Source code for gwcelery.tasks.inference

"""Source Parameter Estimation with LALInference, Bilby, and RapidPE."""
import glob
import json
import os
import subprocess
import urllib
from distutils.dir_util import mkpath
from distutils.spawn import find_executable

import numpy as np
from bilby_pipe.bilbyargparser import BilbyConfigFileParser
from bilby_pipe.utils import convert_string_to_dict
from celery import group
from celery.exceptions import Ignore

from .. import app
from ..jinja import env
from . import condor, gracedb

_RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE = 100

RAPIDPE_GETENV = [
    "DEFAULT_SEGMENT_SERVER", "GWDATAFIND_SERVER",
    "LAL_DATA_PATH", "LD_LIBRARY_PATH", "LIBRARY_PATH",
    "NDSSERVER", "PATH", "PYTHONPATH",
]
"""Names of environment variables to include in RapidPE HTCondor submit
files."""

RAPIDPE_ENVIRONMENT = {
    "RIFT_LOWLATENCY": "True",
}
"""Names and values of environment variables to include in RapidPE HTCondor
submit files."""


def _find_appropriate_cal_env(trigtime, dir_name):
    """Return the path to the calibration uncertainties estimated at the time
    before and closest to the trigger time. If there are no calibration
    uncertainties estimated before the trigger time, return the oldest one. The
    gpstimes at which the calibration uncertainties were estimated and the
    names of the files containing the uncertaintes are saved in
    [HLV]_CalEnvs.txt.

    Parameters
    ----------
    trigtime : float
        The trigger time of a target event
    dir_name : str
        The path to the directory where files containing calibration
        uncertainties exist

    Return
    ------
    path : str
        The path to the calibration uncertainties appropriate for a target
        event

    """
    filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt'))
    calibration_index = np.atleast_1d(
        np.recfromtxt(filename, names=['gpstime', 'filename'])
    )
    gpstimes = calibration_index['gpstime']
    candidate_gpstimes = gpstimes < trigtime
    if np.any(candidate_gpstimes):
        idx = np.argmax(gpstimes * candidate_gpstimes)
        appropriate_cal = calibration_index['filename'][idx]
    else:
        appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)]
    return os.path.join(dir_name, appropriate_cal.decode('utf-8'))


