Source code for gwcelery.email.bootsteps

from threading import Thread

from celery import bootsteps
from celery.utils.log import get_logger
from safe_netrc import netrc

from .signals import email_received

__all__ = ('Receiver',)

log = get_logger(__name__)


class EmailBootStep(bootsteps.ConsumerStep):
    """Generic boot step to limit us to appropriate kinds of workers.

    Only include this bootstep in workers that are started with the
    ``--email`` command line option.
    """

    def __init__(self, consumer, email=False, **kwargs):
        self.enabled = bool(email)

    def start(self, consumer):
        log.info('Starting %s', self.name)

    def stop(self, consumer):
        log.info('Stopping %s', self.name)


[docs]class Receiver(EmailBootStep): """Run the global email receiver in background thread.""" name = 'email client' def _runloop(self): from imapclient import IMAPClient from imapclient.exceptions import IMAPClientAbortError username, _, password = netrc().authenticators(self._host) while self._running: try: log.debug('Starting new connection') with IMAPClient(self._host, use_uid=True, timeout=30) as conn: log.debug('Logging in') conn.login(username, password) log.debug('Selecting inbox') conn.select_folder('inbox') while self._running: log.debug('Searching for new messages') messages = conn.search() log.debug('Fetching new messages') for msgid, data in conn.fetch( messages, ['RFC822']).items(): log.debug('Sending signal for new email') email_received.send(None, rfc822=data[b'RFC822']) log.debug('Deleting email') conn.delete_messages(msgid) log.debug('Starting idle') conn.idle() # Stay in IDLE mode for at most 5 minutes. # According to the imapclient documentation: # # > Note that IMAPClient does not handle low-level # > socket errors that can happen when maintaining # > long-lived TCP connections. Users are advised to # > renew the IDLE command every 10 minutes to avoid # > the connection from being abruptly closed. for _ in range(60): if not self._running or conn.idle_check(timeout=5): break log.debug('Idle done') conn.idle_done() except IMAPClientAbortError: log.exception('IMAP connection aborted') except ConnectionResetError: log.exception('IMAP connection reset')
[docs] def start(self, consumer): super().start(consumer) self._host = consumer.app.conf['email_host'] self._running = True self._thread = Thread(target=self._runloop, name='EmailClientThread') self._thread.start()
[docs] def stop(self, consumer): super().stop(consumer) self._running = False self._thread.join()