提到celery离不开谈生产者消费者设计模式,生产者和消费者之间通过缓冲区(通常是一个阻塞队列)实现通讯, 生产者将产生的数据放入缓冲区,消费者从缓冲区中获取数据。
正常的网络处理都是同步进行的,上一个处理完后才开始处理下一个。
举个不知是否恰当的例子:饭店送外卖,最开始没有外卖员存在的时候都是店员派送,到一定量的订单才开始去派送,这样一来第一个下单的顾客等的时间太久了不满意服务。后来有了骑手,我接到一单外卖坐好就直接放到取餐处,外卖员接单就可以直接送了,顾客不用等太久。
在这个案例中,最开始的模式就好像我们网络处理的正常同步模式,我全部做好了才给你送。后面改进的就是异步进行的,特别符合我们的这个生产者消费者模式。我们作为生产者将任务发布在外卖平台,骑手从外卖平台去执行我的任务。
![]()
celery
- 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。
- 单个 Celery 进程每分钟可处理数以百万计的任务。
- 通过消息进行通信,使用
消息队列(broker)
在客户端
和消费者
之间进行协调。
安装celery
flask使用celery实例
我们这里以flask为例,django也一样的操作。此处我们用导出文件来演示。
原始代码
导出代码文件,我们这里使用单例类来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
import os import time from pathlib import Path
class ExportClass(object): logs_path = os.path.join(Path(__file__).resolve().parent.parent.parent, 'log') """单例类""" def __new__(cls, *args, **kwargs): if not hasattr(cls, "_instance"): cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance
def save_file(self, data, filepath): """ # 写入文件 :param data: 文件内容对象(待处理的列表) # :param filepath: 文件名 :return: 消息 """ try: with open(os.path.join(self.logs_path, filepath), 'w') as f: content = "" for line in data: time.sleep(0.1) content += "%s==\r\n" % line f.write(content) except Exception as e: return "错误:%s" % e return "成功"
if __name__ == '__main__': ExportClass().save_file(range(100), 'aaa.logs')
|
flask入口主文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from flask import Flask
from celery_task.export_file.export_clsss import ExportClass
app = Flask(__name__)
@app.route('/') def hello_world(): data = [item for item in range(200)] ExportClass().save_file(data, 'data.logs') return 'Hello World!'
if __name__ == '__main__': app.run()
|
运行flask,我们大概要20秒以后才能看到hello world显示在页面上,很显然,文件导出操作阻塞了我们的响应视图,我们使用celery改进。
目录结构
1 2 3 4 5 6 7 8 9 10
| app.py flask主程序入口 log 文件导出目录 celery_task celery主目录 ├─main.py celery入口文件 ├─config.py celery配置文件 ├─__init__.py 包默认文件 ├─export_file 导出文件任务目录 │ ├─export_class.py 导出功能类文件 │ ├─tasks.py 导出功能任务入口 │ ├─__init__.py 包默认文件
|
创建celery_task目录
名称随意定,只要不起celery覆盖celery包就可以。
config.py
在该文件中配置celery,我们这里只有一行代码,配置了下消息队列的位置。
1 2 3 4 5 6 7 8 9 10 11
|
broker_url = "redis://127.0.0.1:6379/5"
|
main.py
celery入口文件,我们在该文件中实例化celery,注册任务。
celery_obj.autodiscover_tasks([‘celery_task.export_file’])这样我们就简单将任务放进了消息队列中,该任务即上面目录中的export_file任务包。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
from celery import Celery
celery_obj = Celery("my_flask")
celery_obj.config_from_object('celery_task.config')
celery_obj.autodiscover_tasks(['celery_task.export_file'])
|
export_file/tasks.py
该文件为导出文件任务的入口,我们在该文件中定义导出文件方法,引入原始导出文件方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
from .export_clsss import ExportClass from celery_task.main import celery_obj
@celery_obj.task(name="celery_export_file") def to_export_file(content, filepath): """ 写入文件 :param content: 文件内容数据 :param filepath: 文件名 :return: 文件处理结果 """ return ExportClass().save_file(content, filepath)
|
export_file/export_class.py
该文件跟最上面原始文件一样,不用改动什么,目录有变化的改变下导入包的路径就可以了。
app.py
flask入口文件,我们在路由函数中要使用celery导出文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from flask import Flask
from celery_task.export_file.tasks import to_export_file
app = Flask(__name__)
@app.route('/') def hello_world(): data = [item for item in range(200)] to_export_file.delay(data, 'data.logs') return 'Hello World!'
if __name__ == '__main__': app.run()
|
执行flask
执行celery
celery worker工作模式
- 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
- 如何自己指定进程数:
celery worker -A proj --concurrency=4
- 如何改变进程池方式为协程方式:
celery worker -A proj --concurrency=1000 -P eventlet -c 1000
1 2 3 4 5
| $ pip install eventlet
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000
|
我们这里使用默认
1
| celery -A celery_task.main worker -l info
|
我们看下运行后的响应状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| -------------- celery@Tony-iMac.local v5.0.4 (singularity) --- ***** ----- -- ******* ---- macOS-10.13.6-x86_64-i386-64bit 2020-12-09 15:53:09 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: my_flask:0x10237c340 - ** ---------- .> transport: redis://127.0.0.1:6379/5 - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
[tasks] . celery_export_file
[2020-12-09 15:53:09,300: INFO/MainProcess] Connected to redis://127.0.0.1:6379/5 [2020-12-09 15:53:09,311: INFO/MainProcess] mingle: searching for neighbors [2020-12-09 15:53:10,336: INFO/MainProcess] mingle: all alone [2020-12-09 15:53:10,347: INFO/MainProcess] celery@Tony-iMac.local ready.
|
从响应结果可以看到任务列表,消息队列redis地址,进程数等等,万事俱备,只欠任务了。
发送任务请求
访问首页,即请求导出文件任务
访问首页直接看到响应的helloworld字符串,导出的文件也还暂未生成,等大概20秒左右,文件生成,内容正常写入。celery响应结果也多了两行
1 2
| [2020-12-09 15:53:14,938: INFO/MainProcess] Received task: celery_export_file[9e61efcc-5635-485b-a429-7d0a162edbdb] [2020-12-09 15:53:35,079: INFO/ForkPoolWorker-8] Task celery_export_file[9e61efcc-5635-485b-a429-7d0a162edbdb] succeeded in 20.139845922s: '成功'
|
执行成功,还输出了我们之前导出文件的返回内容。以上就是整个celery的使用流程,django中一样使用方法。
celery不能读取框架配置文件
有时候在我们celery任务代码中可能会用到flask或者django的配置文件内容,我们这里就以django为例进行说明:
常规我们使用django的配置文件会如下写:
1 2 3
| from django.conf import setting
|
运行的时候却提示找不到setting.EMAIL_FROM,这是因为celery作为一个独立的模块是无法使用django模块中的配置文件的,我们需要在celery主入口main.py加入以下代码:
1 2 3 4
| import os if not os.getenv('DJANGO_SETTINGS_MODULE'): os.environ['DJANGO_SETTINGS_MODULE'] = 'myproject.settings'
|
配置以上代码就可以在celery中使用django的配置文件内容了