from celery import Task class HookTask(Task): """ 任务钩子: 监控任务的执行 """ # 成功时执行 def on_success(self, ret_val, task_id, args, kwargs): print('ret_val', ret_val) print(f'task id:{task_id} , arg:{args} , successful !') # 失败时执行 def on_failure(self, exc, task_id, args, kwargs, error_info): print(f'task id:{task_id} , arg:{args} , failed ! errors: {exc}') # 任务重试时执行 def on_retry(self, exc, task_id, args, kwargs, error_info): print(f'task id:{task_id} , arg:{args} , retry ! errors: {exc}')