Source code for gwcelery.tasks.gcn

"""Tasks to send and receive Gamma-ray Coordinates Network [GCN]_ notices.

References
----------
.. [GCN] https://gcn.gsfc.nasa.gov

"""
import difflib
import html
import urllib.parse

import gcn
import lxml.etree
from comet.utility.xml import xml_document
from gcn import NoticeType, get_notice_type
from twisted.internet import reactor

from .. import app
from ..voevent.signals import voevent_received
from . import gracedb
from .core import DispatchHandler


class _VOEventDispatchHandler(DispatchHandler):

    def process_args(self, event):
        notice_type = get_notice_type(event.element)

        # Just cast to enum for prettier log messages
        try:
            notice_type = NoticeType(notice_type)
        except ValueError:
            pass

        return notice_type, (event.raw_bytes,), {}


handler = _VOEventDispatchHandler()
r"""Function decorator to register a handler callback for specified GCN notice
types. The decorated function is turned into a Celery task, which will be
automatically called whenever a matching GCN notice is received.

Parameters
----------
\*keys
    List of GCN notice types to accept
\*\*kwargs
    Additional keyword arguments for :meth:`celery.Celery.task`.

Examples
--------
Declare a new handler like this::

    @gcn.handler(gcn.NoticeType.FERMI_GBM_GND_POS,
                 gcn.NoticeType.FERMI_GBM_FIN_POS)
    def handle_fermi(payload):
        root = lxml.etree.fromstring(payload)
        # do work here...
"""


@voevent_received.connect
def _on_voevent_received(xml_document, **kwargs):
    handler.dispatch(xml_document)


[docs]class SendingError(RuntimeError): """A generic error associated with sending VOEvents."""
[docs]@app.task(autoretry_for=(SendingError,), bind=True, default_retry_delay=20.0, ignore_result=True, queue='voevent', retry_backoff=True, retry_kwargs=dict(max_retries=10), shared=False) def send(self, message): """Send a VOEvent to GCN. This task will be retried several times if the VOEvent cannot be sent. See the Raises section below for circumstances that cause a retry. Parameters ---------- message : bytes The raw VOEvent file contents. Raises ------ SendingError If the VOEvent could not be sent because there were no network peers connected to the VOEvent broadcaster. """ broadcasters = self.app.conf['voevent_broadcaster_factory'].broadcasters if broadcasters: event = xml_document(message) for broadcaster in broadcasters: reactor.callFromThread(broadcaster.send_event, event) elif self.app.conf['voevent_broadcaster_whitelist']: raise SendingError('Not sending the event because there are no ' 'subscribers connected to the GCN broker.')
[docs]@handler(gcn.NoticeType.LVC_EARLY_WARNING, gcn.NoticeType.LVC_PRELIMINARY, gcn.NoticeType.LVC_INITIAL, gcn.NoticeType.LVC_UPDATE, gcn.NoticeType.LVC_RETRACTION, bind=True, shared=False) def validate(self, payload): """Validate LIGO/Virgo GCN notices. Check that the contents of a public LIGO/Virgo GCN matches the original VOEvent in GraceDB. Notes ----- If the VOEvent broadcaster is disabled by setting :obj:`~gwcelery.conf.voevent_broadcaster_whitelist` to an empty list, then this task becomes a no-op. """ if not self.app.conf['voevent_broadcaster_whitelist']: return root = lxml.etree.fromstring(payload) # Which GraceDB ID does this refer to? graceid = root.find("./What/Param[@name='GraceID']").attrib['value'] # Which VOEvent does this refer to? u = urllib.parse.urlparse(root.attrib['ivorn']) local_id = u.fragment filename = local_id + '.xml' # Download and parse original VOEvent orig = gracedb.download(filename, graceid) # Create a diff of the two VOEvents. diff = ''.join( difflib.unified_diff( *( [ line.decode('ascii', 'surrogateescape') for line in contents.splitlines(keepends=True) ] for contents in (orig, payload) ), fromfile='{} (sent)'.format(filename), tofile='{} (received)'.format(filename) ) ) if diff: # Write a log message to indicate that the event differed. msg = 'VOEvent received from GCN differs from what we sent.' gracedb.upload.delay( None, None, graceid, '{}<pre>{}</pre>'.format(msg, html.escape(diff)), ['em_follow']) raise ValueError('{}\n\n{}'.format(msg, diff)) else: # Tag the VOEvent to indicate that it was received correctly. gracedb.create_tag.delay(filename, 'gcn_received', graceid)