Source code for gwcelery.tasks.first2years_external

"""Create mock external events to be in coincidence
   with MDC superevents."""

import random
import re
from pathlib import Path

import numpy as np
from astropy.time import Time
from lxml import etree

from .. import app
from ..tests import data
from ..util import read_json
from . import external_triggers, igwn_alert, raven


[docs]def create_upload_external_event(gpstime, pipeline, ext_search): """Create a random external event VOEvent or Kafka alert packet. Parameters ---------- gpstime : float Event's GPS time pipeline : str External trigger pipeline name ext_search : str Search field for external event Returns ------- event : str or dict Alert packet in format as if it was sent from GCN or Kafka, in a XML GCN notice alert packet in string format for the GRB or SubGRB search or a dictionary Kafka packet if for SubGRBTargeted search. """ new_date = str(Time(gpstime, format='gps', scale='utc').isot) + 'Z' new_TrigID = str(int(gpstime)) ra = random.uniform(0, 360) thetas = np.arange(-np.pi / 2, np.pi / 2, .01) dec = random.choices(np.rad2deg(thetas), weights=np.cos(thetas) / sum(np.cos(thetas)))[0] error = .05 if pipeline == 'Swift' else random.uniform(1, 30) if pipeline in {'Fermi', 'Swift', 'INTEGRAL'}: is_grb = True else: is_grb = False # If SubGRBTargeted modify alert packet from Kafka if ext_search == 'SubGRBTargeted': if pipeline == 'Fermi': # Template based on: # https://github.com/joshuarwood/gcn-schema/tree/main/gcn/notices/ # fermi/gbm alert = read_json(data, 'kafka_alert_fermi.json') # Remove sky map file to create our own sky map alert.pop('healpix_file') alert['dec_uncertainty'] = [error] elif pipeline == 'Swift': # Template based on: # https://github.com/nasa-gcn/gcn-schema/tree/main/gcn/notices/ # swift/bat/guano alert = read_json(data, 'kafka_alert_swift.json') alert['ra_uncertainty'] = [error] else: raise ValueError( 'Only use Fermi or Swift for SubGRBTargeted search') alert['trigger_time'] = new_date alert['alert_datetime'] = new_date alert['id'] = [new_TrigID] alert['ra'] = ra alert['dec'] = dec # Generate FAR from max threshold and trials factors alert['far'] = \ (app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] * random.uniform(0, 1)) external_triggers.handle_targeted_kafka_alert(alert) # Return VOEvent for testing until GraceDB natively ingests kafka # alerts return external_triggers._kafka_to_voevent(alert)[0] # Otherwise modify respective VOEvent template else: fname = str(Path(__file__).parent / '../tests/data/{0}{1}_{2}gcn.xml'.format( pipeline.lower(), '_subthresh' if ext_search == 'SubGRB' else '', 'grb_' if is_grb else '')) root = etree.parse(fname) # Change ivorn to indicate if this is an MDC event or O3 replay event root.xpath('.')[0].attrib['ivorn'] = \ 'ivo://lvk.internal/{0}#{1}{2}_event_{3}'.format( pipeline if pipeline != 'Swift' else 'SWIFT', '_subthresh' if ext_search == 'SubGRB' else '', 'MDC-test' if ext_search == 'MDC' else 'O3-replay', new_date).encode() # Change times to chosen time root.find("./Who/Date").text = str(new_date).encode() root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Time/TimeInstant/" "ISOTime")).text = str(new_date).encode() if ext_search == 'SubGRB': root.find("./What/Param[@name='Trans_Num']").attrib['value'] = \ str(new_TrigID).encode() else: root.find("./What/Param[@name='TrigID']").attrib['value'] = \ str(new_TrigID).encode() if is_grb: # Give random sky position root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/Value2/" "C1")).text = str(ra).encode() root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/Value2/" "C2")).text = str(dec).encode() if pipeline != 'Swift': root.find( ("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/" "Error2Radius")).text = str(error).encode() event = etree.tostring(root, xml_declaration=True, encoding="UTF-8", pretty_print=True) # Upload as from GCN if is_grb: external_triggers.handle_grb_gcn(event) else: external_triggers.handle_snews_gcn(event) return event
def _offset_time(gpstime, group, pipeline, ext_search): """Offsets the given GPS time by applying a random number within a time window, determine by the search being done. Parameters ---------- gpstime : float Event's GPS time group : str Burst or CBC pipeline : str Pipeline field for external event ext_search : str Search field for external event Returns ------- gsptime_adjusted : float Event's original gps time plus a random number within the determined search window """ tl, th = raven._time_window('S1', group, [pipeline], [ext_search]) return gpstime + random.uniform(tl, th) def _is_joint_mdc(graceid, se_search): """Determine whether to upload an external events using user-defined frequency of MDC or AllSky superevents. Looks at the ending letters of a superevent (e.g. 'ac' from 'MS190124ac'), converts to a number, and checks if divisible by a number given in the configuration file. For example, if the configuration number :obj:`~gwcelery.conf.joint_mdc_freq` is 10, this means joint events with superevents ending with 'j', 't', 'ad', etc. Parameters ---------- graceid : str GraceDB ID of superevent se_search : str Search field for preferred event in superevent Returns ------- is_joint_mdc : bool Returns True if the GraceDB ID matches pre-determined frequency rates """ end_string = re.split(r'\d+', graceid)[-1].lower() val = 0 for i in range(len(end_string)): val += (ord(end_string[i]) - 96) * 26 ** (len(end_string) - i - 1) return val % int(app.conf['joint_mdc_freq']) == 0 if se_search == 'MDC' \ else val % int(app.conf['joint_O3_replay_freq']) == 0
[docs]@igwn_alert.handler('mdc_superevent', 'superevent', shared=False) def upload_external_event(alert, ext_search=None): """Upload a random GRB event(s) for a certain fraction of MDC or O3-replay superevents. Notes ----- Every n superevents, upload a GRB candidate within the standard CBC-GRB or Burst-GRB search window, where the frequency n is determined by the configuration variable :obj:`~gwcelery.conf.joint_mdc_freq` or :obj:`~gwcelery.conf.joint_O3_replay_freq`. For O3 replay testing with RAVEN pipeline, only runs on gracedb-playground. Parameters ---------- alert : dict IGWN alert packet ext_search : str Search field for external event Returns ------- events, pipelines : tuple Returns tuple of the list of external events created and the list of pipelines chosen for each event """ if alert['alert_type'] != 'new': return se_search = alert['object']['preferred_event_data']['search'] group = alert['object']['preferred_event_data']['group'] is_gracedb_playground = app.conf['gracedb_host'] \ == 'gracedb-playground.ligo.org' joint_mdc_alert = se_search == 'MDC' and _is_joint_mdc(alert['uid'], 'MDC') joint_allsky_alert = se_search == 'AllSky' and \ _is_joint_mdc(alert['uid'], 'AllSky') and is_gracedb_playground and \ group in {'CBC', 'Burst'} if not (joint_mdc_alert or joint_allsky_alert): return # Potentially upload 1, 2, or 3 GRB events num = 1 + np.random.choice(np.arange(3), p=[.6, .3, .1]) if joint_mdc_alert: # If joint MDC alert, make external event MDC if ext_search is None: ext_search = 'MDC' elif ext_search == 'MDC': pass else: raise ValueError('External search must be "MDC" if MDC superevent') # If O3 replay, choose search from acceptable list elif ext_search is None: # Determine search for external event and then pipelines ext_search = \ (np.random.choice(['GRB', 'SubGRB', 'SubGRBTargeted'], p=[.6, .1, .3]) if joint_allsky_alert else 'GRB') # Choose pipeline(s) based on search if ext_search in {'GRB', 'MDC'}: pipelines = np.random.choice(['Fermi', 'Swift', 'INTEGRAL'], p=[.6, .3, .1,], size=num, replace=False) elif ext_search == 'SubGRB': pipelines = np.full(num, 'Fermi') elif ext_search == 'SubGRBTargeted': # Only two current pipelines in targeted search so map 3 => 2 num = min(num, 2) pipelines = np.random.choice(['Fermi', 'Swift'], p=[.5, .5], size=num, replace=False) events = [] for pipeline in pipelines: gpstime = float(alert['object']['t_0']) new_time = _offset_time(gpstime, group, pipeline, ext_search) # Choose external grb pipeline to simulate ext_event = create_upload_external_event( new_time, pipeline, ext_search) events.append(ext_event) return events, pipelines
[docs]@app.task(ignore_result=True, shared=False) def upload_snews_event(): """Create and upload a SNEWS-like MDC external event. Returns ------- ext_event : str XML GCN notice alert packet in string format """ current_time = Time(Time.now(), format='gps').value ext_event = create_upload_external_event(current_time, 'SNEWS', 'MDC') return ext_event