关于fastapi中await的使用理解

有关async / await的说明,大家可以参考https://fastapi.tiangolo.com/zh/async/ 里的介绍。今天这里主要以几个例子来说明其使用场景。

案例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
app = FastAPI()


@app.middleware("http")
async def TimesMiddleware(request: Request, call_next):
start = time.time()
print(request.url, '开始', start)

response = await call_next(request)
end = time.time()
process_time = end - start
response.headers["X-Process-Time"] = str(process_time)
print(request.url, '结束', end)
return response


@app.get("/")
async def root(q: str):
time.sleep(10)
return {"message": "Hello World-"+q}

上述是一个很简单的带有中间件的简单示例,中间件仅仅统计了接口用时,接口里使用time.sleep模拟了一个阻塞操作。

如上,左边窗口启动了我们的web服务器,右边两个窗口用来模拟两个客户端访问web。

大概流程如下:

  1. 启动web服务器
  2. 客户端1请求web(随后客户端2请求web,被阻塞等待)
  3. 服务器接收到客户端发来的请求,先执行了中间件的请求,开始计时
  4. 执行路由函数,执行sleep
  5. sleep执行完毕,执行中间件的相应,结束计时,并设置响应头,路由函数返回响应给前端
  6. web服务器处理完客户端1的请求开始接收客户端2的请求
  7. 依然是中间件请求-》路由函数sleep-》中间件响应-》路由函数返回响应给前端

从流程上来看,两个客户端请求耗时都是10秒钟,但是客户端1发起请求后,客户端2随即发起了请求,在这里我们任务两次请求无限接近,趋近于同时发起请求,但是web服务器先收到了客户端1的请求,所以要把客户端1的请求全部处理完才会继续处理客户端2的请求,而客户端1和客户端2请求路由处理分别都耗时了10秒钟,但是客户端2同时又要等客户端1处理完才开始,所以客户端2相当于等待了20秒才拿到了结果,综上10个客户端同时发起请求,最后一个要等100秒才能处理完。

案例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
app = FastAPI()


@app.middleware("http")
async def TimesMiddleware(request: Request, call_next):
start = time.time()
print(request.url, '开始', start)

response = await call_next(request)
end = time.time()
process_time = end - start
response.headers["X-Process-Time"] = str(process_time)
print(request.url, '结束', end)
return response


@app.get("/")
async def root(q: str):
await asyncio.sleep(10)
return {"message": "Hello World-"+q}

本次我们采用了异步的方式执行,从视频中我们分析下整个流程:

  1. web服务器启动
  2. 客户端1发起请求(随后客户端2发起请求)
  3. 中间件请求执行,打印客户端1请求开始时间
  4. 执行客户端1请求的路由函数,即异步sleep
  5. 因为是异步的,检测到有阻塞,web服务直接开始处理其他请求
  6. 此时中间件处理客户端2的请求,打印客户端2开始时间
  7. web服务器处理客户端2请求的路由函数,即异步sleep
  8. 遇到阻塞,web服务器处理其他请求
  9. 此时检测到客户端1的请求处理完毕,中间件计算耗时,路由函数返回响应,客户端1的请求处理完毕
  10. 此时检测到客户端2的请求处理完毕,中间件计算耗时,路由函数返回响应,客户端2的请求处理完毕

分析:两次请求耗时时间都是10秒钟;跟案例1的差异是,此时web服务器遇到客户端1请求路由函数里的阻塞操作,随即开始处理客户端2的请求,我们同样假设两次请求无限接近,服务端检测到客户端1阻塞结束了,随即继续执行客户端1的请求,返回响应结束,接着检测到客户端2阻塞任务完成,又继续执行客户端2的请求,返回响应结束。请求开始无限时间一致,处理时间一致,结束时间趋于一致,都是耗时接近10秒钟,这样多个同时发起的请求,耗时基本完全一致,客户体验相对于方案1有了质的提升。

方案3

上述方案还有不完美的地方,不过要看场景,我们这里的代码里路由函数响应结果不依赖于阻塞任务,假设这是一个发送验证码的任务,我们无需让客户端等待发送的过程,我只需要马上告诉客户端发送验证码已经在执行了即可,发送验证码的任务后台执行。

这里我们用asyncio中的create_task处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
app = FastAPI()


@app.middleware("http")
async def TimesMiddleware(request: Request, call_next):
start = time.time()
print(request.url, '开始', start)

response = await call_next(request)
end = time.time()
process_time = end - start
response.headers["X-Process-Time"] = str(process_time)
print(request.url, '结束', end)
return response


@app.get("/")
async def root(q: str):
asyncio.create_task(background_task(q, 10))
return {"message": "Hello World" + q}


async def background_task(name:str, duration: int = 3):
print(f"【{name}】开始执行后台任务", time.time())
await asyncio.sleep(duration)
print(f'【{name}】后台任务执行完成', time.time())

该方案遇到请求中的阻塞直接先去处理其他任务了,所以在客户端响应里可以看到处理时间都是无限接近于0的,而且在响应给客户端后,阻塞任务一直在执行,直到完成。

另外这里单独放出来执行流程,重点可以看阻塞任务的执行时机,都是在接口响应前,让大家看这个主要是为了和fastapi的background task作比较。

方案4

fastapi的后台任务大概做了下述介绍:

你可以定义在返回响应后运行的后台任务。

这对需要在请求之后执行的操作很有用,但客户端不必在接收响应之前等待操作完成。

包括这些例子:

  • 执行操作后发送的电子邮件通知:
    • 由于连接到电子邮件服务器并发送电子邮件往往很“慢”(几秒钟),您可以立即返回响应并在后台发送电子邮件通知。
  • 处理数据:
    • 例如,假设您收到的文件必须经过一个缓慢的过程,您可以返回一个”Accepted”(HTTP 202)响应并在后台处理它。

使用方式也很简单,我们这里主要跟方案3做下对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
app = FastAPI()


@app.middleware("http")
async def TimesMiddleware(request: Request, call_next):
start = time.time()
print(request.url, '开始', start)

response = await call_next(request)
end = time.time()
process_time = end - start
response.headers["X-Process-Time"] = str(process_time)
print(request.url, '结束', end)
return response


@app.get("/")
async def root(q: str, background_tasks: BackgroundTasks):
background_tasks.add_task(mytask, q, 10)
return {"message": "Hello World" + q}


async def mytask(name:str, duration: int = 3):
print(f"【{name}】开始执行后台任务", time.time())
await asyncio.sleep(duration)
print(f'【{name}】后台任务执行完成', time.time())

服务端遇到阻塞同样开始执行其他任务,只是阻塞任务的执行时机不同。

跟方案3比,方案4的执行时机实在客户端响应后开始。