Source code for gwcelery.tasks.bayestar

"""Rapid sky localization with :mod:`BAYESTAR <ligo.skymap.bayestar>`."""
import io
import logging
import urllib.parse

from celery.exceptions import Ignore
from ligo.lw.utils import load_fileobj
from ligo.skymap import bayestar as _bayestar
from ligo.skymap.io import events, fits

from .. import app
from . import gracedb

log = logging.getLogger('BAYESTAR')


[docs]@app.task(queue='openmp', shared=False) def localize(coinc_psd, graceid, disabled_detectors=None): """Generate a rapid sky localization using :mod:`BAYESTAR <ligo.skymap.bayestar>`. Parameters ---------- coinc_psd : byte contents of the input event's ``coinc.xml`` file that includes PSD. graceid : str The GraceDB ID, used for FITS metadata and recording log messages to GraceDB. disabled_detectors : list, optional List of detectors to disable. Returns ------- bytes The byte contents of the finished FITS file. Notes ----- This task is adapted from the command-line tool :doc:`bayestar-localize-lvalert <ligo.skymap:tool/bayestar_localize_lvalert>`. It should execute in a special queue for computationally intensive, multithreaded, OpenMP tasks. """ # Determine the base URL for event pages. scheme, netloc, *_ = urllib.parse.urlparse(gracedb.client.url) base_url = urllib.parse.urlunparse((scheme, netloc, 'events', '', '', '')) try: # A little bit of Cylon humor log.info('by your command...') # Read the coinc.xml into a document doc = load_fileobj(io.BytesIO(coinc_psd), contenthandler=events.ligolw.ContentHandler) # Parse event event_source = events.ligolw.open(doc, psd_file=doc, coinc_def=None) if disabled_detectors: event_source = events.detector_disabled.open( event_source, disabled_detectors) event, = event_source.values() # Run BAYESTAR log.info('starting sky localization') # FIXME: the low frequency cutoff should not be hardcoded. # It should be provided in the coinc.xml file. skymap = _bayestar.localize(event, f_low=15.0) skymap.meta['objid'] = str(graceid) skymap.meta['url'] = '{}/{}'.format(base_url, graceid) log.info('sky localization complete') with io.BytesIO() as f: fits.write_sky_map(f, skymap, moc=True) return f.getvalue() except events.DetectorDisabledError: raise Ignore()