Source code for gwcelery.tools.flask
"""Flask web application for manually triggering certain tasks."""
from contextlib import ExitStack, nullcontext
import click
from celery.bin.base import LOG_LEVEL, CeleryDaemonCommand, CeleryOption
from celery.platforms import detached
from flask.cli import FlaskGroup
from .. import views as _ # noqa: F401
from ..flask import app
[docs]class CeleryDaemonFlaskGroup(FlaskGroup):
def __init__(self, *args, **kwargs):
super().__init__(self, *args, **kwargs)
self.params.extend(CeleryDaemonCommand(name='flask').params)
[docs]def maybe_detached(*, detach, **kwargs):
return detached(**kwargs) if detach else nullcontext()
@click.group(cls=CeleryDaemonFlaskGroup, help=__doc__,
context_settings={'auto_envvar_prefix': 'FLASK'})
@click.option('-D',
'--detach',
cls=CeleryOption,
is_flag=True,
default=False,
help="Start as a background process.")
@click.option('-l',
'--loglevel',
default='WARNING',
cls=CeleryOption,
type=LOG_LEVEL,
help="Logging level.")
@click.pass_context
def flask(ctx, detach=None, loglevel=None, logfile=None,
pidfile=None, uid=None, gid=None, umask=None, **kwargs):
# Look up Celery app
celery_app = ctx.parent.obj.app
# # Prepare to pass Flask app to Flask CLI
ctx.obj.create_app = lambda *args, **kwargs: app
# Detach from the tty if requested.
# FIXME: After we update to click >= 8, replace the elaborate construction
# below with ctx.with_resource(maybe_detached(...))
exit_stack = ExitStack()
ctx.call_on_close(exit_stack.close)
exit_stack.enter_context(maybe_detached(detach=detach,
logfile=logfile, pidfile=pidfile,
uid=uid, gid=gid, umask=umask))
celery_app.log.setup(loglevel, logfile)