python异步任务调度工具之Celery

CeleryDistributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。

Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。

​ 这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。Celery用于生产系统每天处理数以百万计的任务。Celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或DjangoORM) 。Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

Celery 介绍

Celery中几个基本的概念,需要先了解下,不然不知道为什么要安装下面的东西。概念:BrokerBackend

什么是broker

broker是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celeryworker将会取到消息,进行对于的程序执行。好吧,这个邮箱可以看成是一个消息队列。其中Broker的中文意思是 经纪人 ,其实就是一开始说的 消息队列 ,用来发送和接受消息。这个Broker有几个方案可供选择:RabbitMQ (消息队列),Redis(缓存数据库),关系型数据库(不推荐),等等

什么是backend

通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。Backend是在Celery的配置中的一个配置项 CELERY_RESULT_BACKEND ,作用是保存结果和状态,如果你需要跟踪任务的状态,那么需要设置这一项,可以是Database backend,也可以是Cache backend,具体可以参考这里: CELERY_RESULT_BACKEND

对于 brokers,官方推荐是 rabbitmqredis,至于 backend,就是数据库。为了简单可以都使用 redis

Celery初体验

创建worker

文件名:CeleryTask.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from celery import Celery
import time

redis_cli = "redis://127.0.0.1:6379"

# 创建一个Celery实例,这就是我们用户的应用app
my_task = Celery('TestTasks', broker = redis_cli, backend= redis_cli)

# 创建任务
@my_task.task
def func1(type):
time.sleep(20)
return f"我是func1的返回值,完成了任务【{type}】"

@my_task.task
def func2():
return "我是func2的返回值"

@my_task.task
def func3():
return "我是func3的返回值"

创建好了worker接下来我们要启动这个它,Celert4以上貌似不知道windows,所以我们需要安装一个扩展包eventletlinux不需要,启动时使用一下命令:

1
celery worker -A CeleryTask -l INFO -P eventlet

linux下不用加-P参数,以上便成功启动了。

发布任务

工头布置任务,文件名RunTask.py

1
2
3
4
5
from CeleryTask import func1, func2, func3

# 将任务交给我Celery的Worker执行会返回一个任务id
res = func1.delay("买菜")
print(res, type(res))

检查任务

我们检查一个任务是否完成,文件名:WatchTask.py

1
2
3
4
5
6
7
8
9
10
11
from celery.result import AsyncResult
from CeleryTask import my_task

# 异步获取任务返回值
res = AsyncResult(id = "c17ab9d7-b4d5-41ed-88b3-120f0c00ebf5", app=my_task)
print('结果:', res)

# 获取响应结果
print('get:', res.get())
# 任务是否成功完成
print('成功:', res.successful())

我们通过之前发布的任务id,来检查该任务的状态,运行检查任务代码:

20秒以前,任务未完成时打印如下:

1
2
"D:\Program Files\Python37\python.exe" E:/site/WatchTask.py
结果: c17ab9d7-b4d5-41ed-88b3-120f0c00ebf5

反复执行该文件,大约20秒后打印如下:

1
2
3
4
"D:\Program Files\Python37\python.exe" E:/site/WatchTask.py
结果: c17ab9d7-b4d5-41ed-88b3-120f0c00ebf5
get: 我是func1的返回值,完成了任务【买菜】
成功: True

同时worker也帮我们打印出了日志:

1
[2019-10-22 14:11:04,229: INFO/MainProcess] Task CeleryTask.func1[c17ab9d7-b4d5-41ed-88b3-120f0c00ebf5] succeeded in 20.0s: '我是func1的返回值'

分布式爬虫

之前文章**从零开发手机app之爬取数据**中我们做过一个简单的爬虫,今天我们这里使用celery来实现下这个爬虫。

之前代码

我们先放上原来的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import requests, time, os

# 防止被墙
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36',
'Accept':'text/html,application/xhtml+ xml,application/xml;q = 0.9,image/webp,image/apng,*/*;q=0.8, application/signe-exchange;v = b3'
}

# 抖音热歌列表url
music_list_url = 'http://www.kuwo.cn/api/www/bang/bang/musicList?bangId=158&pn=1&rn=30&reqId=a27b7ee0-ebd3-11e9-92b8-13a9816f0dc5'

# 爬取音乐
def kuwo():
# 获取歌曲列表接口
res = requests.get(music_list_url, headers=headers)
# 获取歌曲列表数据
music_datas = res.json().get('data').get('musicList')
# 循环读取每条歌曲数据
for item in music_datas:
# 休眠一秒防止被踢
time.sleep(1)
# 定义文件名
from uuid import uuid4
filename = uuid4()
# 定义要保存的图片名称及路径
img = os.path.join('img', f"{filename}.jpg")
# 下载歌曲图片
download(item.get('pic'), img)
print('采集成功……')
# 下载文件
def download(url, path):
'''
下载文件
:param url: 要在下载的文件路径
:param path: 要保存的文件位置
:return:
'''
res = requests.get(url).content
with open(path, 'wb') as f:
f.write(res)