[docs]def prepare_lalinference_ini(event, superevent_id): """Determine LALInference configurations and return ini file content Parameters ---------- event : dict The json contents of a target G event retrieved from gracedb.get_event(), whose mass and spin information are used to determine analysis settings. superevent_id : str The GraceDB ID of a target superevent Returns ------- ini_contents : str """ # Get template of .ini file ini_template = env.get_template('lalinference.jinja2') # fill out the ini template and return the resultant content singleinspiraltable = event['extra_attributes']['SingleInspiral'] trigtime = event['gpstime'] executables = {'datafind': 'gw_data_find', 'mergeNSscript': 'lalinference_nest2pos', 'mergeMCMCscript': 'cbcBayesMCMC2pos', 'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s', 'resultspage': 'cbcBayesPostProc', 'segfind': 'ligolw_segment_query', 'ligolw_print': 'ligolw_print', 'coherencetest': 'lalinference_coherence_test', 'lalinferencenest': 'lalinference_nest', 'lalinferencemcmc': 'lalinference_mcmc', 'lalinferencebambi': 'lalinference_bambi', 'lalinferencedatadump': 'lalinference_datadump', 'ligo-skymap-from-samples': 'true', 'ligo-skymap-plot': 'true', 'processareas': 'process_areas', 'computeroqweights': 'lalinference_compute_roq_weights', 'mpiwrapper': 'lalinference_mpi_wrapper', 'gracedb': 'gracedb', 'ppanalysis': 'cbcBayesPPAnalysis', 'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral', 'bayeswave': 'BayesWave', 'bayeswavepost': 'BayesWavePost'} ini_settings = { 'gracedb_host': app.conf['gracedb_host'], 'types': app.conf['low_latency_frame_types'], 'channels': app.conf['strain_channel_names'], 'state_vector_channels': app.conf['state_vector_channel_names'], 'webdir': os.path.join( app.conf['pe_results_path'], superevent_id, 'lalinference' ), 'paths': [{'name': name, 'path': find_executable(executable)} for name, executable in executables.items()], 'h1_calibration': _find_appropriate_cal_env( trigtime, '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford' ), 'l1_calibration': _find_appropriate_cal_env( trigtime, '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston' ), 'v1_calibration': _find_appropriate_cal_env( trigtime, '/home/cbc/pe/O3/calibrationenvelopes/Virgo' ), 'mc': min([sngl['mchirp'] for sngl in singleinspiraltable]), 'q': min([sngl['mass2'] / sngl['mass1'] for sngl in singleinspiraltable]), 'mpirun': find_executable('mpirun') } return ini_template.render(ini_settings)
[docs]@app.task(shared=False) def _setup_dag_for_lalinference(coinc, rundir, event, superevent_id): """Create DAG for a lalinference run and return the path to DAG. Parameters ---------- coinc : byte contents Byte contents of ``coinc.xml``. The PSD is expected to be embedded. rundir : str The path to a run directory where the DAG file is created. event : dict The json contents of a target G event retrieved from gracedb.get_event(), whose mass and spin information are used to determine analysis settings. superevent_id : str The GraceDB ID of a target superevent Returns ------- path_to_dag : str The path to the .dag file """ # write down coinc.xml in the run directory path_to_coinc = os.path.join(rundir, 'coinc.xml') with open(path_to_coinc, 'wb') as f: f.write(coinc) # write down and upload ini file ini_contents = prepare_lalinference_ini(event, superevent_id) path_to_ini = os.path.join(rundir, 'online_lalinference_pe.ini') with open(path_to_ini, 'w') as f: f.write(ini_contents) gracedb.upload.delay( ini_contents, filename=os.path.basename(path_to_ini), graceid=superevent_id, message=('Automatically generated LALInference configuration file' ' for this event.'), tags='pe') try: subprocess.run( ['lalinference_pipe', '--run-path', rundir, '--coinc', path_to_coinc, path_to_ini, '--psd', path_to_coinc], capture_output=True, check=True) except subprocess.CalledProcessError as e: contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr gracedb.upload.delay( filecontents=contents, filename='lalinference_dag.log', graceid=superevent_id, message='Failed to prepare DAG for lalinference', tags='pe' ) raise return os.path.join(rundir, 'multidag.dag')
[docs]@app.task(shared=False) def _setup_dag_for_bilby( coinc_bayestar, rundir, event, superevent_id, mode="production" ): """Create DAG for a bilby run and return the path to DAG. Parameters ---------- coinc_bayestar : tuple Byte contents of ``coinc.xml`` and ``bayestar.multiorder.fits``. rundir : str The path to a run directory where the DAG file is created event : dict The json contents of a target G event retrieved from gracedb.get_event(), whose mass and spin information are used to determine analysis settings. superevent_id : str The GraceDB ID of a target superevent mode : str Analysis mode, allowed options are "production" and "fast_test", default is "production". Returns ------- path_to_dag : str The path to the .dag file Notes ----- `--channel-dict o3replay` is added to bilby_pipe_gracedb arguments when the gracedb host is different from `gracedb.ligo.org` or `gracedb-test.ligo.org`. Condor queue is set to `Online_PE` if gracedb host is `gracedb.ligo.org`, and `Online_PE_MDC` otherwise. """ path_to_json = os.path.join(rundir, 'event.json') with open(path_to_json, 'w') as f: json.dump(event, f, indent=2) coinc, bayestar = coinc_bayestar path_to_psd = os.path.join(rundir, 'coinc.xml') with open(path_to_psd, 'wb') as f: f.write(coinc) path_to_bayestar = os.path.join(rundir, 'bayestar.multiorder.fits') with open(path_to_bayestar, 'wb') as f: f.write(bayestar) path_to_webdir = os.path.join( app.conf['pe_results_path'], superevent_id, 'bilby', mode ) path_to_settings = os.path.join(rundir, 'settings.json') setup_arg = ['bilby_pipe_gracedb', '--webdir', path_to_webdir, '--outdir', rundir, '--json', path_to_json, '--psd-file', path_to_psd, '--skymap-file', path_to_bayestar, '--settings', path_to_settings] settings = {'summarypages_arguments': {'gracedb': event['graceid'], 'no_ligo_skymap': True}, 'accounting_user': 'soichiro.morisaki'} if app.conf['gracedb_host'] != 'gracedb.ligo.org': settings['queue'] = 'Online_PE_MDC' else: settings['queue'] = 'Online_PE' settings['accounting'] = 'ligo.prod.o4.cbc.pe.bilby' # FIXME: using live data for gracedb-test events should be reconsidered # when we have a better idea to differentiate MDC and real events. if app.conf['gracedb_host'] not in [ 'gracedb.ligo.org', 'gracedb-test.ligo.org' ]: setup_arg += ['--channel-dict', 'o3replay'] trigger_chirp_mass = event['extra_attributes']['CoincInspiral']['mchirp'] if trigger_chirp_mass < 0.6: raise ValueError( "No bilby settings available for trigger chirp mass of" f" {trigger_chirp_mass}Msun." ) if mode == 'production': settings.update( { 'sampler_kwargs': {'naccept': 60, 'nlive': 500, 'npool': 24, 'sample': 'acceptance-walk'}, 'n_parallel': 3, 'request_cpus': 24, 'spline_calibration_nodes': 10, 'request_memory_generation': 8.0 } ) # use low-spin IMRPhenomD below chirp mass of m1=3Msun, m2=1Msun # assuming binary neutron star if trigger_chirp_mass < 1.465: likelihood_mode = 'lowspin_phenomd_fhigh1024_roq' settings['sampler_kwargs']['naccept'] = 10 # use IMRPhenomPv2 with mass ratio upper bound of 8 below chirp mass of # m1=8Msun, m2=1Msun elif trigger_chirp_mass < 2.243: likelihood_mode = 'phenompv2_bns_roq' # use IMRPhenomPv2 with mass ratio upper bound of 20 in chirp-mass # range where IMRPhenomXPHM ROQ bases are not available elif trigger_chirp_mass < 12: likelihood_mode = 'low_q_phenompv2_roq' else: likelihood_mode = 'phenomxphm_roq' settings['request_memory_generation'] = 36.0 settings['request_memory'] = 16.0 setup_arg += ['--cbc-likelihood-mode', likelihood_mode] elif mode == 'fast_test': setup_arg += ["--sampler-kwargs", "FastTest"] if trigger_chirp_mass < 3.9: setup_arg += ['--cbc-likelihood-mode', 'phenompv2_bns_roq'] settings['request_memory_generation'] = 8.0 else: raise ValueError(f"mode: {mode} not recognized.") with open(path_to_settings, 'w') as f: json.dump(settings, f, indent=2) try: subprocess.run(setup_arg, capture_output=True, check=True) except subprocess.CalledProcessError as e: contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr gracedb.upload.delay( filecontents=contents, filename='bilby_dag.log', graceid=superevent_id, message=f'Failed to prepare DAG for {mode}-mode bilby', tags='pe' ) raise else: # Uploads bilby ini file to GraceDB with open(os.path.join(rundir, 'bilby_config.ini'), 'r') as f: ini_contents = f.read() if mode == 'production': filename = 'bilby_config.ini' else: filename = f'bilby_{mode}_config.ini' gracedb.upload.delay( ini_contents, filename=filename, graceid=superevent_id, message=(f'Automatically generated {mode}-mode Bilby configuration' ' file for this event.'), tags='pe') path_to_dag, = glob.glob(os.path.join(rundir, 'submit/dag*.submit')) return path_to_dag
[docs]@app.task(shared=False) def _setup_dag_for_rapidpe(rundir, superevent_id, event): """Create DAG for a rapidpe run and return the path to DAG. Parameters ---------- rundir : str The path to a run directory where the DAG file is created superevent_id : str The GraceDB ID of a target superevent Returns ------- path_to_dag : str The path to the .dag file """ gracedb_host = app.conf['gracedb_host'] settings = app.conf['rapidpe_settings'] trigger_snr = event['extra_attributes']['CoincInspiral']['snr'] high_snr_trigger = trigger_snr >= 37.5 # dump ini file ini_template = env.get_template('rapidpe.jinja2') ini_contents = ini_template.render( {'rundir': rundir, 'webdir': os.path.join( app.conf['pe_results_path'], superevent_id, 'rapidpe' ), 'gracedb_url': f'https://{gracedb_host}/api', 'superevent_id': superevent_id, 'run_mode': settings['run_mode'], 'frame_data_types': app.conf['low_latency_frame_types'], 'accounting_group': settings['accounting_group'], 'use_cprofile': settings['use_cprofile'], 'gracedb_host': gracedb_host, 'high_snr_trigger': high_snr_trigger, 'getenv': RAPIDPE_GETENV, 'environment': RAPIDPE_ENVIRONMENT}) path_to_ini = os.path.join(rundir, 'rapidpe.ini') with open(path_to_ini, 'w') as f: f.write(ini_contents) gracedb.upload.delay( ini_contents, filename=os.path.basename(path_to_ini), graceid=superevent_id, message=('Automatically generated RapidPE-RIFT configuration file' ' for this event.'), tags='pe') # set up dag try: subprocess.run(['rapidpe-rift-pipe', path_to_ini], capture_output=True, check=True) except subprocess.CalledProcessError as e: contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr message = 'Failed to prepare DAG for Rapid PE' fail_gracefully = False if e.returncode == _RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE: fail_gracefully = True message += ": no GstLAL trigger available" gracedb.upload.delay( filecontents=contents, filename='rapidpe_dag.log', graceid=superevent_id, message=message, tags='pe' ) if fail_gracefully: # Ends task but without logging as a failure raise Ignore() else: # Ends task with the unhandled error logged raise # return path to dag dag = os.path.join(rundir, "event_all_iterations.dag") return dag
[docs]@app.task(shared=False) def _condor_no_submit(path_to_dag, include_env=None): """Run 'condor_submit_dag -no_submit' and return the path to .sub file.""" args = ['condor_submit_dag'] if include_env is not None: args += ['-include_env', ','.join(include_env)] args += ['-no_submit', path_to_dag] subprocess.run(args, capture_output=True, check=True) return '{}.condor.sub'.format(path_to_dag)
[docs]def dag_prepare_task(rundir, event, superevent_id, pe_pipeline, **kwargs): """Return a canvas of tasks to prepare DAG. Parameters ---------- rundir : str The path to a run directory where the DAG file is created event : dict The json contents of a target G event retrieved from gracedb.get_event(), whose mass and spin information are used to determine analysis settings. superevent_id : str The GraceDB ID of a target superevent pe_pipeline : str The parameter estimation pipeline used, lalinference, bilby, or rapidpe. Returns ------- canvas : canvas of tasks The canvas of tasks to prepare DAG """ # List of environment variables `condor_submit_dag` should be aware of. include_env = None if pe_pipeline == 'lalinference': canvas = gracedb.download.si('coinc.xml', event['graceid']) | \ _setup_dag_for_lalinference.s(rundir, event, superevent_id) elif pe_pipeline == 'bilby': canvas = group( gracedb.download.si('coinc.xml', event['graceid']), gracedb.download.si('bayestar.multiorder.fits', event['graceid']) ) | _setup_dag_for_bilby.s( rundir, event, superevent_id, kwargs['bilby_mode'] ) elif pe_pipeline == 'rapidpe': canvas = _setup_dag_for_rapidpe.s(rundir, superevent_id, event) include_env = RAPIDPE_GETENV else: raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') canvas |= _condor_no_submit.s(include_env=include_env) return canvas
def _find_paths_from_name(directory, name): """Return the paths of files or directories with given name under the specfied directory Parameters ---------- directory : string Name of directory under which the target file or directory is searched for. name : string Name of target files or directories Returns ------- paths : generator Paths to the target files or directories """ return glob.iglob(os.path.join(directory, '**', name), recursive=True)
[docs]@app.task(ignore_result=True, shared=False) def _clean_up_bilby(rundir): """Remove large data products produced by bilby Parameters ---------- rundir : str """ for p in glob.glob( os.path.join(rundir, "data/*_generation_roq_weights.hdf5") ): os.remove(p) for p in glob.glob( os.path.join(rundir, "data/*_generation_data_dump.pickle") ): os.remove(p)
[docs]@app.task(ignore_result=True, shared=False) def job_error_notification(request, exc, traceback, superevent_id, rundir, analysis): """Upload notification when condor.submit terminates unexpectedly. Parameters ---------- request : Context (placeholder) Task request variables exc : Exception Exception raised by condor.submit traceback : str (placeholder) Traceback message from a task superevent_id : str The GraceDB ID of a target superevent rundir : str The run directory for PE analysis : str Analysis name used as a label in uploaded messages Notes ----- Some large bilby data products are cleaned up after the notification if the gracedb host is different from `gracedb.ligo.org`. """ if isinstance(exc, condor.JobRunning): subprocess.run(['condor_rm', str(exc.args[0]['Cluster'])]) canvas = gracedb.upload.si( filecontents=None, filename=None, graceid=superevent_id, tags='pe', message=f'The {analysis} condor job was aborted by gwcelery, ' 'due to its long run time.' ) elif isinstance(exc, condor.JobAborted): canvas = gracedb.upload.si( filecontents=None, filename=None, graceid=superevent_id, tags='pe', message=f'The {analysis} condor job was aborted.' ) else: canvas = gracedb.upload.si( filecontents=None, filename=None, graceid=superevent_id, tags='pe', message=f'The {analysis} condor job failed.' ) if analysis == "rapidpe": to_upload = [ 'event_all_iterations.dag.lib.err', 'marginalize_extrinsic_parameters_iteration_*.dag.lib.err' ] else: to_upload = ['*.log', '*.err', '*.out'] for filename in to_upload: tasks = [] for path in _find_paths_from_name(rundir, filename): with open(path, 'rb') as f: contents = f.read() if contents: # put .log suffix in log file names so that users can directly # read the contents instead of downloading them when they click # file names tasks.append(gracedb.upload.si( filecontents=contents, filename=os.path.basename(path) + '.log', graceid=superevent_id, message=f'A log file for {analysis} condor job.', tags='pe' )) canvas |= group(tasks) if "bilby" in analysis and app.conf['gracedb_host'] != 'gracedb.ligo.org': canvas |= _clean_up_bilby.si(rundir) canvas.delay()
def _upload_tasks_lalinference(rundir, superevent_id): """Return canvas of tasks to upload LALInference results Parameters ---------- rundir : str The path to a run directory superevent_id : str The GraceDB ID of a target superevent Returns ------- tasks : canvas The work-flow for uploading LALInference results """ pe_results_path = os.path.join( app.conf['pe_results_path'], superevent_id, 'lalinference' ) # posterior samples path, = glob.glob( os.path.join(rundir, '**', 'posterior*.hdf5'), recursive=True) with open(path, 'rb') as f: canvas = gracedb.upload.si( f.read(), 'LALInference.posterior_samples.hdf5', superevent_id, 'LALInference posterior samples', 'pe') # plots tasks = [] for filename, message in [ ('extrinsic.png', 'LALInference corner plot for extrinsic parameters'), ('intrinsic.png', 'LALInference corner plot for intrinsic parameters') ]: # Here it is not required that only a single png file exists, so that # posterior samples are uploaded whatever. This applies for the other # files. for path in _find_paths_from_name(pe_results_path, filename): with open(path, 'rb') as f: tasks.append(gracedb.upload.si( f.read(), f'LALInference.{filename}', superevent_id, message, 'pe' )) canvas |= group(tasks) # psd tasks = [] for path in _find_paths_from_name(rundir, 'glitch_median_PSD_forLI_*.dat'): with open(path, 'r') as f: tasks.append(gracedb.upload.si( f.read(), os.path.basename(path), superevent_id, 'Bayeswave PSD used for LALInference PE', 'pe' )) canvas |= group(tasks) # dag tasks = [] for path in _find_paths_from_name(rundir, 'lalinference*.dag'): with open(path, 'r') as f: tasks.append(gracedb.upload.si( f.read(), os.path.basename(path), superevent_id, 'LALInference DAG', 'pe' )) canvas |= group(tasks) # link to results page tasks = [] for path in _find_paths_from_name(pe_results_path, 'posplots.html'): baseurl = urllib.parse.urljoin( app.conf['pe_results_url'], os.path.relpath(path, app.conf['pe_results_path']) ) tasks.append(gracedb.upload.si( None, None, superevent_id, 'Online lalinference parameter estimation finished. ' f'<a href={baseurl}>results</a>' )) canvas |= group(tasks) return canvas def _upload_tasks_bilby(rundir, superevent_id, mode): """Return canvas of tasks to upload Bilby results Parameters ---------- rundir : str The path to a run directory superevent_id : str The GraceDB ID of a target superevent mode : str Analysis mode Returns ------- tasks : canvas The work-flow for uploading Bilby results Notes ----- Some large bilby data products are cleaned up after posteterior file is uploaded if the gracedb host is different from `gracedb.ligo.org`. """ # convert bilby sample file into one compatible with ligo-skymap samples_dir = os.path.join(rundir, 'final_result') if mode == 'production': samples_filename = 'Bilby.posterior_samples.hdf5' else: samples_filename = f'Bilby.{mode}.posterior_samples.hdf5' out_samples = os.path.join(samples_dir, samples_filename) in_samples, = glob.glob(os.path.join(samples_dir, '*result.hdf5')) subprocess.run( ['bilby_pipe_to_ligo_skymap_samples', in_samples, '--out', out_samples] ) with open(out_samples, 'rb') as f: canvas = gracedb.upload.si( f.read(), samples_filename, superevent_id, f'{mode}-mode Bilby posterior samples', 'pe') if app.conf['gracedb_host'] != 'gracedb.ligo.org': canvas |= _clean_up_bilby.si(rundir) # pesummary pesummary_kwargs = {} path_to_ini, = glob.glob(os.path.join(rundir, "*_complete.ini")) pesummary_kwargs["config"] = path_to_ini config_parser = BilbyConfigFileParser() with open(path_to_ini, "r") as f: config_content, _, _, _ = config_parser.parse(f) pesummary_kwargs["psd"] = convert_string_to_dict( config_content["psd-dict"] ) pesummary_kwargs["calibration"] = convert_string_to_dict( config_content["spline-calibration-envelope-dict"] ) pesummary_kwargs["approximant"] = config_content["waveform-approximant"] pesummary_kwargs["f_low"] = config_content["minimum-frequency"] pesummary_kwargs["f_ref"] = config_content["reference-frequency"] pesummary_kwargs["label"] = "online" webdir = os.path.join(config_content["webdir"], 'pesummary') url = urllib.parse.urljoin( app.conf['pe_results_url'], os.path.relpath( os.path.join(webdir, 'home.html'), app.conf['pe_results_path'] ) ) canvas = group( canvas, _pesummary_task(webdir, in_samples, **pesummary_kwargs) | gracedb.upload.si( None, None, superevent_id, f'PESummary page for {mode}-mode Bilby is available ' f'<a href={url}>here</a>', 'pe' ) ) return canvas def _upload_tasks_rapidpe(rundir, superevent_id): summary_path = os.path.join(rundir, "summary") url = urllib.parse.urljoin( app.conf['pe_results_url'], os.path.join(superevent_id, 'rapidpe', 'summarypage.html') ) canvas = gracedb.upload.si( None, None, superevent_id, f'Summary page for RapidPE-RIFT is available <a href={url}>here</a>', ('pe',)) to_upload = [ ( "p_astro.json", "RapidPE_RIFT.p_astro.json", "RapidPE-RIFT Pastro results", ("pe", "p_astro", "public"), ), ] tasks = [] for src_basename, dst_filename, description, tags in to_upload: src_filename = os.path.join(summary_path, src_basename) if os.path.isfile(src_filename): with open(src_filename, "rb") as f: tasks.append( gracedb.upload.si( f.read(), dst_filename, superevent_id, description, tags)) canvas |= group(tasks) return canvas
[docs]@app.task(ignore_result=True, shared=False) def dag_finished(rundir, superevent_id, pe_pipeline, **kwargs): """Upload PE results Parameters ---------- rundir : str The path to a run directory where the DAG file exits superevent_id : str The GraceDB ID of a target superevent pe_pipeline : str The parameter estimation pipeline used, lalinference, bilby, or rapidpe. """ if pe_pipeline == 'lalinference': canvas = _upload_tasks_lalinference(rundir, superevent_id) elif pe_pipeline == 'bilby': canvas = _upload_tasks_bilby( rundir, superevent_id, kwargs['bilby_mode']) elif pe_pipeline == 'rapidpe': canvas = _upload_tasks_rapidpe(rundir, superevent_id) else: raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') canvas.delay() # NOTE: check if this should include rapidpe as well if pe_pipeline == 'bilby': gracedb.create_label.delay('PE_READY', superevent_id)
def _pesummary_task(webdir, samples, **pesummary_kwargs): """Return a celery task to submit a pesummary condor job. Parameters ---------- webdir : str output directory samples : str path to posterior sample file **pesummary_kwargs Extra arguments of summarypages Returns ------- celery task Notes ----- `--disable_interactive --disable_expert` are added and `--redshift_method exact --evolve_spins_forwards` are not added to `summarypages` arguments when the gracedb host is different from `gracedb.ligo.org`. Condor queue is set to `Online_PE` if gracedb host is `gracedb.ligo.org`, and `Online_PE_MDC` otherwise. """ args = [ "summarypages", "--webdir", webdir, "--samples", samples, "--gw", "--no_ligo_skymap", "--multi_process", "6" ] for key in pesummary_kwargs: if key in ["psd", "calibration"]: args += [f"--{key}"] for ifo in pesummary_kwargs[key]: args += [f'{ifo}:{pesummary_kwargs[key][ifo]}'] else: args += [f"--{key}", pesummary_kwargs[key]] condor_kwargs = dict( request_memory=16000, request_disk=5000, request_cpus=6, accounting_group_user='soichiro.morisaki' ) if app.conf['gracedb_host'] != 'gracedb.ligo.org': condor_kwargs['accounting_group'] = 'ligo.dev.o4.cbc.pe.bilby' condor_kwargs['requirements'] = '((TARGET.Online_PE_MDC =?= True))' condor_kwargs['+Online_PE_MDC'] = True args += ["--disable_interactive", "--disable_expert"] else: condor_kwargs['accounting_group'] = 'ligo.prod.o4.cbc.pe.bilby' condor_kwargs['requirements'] = '((TARGET.Online_PE =?= True))' condor_kwargs['+Online_PE'] = True args += ["--redshift_method", "exact", "--evolve_spins_forwards"] return condor.check_output.si(args, **condor_kwargs) # Modified version of condor.submit task with retry kwargs overridden with # RapidPE-specific settings. submit_rapidpe = app.task( **condor.submit_kwargs, **app.conf['rapidpe_condor_retry_kwargs'], )(condor.submit.run)
[docs]@app.task(ignore_result=True, shared=False) def start_pe(event, superevent_id, pe_pipeline): """Run Parameter Estimation on a given event. Parameters ---------- event : dict The json contents of a target G event retrieved from gracedb.get_event(), whose mass and spin information are used to determine analysis settings. superevent_id : str The GraceDB ID of a target superevent pe_pipeline : str The parameter estimation pipeline used, lalinference, bilby, or rapidpe. """ # make an event directory pipeline_dir = os.path.expanduser('~/.cache/{}'.format(pe_pipeline)) mkpath(pipeline_dir) event_dir = os.path.join(pipeline_dir, superevent_id) if pe_pipeline == 'bilby': if ( app.conf['gracedb_host'] == 'gracedb-playground.ligo.org' and event['extra_attributes']['CoincInspiral']['mchirp'] >= 12 ): # Count the number of BBH jobs and do not start a run if it exceeds # 5 so that we do not use up disk space. We assume that the job is # running if a data dump pickle file exists under the run # directory, which is the largest file produced by PE and removed # when the run completes. number_of_bbh_running = 0 for p in glob.glob( os.path.join( pipeline_dir, "*/*/data/*_generation_data_dump.pickle" ) ): path_to_ev = os.path.join(os.path.dirname(p), "../event.json") if os.path.exists(path_to_ev): with open(path_to_ev, "r") as f: ev = json.load(f) mc = ev['extra_attributes']['CoincInspiral']['mchirp'] if mc >= 12: number_of_bbh_running += 1 if number_of_bbh_running > 5: gracedb.upload.delay( filecontents=None, filename=None, graceid=superevent_id, message='Parameter estimation will not start to save disk ' f'space (There are {number_of_bbh_running} BBH ' 'jobs running).', tags='pe' ) return modes = ["production"] rundirs = [os.path.join(event_dir, m) for m in modes] kwargs_list = [{'bilby_mode': m} for m in modes] analyses = [f'{m}-mode bilby' for m in modes] condor_submit_task = condor.submit elif pe_pipeline == 'rapidpe': rundirs = [event_dir] kwargs_list = [{'event_pipeline': event["pipeline"]}] analyses = [pe_pipeline] condor_submit_task = submit_rapidpe else: rundirs = [event_dir] kwargs_list = [{}] analyses = [pe_pipeline] condor_submit_task = condor.submit os.mkdir(event_dir) for rundir, kwargs, analysis in zip(rundirs, kwargs_list, analyses): mkpath(rundir) gracedb.upload.delay( filecontents=None, filename=None, graceid=superevent_id, message=(f'Starting {analysis} parameter estimation ' f'for {event["graceid"]}'), tags='pe' ) ( dag_prepare_task( rundir, event, superevent_id, pe_pipeline, **kwargs ) | condor_submit_task.s().on_error( job_error_notification.s(superevent_id, rundir, analysis) ) | dag_finished.si(rundir, superevent_id, pe_pipeline, **kwargs) ).delay()