Django中使用celery执行定时和周期任务
定时和周期任务在我们日常开发中是经常会用到的两个功能,比如你每天9点某系统发给你前一天的账单,又比如你预约的某场会议还有半小时就要开始了,系统自动给你发送了一个提醒,这都是今天我们所要实现的。
Celery的工作原理
Celery是一个高效的基于分布式消息传递的作业队列。它主要通过消息(messages)传递任务,通常使用一个叫Broker(中间人)来协调client(任务的发出者)和worker(任务的处理者)。 clients发出消息到队列中,broker将队列中的信息派发给 Celery worker来处理。Celery本身不提供消息服务,它支持的消息服务(Broker)有RabbitMQ
和Redis
。
安装项目依赖文件
我们这里使用Redis
做消息队列的broker
,所以还需要安装redis
。另外如果你要设置定时或周期性任务,还需要安装django-celery-beat
。
此处也没有特意去装某个版本,可以看到我django
都4版本的了,直接没指定版本,拉得都是最新的,个人有要求就版本要求去装即可。
1 | celery 5.2.7 |
Celery配置
在正式使用celery
和django-celery-beat
之前,你需要做基础的配置。假如你的Django
项目文件夹布局如下所示,你首先需要在myproject/myproject
目录下新增celery.py
并修改__init__.py
。
1 | - myproject/ |
新建celery.py
,添加如下代码:
1 | import os |
修改__init__.py
,如下所示:
1 | from .celery import app as celery_app |
接下来修改Django
项目的settings.py
,先将celery_beat和celery_results
加到install_app
中
1 | INSTALLED_APPS = [ |
然后添加Celery有关配置选项,如下所示:
1 | # 最重要的配置,设置消息broker,格式为:db://user:password@host:port/dbname |
其它Celery常用配置选项包括:
1 | # 为django_celery_results存储Celery任务执行结果设置后台 |
完整配置选项见:
- https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_expires
注意:
- 在
Django
中正式编写和执行自己的异步任务前,一定要先测试redis
和celery
是否安装好并配置成功。 - 一个无限期阻塞的任务会使得工作单元无法再做其他事情,建议给任务设置超时时间。
测试Celery是否工作正常
首先你要启动redis
服务。windows进入redis
所在目录(比如C:\redis
),使用redis-server.exe
启动redis
。Linux下使用./redis-server redis.conf
启动,也可修改redis.conf
将daemonize
设置为yes, 确保守护进程开启。
启动redis
服务后,你要先进入项目所在文件夹运行python manage.py runserver
命令启动Django
服务器(无需创建任何app
),然后再打开一个终端terminal窗口输入celery命令,启动worker。
1 | Linux下测试,启动Celery |
如果你能看到[tasks]下所列异步任务清单如debug_task
,以及最后一句celery@xxxx ready, 说明你的redis和celery都配置好了,可以开始正式工作了。
1 |
|
编写任务
celery
配置完成后,我们就可以编写任务了。Django
项目中所有需要Celery执行的异步或周期性任务都放在tasks.py
文件里,该文件可以位于project目录下,也可以位于各个app
的目录下。专属于某个Celery实例化项目的task可以使用@app.task
装饰器定义,各个app
目录下可以复用的task建议使用@shared_task
定义。
两个示例如下所示:
1 | # myproject/tasks.py |
上面我们定义一个名为add
的任务,它接收两个参数,并返回计算结果。为了模拟耗时任务,我们中途让其sleep 2秒。现在已经定义了一个耗时任务,我们希望在Django
的视图或其它地方中以异步方式调用执行它,应该怎么做呢? 下面我们将给出答案。
注意:
- 使用celery定义任务时,避免在一个任务中调用另一个异步任务,容易造成阻塞。
- 当我们使用
@app.task
装饰器定义我们的异步任务时,那么这个任务依赖于根据项目名myproject
生成的Celery实例。然而我们在进行Django
开发时为了保证每个app
的可重用性,我们经常会在每个app
文件夹下编写异步任务,这些任务并不依赖于具体的Django
项目名。使用@shared_task
装饰器能让我们避免对某个项目名对应Celery实例的依赖,使app
的可移植性更强。
异步调用任务
Celery提供了2种以异步方式调用任务的方法,delay
和apply_async
方法,如下所示:
1 | # 方法一:delay方法 |
我们接下来看一个具体的例子。我们编写了一个Django
视图函数,使用delay
方法调用add
任务。
1 | # app/views.py |
当你通过浏览器访问/test/链接时,你根本感受不到2s
的延迟,页面可以秒开,同时你会发现终端的输出如下所示,显示任务执行成功。
我们现在再次使用apply_async
方法调用add
任务,不过还要打印初任务的id (task.id
)和状态status。Celery会为每个加入到队列的任务分配一个独一无二的uuid
, 你可以通过task.status
获取状态和task.result
获取结果。注意:apply_async
传递参数的方式与delay方法不同。
1 | # app/views.py |
Django
返回响应结果如下所示。这是在预期之内的,因为Django
返回响应时任务还未执行完毕。
那么问题来了,这个异步任务执行了,返回了个计算结果(8),那么我们系统性地了解任务状态并获取这个执行结果呢? 答案是django-celery-results
。
查看任务执行状态及结果
通过pip安装django-celery-results
后,需要将其加入到INSTALLED_APPS
并使用migrate
命令迁移创建数据表。以下几项配置选项是与这个库相关的。
1 | # 支持数据库django-db和缓存django-cache存储任务状态及结果 |
安装配置完成后,进入Django admin
后台,你就可以详细看到每个任务的id、名称及状态。
点击单个任务id,你可以看到有关这个任务的更多信息,比如传递的参数和返回结果,如下所示:
除了在Django admin
后台中查看任务状态和结果,你还可以在视图中通过AsyncResult
方法获取任务执行状态和结果,它需要接收一个任务的task_id
(通常为uuid
格式)。
1 | from celery.result import AsyncResult |
设置定时和周期性任务
借助于装django-celery-beat
后, 你可以将任一Celery任务设置为定时任务或周期性任务。使用它你只需要通过pip安装它,并加入INSTALLED_APPS
里去。
django-celery-beat
提供了两种添加定时或周期性任务的方式,一是直接在settings.py
中添加,二是通过Django admin
后台添加。
配置文件添加任务
同一任务可以设置成不同的调用周期,给它们不同的任务名就好了,以下代码添加至settings文件中
1 | from datetime import timedelta |
Django Admin
添加周期性任务
先在settings.py
中将任务调度器设为DatabaseScheduler
1 | CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' |
然后进入Periodic Task表添加和修改周期性任务即可。
通过Crontab
设置定时任务
如果你希望在特定的时间(某月某周或某天)执行一个任务,你可以通过crontab
设置定时任务,如下例所示:
1 | CELERY_BEAT_SCHEDULE = { |
更多Crontab定义案例如下所示:
例子 | 含义 |
---|---|
crontab() | 每分 |
crontab(minute=0, hour=0) | 每天午夜 |
crontab(minute=0, hour='*/3') | 能被3整除的小时数,3,6,9点等等 |
crontab(minute=0,``hour='0,3,6,9,12,15,18,21') | 与前面相同,指定小时 |
crontab(minute='*/15') | 每15分钟 |
crontab(day_of_week='sunday') | 星期日每分钟 |
crontab(minute='*',``hour='*', day_of_week='sun') | 同上 |
crontab(minute='*/10',``hour='3,17,22', day_of_week='thu,fri') | 每10分钟运行一次, 但仅限于周四或周五的 3-4 am, 5-6 pm, 和10-11 pm. |
crontab(minute=0, hour='*/2,*/3') | 可以被2或3整除的小时数,除了 1am , 5am , 7am , 11am , 1pm , 5pm , 7pm , 11pm |
crontab(minute=0, hour='*/5') | 可以被5整除的小时 |
crontab(minute=0, hour='*/3,8-17') | 8am-5pm 之间可以被3整除的小时 |
crontab(0, 0, day_of_month='2') | 每个月的第2天 |
crontab(0, 0,``day_of_month='2-30/2') | 每月的偶数日 |
crontab(0, 0,``day_of_month='1-7,15-21') | 每月的第一和第三周 |
crontab(0, 0, day_of_month='11',``month_of_year='5') | 每年的5月11日 |
crontab(0, 0,``month_of_year='*/3') | 每个季度首个月份每天 |
Crontab
也可以通过Django Admin
添加,然后与任务进行绑定。
如果你变换了时区timezone,比如从’UTC’变成了’Asia/Shanghai’,需重置周期性任务,这非常重要。
1 | 调整timezone后重置任务 |
前面我们只是添加了定时或周期性任务,我们还需要启动任务调度器beat分发定时和周期任务给Celery的worker。
启动任务调度器beat
多开几个终端,一个用来启动任务调度器beat,另一个启动celery worker,你的任务就可以在后台执行啦。
1 | Celery -A myproject beat |
使用模型动态添加定时任务
定时任务和周期任务设置类似,上面虽然讲的是周期任务,但是定时任务也同样的设置方式,但是可以看到是有明显不灵活的。
以上任务都是在事先明确的情况下添加的,不管是从django-admin还是settings里配置,都是提前知道这个任务的运行时间,这样才能去手动配置好。
如最开始我讲的场景,我的系统里设计了一个会议模型,我的需求是会议开始前30分钟给所有报名这个会议的用户发一个会议即将开始的消息提醒,那你告诉我我这么多会议,每个会议时间都不一致,你不可能每次添加完一个会议就去settings或者django-admin里去手动把这个任务加上吧,稍微想下其实就明白了了,大家从django-admin里已经看到了任务列表、crontab列表、任务结果列表,很明显是通过model做到的,那我们创建会议的时候,同步添加一个任务到celery任务列表中不就行了,干……
任务文件app.tasks.py
1 |
|
视图文件app.views.py
1 | def create_metting(request): |
确保该视图已添加至路由
将该视图添加至路由,然后添加会议,观察django-admin
中的crontab
和periodic task
,都多了一条记录,celery worker和celery beat跟之前一样都是启动的状态,等到会议前30分钟,再观察task result,可以看到,也多了一条任务成功的记录,至此定时任务已完成。
Celery高级用法与注意事项
给任务设置最大重试次数
定义任务时可以通过max_retries
设置最大重试次数,并调用self.retry
方法调用。因为要调用self
这个参数,定义任务时必须设置bind=True
。
1 |
|
不同任务交由不同Queue处理
不同的任务所需要的资源和时间不一样的。为了防止一些非常占用资源或耗时的任务阻塞任务队列导致一些简单任务也无法执行,可以将不同任务交由不同的Queue处理。下例定义了两个Queue队列,default执行普通任务,heavy_tasks执行重型任务。
1 | CELERY_TASK_DEFAULT_QUEUE = 'default' |
忽略不想要的结果
如果你不在意任务的返回结果,可以设置 ignore_result
选项,因为存储结果耗费时间和资源。你还可以可以通过 task_ignore_result
设置全局忽略任务结果。
1 |
|
避免启动同步子任务
让一个任务等待另外一个任务的返回结果是很低效的,并且如果工作单元池被耗尽的话这将会导致死锁。
1 | # 话例子 |
# 好例子
1 | def update_page_info(url): |
在好例子里,我们将不同的任务签名链接起来创建一个任务链,三个子任务按顺序执行。
Django
的模型对象不应该作为参数传递
Django
的模型对象不应该作为参数传递给任务。几乎总是在任务运行时从数据库获取对象是最好的,因为老的数据会导致竞态条件。假象有这样一个场景,你有一篇文章,以及自动展开文章中缩写的任务:
1 | class Article(models.Model): |
首先,作者创建一篇文章并保存,这时作者点击一个按钮初始化一个缩写展开任务:
1 | id=102) article = Article.objects.get( |
现在,队列非常忙,所以任务在2分钟内都不会运行。与此同时,另一个作者修改了这篇文章,当这个任务最终运行,因为老版本的文章作为参数传递给了这个任务,所以这篇文章会回滚到老的版本。修复这个竞态条件很简单,只要参数传递文章的 id 即可,此时可以在任务中重新获取这篇文章:
1 |
|
使用on_commit函数处理事务
我们再看另外一个celery中处理事务的例子。这是在数据库中创建一个文章对象的 Django
视图,此时传递主键给任务。它使用 commit_on_success
装饰器,当视图返回时该事务会被提交,当视图抛出异常时会进行回滚。
1 | from django.db import transaction |
如果在事务提交之前任务已经开始执行会产生一个竞态条件;数据库对象还不存在。解决方案是使用 on_commit
回调函数来在所有事务提交成功后启动任务。
1 | from django.db.transaction import on_commit |
本文引自:https://pythondjango.cn/django/advanced/12-sync-periodic-tasks-with-celery/
内容上对于定时任务做了补充,如果牵涉到版权,请联系本人删除。