| from datetime import timedelta |
|
|
| from celery import Celery, Task |
| from celery.schedules import crontab |
| from flask import Flask |
|
|
| from configs import dify_config |
|
|
|
|
| def init_app(app: Flask) -> Celery: |
| class FlaskTask(Task): |
| def __call__(self, *args: object, **kwargs: object) -> object: |
| with app.app_context(): |
| return self.run(*args, **kwargs) |
|
|
| broker_transport_options = {} |
|
|
| if dify_config.CELERY_USE_SENTINEL: |
| broker_transport_options = { |
| "master_name": dify_config.CELERY_SENTINEL_MASTER_NAME, |
| "sentinel_kwargs": { |
| "socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT, |
| }, |
| } |
|
|
| celery_app = Celery( |
| app.name, |
| task_cls=FlaskTask, |
| broker=dify_config.CELERY_BROKER_URL, |
| backend=dify_config.CELERY_BACKEND, |
| task_ignore_result=True, |
| ) |
|
|
| |
| ssl_options = { |
| "ssl_cert_reqs": None, |
| "ssl_ca_certs": None, |
| "ssl_certfile": None, |
| "ssl_keyfile": None, |
| } |
|
|
| celery_app.conf.update( |
| result_backend=dify_config.CELERY_RESULT_BACKEND, |
| broker_transport_options=broker_transport_options, |
| broker_connection_retry_on_startup=True, |
| ) |
|
|
| if dify_config.BROKER_USE_SSL: |
| celery_app.conf.update( |
| broker_use_ssl=ssl_options, |
| ) |
|
|
| celery_app.set_default() |
| app.extensions["celery"] = celery_app |
|
|
| imports = [ |
| "schedule.clean_embedding_cache_task", |
| "schedule.clean_unused_datasets_task", |
| "schedule.create_tidb_serverless_task", |
| "schedule.update_tidb_serverless_status_task", |
| ] |
| day = dify_config.CELERY_BEAT_SCHEDULER_TIME |
| beat_schedule = { |
| "clean_embedding_cache_task": { |
| "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task", |
| "schedule": timedelta(days=day), |
| }, |
| "clean_unused_datasets_task": { |
| "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task", |
| "schedule": timedelta(days=day), |
| }, |
| "create_tidb_serverless_task": { |
| "task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task", |
| "schedule": crontab(minute="0", hour="*"), |
| }, |
| "update_tidb_serverless_status_task": { |
| "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task", |
| "schedule": crontab(minute="30", hour="*"), |
| }, |
| } |
| celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) |
|
|
| return celery_app |
|
|