python多任务异步协程

python多任务异步协程常用方式,笔记

asyncio中4个概念

特殊的函数

  • 如果一个函数的定义被async修饰后,则该函数就变成了一个特殊的函数
  • 特殊之处:
    • 该特殊的函数调用后,函数内部的实现语句不会被立即执行
    • 该特殊函数被调用后会返回一个协程对象

协程对象

  • 通过特殊喊的调用返回一个协程对象
  • 协程 == 特殊函数 == 一组指定的操作
  • 协程 == 一组特定的操作

任务对象

  • 本质就是一个高级的协程对象(是对协程对象的进一步封装)

  • 任务 == 协程 == 特殊函数 == 一组指定的操作

  • 任务 == 一组指定的操作

  • 如何创建一个任务对象

    asyncio.ensure_future(协程对象)

事件循环对象

  • 可以将多个任务对象注册/装载到时间循环对象中

  • 如果开启了时间循环后,则其内部注册/装载的任务对象表示的指定操作就会被基于异步的被执行

  • 创建方式:

    loop = asyncio.get_event_loop()

  • 启动方式

    loop.run_until_complete(任务对象)

wait方法的作用

将任务列表中的任务对象赋予可被挂起的权限。只有任务对象被赋予了可被挂起的权限后,该任务对象才可以被挂起

  • 挂起:将当前任务对象交出cpu的使用权

注意事项:

在特殊函数内部,不可以出现不支持异步模块对应的代码,否则会中断整个异步效果。

await关键字:

在特殊函数内部,凡是阻塞操作前都必须使用await进行修饰。await就可以保证阻塞操作在异步执行的过程中不会被跳过。

单任务协程示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Uyynot
# @Email : uyynot@qq.com
# @Time : 2022/11/18 9:52
# @File : ttt.py
# @Project : test
# @Desc :
import asyncio
import time


async def get_response(url):
print("开始爬取:", url)
await asyncio.sleep(2)
print(f"爬取{url}完成")
return f"我是{url}响应的结果"

# 回调函数
def call_back(task):
print("我是任务对象的回调函数:", task, task.result())

if __name__ == '__main__':
start = time.time()
# 单任务使用协程
# 协程对象
coroutine_obj = get_response('www.aaa.com')
# 任务对象
task = asyncio.ensure_future(coroutine_obj)
# 任务执行完成后执行回调函数
task.add_done_callback(call_back)
# 事件循环对象
loop = asyncio.get_event_loop()
# 将任务对象注册到事件循环中,然后启动事件循环
loop.run_until_complete(task)
print(f"耗时:{time.time()-start}")

执行结果:

1
2
3
4
开始爬取: www.aaa.com
爬取www.aaa.com完成
我是任务对象的回调函数: <Task finished name='Task-1' coro=<get_response() done, defined at D:/pycharm_projects/dev/test/ttt.py:13> result='我是www.aaa.com响应的结果'> 我是www.aaa.com响应的结果
耗时:2.016199827194214

多任务协程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Uyynot
# @Email : uyynot@qq.com
# @Time : 2022/11/18 9:52
# @File : ttt.py
# @Project : test
# @Desc :
import asyncio
import time


async def get_response(url):
print("开始爬取:", url)
await asyncio.sleep(2)
print(f"爬取{url}完成")
return f"我是{url}响应的结果"

# 回调函数
def call_back(task):
print("我是任务对象的回调函数:", task, task.result())

if __name__ == '__main__':
start = time.time()
# 多任务使用协程
urls = ["www.a.com", "www.b.com", "www.c.com", "www.d.com", "www.f.com"]
# 用来接收任务对象列表
tasks = []
for url in urls:
# 获取协程对象
coroutine_obj = get_response(url)
# 创建任务对象
task_obj = asyncio.ensure_future(coroutine_obj)
# 任务执行完成后执行回调函数
task_obj.add_done_callback(call_back)
# 添加至任务对象列表
tasks.append(task_obj)
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 将任务对象列表注册到事件循环中,然后启动事件循环
loop.run_until_complete(asyncio.wait(tasks))
print(f"耗时:{time.time()-start}")

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
开始爬取: www.a.com
开始爬取: www.b.com
开始爬取: www.c.com
开始爬取: www.d.com
开始爬取: www.f.com
爬取www.a.com完成
爬取www.c.com完成
爬取www.f.com完成
爬取www.b.com完成
爬取www.d.com完成
我是任务对象的回调函数: <Task finished name='Task-1' coro=<get_response() done, defined at D:/pycharm_projects/dev/test/ttt.py:13> result='我是www.a.com响应的结果'> 我是www.a.com响应的结果
我是任务对象的回调函数: <Task finished name='Task-3' coro=<get_response() done, defined at D:/pycharm_projects/dev/test/ttt.py:13> result='我是www.c.com响应的结果'> 我是www.c.com响应的结果
我是任务对象的回调函数: <Task finished name='Task-5' coro=<get_response() done, defined at D:/pycharm_projects/dev/test/ttt.py:13> result='我是www.f.com响应的结果'> 我是www.f.com响应的结果
我是任务对象的回调函数: <Task finished name='Task-2' coro=<get_response() done, defined at D:/pycharm_projects/dev/test/ttt.py:13> result='我是www.b.com响应的结果'> 我是www.b.com响应的结果
我是任务对象的回调函数: <Task finished name='Task-4' coro=<get_response() done, defined at D:/pycharm_projects/dev/test/ttt.py:13> result='我是www.d.com响应的结果'> 我是www.d.com响应的结果
耗时:2.015181541442871