if __name__ == '__main__':
start = time.time()
# 开始爬取
kuwo()
print("用时:", time.time() - start)

本次共爬取30个资源,测试函数总用时30多秒钟

1
2
采集成功……
用时: 32.57835030555725

使用Celery改进

任务列表spider.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import requests, time, os

from celery import Celery

redis_cli = "redis://127.0.0.1:6379"

my_task = Celery('TestTasks', broker = redis_cli, backend= redis_cli)

# 爬虫任务
@my_task.task
def run(url):
# 休眠一秒防止被踢
time.sleep(1)
# 定义文件名
from uuid import uuid4
filename = uuid4()
# 定义要保存的图片名称及路径
img = os.path.join('img', f"{filename}.jpg")
# 下载歌曲图片
res = requests.get(url).content
with open(img, 'wb') as f:
f.write(res)
return f"文件:【{img}】采集成功!"

分发任务

文件名:PublishTask.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import requests, time, os

from spider import run

# 防止被墙
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36',
'Accept':'text/html,application/xhtml+ xml,application/xml;q = 0.9,image/webp,image/apng,*/*;q=0.8, application/signe-exchange;v = b3'
}

# 抖音热歌列表url
music_list_url = 'http://www.kuwo.cn/api/www/bang/bang/musicList?bangId=158&pn=1&rn=30&reqId=a27b7ee0-ebd3-11e9-92b8-13a9816f0dc5'
# 获取歌曲列表接口
res = requests.get(music_list_url, headers=headers)
# 获取歌曲列表数据
music_datas = res.json().get('data').get('musicList')
# 循环发布每条任务
for item in music_datas:
res = run.delay(item.get('pic'))
print(res)

测试执行

使用命令启动

1
2
# 最大worker数量25
Celery worker -A spider -c 25 -l INFO -P eventlet

