基于 taskiq 的异步任务队列
插件自动读取项目配置,优先使用 TASKIQ_BROKER,否则使用 CELERY_BROKER:
# Redis
TASKIQ_BROKER = 'redis' # 或 CELERY_BROKER
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = ''
CELERY_BROKER_REDIS_DATABASE = 0
# RabbitMQ
TASKIQ_BROKER = 'rabbitmq' # 或 CELERY_BROKER
CELERY_RABBITMQ_HOST = 'localhost'
CELERY_RABBITMQ_PORT = 5672
CELERY_RABBITMQ_USERNAME = 'guest'
CELERY_RABBITMQ_PASSWORD = 'guest'
CELERY_RABBITMQ_VHOST = '/'在 backend/core/registrar.py 的 register_init 函数中添加:
from backend.plugin.task.broker import taskiq_broker
async def register_init(app):
await taskiq_broker.startup()
yield
await taskiq_broker.shutdown()taskiq worker backend.plugin.task.broker:taskiq_broker -fsdtaskiq scheduler backend.plugin.task.scheduler:taskiq_scheduler -fsd在 backend/plugin/task/tasks/beat.py 中定义定时任务:
@taskiq_broker.task(task_name='my_task', schedule=[{'cron': '*/5 * * * *'}])
async def my_task() -> str:
return 'done'