Source code for gwcelery.voevent.bootsteps

import threading

from celery import bootsteps
from celery.concurrency import solo

from .logging import log
from .signals import voevent_received
from .util import get_host_port, get_local_ivo, get_network

__all__ = ('Broadcaster', 'Reactor', 'Receiver')


class VOEventBootStep(bootsteps.ConsumerStep):
    """Generic boot step to limit us to appropriate kinds of workers."""

    def include_if(self, consumer):
        """Only include this bootstep in workers that are configured to listen
        to the ``voevent`` queue.
        """
        return 'voevent' in consumer.app.amqp.queues

    def create(self, consumer):
        if not isinstance(consumer.pool, solo.TaskPool):
            raise RuntimeError(
                'The VOEvent broker only works with the "solo" task pool. '
                'Start the worker with "--queues=voevent --pool=solo".')

    def start(self, consumer):
        log.info('Starting %s', self.name)

    def stop(self, consumer):
        log.info('Stopping %s', self.name)


[docs]class Reactor(VOEventBootStep): """Run the global Twisted reactor in background thread. The Twisted reactor is a global run loop that drives all Twisted services and operations. This boot step starts the Twisted reactor in a background thread when the Celery consumer starts, and stops the thread when the Consumer terminates. """ name = 'Twisted reactor' def __init__(self, consumer, **kwargs): self._thread = None
[docs] def start(self, consumer): super().start(consumer) from twisted.internet import reactor self._thread = threading.Thread(target=reactor.run, args=(False,), name='TwistedReactorThread') self._thread.start()
[docs] def stop(self, consumer): from twisted.internet import reactor super().stop(consumer) reactor.callFromThread(reactor.stop) self._thread.join()
class TwistedService(VOEventBootStep): """A generic bootstep to create, start, and stop a Twisted service.""" requires = VOEventBootStep.requires + (Reactor,) def __init__(self, consumer, **kwargs): self._service = None def create_service(self, consumer): raise NotImplementedError def start(self, consumer): from twisted.internet import reactor super().start(consumer) self._service = self.create_service(consumer) reactor.callFromThread(self._service.startService) def stop(self, consumer): from twisted.internet import reactor super().stop(consumer) reactor.callFromThread(self._service.stopService)
[docs]class Broadcaster(TwistedService): """Comet-based VOEvent broadcaster. Run a Comet-based VOEvent broadcaster (:class:`comet.protocol.broadcaster.VOEventBroadcasterFactory`). Starts after the :class:`~gwcelery.voevent.bootsteps.Reactor` bootstep. A few :doc:`configuration options <configuration>` are available: * ``voevent_broadcaster_address``: The address to bind to, in :samp:`{host}:{port}` format. * ``voevent_broadcaster_whitelist``: A list of hostnames, IP addresses, or CIDR address ranges from which to accept connections. The list of active connections is made available :ref:`inspection <celery:worker-inspect>` with the ``gwcelery inspect stats`` command under the ``voevent-broker-peers`` key. """ name = 'VOEvent broadcaster'
[docs] def create_service(self, consumer): from comet.protocol.broadcaster import VOEventBroadcasterFactory from comet.utility import WhitelistingFactory from twisted.application.internet import TCPServer conf = consumer.app.conf local_ivo = get_local_ivo(consumer.app) host, port = get_host_port(conf['voevent_broadcaster_address']) allow = [get_network(a) for a in conf['voevent_broadcaster_whitelist']] conf['voevent_broadcaster_factory'] = self._factory = factory = \ VOEventBroadcasterFactory(local_ivo, 0) if allow: factory = WhitelistingFactory(factory, allow, 'subscription') return TCPServer(port, factory, interface=host)
[docs] def info(self, consumer): try: peers = [ b.transport.getPeer().host for b in self._factory.broadcasters] except: # noqa: E722 log.exception('failed to get info for voevent-broker-peers') peers = [] return {'voevent-broker-peers': peers}
[docs]class Receiver(TwistedService): """VOEvent receiver. Run a Comet-based VOEvent receiver (:class:`comet.protocol.subscriber.VOEventSubscriberFactory`). Starts after the :class:`~gwcelery.voevent.bootsteps.Reactor` bootstep. A few :doc:`configuration options <configuration>` are available: * ``voevent_receiver_address``: The address to connect to, in :samp:`{host}:{port}` format. The list of active connections is made available :ref:`inspection <celery:worker-inspect>` with the ``gwcelery inspect stats`` command under the ``voevent-receiver-peers`` key. """ name = 'VOEvent receiver' requires = TwistedService.requires + ( 'celery.worker.consumer.tasks:Tasks',)
[docs] def create_service(self, consumer): from comet.icomet import IHandler from twisted.application.internet import TCPClient from zope.interface import implementer from .subscriber import VOEventSubscriberFactory @implementer(IHandler) class Handler: def __call__(self, event): from twisted.internet import reactor reactor.callInThread( voevent_received.send, sender=None, xml_document=event) conf = consumer.app.conf local_ivo = get_local_ivo(consumer.app) host, port = get_host_port(conf['voevent_receiver_address']) self._factory = factory = VOEventSubscriberFactory( local_ivo=local_ivo, handlers=[Handler()]) return TCPClient(host, port, factory)
[docs] def info(self, consumer): try: peers = [ b.transport.getPeer().host for b in self._factory.subscribers] except: # noqa: E722 log.exception('failed to get info for voevent-receiver-peers') peers = [] return {'voevent-receiver-peers': peers}