运行PublishTask.py文件,30个任务id瞬间打印出来,我们来看看Celery日志输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
[2019-10-22 15:46:55,760: INFO/MainProcess] Received task: spider.run[afa14131-972b-45fc-bac0-23c5636119af]
[2019-10-22 15:46:55,762: INFO/MainProcess] Received task: spider.run[2ab02b40-ea01-44b1-a4a3-2ef153daf832]
[2019-10-22 15:46:55,765: INFO/MainProcess] Received task: spider.run[bc7a97d5-f152-4d98-b2a0-81a972cda3a7]
[2019-10-22 15:46:55,766: INFO/MainProcess] Received task: spider.run[5daeb4f6-ba0e-479d-8a5d-8323c19a6aa3]
[2019-10-22 15:46:55,768: INFO/MainProcess] Received task: spider.run[8c8a1c78-a5ee-416c-8a13-9524b0033e40]
[2019-10-22 15:46:55,769: INFO/MainProcess] Received task: spider.run[326cc297-9a97-4d4a-90ae-ce713884aec2]
[2019-10-22 15:46:55,771: INFO/MainProcess] Received task: spider.run[a41c6f9c-3295-44ab-98e3-297d7bf74d28]
[2019-10-22 15:46:55,772: INFO/MainProcess] Received task: spider.run[055ed399-59c9-49ff-8366-364103a1cea6]
[2019-10-22 15:46:55,774: INFO/MainProcess] Received task: spider.run[d49c02e8-e29d-4ff8-a13a-f4b4842c62a9]
[2019-10-22 15:46:55,775: INFO/MainProcess] Received task: spider.run[a3f65009-4b54-4f73-bce6-4323cd43a4b3]
[2019-10-22 15:46:55,777: INFO/MainProcess] Received task: spider.run[32e1af6b-0bfa-4283-bfac-c706019a7700]
[2019-10-22 15:46:55,778: INFO/MainProcess] Received task: spider.run[b7fa1843-c342-4be3-993d-c14e75a219ee]
[2019-10-22 15:46:55,779: INFO/MainProcess] Received task: spider.run[0334b7ed-71cf-4f50-8eba-c98f5a886fa6]
[2019-10-22 15:46:55,781: INFO/MainProcess] Received task: spider.run[54cee9f2-6e2e-4ecf-a3ee-ddaaf5a3407e]
[2019-10-22 15:46:55,782: INFO/MainProcess] Received task: spider.run[2802ec81-ffe1-4e6b-a618-58aea0f437d4]
[2019-10-22 15:46:55,783: INFO/MainProcess] Received task: spider.run[348fe088-6e5b-4644-975c-1ae5ddcb8ea3]
[2019-10-22 15:46:55,785: INFO/MainProcess] Received task: spider.run[cbfd2619-ce7d-4e82-becf-7b99d664a201]
[2019-10-22 15:46:55,786: INFO/MainProcess] Received task: spider.run[f228e417-ea25-405c-8885-af1ec26f6566]
[2019-10-22 15:46:55,788: INFO/MainProcess] Received task: spider.run[736bfc0c-86e2-4cb4-9584-07fbe7d1d511]
[2019-10-22 15:46:55,790: INFO/MainProcess] Received task: spider.run[38240675-df53-430f-bb6f-0eac4eaf81c3]
[2019-10-22 15:46:55,793: INFO/MainProcess] Received task: spider.run[d71929cf-dc76-4b75-8d6c-ff21acdf0d38]
[2019-10-22 15:46:55,794: INFO/MainProcess] Received task: spider.run[d8079d5c-2fde-426e-ba7d-3e04846ffe8e]
[2019-10-22 15:46:55,796: INFO/MainProcess] Received task: spider.run[f0d95992-026b-4ca0-853f-d8d20bb31380]
[2019-10-22 15:46:55,798: INFO/MainProcess] Received task: spider.run[20c35a7d-3749-4add-b430-9ec72954e741]
[2019-10-22 15:46:55,801: INFO/MainProcess] Received task: spider.run[b85a0bfd-0d81-4efa-846f-8de835eb0022]
[2019-10-22 15:46:55,802: INFO/MainProcess] Received task: spider.run[9bd97385-52d9-421f-afaf-d5ced31e154d]
[2019-10-22 15:46:57,093: INFO/MainProcess] Task spider.run[bc7a97d5-f152-4d98-b2a0-81a972cda3a7] succeeded in 1.3280000000013388s: '文件:【img\4069fc3f-f823-4c89-ac26-5c1d2d5c2b9d.jpg
】采集成功!'
[2019-10-22 15:46:57,094: INFO/MainProcess] Task spider.run[afa14131-972b-45fc-bac0-23c5636119af] succeeded in 1.3280000000013388s: '文件:【img\4aa78a6f-632c-42a5-8c05-229ac3f8dcc8.jpg
】采集成功!'
[2019-10-22 15:46:57,095: INFO/MainProcess] Task spider.run[8c8a1c78-a5ee-416c-8a13-9524b0033e40] succeeded in 1.3280000000013388s: '文件:【img\036d0e76-a05c-443b-af32-fbe30f7b4b6d.jpg
】采集成功!'
[2019-10-22 15:46:57,095: INFO/MainProcess] Task spider.run[d49c02e8-e29d-4ff8-a13a-f4b4842c62a9] succeeded in 1.3130000000019209s: '文件:【img\c47f5d02-da2d-46b2-84c5-5849d657fef1.jpg
】采集成功!'
[2019-10-22 15:46:57,096: INFO/MainProcess] Task spider.run[f0d95992-026b-4ca0-853f-d8d20bb31380] succeeded in 1.2969999999986612s: '文件:【img\4578c584-7497-40e1-ba13-3d714b85f72c.jpg
】采集成功!'
[2019-10-22 15:46:57,096: INFO/MainProcess] Task spider.run[2ab02b40-ea01-44b1-a4a3-2ef153daf832] succeeded in 1.3280000000013388s: '文件:【img\39032b88-89f4-4e4d-856e-af55571bd814.jpg
】采集成功!'
[2019-10-22 15:46:57,096: INFO/MainProcess] Task spider.run[2802ec81-ffe1-4e6b-a618-58aea0f437d4] succeeded in 1.3130000000019209s: '文件:【img\978ec002-92b1-490e-b050-26db727e3cf2.jpg
】采集成功!'
[2019-10-22 15:46:57,097: INFO/MainProcess] Task spider.run[38240675-df53-430f-bb6f-0eac4eaf81c3] succeeded in 1.2969999999986612s: '文件:【img\5ad330c7-9f7f-4c09-bafc-92cc3d164ece.jpg
】采集成功!'
[2019-10-22 15:46:57,097: INFO/MainProcess] Task spider.run[a41c6f9c-3295-44ab-98e3-297d7bf74d28] succeeded in 1.3280000000013388s: '文件:【img\d73cf877-4c10-4819-b7b9-38ef1c56b953.jpg
】采集成功!'
[2019-10-22 15:46:57,097: INFO/MainProcess] Task spider.run[f228e417-ea25-405c-8885-af1ec26f6566] succeeded in 1.3130000000019209s: '文件:【img\c8a78ab7-e1a5-4fc3-b9ff-7c5d26e778d1.jpg
】采集成功!'
[2019-10-22 15:46:57,098: INFO/MainProcess] Task spider.run[5daeb4f6-ba0e-479d-8a5d-8323c19a6aa3] succeeded in 1.3280000000013388s: '文件:【img\bd65a09b-184d-4306-9609-132cd73344e4.jpg
】采集成功!'
[2019-10-22 15:46:57,098: INFO/MainProcess] Task spider.run[055ed399-59c9-49ff-8366-364103a1cea6] succeeded in 1.3280000000013388s: '文件:【img\e8d2d0cf-65e9-4373-9b19-653df23e7eef.jpg
】采集成功!'
[2019-10-22 15:46:57,098: INFO/MainProcess] Task spider.run[0334b7ed-71cf-4f50-8eba-c98f5a886fa6] succeeded in 1.3130000000019209s: '文件:【img\bfda03cb-4749-4dfd-81b1-63b88a4863c5.jpg
】采集成功!'
[2019-10-22 15:46:57,099: INFO/MainProcess] Task spider.run[cbfd2619-ce7d-4e82-becf-7b99d664a201] succeeded in 1.3130000000019209s: '文件:【img\732e9d6d-c6b6-45c2-abcc-d6862bbfa72e.jpg
】采集成功!'
[2019-10-22 15:46:57,099: INFO/MainProcess] Task spider.run[32e1af6b-0bfa-4283-bfac-c706019a7700] succeeded in 1.3130000000019209s: '文件:【img\77d7dde5-9a36-482a-97ab-fbcf94e3c021.jpg
】采集成功!'
[2019-10-22 15:46:57,101: INFO/MainProcess] Task spider.run[54cee9f2-6e2e-4ecf-a3ee-ddaaf5a3407e] succeeded in 1.3290000000015425s: '文件:【img\b772f99f-36ca-4776-9b6f-f3aefc834dd9.jpg
】采集成功!'
[2019-10-22 15:46:57,102: INFO/MainProcess] Received task: spider.run[cd758c80-e8f3-4eef-95a9-26ac56131adf]
[2019-10-22 15:46:57,137: INFO/MainProcess] Task spider.run[326cc297-9a97-4d4a-90ae-ce713884aec2] succeeded in 1.375s: '文件:【img\682170a0-d428-4039-90a0-59744f2918b9.jpg】采集成功!'
[2019-10-22 15:46:57,138: INFO/MainProcess] Task spider.run[d71929cf-dc76-4b75-8d6c-ff21acdf0d38] succeeded in 1.3439999999973224s: '文件:【img\e72af2f6-6541-4ed2-936f-31c83aaa8a93.jpg
】采集成功!'
[2019-10-22 15:46:57,140: INFO/MainProcess] Task spider.run[a3f65009-4b54-4f73-bce6-4323cd43a4b3] succeeded in 1.360000000000582s: '文件:【img\dbdcc404-aac9-4741-a58b-25f7d3eee8f3.jpg
】采集成功!'
[2019-10-22 15:46:57,141: INFO/MainProcess] Received task: spider.run[0cc84de9-d41b-4958-b9ab-81aad2c696d4]
[2019-10-22 15:46:57,143: INFO/MainProcess] Received task: spider.run[00d7909d-1305-46a1-952c-49e540bf470b]
[2019-10-22 15:46:57,145: INFO/MainProcess] Received task: spider.run[68874ab0-119a-4206-bc68-ab71c21c1fe6]
[2019-10-22 15:46:57,156: INFO/MainProcess] Task spider.run[348fe088-6e5b-4644-975c-1ae5ddcb8ea3] succeeded in 1.375s: '文件:【img\d0c553d9-95a4-480c-b61e-ee2c6cdfa57e.jpg】采集成功!'
[2019-10-22 15:46:58,164: INFO/MainProcess] Task spider.run[cd758c80-e8f3-4eef-95a9-26ac56131adf] succeeded in 1.0459999999984575s: '文件:【img\826a21be-02b8-4d6a-9bd3-4b489d81c64d.jpg
】采集成功!'
[2019-10-22 15:46:58,173: INFO/MainProcess] Task spider.run[9bd97385-52d9-421f-afaf-d5ced31e154d] succeeded in 1.0620000000017171s: '文件:【img\f83af5be-e641-49d2-b5ef-d2fc655ce166.jpg
】采集成功!'
[2019-10-22 15:46:58,190: INFO/MainProcess] Task spider.run[68874ab0-119a-4206-bc68-ab71c21c1fe6] succeeded in 1.0470000000022992s: '文件:【img\15f4732b-2306-4fe0-a90d-62c3cd4579ed.jpg
】采集成功!'
[2019-10-22 15:46:58,206: INFO/MainProcess] Task spider.run[0cc84de9-d41b-4958-b9ab-81aad2c696d4] succeeded in 1.0620000000017171s: '文件:【img\9911ce58-146b-46b1-a163-b6104c4a06e6.jpg
】采集成功!'
[2019-10-22 15:46:58,207: INFO/MainProcess] Task spider.run[00d7909d-1305-46a1-952c-49e540bf470b] succeeded in 1.0620000000017171s: '文件:【img\6d3dd6dd-717b-478e-8b01-9ec12a35e496.jpg
】采集成功!'
[2019-10-22 15:46:58,868: INFO/MainProcess] Task spider.run[d8079d5c-2fde-426e-ba7d-3e04846ffe8e] succeeded in 3.077999999997701s: '文件:【img\a195ffb5-5d0d-4df2-86b7-52672a8a3842.jpg
】采集成功!'
[2019-10-22 15:46:58,889: INFO/MainProcess] Task spider.run[20c35a7d-3749-4add-b430-9ec72954e741] succeeded in 3.0939999999973224s: '文件:【img\e68898dd-0224-4f3d-a48b-e7501f72e9ab.jpg
】采集成功!'
[2019-10-22 15:46:58,890: INFO/MainProcess] Task spider.run[b85a0bfd-0d81-4efa-846f-8de835eb0022] succeeded in 3.0939999999973224s: '文件:【img\5cd32d97-33f4-4e2a-8595-d17eb1ca1302.jpg
】采集成功!'
[2019-10-22 15:46:58,936: INFO/MainProcess] Task spider.run[b7fa1843-c342-4be3-993d-c14e75a219ee] succeeded in 3.1570000000028813s: '文件:【img\307c87e9-7a61-4d80-bce9-07700726b713.jpg
】采集成功!'
[2019-10-22 15:47:00,923: INFO/MainProcess] Task spider.run[736bfc0c-86e2-4cb4-9584-07fbe7d1d511] succeeded in 5.125s: '文件:【img\18f9b55c-4877-45db-97a7-a67c2707cd23.jpg】采集成功!'

