#
# Copyright (C) 2018-2020 Leo P. Singer <leo.singer@ligo.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import asyncio
import getpass
import logging
import uuid
import pkg_resources
from safe_netrc import netrc, NetrcParseError
import slixmpp
from ._version import get_versions
__all__ = ('LVAlertClient',)
__version__ = get_versions()['version']
del get_versions
log = logging.getLogger(__name__)
DEFAULT_SERVER = 'lvalert.cgca.uwm.edu'
def _get_default_login(netrcfile, server):
try:
netrcfile = netrc(netrcfile)
except (OSError, NetrcParseError):
log.exception('Cannot load netrc file: %s', netrcfile)
return None, None
auth = netrcfile.authenticators(server)
if auth is None:
log.warn('No netrc entry found for server: %s', server)
return None, None
default_username, _, default_password = auth
return default_username, default_password
def _get_login(username, password, netrcfile, interactive, server):
default_username, default_password = _get_default_login(netrcfile, server)
prompt = 'password for {}@{}: '.format(username, server)
if username is not None and password is not None:
return username, password
elif username is None and default_username is None:
raise RuntimeError('Username not specified')
elif username is None or username == default_username:
return default_username, default_password
elif interactive:
return username, getpass.getpass(prompt)
else:
raise RuntimeError('Password not specified')
[docs]class LVAlertClient(slixmpp.ClientXMPP):
"""An XMPP client configured for LVAlert.
Parameters
----------
username : str, optional
The XMPP username, or :obj:`None` to look up from the netrc_ file.
password : str, optional
The XMPP password, or :obj:`None` to look up from the netrc_ file.
resource : str, optional
The XMPP resource ID, or :obj:`None` to generate a random one.
netrc : str, optional
The netrc_ file. The default is to consult the ``NETRC`` environment
variable or use the default path of ``~/.netrc``.
interactive : bool, optional
If :obj:`True`, then fall back to asking for the password on the
command line if necessary.
server : str, optional
The LVAlert server hostname.
Example
-------
Usage of the :class:`LVAlertClient` class typically has three phases:
1. Create a client instance. Pass any desired connection options (server,
username, password) to the constructor.
2. Configure a pubsub listener by calling the :meth:`listen` method and
one or more event handlers by calling the
:meth:`~slixmpp.xmlstream.xmlstream.XMLStream.add_event_handler`
method.
3. Start the client run loop by calling the :meth:`start` method. The
client run loop continues processing until it is interrupted by a
:exc:`KeyboardInterrupt` or a call to the :meth:`stop` method either
from one of the event handlers or from another thread.
The simplest use case is a client that runs a callback for each LVAlert
message that is received:
.. code-block:: python
def process_alert(node, payload):
if node == 'cbc_gstlal':
alert = json.loads(payload)
print(alert)
client = LVAlertClient()
client.listen(process_alert)
client.start() # Runs until interrupted with Ctrl-C
Typically, if you want to call one of the administrative methods
(:meth:`get_nodes`, :meth:`get_subscriptions`, :meth:`subscribe`, or
:meth:`unsubscribe`), you will add them to a callback for the
``session_start`` event. Since these four methods are :term:`coroutines
<coroutine>`, the callback should be defined using the ``async``/``await``
syntax:
.. code-block:: python
client = LVAlertClient()
async def callback(*args):
subscriptions = await client.get_subscriptions()
print('Subscribed to:', subscriptions)
client.add_event_handler('session_start', callback)
client.start() # Runs until interrupted with Ctrl-C
To register a *single-shot* callback, pass ``disposable=True`` to
:meth:`~slixmpp.xmlstream.xmlstream.XMLStream.add_event_handler`. This is
most useful if you want to perform some action once, then immediately
disconnect and stop:
.. code-block:: python
client = LVAlertClient()
async def callback(*args):
await client.subscribe('cbc_gstlal', 'cbc_pycbc')
client.stop()
client.add_event_handler('session_start', callback, disposable=True)
client.start() # Stops after callback reaches client.stop()
"""
def __init__(self, username=None, password=None, resource=None, netrc=None,
interactive=False, server=DEFAULT_SERVER):
username, password = _get_login(
username, password, netrc, interactive, server)
if resource is None:
resource = uuid.uuid4().hex
jid = '{}@{}/{}'.format(username, server, resource)
super().__init__(jid, password)
self.register_plugin('xep_0060') # Activate PubSub plugin
self.add_event_handler("session_start", self._session_start)
self.ca_certs = pkg_resources.resource_filename(__name__, 'certs.pem')
self._stopped = None
async def _session_start(self, event):
self.send_presence()
await self.get_roster()
[docs] def listen(self, callback):
"""Set a callback to be executed for each pubsub item received.
Parameters
----------
callback : callable
A function of two arguments: the node and the alert payload.
"""
self._callback = callback
self.add_event_handler('pubsub_publish', self._pubsub_publish)
[docs] def start(self):
"""Run the client until :meth:`stop` is called.
Establish a connection, process all events, and run all event handlers,
until :meth:`stop` is called or the current thread is interrupted
(e.g., by a :exc:`KeyboardInterrupt`).
If the connection is ever dropped, it is re-established automatically.
Once processing stops, the connection is closed cleanly before this
method returns.
"""
self._stopped = self.loop.create_future()
self.init_plugins()
self.connect()
try:
self.loop.run_until_complete(self._stopped)
finally:
self.disconnect()
self.loop.run_until_complete(self.disconnected)
def _stop(self):
if self._stopped is not None:
self._stopped.set_result(True)
self._stopped = None
[docs] def stop(self):
"""Stop the client.
If the client has been started by calling :meth:`start`, then
:meth:`start` will return and the connection will be closed.
Notes
-----
This method is thread safe, so you can use it to stop the client from
another thread. For example:
.. code-block:: python
from threading import Thread
from time import sleep
client = LVAlertClient()
def wait_then_stop():
sleep(5)
client.stop()
Thread(target=wait_then_stop).start()
client.start()
"""
self.loop.call_soon_threadsafe(self._stop)
def _pubsub_publish(self, msg):
node = msg['pubsub_event']['items']['node']
text = msg['pubsub_event']['items']['item']['payload'].text
try:
self._callback(node, text)
except: # noqa: E722
log.exception('Exception occurred in callback')
@property
def _pubsub_server(self):
return 'pubsub.{}'.format(self.boundjid.server)
[docs] async def get_nodes(self):
"""Get a list of all available pubsub nodes.
Returns
-------
list
A list of strings naming the available pubsub nodes
"""
result = await self['xep_0060'].get_nodes(self._pubsub_server)
return [item for _, item, _ in result['disco_items']['items']]
[docs] async def get_subscriptions(self):
"""Get a list of your subscriptions.
Returns
-------
list
A list of strings naming the subscribed pubsub nodes
"""
result = await self['xep_0060'].get_subscriptions(self._pubsub_server)
return sorted({stanza['node'] for stanza in
result['pubsub']['subscriptions']['substanzas']})
async def _subscribe(self, node):
await self['xep_0060'].subscribe(self._pubsub_server, node)
[docs] async def subscribe(self, *nodes):
"""Subscribe to one or more pubsub nodes.
Parameters
----------
*args : list
A list of strings naming the pubsub nodes to which to subscribe
"""
await asyncio.gather(*(self._subscribe(node) for node in nodes))
async def _unsubscribe(self, node):
subs = await self['xep_0060'].get_subscriptions(
self._pubsub_server, node)
subs = subs['pubsub']['subscriptions']['substanzas']
subids = [sub['subid'] for sub in subs]
await asyncio.gather(*(
self['xep_0060'].unsubscribe(self._pubsub_server, node, subid)
for subid in subids))
[docs] async def unsubscribe(self, *nodes):
"""Unsubscribe from one or more pubsub nodes.
Parameters
----------
*args : list
A list of strings naming the pubsub nodes from which to unsubscribe
"""
await asyncio.gather(*(self._unsubscribe(node) for node in nodes))