"""Tasks to validate the GCN Notice types of the e-mail formats [GCN e-mail]_.
References
----------
.. [GCN e-mail] https://gcn.gsfc.nasa.gov/lvc.html#tc13
"""
import email
import email.policy
from math import isclose
import lxml.etree
from celery.utils.log import get_task_logger
from .. import app
from ..email.signals import email_received
from . import gracedb
log = get_task_logger(__name__)
def _trigger_datetime(gcn_notice_mail):
"""Get trigger data and time from a GCN email notice."""
# We now add Z to ISOtime
# TRIGGER_DATE: 20123 TJD; 179 DOY; 2023/06/28 (yyyy/mm/dd)
# TRIGGER_TIME: 83520.000000 SOD {23:12:00.000000} UT
# <ISOTime>2023-06-28T23:12:00Z</ISOTime>
trigger_date = gcn_notice_mail[
"TRIGGER_DATE"].split()[4].replace("/", "-")
# FIXME: replace with a regular expression.
trigger_time = gcn_notice_mail["TRIGGER_TIME"].split()[2]
trigger_time = trigger_time.replace("{", "").replace("}", "")
trigger_time = trigger_time.split('.')[0]
trigger_datetime = (f'{trigger_date}T{trigger_time}Z')
return trigger_datetime
def _vo_match_notice(gcn_notice_mail, params_vo, trigger_time_vo):
"""Match the notice-email and the VOtable keywords."""
dict_checks = {}
# TRIGGER_DATE+TRIGGER_TIME
trigger_datetime_notice_mail = _trigger_datetime(gcn_notice_mail)
match_trigger_datetime = (
trigger_datetime_notice_mail == trigger_time_vo)
dict_checks['TRIGGER_DATETIME'] = match_trigger_datetime
# SEQUENCE_NUM
match_sequence_num = (
gcn_notice_mail["SEQUENCE_NUM"].split()[0] == params_vo["Pkt_Ser_Num"])
dict_checks['SEQUENCE_NUM'] = match_sequence_num
if params_vo['AlertType'] == 'Retraction':
return dict_checks
# Notice keywords
notice_keys = ({"types": ["GROUP_TYPE", "PIPELINE_TYPE", "SEARCH_TYPE"],
"classif_props_cbc": ["PROB_NS", "PROB_REMNANT",
"PROB_BNS", "PROB_NSBH", "PROB_BBH",
"PROB_TERRES"],
"urls": ["SKYMAP_FITS_URL", "EVENTPAGE_URL"],
"classif_props_burst": ["CENTRAL_FREQ", "DURATION"]})
# Votable keywords
vo_keys = ({"types": ["Group", "Pipeline", "Search"],
"classif_props_cbc": ["HasNS", "HasRemnant", "BNS",
"NSBH", "BBH", "Terrestrial"],
"urls": ["skymap_fits", "EventPage"],
"classif_props_burst": ["CentralFreq", "Duration"]})
# FAR
far_notice = float(gcn_notice_mail["FAR"].split()[0])
match_far = isclose(far_notice, float(params_vo["FAR"]), rel_tol=0.001)
dict_checks['FAR'] = match_far
# Group and pipeline types
for notice_key, vo_key in zip(notice_keys["types"], vo_keys["types"]):
value_notice = gcn_notice_mail[notice_key].split()[2]
match = (value_notice == params_vo[vo_key])
dict_checks[notice_key] = match
# EventPage/EVENTPAGE_URL and skymap_fits/SKYMAP_FITS_URL
for notice_key, vo_key in zip(notice_keys["urls"], vo_keys["urls"]):
value_notice = gcn_notice_mail[notice_key]
match = (value_notice == params_vo[vo_key])
dict_checks[notice_key] = match
# CBC classification and properties
if params_vo['Group'] == 'CBC':
for notice_key, vo_key, in zip(notice_keys["classif_props_cbc"],
vo_keys["classif_props_cbc"]):
value_notice = float(gcn_notice_mail[notice_key].split()[0])
match = isclose(value_notice, float(params_vo[vo_key]),
abs_tol=0.01)
dict_checks[notice_key] = match
# Burst Properties
if params_vo['Group'] == 'Burst':
for notice_key, vo_key in zip(notice_keys["classif_props_burst"],
vo_keys["classif_props_burst"]):
value_notice = float(gcn_notice_mail[notice_key].split()[0])
match = isclose(value_notice,
float(params_vo[vo_key]), rel_tol=0.001)
dict_checks[notice_key] = match
return dict_checks
def _vo_match_comments(gcn_notice_mail, params_vo):
"""Check the notice-email comments for the contributed instruments."""
dict_check_comments = {}
comments_notice_mail = gcn_notice_mail.get_all("COMMENTS")
instruments_vo = params_vo["Instruments"]
text = ' contributed to this candidate event.'
gcn_to_vo_instruments = {'LIGO-Hanford Observatory': 'H1',
'LIGO-Livingston Observatory': 'L1',
'VIRGO Observatory': 'V1'}
instrument_comments = (line.strip() for line in comments_notice_mail)
instruments_gcn = {gcn_to_vo_instruments[line[:-len(text)]]
for line in instrument_comments if line.endswith(text)}
instruments_vo = set(instruments_vo.split(','))
match_instruments = (instruments_gcn == instruments_vo)
dict_check_comments["INSTRUMENT"] = match_instruments
return dict_check_comments
[docs]@email_received.connect
def on_email_received(rfc822, **kwargs):
"""Read the RFC822 email."""
message = email.message_from_bytes(rfc822, policy=email.policy.default)
validate_text_notice.s(message).delay()
[docs]@app.task(shared=False)
def validate_text_notice(message):
"""Validate LIGO/Virgo GCN e-mail notice format.
Check that the contents of a public LIGO/Virgo GCN e-mail notice format
matches the original VOEvent in GraceDB.
"""
# Filter from address and subject
if message['From'] != 'Bacodine <vxw@capella2.gsfc.nasa.gov>':
log.info('Email is not from BACODINE. Subject:%s', message['Subject'])
log.info('Sender is: %s', message['From'])
return
# Write message log
log.info('Validating Notice: Subject:%s', message['Subject'])
# Parse body email
bodymsg = message.get_payload()
notice = email.message_from_string(bodymsg)
# Get notice type
notice_type = notice['NOTICE_TYPE']
if notice_type.split(" ")[-1] == "Skymap":
notice_type = notice_type.split(" ")[-2]
else:
notice_type = notice_type.split(" ")[-1]
# GCN e-mail notice type for EarlyWarning is Early_Warning
# while we have the ivo://gwnet/LVC#S231030av-1-EarlyWarning
# No underscore in ivo Fix IT
notice_type = notice_type.replace('_', '')
# Get gracedb id and sequence number
trigger_num = notice['TRIGGER_NUM']
sequence_num = notice['SEQUENCE_NUM']
# Download VOevent
filename = f'{trigger_num}-{sequence_num}-{notice_type}.xml'
payload = gracedb.download(filename, trigger_num)
# Parse VOevent
root = lxml.etree.fromstring(payload)
params_vo = {elem.attrib['name']:
elem.attrib['value']
for elem in root.iterfind('.//Param')}
trigger_time_vo = root.findtext('.//ISOTime')
# Match
filename_email = 'email_' + filename.replace('.xml', '.txt')
gracedb.upload.delay(bodymsg, filename_email, trigger_num,
'email notice corresponding to ' + filename,
tags=['em_follow'])
error = None
try:
if notice_type == 'Retraction':
match = _vo_match_notice(notice, params_vo, trigger_time_vo)
elif params_vo['Group'] in ["CBC", "Burst"]:
match = {**_vo_match_notice(notice, params_vo, trigger_time_vo),
**_vo_match_comments(notice, params_vo)}
else:
match = {}
error = f'Email notice {filename} has unknown notice type'
mismatched = ' '.join(key for key, value in match.items() if not value)
if mismatched:
error = \
f'Email notice {filename} has mismatched keys: {mismatched}'
except KeyError as err:
# Since there was an exeception, the gcn was not annnotated
error = f'Email notice {filename} missing key: {err}'
except Exception as err:
# Since there are other possible exceptions
# we also catch generic error not to stop exection
# and record the exception
error = f'Email notice {filename} generated exception: {err}'
if error:
gracedb.create_tag.delay(filename, 'gcn_email_notok', trigger_num)
gracedb.upload.delay(None, None, trigger_num,
error, tags=['em_follow'])
else:
gracedb.create_tag.delay(filename, 'gcn_email_ok', trigger_num)