如何在web中使用celery

提到celery离不开谈生产者消费者设计模式,生产者和消费者之间通过缓冲区(通常是一个阻塞队列)实现通讯, 生产者将产生的数据放入缓冲区,消费者从缓冲区中获取数据。

正常的网络处理都是同步进行的,上一个处理完后才开始处理下一个。

举个不知是否恰当的例子:饭店送外卖,最开始没有外卖员存在的时候都是店员派送,到一定量的订单才开始去派送,这样一来第一个下单的顾客等的时间太久了不满意服务。后来有了骑手,我接到一单外卖坐好就直接放到取餐处,外卖员接单就可以直接送了,顾客不用等太久。

在这个案例中,最开始的模式就好像我们网络处理的正常同步模式,我全部做好了才给你送。后面改进的就是异步进行的,特别符合我们的这个生产者消费者模式。我们作为生产者将任务发布在外卖平台,骑手从外卖平台去执行我的任务。

celery

  • 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。
  • 单个 Celery 进程每分钟可处理数以百万计的任务。
  • 通过消息进行通信,使用消息队列(broker)客户端消费者之间进行协调。

安装celery

1
pip install Celery

flask使用celery实例

我们这里以flask为例,django也一样的操作。此处我们用导出文件来演示。

原始代码

导出代码文件,我们这里使用单例类来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/12/9 15:06
# @Author : 托小尼
# @Email : 646547989@qq.com
# @URI : https://www.diandian100.cn
# @File : export_clsss.py
import os
import time
from pathlib import Path


class ExportClass(object):
# log文件生成目录
logs_path = os.path.join(Path(__file__).resolve().parent.parent.parent, 'log')
"""单例类"""
def __new__(cls, *args, **kwargs):
# 判断是否已有对象,没有则创建,最后返回该对象
if not hasattr(cls, "_instance"):
cls._instance = super().__new__(cls, *args, **kwargs)
return cls._instance

def save_file(self, data, filepath):
"""
# 写入文件
:param data: 文件内容对象(待处理的列表)
# :param filepath: 文件名
:return: 消息
"""
try:
# 打开文件准备写入
with open(os.path.join(self.logs_path, filepath), 'w') as f:
content = ""
# 依次循环列表,每次停留0.1秒,内容格式化
for line in data:
time.sleep(0.1)
content += "%s==\r\n" % line
# 将格式化好的内容写入文件中
f.write(content)
except Exception as e:
return "错误:%s" % e
return "成功"


if __name__ == '__main__':
ExportClass().save_file(range(100), 'aaa.logs')
flask入口主文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import Flask

from celery_task.export_file.export_clsss import ExportClass

app = Flask(__name__)


@app.route('/')
def hello_world():
# 生成一个200长的列表
data = [item for item in range(200)]
# 我们想将刚才生成的列表依次换行写入data.logs文件中
ExportClass().save_file(data, 'data.logs')
return 'Hello World!'


if __name__ == '__main__':
app.run()

运行flask,我们大概要20秒以后才能看到hello world显示在页面上,很显然,文件导出操作阻塞了我们的响应视图,我们使用celery改进。

目录结构
1
2
3
4
5
6
7
8
9
10
app.py									flask主程序入口
log 文件导出目录
celery_task celery主目录
├─main.py celery入口文件
├─config.py celery配置文件
├─__init__.py 包默认文件
├─export_file 导出文件任务目录
│ ├─export_class.py 导出功能类文件
│ ├─tasks.py 导出功能任务入口
│ ├─__init__.py 包默认文件
创建celery_task目录

名称随意定,只要不起celery覆盖celery包就可以。

config.py

在该文件中配置celery,我们这里只有一行代码,配置了下消息队列的位置。

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/12/9 14:57
# @Author : 托小尼
# @Email : 646547989@qq.com
# @URI : https://www.diandian100.cn
# @File : config.py
# @Desc : Celery配置文件

# 指定消息队列位置,此处使用redis
broker_url = "redis://127.0.0.1:6379/5"
main.py

celery入口文件,我们在该文件中实例化celery,注册任务。

celery_obj.autodiscover_tasks([‘celery_task.export_file’])这样我们就简单将任务放进了消息队列中,该任务即上面目录中的export_file任务包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/12/9 14:57
# @Author : 托小尼
# @Email : 646547989@qq.com
# @URI : https://www.diandian100.cn
# @File : main.py
# @desc : Celery入口文件
from celery import Celery

