Source code for gwcelery.tasks.condor

"""Submit and monitor HTCondor jobs [1]_.

Notes
-----
Internally, we use the XML condor log format [2]_ for easier parsing.

References
----------
.. [1] http://research.cs.wisc.edu/htcondor/manual/latest/condor_submit.html
.. [2] http://research.cs.wisc.edu/htcondor/classad/refman/node3.html

"""
import os
import subprocess
import tempfile
from distutils.dir_util import mkpath

import lxml.etree

from .. import app


def _escape_arg(arg):
    """Escape a command line argument for an HTCondor submit file."""
    arg = arg.replace('"', '""').replace("'", "''")
    if ' ' in arg or '\t' in arg:
        arg = "'" + arg + "'"
    return arg


def _escape_args(args):
    """Escape a list of command line arguments for an HTCondor submit file."""
    return '"' + ' '.join(_escape_arg(arg) for arg in args) + '"'


def _mklog(suffix):
    """Create a unique path for an HTCondor log."""
    condor_dir = os.path.expanduser('~/.cache/condor')
    mkpath(condor_dir)
    with tempfile.NamedTemporaryFile(dir=condor_dir, suffix=suffix) as f:
        return f.name


def _read(filename):
    with open(filename, 'r') as f:
        return f.read()


def _rm_f(*args):
    for arg in args:
        try:
            os.remove(arg)
        except OSError:
            pass


def _parse_classad(c):
    """Turn a ClassAd XML fragment into a dictionary of Python values.

    Note that this supports only the small subset of the ClassAd XML
    syntax [2]_ that we need to determine if a job succeeded or failed.
    """
    if c is not None:
        for a in c.findall('a'):
            key = a.attrib['n']
            child, = a.getchildren()
            if child.tag == 's':
                value = str(child.text)
            elif child.tag == 'b':
                value = (child.attrib['v'] == 't')
            elif child.tag == 'i':
                value = int(child.text)
            else:
                # Coverage skipped below because the Python compiler optimzies
                # away ``continue`` statements.
                #
                # See <https://bitbucket.org/ned/coveragepy/issues/198>.
                continue  # pragma: no cover
            yield key, value


def _read_last_event(log):
    """Get the last event from an HTCondor log file.

    FIXME: It would be more efficient in terms of I/O and file desciptors to
    use a single HTCondor log file for all jobs and use the inotify
    capabilities of ``htcondor.read_events`` to avoid unnecessary polling.
    """
    tree = lxml.etree.fromstring('<classads>' + _read(log) + '</classads>')
    return dict(_parse_classad(tree.find('c[last()]')))


def _submit(submit_file=None, **kwargs):
    args = ['condor_submit']
    for key, value in kwargs.items():
        args += ['-append', '{}={}'.format(key, value)]
    if submit_file is None:
        args += ['/dev/null', '-queue', '1']
    else:
        args += [submit_file]
    subprocess.run(args, capture_output=True, check=True)


[docs]class JobAborted(Exception): """Raised if an HTCondor job was aborted (e.g. by ``condor_rm``)."""
[docs]class JobRunning(Exception): """Raised if an HTCondor job is still running."""
[docs]class JobFailed(subprocess.CalledProcessError): """Raised if an HTCondor job fails."""
submit_kwargs = dict( bind=True, autoretry_for=(JobRunning,), ignore_result=True, shared=False, )
[docs]@app.task( **submit_kwargs, **app.conf['condor_retry_kwargs'] ) def submit(self, submit_file, log=None): """Submit a job using HTCondor. Parameters ---------- submit_file : str Path of the submit file. log: str Used internally to track job state. Caller should not set. Raises ------ :class:`JobAborted` If the job was aborted (e.g. by running ``condor_rm``). :class:`JobFailed` If the job terminates and returns a nonzero exit code. :class:`JobRunning` If the job is still running. Causes the task to be re-queued until the job is complete. Example ------- >>> submit.s('example.sub', ... accounting_group='ligo.dev.o3.cbc.explore.test') """ if log is None: log = _mklog('.log') try: _submit(submit_file, log_xml='true', log=log) except subprocess.CalledProcessError: _rm_f(log) raise self.retry((submit_file,), dict(log=log)) else: event = _read_last_event(log) if event.get('MyType') == 'JobTerminatedEvent': _rm_f(log) if event['TerminatedNormally'] and event['ReturnValue'] != 0: raise JobFailed(event['ReturnValue'], (submit_file,)) elif event.get('MyType') == 'JobAbortedEvent': _rm_f(log) raise JobAborted(event) else: raise JobRunning(event)
[docs]@app.task(bind=True, autoretry_for=(JobRunning,), default_retry_delay=1, max_retries=None, retry_backoff=True, shared=False) def check_output(self, args, log=None, error=None, output=None, **kwargs): """Call a process using HTCondor. Call an external process using HTCondor, in a manner patterned after :meth:`subprocess.check_output`. If successful, returns its output on stdout. On failure, raise an exception. Parameters ---------- args : list Command line arguments, as if passed to :func:`subprocess.check_call`. log, error, output : str Used internally to track job state. Caller should not set. **kwargs Extra submit description file commands. See the documentation for ``condor_submit`` for possible values. Returns ------- str Captured output from command. Raises ------ :class:`JobAborted` If the job was aborted (e.g. by running ``condor_rm``). :class:`JobFailed` If the job terminates and returns a nonzero exit code. :class:`JobRunning` If the job is still running. Causes the task to be re-queued until the job is complete. Example ------- >>> check_output.s(['sleep', '10'], ... accounting_group='ligo.dev.o3.cbc.explore.test') """ # FIXME: Refactor to reuse common code from this task and # gwcelery.tasks.condor.submit. if log is None: log = _mklog('.log') error = _mklog('.err') output = _mklog('.out') kwargs = dict(kwargs, universe='vanilla', executable='/usr/bin/env', getenv='true', log_xml='true', arguments=_escape_args(args), log=log, error=error, output=output) try: _submit(**kwargs) except subprocess.CalledProcessError: _rm_f(log, error, output) raise self.retry((args,), kwargs) else: event = _read_last_event(log) if event.get('MyType') == 'JobTerminatedEvent': captured_error = _read(error) captured_output = _read(output) _rm_f(log, error, output) if event['TerminatedNormally'] and event['ReturnValue'] == 0: return captured_output else: raise JobFailed(event['ReturnValue'], args, captured_output, captured_error) elif event.get('MyType') == 'JobAbortedEvent': _rm_f(log, error, output) raise JobAborted(event) else: raise JobRunning(event)