Python异步编程入门(一)

协程:单线程;充分提升性能:多核 + 单线程

Python对协程的支持是通过generator实现的。

Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

异步模块:

  • IO: asyncio
  • HTTP: aiohttp

简单示例

  • hello world
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# -*- coding: utf-8 -*-
from datetime import datetime
import asyncio


async def hello():
print("Hello world!", datetime.now())
r = await asyncio.sleep(1)
print("Hello again!", datetime.now())

# 获取EventLoop
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()

输出结果:

1
2
Hello world! 2023-09-18 11:14:14.840318
Hello again! 2023-09-18 11:14:15.840848

asyncio.sleep(1) 也是协程

python 3.7以后可以用asyncio.run()运行

1
2
3
4
5
6
7
8
9
10
11
# -*- coding: utf-8 -*-
from datetime import datetime
import asyncio


async def hello():
print("Hello world!", datetime.now())
r = await asyncio.sleep(1)
print("Hello again!", datetime.now())

asyncio.run(hello())
  • 任务组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# -*- coding: utf-8 -*-
from datetime import datetime
import asyncio
from random import randint
import threading


async def hello():
print("Hello world!", datetime.now(), threading.current_thread())
sleep_time = randint(0, 4)
print(f"sleep time: {sleep_time}", threading.current_thread())
r = await asyncio.sleep(sleep_time)
print("Hello again!", datetime.now(), threading.current_thread())