从上面可以看到最大运行时间是5.125s,分布式就是快啊

Celery目录结构

通常我们要运行的不是一个任务,或者说不是一个类型的任务,他们分布在各个文件中,此时我们可以专门建立一个Celery文件夹来存放这些任务。

创建文件

首先,文件目录名可以随意起,我们的项目结构如下:

1
2
3
4
5
└──	CeleryTasks				# Celery目录
├ ├── celery.py # Celery主文件(文件名只能是celery.py)
├ ├── Task1.py # 任务文件1
├ ├── Task2.py # 任务文件2
└──CeleryRun.py # 分发任务文件

celery.py文件

1
2
3
4
5
6
from celery import Celery

redis_cli = "redis://127.0.0.1:6379"

# include 这个参数适用于寻找目录中所有的task
my_task = Celery('TestTasks', broker = redis_cli, backend= redis_cli, include=["CeleryTasks.Task1", "CeleryTasks.Task2"])

Task1.py文件

1
2
3
4
5
6
from CeleryTasks.celery import my_task

# 创建任务
@my_task.task
def funcOne(name):
return f"我是Task1中funcOne任务返回的结果:【{name}】"

Task2.py文件

1
2
3
4
5
6
7
8
9
10
from CeleryTasks.celery import my_task

# 创建任务
@my_task.task
def funcTwo(name):
return f"我是Task2中funcTwo任务返回的结果:【{name}】"

