Source code for gwcelery.voevent.util
"""VOEvent-related utilities."""
import ipaddress
import socket
[docs]def get_host_port(address):
"""Split a network address of the form ``host:port``.
Parameters
----------
network : str
The network address.
Returns
-------
host : str
The hostname, or an empty string if missing.
port : int, None
The port number, or None if missing.
"""
host, _, port = address.partition(':')
return host, (int(port) if port else None)
[docs]def get_local_ivo(app):
"""Create an IVOID to identify this application in VOEvent Transport
Protocol packets.
Returns
-------
str
A local IVOID composed of the machine's fully qualified domain name and
the Celery application name (for example,
`ivo://emfollow.ligo.caltech.edu/gwcelery`).
"""
return 'ivo://{}/{}'.format(socket.getfqdn(), app.main)
[docs]def get_network(address):
"""Find the IP network prefix for a hostname or CIDR notation.
Parameters
----------
address : str
A hostname, such as ``ligo.org``, or an IP address prefix in CIDR
notation, such as ``127.0.0.0/8``.
Returns
-------
ipaddress.IPv4Network
An object representing the IP address prefix.
"""
try:
net = ipaddress.ip_network(address, strict=False)
except ValueError:
net = ipaddress.ip_network(socket.gethostbyname(address), strict=False)
return net