# 获取EventLoop
loop = asyncio.get_event_loop()
# 执行coroutine
tasks = [hello(), hello(), hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

输出结果:

Hello world! 2023-09-18 14:40:17.230477 <_MainThread(MainThread, started 8127873344)>
sleep time: 3 <_MainThread(MainThread, started 8127873344)>
Hello world! 2023-09-18 14:40:17.230515 <_MainThread(MainThread, started 8127873344)>
sleep time: 1 <_MainThread(MainThread, started 8127873344)>
Hello world! 2023-09-18 14:40:17.230528 <_MainThread(MainThread, started 8127873344)>
sleep time: 3 <_MainThread(MainThread, started 8127873344)>
Hello world! 2023-09-18 14:40:17.230539 <_MainThread(MainThread, started 8127873344)>
sleep time: 2 <_MainThread(MainThread, started 8127873344)>
Hello again! 2023-09-18 14:40:18.231054 <_MainThread(MainThread, started 8127873344)>
Hello again! 2023-09-18 14:40:19.231754 <_MainThread(MainThread, started 8127873344)>
Hello again! 2023-09-18 14:40:20.231809 <_MainThread(MainThread, started 8127873344)>
Hello again! 2023-09-18 14:40:20.231981 <_MainThread(MainThread, started 8127873344)>

使用asyncio.gather()运行任务组

  • gather的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# -*- coding: utf-8 -*-
from datetime import datetime
import asyncio
from random import randint


async def hello():
print("Hello world!", datetime.now())
sleep_time = randint(0, 4)
print(f"sleep time: {sleep_time}")
r = await asyncio.sleep(sleep_time)
print("Hello again!", datetime.now())
return sleep_time


async def group_task():
tasks = [hello(), hello(), hello(), hello()]
await asyncio.gather(*tasks)


asyncio.run(group_task())

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
Hello world! 2023-09-19 13:36:34.779374
sleep time: 2
Hello world! 2023-09-19 13:36:34.779417
sleep time: 4
Hello world! 2023-09-19 13:36:34.779427
sleep time: 1
Hello world! 2023-09-19 13:36:34.779433
sleep time: 0
Hello again! 2023-09-19 13:36:34.779463
Hello again! 2023-09-19 13:36:35.780650
Hello again! 2023-09-19 13:36:36.780536
Hello again! 2023-09-19 13:36:38.780285

使用gather还可以获取返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# -*- coding: utf-8 -*-
from datetime import datetime
import asyncio
from random import randint


async def hello():
print("Hello world!", datetime.now())
sleep_time = randint(0, 4)
r = await asyncio.sleep(sleep_time)
print("Hello again!", datetime.now())
return sleep_time


async def group_task():
tasks = [hello(), hello(), hello(), hello()]
result = await asyncio.gather(*tasks)
for v in result:
print(f"result => {v}")


asyncio.run(group_task())

输出:

1
2
3
4
5
6
7
8
9
10
11
12
Hello world! 2023-09-19 13:54:38.360411
Hello world! 2023-09-19 13:54:38.360461
Hello world! 2023-09-19 13:54:38.360469
Hello world! 2023-09-19 13:54:38.360475
Hello again! 2023-09-19 13:54:39.361758
Hello again! 2023-09-19 13:54:40.361681
Hello again! 2023-09-19 13:54:41.361665
Hello again! 2023-09-19 13:54:42.361748
result => 3
result => 2
result => 4
result => 1
  • 超时

wait_for提供了超时功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio


async def my_job():
print('before sleep')
await asyncio.sleep(2)
print('never print')


async def main():
try:
await asyncio.wait_for(my_job(), timeout=1)
except asyncio.TimeoutError:
print('timeout!')


asyncio.run(main())

输出结果:

1
2
before sleep
timeout!
  • to_thread

event loop 遇到执行时间特别长的代码,有没有 await 能让event loop 转为执行其它工作时,就会造成event loop 阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import threading

from time import sleep, time


def hard_work():
print('thread id:', threading.get_ident())
sleep(10)


async def do_async_job():
hard_work()
await asyncio.sleep(1)
print('job done!')


async def main():
print(time())
task1 = asyncio.create_task(do_async_job())
task2 = asyncio.create_task(do_async_job())
task3 = asyncio.create_task(do_async_job())
await asyncio.gather(task1, task2, task3)
print(time())


asyncio.run(main())

输出结果:

1
2
3
4
5
6
7
8
1695103401.266803
thread id: 8127873344
thread id: 8127873344
thread id: 8127873344
job done!
job done!
job done!
1695103432.283202

为解决某些耗时执行的代码阻塞 event loop的问题,Python 3.9 提供 asyncio.to_thread() 可以将耗时执行的代码丟至 event loop 以外的另 1 个 thread 中执行,每呼叫 1 次就會在 1 个新的 thread.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import threading

from time import sleep, time


def hard_work():
print('thread id:', threading.get_ident())
sleep(10)


async def do_async_job():
await asyncio.to_thread(hard_work)
await asyncio.sleep(1)
print('job done!')


async def main():
print(time())
task1 = asyncio.create_task(do_async_job())
task2 = asyncio.create_task(do_async_job())
task3 = asyncio.create_task(do_async_job())
await asyncio.gather(task1, task2, task3)
print(time())

输出结果如下:

1
2
3
4
5
6
7
8
1695103741.921301
thread id: 6153449472
thread id: 6170275840
thread id: 6187102208
job done!
job done!
job done!
1695103752.9301171
  • aiohttp

https://example.com/ 为示例网站,用于演示HTTP请求。

同步请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests
import time


def make_requests():
resp = requests.get('https://example.com')
print('example.com =>', resp.status_code)


def main():
for _ in range(0, 5):
make_requests()


s_time = time.time()
main()
print(time.time() - s_time)

输出结果:

1
2
3
4
5
6
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
5.089737892150879

使用aiohttp完成HTTP异步请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import aiohttp
import asyncio
import time


def make_requests(session):
return session.get('https://example.com')


async def main():
async with aiohttp.ClientSession() as session:
tasks = []
for _ in range(0, 5):
tasks.append(make_requests(session))

results = await asyncio.gather(*tasks)
for r in results:
print('example.com =>', r.status)


s_time = time.time()
asyncio.run(main())
print(time.time() - s_time)

输出结果:

1
2
3
4
5
6
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
0.8919808864593506
  • 异步生成器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# SuperFastPython.com
# example of asynchronous generator with async for loop
import asyncio


# define an asynchronous generator
async def async_generator():
# normal loop
for i in range(10):
# block to simulate doing work
await asyncio.sleep(1)
# yield the result
yield i


# main coroutine
async def main():
# loop over async generator with async for loop
async for item in async_generator():
print(item)


# execute the asyncio program
asyncio.run(main())

参考网站

  1. Python asyncio 從不會到上路
  2. 协程
  3. 【Python教學】淺談 Coroutine 協程使用方法
欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。

欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。


Python异步编程入门(一)
https://percent4.github.io/Python异步编程入门(一)/
作者
Jclian91
发布于
2024年1月10日
许可协议