@my_task.task
def download(name):
return f"我是Task2中download任务返回的结果:【{name}】"

CeleryRun.py文件

1
2
3
4
5
from CeleryTasks.Task1 import funcOne
from CeleryTasks.Task2 import funcTwo, download
funcOne.delay("洗衣服")
funcTwo.delay("做饭")
download.delay("aaa.jpg")

启动woker

之前运行的是文件名,现在我们使用目录CeleryTasks即可,同时可以看出已经把任务文件的所有任务给引入进来了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
E:\site>Celery worker -A CeleryTasks -c 25 -l INFO -P eventlet

-------------- celery@TONY v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Windows-10-10.0.18362-SP0 2019-10-22 16:53:45
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: TestTasks:0x1baf1be6470
- ** ---------- .> transport: redis://127.0.0.1:6379//
- ** ---------- .> results: redis://127.0.0.1:6379/
- *** --- * --- .> concurrency: 25 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. CeleryTasks.Task1.funcOne
. CeleryTasks.Task2.download
. CeleryTasks.Task2.funcTwo

[2019-10-22 16:53:45,151: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
[2019-10-22 16:53:45,161: INFO/MainProcess] mingle: searching for neighbors
[2019-10-22 16:53:46,178: INFO/MainProcess] mingle: all alone
[2019-10-22 16:53:46,190: INFO/MainProcess] celery@TONY ready.
[2019-10-22 16:53:46,191: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379//.

分发任务

运行CeleryRun.py文件,再次查看worker打印台

1
2
3
4
5
6
7
[2019-10-22 16:54:27,027: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[6d84b3b3-244d-4ee4-b361-4cb3c6917fb1]
[2019-10-22 16:54:27,029: INFO/MainProcess] Received task: CeleryTasks.Task2.funcTwo[b2c52bf9-fb61-468c-adfa-877a069cff69]
[2019-10-22 16:54:27,031: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[6d84b3b3-244d-4ee4-b361-4cb3c6917fb1] succeeded in 0.0s: '我是Task1中funcOne任务返回的结果:【洗衣服】'
[2019-10-22 16:54:27,032: INFO/MainProcess] Task CeleryTasks.Task2.funcTwo[b2c52bf9-fb61-468c-adfa-877a069cff69] succeeded in 0.01600000000325963s: '我是Task2中funcTwo任务返回的结果:【
做饭】'
[2019-10-22 16:54:27,033: INFO/MainProcess] Received task: CeleryTasks.Task2.download[dab30fe3-4dbf-46fd-941c-7d20a6386618]
[2019-10-22 16:54:27,034: INFO/MainProcess] Task CeleryTasks.Task2.download[dab30fe3-4dbf-46fd-941c-7d20a6386618] succeeded in 0.0s: '我是Task2中download任务返回的结果:【aaa.jpg】'

所有任务都被接收且执行了。

定时任务

TimingWorker.py

1
2
3
4
5
6
7
8
9
10
from celery import Celery

redis_cli = "redis://127.0.0.1:6379"

my_task = Celery('TestTasks', broker = redis_cli, backend= redis_cli)

# 创建任务
@my_task.task
def func1(name):
return f"我是func1的返回值,我做了【{name}】"

TimingTask.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from TimingWorker import func1


# 定时任务我们不在使用delay这个方法了,delay是立即交给task 去执行
# 现在我们使用apply_async定时执行

#首先我们要先给task一个执行任务的时间
import datetime,time
# 获取当前时间 此时间为东八区时间
ctime = time.time()
# 将当前的东八区时间改为 UTC时间 注意这里一定是UTC时间,没有其他说法
utc_time = datetime.datetime.utcfromtimestamp(ctime)
# 为当前时间增加 10 秒
add_time = datetime.timedelta(seconds=10)
action_time = utc_time + add_time

# action_time 就是当前时间未来10秒之后的时间
#现在我们使用apply_async定时执行
res = func1.apply_async(args=["修理地球"],eta=action_time)
print(res.id)
#这样原本延迟5秒执行的One函数现在就要在10秒钟以后执行了

测试

使用Celery worker -A TimingWorker -c 25 -l INFO -P eventlet启动worker,运行TimingTask.py文件,查看Celery控制台

1
2
[2019-10-22 17:18:22,202: INFO/MainProcess] Received task: TimingWorker.func1[d4fdbba9-1a8b-416b-ac00-96a2e683727b]  ETA:[2019-10-22 09:18:31.716313+00:00]
[2019-10-22 17:18:31,723: INFO/MainProcess] Task TimingWorker.func1[d4fdbba9-1a8b-416b-ac00-96a2e683727b] succeeded in 0.0s: '我是func1的返回值,我做了【修理地球】'

我们可以看到接收任务以后10秒钟才执行了该任务

周期任务

我们还以Celery目录结构为例,只需修改下celery.py文件即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from celery import Celery
from celery.schedules import crontab
redis_cli = "redis://127.0.0.1:6379"
# include 这个参数适用于寻找目录中所有的task
my_task = Celery('TestTasks', broker = redis_cli, backend= redis_cli, include=["CeleryTasks.Task1", "CeleryTasks.Task2"])

my_task.conf.beat_schedule={
"each10s_task":{
"task":"CeleryTasks.Task1.funcOne",
"schedule":10, # 每10秒钟执行一次
"args":("吃饭",)
},
"each1m_task": {
"task": "CeleryTasks.Task2.funcTwo",
"schedule": 13, # 每13秒执行一次
"args": ("睡觉",)
},
"each24hours_task": {
"task": "CeleryTasks.Task2.download",
"schedule": crontab(),#每分钟执行一次
"args": ("打豆豆",)
}
}

以上配置完成之后,还有一点非常重要,不能直接创建Worker了,因为我们要执行周期任务,所以首先要先有一个任务的生产方

1
2
celery beat -A Celery_task
celery worker -A Celery_task -l INFO -P eventlet

创建Worker的方式并没有发行变化,但是这里要注意的是,每间隔一定时间后需要生产出来任务给Worker去执行,这里需要一个生产者beat

celery beat -A CeleryTasks #创建生产者 beat 你的 schedule 写在哪里,就要从哪里启动

1
2
3
4
5
6
7
8
9
10
11
12
E:\site>celery beat -A CeleryTasks
celery beat v4.3.0 (rhubarb) is starting.
__ - ... __ - _
LocalTime -> 2019-10-23 09:26:26
Configuration ->
. broker -> redis://127.0.0.1:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)

创建worker

celery worker -A CeleryTasks -l INFO -P eventlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
E:\site>celery worker -A CeleryTasks -l INFO -P eventlet

-------------- celery@TONY v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Windows-10-10.0.18362-SP0 2019-10-23 09:26:31
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: TestTasks:0x173f00d76a0
- ** ---------- .> transport: redis://127.0.0.1:6379//
- ** ---------- .> results: redis://127.0.0.1:6379/
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. CeleryTasks.Task1.funcOne
. CeleryTasks.Task2.download
. CeleryTasks.Task2.funcTwo

[2019-10-23 09:26:31,620: INFO/MainProcess] Connected to redis://127.0.0.1:6379//
[2019-10-23 09:26:31,628: INFO/MainProcess] mingle: searching for neighbors
[2019-10-23 09:26:32,652: INFO/MainProcess] mingle: all alone
[2019-10-23 09:26:32,706: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379//.
[2019-10-23 09:26:32,716: INFO/MainProcess] celery@TONY ready.

创建worker之后,每隔一定周期就会由beat创建一个任务给Worker去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[2019-10-23 09:36:57,109: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[e75a3d5e-bbcd-429c-98c7-dfda971fc3c1]
[2019-10-23 09:36:57,115: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[e75a3d5e-bbcd-429c-98c7-dfda971fc3c1] succeeded in 0.01600000000325963s: '我是Task1中funcOne任务返回的结果:【
吃饭】'
[2019-10-23 09:37:00,002: INFO/MainProcess] Received task: CeleryTasks.Task2.download[092ee768-9ba9-4a74-ad4f-377d4f4dd046]
[2019-10-23 09:37:00,004: INFO/MainProcess] Task CeleryTasks.Task2.download[092ee768-9ba9-4a74-ad4f-377d4f4dd046] succeeded in 0.0s: '我是Task2中download任务返回的结果:【打豆豆】'
[2019-10-23 09:37:07,106: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[3dd25333-8c54-4def-8c3b-30b92308ed86]
[2019-10-23 09:37:07,109: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[3dd25333-8c54-4def-8c3b-30b92308ed86] succeeded in 0.0s: '我是Task1中funcOne任务返回的结果:【吃饭】'
[2019-10-23 09:37:10,090: INFO/MainProcess] Received task: CeleryTasks.Task2.funcTwo[923b6585-2aa5-4871-9dbe-2b18b99b0e75]
[2019-10-23 09:37:10,091: INFO/MainProcess] Task CeleryTasks.Task2.funcTwo[923b6585-2aa5-4871-9dbe-2b18b99b0e75] succeeded in 0.0s: '我是Task2中funcTwo任务返回的结果:【睡觉】'
[2019-10-23 09:37:17,110: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[024acf7a-c8da-4e8a-923d-00af60b5c036]
[2019-10-23 09:37:17,114: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[024acf7a-c8da-4e8a-923d-00af60b5c036] succeeded in 0.0s: '我是Task1中funcOne任务返回的结果:【吃饭】'
[2019-10-23 09:37:23,097: INFO/MainProcess] Received task: CeleryTasks.Task2.funcTwo[0a3f7e23-b86e-437a-9818-1da9acd33a3f]
[2019-10-23 09:37:23,103: INFO/MainProcess] Task CeleryTasks.Task2.funcTwo[0a3f7e23-b86e-437a-9818-1da9acd33a3f] succeeded in 0.0s: '我是Task2中funcTwo任务返回的结果:【睡觉】'
[2019-10-23 09:37:27,106: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[7c3a34a0-be22-4784-b84b-c2c41297a564]
[2019-10-23 09:37:27,107: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[7c3a34a0-be22-4784-b84b-c2c41297a564] succeeded in 0.0s: '我是Task1中funcOne任务返回的结果:【吃饭】'
[2019-10-23 09:37:36,099: INFO/MainProcess] Received task: CeleryTasks.Task2.funcTwo[b192069c-ede6-4d57-b3ff-1df9e65415cb]
[2019-10-23 09:37:36,104: INFO/MainProcess] Task CeleryTasks.Task2.funcTwo[b192069c-ede6-4d57-b3ff-1df9e65415cb] succeeded in 0.0s: '我是Task2中funcTwo任务返回的结果:【睡觉】'
[2019-10-23 09:37:37,112: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[e9512dbd-345e-486c-ae5d-0489625475a8]
[2019-10-23 09:37:37,119: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[e9512dbd-345e-486c-ae5d-0489625475a8] succeeded in 0.0s: '我是Task1中funcOne任务返回的结果:【吃饭】'
[2019-10-23 09:37:47,109: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[0e95a5e8-99d7-4773-9cbb-69959434b385]
[2019-10-23 09:37:47,112: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[0e95a5e8-99d7-4773-9cbb-69959434b385] succeeded in 0.0s: '我是Task1中funcOne任务返回的结果:【吃饭】'
[2019-10-23 09:37:49,097: INFO/MainProcess] Received task: CeleryTasks.Task2.funcTwo[d30c1676-021d-42ac-baf5-5bae10ce9cd8]
[2019-10-23 09:37:49,100: INFO/MainProcess] Task CeleryTasks.Task2.funcTwo[d30c1676-021d-42ac-baf5-5bae10ce9cd8] succeeded in 0.0s: '我是Task2中funcTwo任务返回的结果:【睡觉】'
[2019-10-23 09:37:57,112: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[eb59efb9-b3df-45e8-8f24-20e74b9325fd]
[2019-10-23 09:37:57,114: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[eb59efb9-b3df-45e8-8f24-20e74b9325fd] succeeded in 0.0s: '我是Task1中funcOne任务返回的结果:【吃饭】'
[2019-10-23 09:38:00,008: INFO/MainProcess] Received task: CeleryTasks.Task2.download[535c8db4-2bf8-460f-800f-e3e045b2fa78]
[2019-10-23 09:38:00,013: INFO/MainProcess] Task CeleryTasks.Task2.download[535c8db4-2bf8-460f-800f-e3e045b2fa78] succeeded in 0.0s: '我是Task2中download任务返回的结果:【打豆豆】'
[2019-10-23 09:38:02,094: INFO/MainProcess] Received task: CeleryTasks.Task2.funcTwo[14893fe7-ef36-4cd3-8c37-244e0b6263c5]
[2019-10-23 09:38:02,095: INFO/MainProcess] Task CeleryTasks.Task2.funcTwo[14893fe7-ef36-4cd3-8c37-244e0b6263c5] succeeded in 0.0s: '我是Task2中funcTwo任务返回的结果:【睡觉】'
[2019-10-23 09:38:07,113: INFO/MainProcess] Received task: CeleryTasks.Task1.funcOne[8b10c310-9b18-446a-856c-a4bf49914570]
[2019-10-23 09:38:07,117: INFO/MainProcess] Task CeleryTasks.Task1.funcOne[8b10c310-9b18-446a-856c-a4bf49914570] succeeded in 0.0s: '我是Task1中funcOne任务返回的结果:【吃饭】'

crontab 调度器

当我们对执行任务的频率有更精细的控制时,例如,一天中某个特殊时间或者一周中某天,可以使用crontab调度器类型。Crontab 表达式的语法非常灵活。下面是总结的一个表格,可以根据自己的需求写crontab调度器。

1
2
3
4
5
"each24hours_task": {
"task": "CeleryTasks.Task2.download",
"schedule": crontab(),#每分钟执行一次
"args": ("打豆豆",)
}

上述定时任务重我们用到了crontab,关于crontab的常用规则示例如下:

示例含义
crontab()每分钟执行
crontab(minute=0, hour=0)每天0点0分执行
crontab(minute=0, hour='*/3')每三个小时执行: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,hour='0,3,6,9,12,15,18,21')同上
crontab(minute='*/15')每十五分钟执行
crontab(day_of_week='sunday')星期天每分钟执行
crontab(minute='',hour='', day_of_week='sun')同上
crontab(minute=’*/10’,hour=’3,17,22’, day_of_week=’thu,fri’)每十分钟执行, 但是只在星期四、五的 3-4 am, 5-6 pm, and 10-11 pm
crontab(minute=0, hour=’/2,/3’)每两个小时及每三个小时执行,意思是: 除了下面时间的每个小时: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour=’*/5’)每五个小时执行。这意味着将在 3pm 而不是 5pm 执行 (因为 3pm 等于 24 小时制的 15, 能被 5 整除)
crontab(minute=0, hour=’*/3,8-17’)每三个小时, 以及 (8am-5pm) 之间的小时执行
crontab(0, 0, day_of_month=’2’)每个月的第二天执行
crontab(0, 0, day_of_month=’2-30/3’)每个月的偶数天执行
crontab(0, 0,day_of_month=’1-7,15-21’)每个月的第一个和第三个星期执行
crontab(0, 0, day_of_month=’11’,month_of_year=’5’)每年五月份的第十一天执行
crontab(0, 0,month_of_year=’*/3’)每个季度的第一个月执行