# 实例化celery
celery_obj = Celery("my_flask")
# 加载celery配置
celery_obj.config_from_object('celery_task.config')
# 注册要执行的celery任务
celery_obj.autodiscover_tasks(['celery_task.export_file'])
export_file/tasks.py

该文件为导出文件任务的入口,我们在该文件中定义导出文件方法,引入原始导出文件方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/12/9 15:20
# @Author : 托小尼
# @Email : 646547989@qq.com
# @URI : https://www.diandian100.cn
# @File : tasks.py
from .export_clsss import ExportClass
from celery_task.main import celery_obj

# name:任务名
@celery_obj.task(name="celery_export_file")
def to_export_file(content, filepath):
"""
写入文件
:param content: 文件内容数据
:param filepath: 文件名
:return: 文件处理结果
"""
# 执行原始文件写入方法,并返回结果
return ExportClass().save_file(content, filepath)

export_file/export_class.py

该文件跟最上面原始文件一样,不用改动什么,目录有变化的改变下导入包的路径就可以了。

app.py

flask入口文件,我们在路由函数中要使用celery导出文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import Flask

from celery_task.export_file.tasks import to_export_file

app = Flask(__name__)


@app.route('/')
def hello_world():
# 生成一个200长的列表
data = [item for item in range(200)]
# 我们想将刚才生成的列表依次换行写入data.logs文件中,此处使用celery的写入文件方法异步执行
to_export_file.delay(data, 'data.logs')
return 'Hello World!'


if __name__ == '__main__':
app.run()

执行flask
1
python -m flask run
执行celery

celery worker工作模式

  • 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
  • 如何自己指定进程数:celery worker -A proj --concurrency=4
  • 如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000
1
2
3
4
5
# 安装eventlet模块
$ pip install eventlet

# 启用 Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000

我们这里使用默认

1
celery -A celery_task.main worker -l info

我们看下运行后的响应状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 
-------------- celery@Tony-iMac.local v5.0.4 (singularity)
--- ***** -----
-- ******* ---- macOS-10.13.6-x86_64-i386-64bit 2020-12-09 15:53:09
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: my_flask:0x10237c340
- ** ---------- .> transport: redis://127.0.0.1:6379/5
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. celery_export_file

[2020-12-09 15:53:09,300: INFO/MainProcess] Connected to redis://127.0.0.1:6379/5
[2020-12-09 15:53:09,311: INFO/MainProcess] mingle: searching for neighbors
[2020-12-09 15:53:10,336: INFO/MainProcess] mingle: all alone
[2020-12-09 15:53:10,347: INFO/MainProcess] celery@Tony-iMac.local ready.

从响应结果可以看到任务列表,消息队列redis地址,进程数等等,万事俱备,只欠任务了。

发送任务请求

访问首页,即请求导出文件任务

访问首页直接看到响应的helloworld字符串,导出的文件也还暂未生成,等大概20秒左右,文件生成,内容正常写入。celery响应结果也多了两行

1
2
[2020-12-09 15:53:14,938: INFO/MainProcess] Received task: celery_export_file[9e61efcc-5635-485b-a429-7d0a162edbdb]  
[2020-12-09 15:53:35,079: INFO/ForkPoolWorker-8] Task celery_export_file[9e61efcc-5635-485b-a429-7d0a162edbdb] succeeded in 20.139845922s: '成功'

执行成功,还输出了我们之前导出文件的返回内容。以上就是整个celery的使用流程,django中一样使用方法。

celery不能读取框架配置文件

有时候在我们celery任务代码中可能会用到flask或者django的配置文件内容,我们这里就以django为例进行说明:

常规我们使用django的配置文件会如下写:

1
2
3
from django.conf import setting

# 然后在任务中比如调用setting.EMAIL_FROM

运行的时候却提示找不到setting.EMAIL_FROM,这是因为celery作为一个独立的模块是无法使用django模块中的配置文件的,我们需要在celery主入口main.py加入以下代码:

1
2
3
4
# 为celery使用django配置文件进行设置
import os
if not os.getenv('DJANGO_SETTINGS_MODULE'):
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproject.settings'

配置以上代码就可以在celery中使用django的配置文件内容了