常规使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import time


async def request(url):
print("开始请求url", url)
# 模拟请求耗时
time.sleep(3)
print("成功请求url", url)
return url

# async修饰的函数,调用之后返回一个协程对象
coroutine_obj = request('http://www.baidu.com')
print(coroutine_obj)

# 创建一个事件循环对象
loop = asyncio.get_event_loop()
# # 将协程对象注册到事件循环中,然后启动事件循环
loop.run_until_complete(coroutine_obj)

使用task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import time


async def request(url):
print("开始请求url", url)
# 模拟请求耗时
time.sleep(3)
print("成功请求url", url)
return url

# async修饰的函数,调用之后返回一个协程对象
coroutine_obj = request('http://www.baidu.com')
print(coroutine_obj)

# 创建一个事件循环对象
loop = asyncio.get_event_loop()
# 基于事件循环创建一个task对象
task = loop.create_task(coroutine_obj)
# 将协程对象注册到事件循环中,然后启动事件循环
loop.run_until_complete(task)

使用feture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import time


async def request(url):
print("开始请求url", url)
# 模拟请求耗时
time.sleep(3)
print("成功请求url", url)
return url

# async修饰的函数,调用之后返回一个协程对象
coroutine_obj = request('http://www.baidu.com')
print(coroutine_obj)

# 创建一个事件循环对象
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine_obj)
# 将协程对象注册到事件循环中,然后启动事件循环
loop.run_until_complete(task)

绑定回调函数

常规使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time


async def request(url):
print("开始请求url", url)
# 模拟请求耗时
time.sleep(3)
print("成功请求url", url)
return url
# 定义回调函数
def callback_func(task):
print("该异步方法运行完成后的返回值为:", task.result())
# async修饰的函数,调用之后返回一个协程对象
coroutine_obj = request('http://www.baidu.com')
print(coroutine_obj)

# 创建一个事件循环对象
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine_obj)
task.add_done_callback(callback_func)
# # 将协程对象注册到事件循环中,然后启动事件循环
loop.run_until_complete(task)

使用task

常规使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import time


async def request(url):
print("开始请求url", url)
# 模拟请求耗时
# 在异步协程中如果出现了同步模块相关的代码,那么就无法实现异步
# time.sleep(3)
# 当在asyncio中遇到阻塞操作必须进行手动挂起
await asyncio.sleep(3)
print("成功请求url", url)
return url

urls = [
'http://www.aaa.com',
'http://www.bbb.com',
'http://www.ccc.com',
'http://www.ddd.com',
]

# 创建一个事件循环对象
loop = asyncio.get_event_loop()
# 异步协程对象列表
tasks = []
for i in urls:
# tasks.append(asyncio.ensure_future(request(i)))
# 使不使用ensure_future结果一样
tasks.append(request(i))
# # 将协程对象注册到事件循环中,然后启动事件循环
loop.run_until_complete(asyncio.wait(tasks))

aiohttp多任务异步协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# @Time    : 2021/9/22 17:29
# @Author : 托小尼
# @Email : 646547989@qq.com
# @URI : https://www.diandian100.cn
# @File : falsk模拟网络接口.py
# @Desc :
import time

from flask import Flask

app = Flask(__name__)


@app.route('/a')
def a():
time.sleep(3)
return '我是aaaaa'


@app.route('/b')
def b():
time.sleep(3)
return '我是bbbbb'


@app.route('/c')
def c():
time.sleep(3)
return '我是ccccc'


@app.route('/d')
def d():
time.sleep(3)
return '我是ddddd'


if __name__ == '__main__':
app.run(debug=True)

aiohttp请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# @Time    : 2021/9/22 17:28
# @Author : 托小尼
# @Email : 646547989@qq.com
# @URI : https://www.diandian100.cn
# @File : aiohttp多任务异步协程.py
# @Desc :aiohttp多任务异步协程
import asyncio
import time

import aiohttp


async def request(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
body = await response.text()
print(body)

tasks = []
urls = [
'http://127.0.0.1:5000/a',
'http://127.0.0.1:5000/b',
'http://127.0.0.1:5000/c',
'http://127.0.0.1:5000/d',
]
start = time.time()
for i in urls:
tasks.append(asyncio.ensure_future(request(i)))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print("总耗时:", time.time()-start)