系列 · Python 工程实践 · 第 6 篇

Python 工程实践(六):并发编程 —— 线程、进程与 asyncio

深入理解 GIL,掌握 threading、multiprocessing 和 asyncio。学会为 I/O 密集型与 CPU 密集型任务选择最合适的并发模型。

你的脚本一次只下载 100 个文件,每个约耗时 2 秒——绝大部分时间在等待服务器响应,总耗时 200 秒,而 CPU 99% 的时间处于空闲状态。你为网络延迟付费,却白白浪费了计算资源,并发编程正是为了解决这个问题而诞生的。

Python 提供三种并发模型,分别面向不同场景。选错模型不仅无法提速,还可能引发竞态条件。


GIL:它是什么?为何重要?#

全局解释器锁(Global Interpreter Lock,GIL)是保护 Python 对象访问的互斥锁(mutex),即使在多核机器上也仅允许一个线程执行 Python 字节码。

GIL 对并行性的影响

GIL 无法防止什么#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import threading

counter = 0

def increment():
    global counter
    for _ in range(1_000_000):
        counter += 1  # 这不是原子操作!

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)
# 无 GIL:竞态条件,counter < 4_000_000
# 有 GIL:仍有竞态条件!counter < 4_000_000

等等,GIL 并不能防止这个?没错。counter += 1 编译为多个字节码指令(LOAD、ADD、STORE),而 GIL 可能在这些字节码指令执行的间隙被释放。GIL 保护的是解释器内部结构,而非你的应用逻辑。

GIL 实际能做什么#

  • 防止多线程同时破坏 Python 内部数据结构(如引用计数、对象分配)
  • 加速单线程代码执行(避免加锁开销)
  • 在 I/O 操作(文件读写、网络调用、sleep)期间自动释放

GIL 对你的实际影响#

工作负载类型ThreadingMultiprocessing
I/O 密集型(网络、磁盘、数据库)表现良好(I/O 期间 GIL 释放)可行但过度设计
CPU 密集型(数学计算、解析)无加速效果(GIL 阻塞并行执行)表现良好(独立进程,各自拥有 GIL)
混合型取决于 I/O 与 CPU 比例通常更稳妥的选择

未来:Free-Threaded Python(3.13+)#

PEP 703 引入了实验性的无 GIL CPython 构建。从 Python 3.13 开始,你可以安装"free-threaded"构建(python3.13t),实现线程级真正并行执行:

1
2
3
4
5
6
# 安装 free-threaded 构建(实验性)
$ pyenv install 3.13.0t

# 检查 GIL 是否已禁用
$ python3.13t -c "import sys; print(sys._is_gil_enabled())"
False

GIL 禁用后,前面的线程示例在 CPU 密集型任务上确实能获得真正的并行加速。然而截至 2025 年,生态系统仍在适配——许多 C 扩展假设 GIL 存在,可能会崩溃或产生错误结果。目前仅适合实验,不建议用于生产环境。计划在 Python 3.15 或 3.16 中将 free-threading 设为默认。

当前实践建议不变:I/O 用线程,CPU 用进程,高并发 I/O 用 asyncio。

Threading:面向 I/O 密集型任务的并发#

线程共享同一内存空间,开销轻量。由于 GIL 在 I/O 操作期间会释放,因此线程非常适合网络请求、文件操作和数据库查询等场景。

并发模型比较

基础线程用法#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import threading
import time

import requests


def download(url: str) -> int:
    """下载 URL,返回响应体大小。"""
    response = requests.get(url, timeout=10)
    return len(response.content)


urls = [
    "https://httpbin.org/delay/1",
]

# 串行执行:约 4 秒
start = time.perf_counter()
results = [download(url) for url in urls]
sequential_time = time.perf_counter() - start
print(f"Sequential: {sequential_time:.2f}s")

# 多线程:约 1 秒
start = time.perf_counter()
threads = []
for url in urls:
    t = threading.Thread(target=download, args=(url,))
    t.start()
    threads.append(t)
for t in threads:
    t.join()
threaded_time = time.perf_counter() - start
print(f"Threaded: {threaded_time:.2f}s")

输出:

1
2
Sequential: 4.12s
Threaded: 1.08s

ThreadPoolExecutor#

concurrent.futures 提供更高层的 API,并支持结果收集:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests


def download(url: str) -> tuple[str, int]:
    response = requests.get(url, timeout=10)
    return url, len(response.content)


urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/3",
]

with ThreadPoolExecutor(max_workers=4) as executor:
    # 提交全部任务
    futures = {executor.submit(download, url): url for url in urls}

    # 按完成顺序处理结果
    for future in as_completed(futures):
        url = futures[future]
        try:
            result_url, size = future.result()
            print(f"Downloaded {result_url}: {size} bytes")
        except Exception as e:
            print(f"Failed {url}: {e}")

线程安全的数据结构#

当线程共享数据时,需使用锁或线程安全集合:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
from collections import deque

# 用于保护共享状态的锁
lock = threading.Lock()
results = []

def worker(item):
    processed = expensive_computation(item)
    with lock:  # 同一时刻仅一个线程可执行此代码块
        results.append(processed)

# 生产者-消费者模式中使用的队列
from queue import Queue

work_queue: Queue[str] = Queue()
for url in urls:
    work_queue.put(url)

def consumer():
    while True:
        url = work_queue.get()
        if url is None:  # Poison pill(终止信号)
            break
        download(url)
        work_queue.task_done()

# 启动消费者线程
threads = [threading.Thread(target=consumer) for _ in range(4)]
for t in threads:
    t.start()

# 等待所有任务完成
work_queue.join()

# 发送终止信号
for _ in threads:
    work_queue.put(None)
for t in threads:
    t.join()

Multiprocessing:面向 CPU 密集型任务的并发#

每个进程拥有独立的 Python 解释器和 GIL,从而实现在多核 CPU 上真正的并行执行。

并发基准测试

基础用法#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import multiprocessing
import time


def cpu_intensive(n: int) -> int:
    """模拟 CPU 密集型工作。"""
    total = 0
    for i in range(n):
        total += i * i
    return total


numbers = [10_000_000] * 4

# 串行执行
start = time.perf_counter()
results = [cpu_intensive(n) for n in numbers]
print(f"Sequential: {time.perf_counter() - start:.2f}s")

# 并行执行
start = time.perf_counter()
with multiprocessing.Pool(processes=4) as pool:
    results = pool.map(cpu_intensive, numbers)
print(f"Parallel:   {time.perf_counter() - start:.2f}s")

在 4 核机器上的输出:

1
2
Sequential: 8.45s
Parallel:   2.31s

ProcessPoolExecutor#

API 与 ThreadPoolExecutor 完全一致,便于在两者间切换:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ProcessPoolExecutor

def factorize(n: int) -> list[int]:
    """找出 n 的所有因数。"""
    factors = []
    for i in range(1, int(n**0.5) + 1):
        if n % i == 0:
            factors.append(i)
            if i != n // i:
                factors.append(n // i)
    return sorted(factors)


numbers = [112272535095293, 112582705942171, 115280095190773, 115797848077099]

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(factorize, numbers))
    for n, factors in zip(numbers, results):
        print(f"{n}: {factors}")

Multiprocessing 开销#

进程比线程更“重”:

方面ThreadsProcesses
内存共享(轻量)独立(每个进程复制数据)
启动成本~1ms~50–100ms
通信方式直接(共享内存,但需加锁)序列化(通过管道 pickle)
GIL 限制是(CPU 密集型受限)否(独立解释器)
调试难度更难(共享状态引发的 bug)更易(状态隔离)

不要对小任务使用 multiprocessing,因为序列化与进程启动开销可能导致其比串行执行更慢。

进程间共享数据#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import multiprocessing

def worker(shared_array, index, value):
    shared_array[index] = value

if __name__ == "__main__":
    # 共享数组
    arr = multiprocessing.Array("i", 4)  # 4 个整数

    processes = []
    for i in range(4):
        p = multiprocessing.Process(target=worker, args=(arr, i, i * 10))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print(list(arr))  # [0, 10, 20, 30]

注意 if __name__ == "__main__": 守卫语句。在 macOS 和 Windows 上这是必需的,因为 multiprocessing 使用 spawn 方式创建新进程,会重新导入模块。

concurrent.futures:统一 API#

Asyncio 事件循环像一个旋转轮处理协程

concurrent.futures 的精妙之处在于:在线程与进程间切换,只需改动一行代码:

并发决策树

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def process_item(item):
    # ... 执行某些工作 ...
    return result

items = range(100)

# 用于 I/O 密集型任务:
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(process_item, items))

# 用于 CPU 密集型任务(仅改这一行):
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))

超时与异常处理#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from concurrent.futures import ThreadPoolExecutor, TimeoutError

def slow_download(url):
    import time
    time.sleep(10)
    return f"Done: {url}"

with ThreadPoolExecutor(max_workers=2) as executor:
    future = executor.submit(slow_download, "https://example.com")

    try:
        result = future.result(timeout=5)  # 最多等待 5 秒
    except TimeoutError:
        print("Task timed out!")
        future.cancel()
    except Exception as e:
        print(f"Task failed: {e}")

asyncio:协作式并发#

asyncio 基于单线程与事件循环(event loop)运行。协程函数在 await 点主动让出控制权,使其他任务得以运行。无需线程、无需锁、无需担忧 GIL。

Asyncio 事件循环

基础 async/await#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio


async def say_after(delay: float, message: str) -> str:
    """等待后返回消息。"""
    await asyncio.sleep(delay)
    return message


async def main():
    # 串行执行:耗时 3 秒
    result1 = await say_after(1, "hello")
    result2 = await say_after(2, "world")
    print(result1, result2)

    # 并发执行:耗时 2 秒(取最大延迟)
    results = await asyncio.gather(
        say_after(1, "hello"),
        say_after(2, "world"),
    )
    print(results)  # ['hello', 'world']


asyncio.run(main())

创建任务(Tasks)#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio


async def download(url: str) -> str:
    print(f"Start: {url}")
    await asyncio.sleep(1)  # 模拟网络 I/O
    print(f"Done: {url}")
    return f"Content of {url}"


async def main():
    # 创建任务(立即开始执行)
    tasks = [
        asyncio.create_task(download(f"https://example.com/{i}"))
        for i in range(5)
    ]

    # 等待全部完成
    results = await asyncio.gather(*tasks)
    print(f"Downloaded {len(results)} pages")


asyncio.run(main())

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Start: https://example.com/0
Start: https://example.com/1
Start: https://example.com/2
Start: https://example.com/3
Start: https://example.com/4
Done: https://example.com/0
Done: https://example.com/1
Done: https://example.com/2
Done: https://example.com/3
Done: https://example.com/4
Downloaded 5 pages

五个下载任务几乎同时发起,并在约 1 秒后全部完成。

aiohttp:异步 HTTP 客户端#

requests 是同步库。要进行异步 HTTP 请求,请使用 aiohttp

1
(.venv) $ pip install aiohttp
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import time

import aiohttp


async def download(session: aiohttp.ClientSession, url: str) -> int:
    async with session.get(url) as response:
        content = await response.read()
        return len(content)


async def main():
    urls = [f"https://httpbin.org/delay/1" for _ in range(10)]

    async with aiohttp.ClientSession() as session:
        tasks = [download(session, url) for url in urls]
        start = time.perf_counter()
        results = await asyncio.gather(*tasks)
        elapsed = time.perf_counter() - start

    print(f"Downloaded {len(results)} URLs in {elapsed:.2f}s")
    print(f"Total bytes: {sum(results)}")


asyncio.run(main())

输出:

1
2
Downloaded 10 URLs in 1.15s
Total bytes: 4230

10 个各延迟 1 秒的 URL,在略超 1 秒内全部完成。

Semaphore:控制并发数量#

不加限制的并发可能压垮服务端或触发速率限制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import aiohttp


async def download(
    session: aiohttp.ClientSession,
    url: str,
    semaphore: asyncio.Semaphore,
) -> int:
    async with semaphore:  # 最多 N 个并发下载
        async with session.get(url) as response:
            content = await response.read()
            return len(content)


async def main():
    urls = [f"https://httpbin.org/delay/1" for _ in range(100)]
    semaphore = asyncio.Semaphore(10)  # 最多 10 个并发请求

    async with aiohttp.ClientSession() as session:
        tasks = [download(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
    print(f"Downloaded {len(results)} URLs")


asyncio.run(main())

asyncio 中的超时处理#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio


async def slow_operation():
    await asyncio.sleep(10)
    return "done"


async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=3.0)
    except asyncio.TimeoutError:
        print("Operation timed out after 3 seconds")


asyncio.run(main())

现代 asyncio 模式(Python 3.11+)#

Python 3.9–3.11 引入了三个从根本上改善异步代码的特性。如果你使用 3.11+,优先选择这些新模式。

asyncio.to_thread():无样板代码运行阻塞操作#

Python 3.9 之前,在异步上下文中运行阻塞代码需要 loop.run_in_executor()。现在有了更简洁的方式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import time


def blocking_io() -> str:
    """模拟阻塞 I/O 操作(遗留库、文件 I/O 等)。"""
    time.sleep(2)
    return "阻塞调用的结果"


async def main():
    # 旧方式(冗长):
    # loop = asyncio.get_event_loop()
    # result = await loop.run_in_executor(None, blocking_io)

    # 新方式(Python 3.9+):
    result = await asyncio.to_thread(blocking_io)
    print(result)

    # 并发运行多个阻塞调用:
    results = await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.to_thread(blocking_io),
        asyncio.to_thread(blocking_io),
    )
    # 总共约 2 秒,而非 6 秒


asyncio.run(main())

当你需要从异步代码中调用同步库(数据库驱动、文件解析器、遗留 SDK)而不阻塞事件循环时,使用 asyncio.to_thread()

asyncio.TaskGroup:结构化并发#

asyncio.gather() 有一个问题:如果某个任务抛出异常,其他任务会继续运行(或被不一致地取消)。TaskGroup(Python 3.11+)通过结构化并发解决了这个问题——组内所有任务保证在代码块退出前完成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


async def fetch(url: str) -> str:
    await asyncio.sleep(1)
    if "bad" in url:
        raise ValueError(f"无效 URL: {url}")
    return f"内容: {url}"


async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(fetch("https://example.com/a"))
            task2 = tg.create_task(fetch("https://example.com/b"))
            task3 = tg.create_task(fetch("https://example.com/bad"))
    except* ValueError as eg:
        # ExceptionGroup:统一处理所有 ValueError
        for exc in eg.exceptions:
            print(f"捕获: {exc}")
    else:
        print(task1.result(), task2.result(), task3.result())


asyncio.run(main())

gather() 的关键区别:

特性asyncio.gather()asyncio.TaskGroup
失败时取消仅在 return_exceptions=False始终取消剩余任务
异常处理第一个异常传播,其余丢失ExceptionGroup 收集全部
清理保证无——任务可能泄漏有——块退出时所有任务已完成
动态创建任务否(固定列表)是(块内 tg.create_task()
Python 版本3.4+3.11+

Python 3.11+ 的新代码优先使用 TaskGroup 而非 gather() 它能防止困扰 gather() 代码的"发射后不管"类 bug。

gather() 中的异常处理#

如果需要支持 Python < 3.11,显式处理 gather() 中的失败:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio


async def risky_download(url: str) -> str:
    await asyncio.sleep(1)
    if "fail" in url:
        raise ConnectionError(f"无法连接 {url}")
    return f"OK: {url}"


async def main():
    urls = ["https://a.com", "https://fail.com", "https://b.com"]

    # 方案 1:return_exceptions=True(收集全部,手动检查)
    results = await asyncio.gather(
        *[risky_download(url) for url in urls],
        return_exceptions=True,
    )
    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"失败 {url}: {result}")
        else:
            print(f"成功 {url}: {result}")


asyncio.run(main())

输出:

1
2
3
成功 https://a.com: OK: https://a.com
失败 https://fail.com: 无法连接 https://fail.com
成功 https://b.com: OK: https://b.com

生产环境模式#

指数退避重试#

网络请求不可避免会失败。成熟的重试机制使用指数退避来避免压垮下游服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import random
from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")

async def retry_with_backoff(
    func: Callable[[], Awaitable[T]],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
) -> T:
    """带指数退避和抖动的重试。"""
    for attempt in range(max_retries + 1):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            await asyncio.sleep(delay + jitter)

# 使用示例
async def fetch_data():
    result = await retry_with_backoff(
        lambda: api_client.get("/unstable-endpoint"),
        max_retries=5,
        base_delay=0.5,
    )

关键设计要点:

  • 指数增长:每次失败后等待时间翻倍,避免短时间内大量重试
  • 随机抖动:防止"惊群效应"——多个客户端在同一时刻同时重试
  • 最大延迟上限:防止退避时间无限增长

令牌桶限速器#

当调用有 QPS 限制的外部 API 时,令牌桶算法是最灵活的限速方案:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
import time

class TokenBucket:
    """异步令牌桶限速器。"""

    def __init__(self, rate: float, capacity: int):
        self.rate = rate          # 每秒补充令牌数
        self.capacity = capacity  # 桶容量
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, tokens: int = 1):
        async with self._lock:
            self._refill()
            while self.tokens < tokens:
                wait = (tokens - self.tokens) / self.rate
                await asyncio.sleep(wait)
                self._refill()
            self.tokens -= tokens

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_refill = now

# 使用:限制为每秒 10 次请求
limiter = TokenBucket(rate=10, capacity=10)

async def rate_limited_request(url: str):
    await limiter.acquire()
    return await http_client.get(url)

优雅关闭#

生产服务需要在接收到终止信号时干净地关闭——完成进行中的请求,释放资源:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import signal

class GracefulShutdown:
    def __init__(self):
        self._shutdown_event = asyncio.Event()

    def install_handlers(self):
        loop = asyncio.get_running_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(sig, self._shutdown_event.set)

    async def wait(self):
        await self._shutdown_event.wait()

async def main():
    shutdown = GracefulShutdown()
    shutdown.install_handlers()

    workers = [asyncio.create_task(worker(i)) for i in range(4)]

    # 等待关闭信号
    await shutdown.wait()
    print("收到关闭信号,等待任务完成...")

    # 给 worker 时间完成当前工作
    for w in workers:
        w.cancel()
    await asyncio.gather(*workers, return_exceptions=True)
    print("干净关闭完成")

异步生产者-消费者#

asyncio.Queue 天然适合解耦数据生产和消费,实现背压控制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
from dataclasses import dataclass

@dataclass
class WorkItem:
    url: str
    priority: int = 0

async def producer(queue: asyncio.Queue, urls: list[str]):
    for url in urls:
        await queue.put(WorkItem(url=url))
    # 发送毒丸通知消费者退出
    await queue.put(None)

async def consumer(queue: asyncio.Queue, consumer_id: int):
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # 传递给下一个消费者
            break
        try:
            result = await process(item.url)
            print(f"消费者 {consumer_id}: 处理完成 {item.url}")
        except Exception as e:
            print(f"消费者 {consumer_id}: 处理失败 {item.url}: {e}")
        finally:
            queue.task_done()

async def pipeline(urls: list[str], num_consumers: int = 5):
    queue: asyncio.Queue = asyncio.Queue(maxsize=100)  # 背压:最多缓冲100项

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, urls))
        for i in range(num_consumers):
            tg.create_task(consumer(queue, i))

maxsize=100 提供背压:当队列满时,生产者会自动暂停,防止内存无限增长。

测试异步代码#

使用 pytest-asyncio#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import pytest
import asyncio

@pytest.mark.asyncio
async def test_fetch_user():
    user = await fetch_user(user_id=42)
    assert user.name == "Alice"

@pytest.mark.asyncio
async def test_concurrent_fetches():
    users = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    assert len(users) == 3

Mock 异步函数#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock_api():
    mock_client = AsyncMock()
    mock_client.get.return_value = {"status": "ok", "data": [1, 2, 3]}

    result = await fetch_data(client=mock_client)
    assert result == [1, 2, 3]
    mock_client.get.assert_called_once_with("/api/data")

@pytest.mark.asyncio
async def test_timeout_handling():
    mock_client = AsyncMock()
    mock_client.get.side_effect = asyncio.TimeoutError()

    with pytest.raises(ServiceUnavailableError):
        await fetch_data(client=mock_client)

如何选择正确的并发模型?#

决策取决于工作负载类型:

I/O 密集型任务(网络、磁盘、数据库):

  • 并发任务数较少(< 50):使用 ThreadPoolExecutor
  • 并发任务数较多(50+):使用 asyncio
  • 简单脚本或一次性任务:ThreadPoolExecutor
  • Web 服务器、长期运行的服务:asyncio

CPU 密集型任务(数学计算、图像处理、文本解析):

  • 始终使用 ProcessPoolExecutormultiprocessing.Pool

混合型工作负载:

  • asyncio 处理 I/O,将 CPU 密集型工作卸载至 ProcessPoolExecutor
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
from concurrent.futures import ProcessPoolExecutor

import aiohttp


def cpu_work(data: bytes) -> dict:
    """CPU 密集型处理(在独立进程中运行)。"""
    # 解析、转换、计算...
    return {"result": len(data)}


async def fetch_and_process(session, url, process_pool):
    """异步获取数据(I/O),再交由进程池处理(CPU)。"""
    async with session.get(url) as response:
        data = await response.read()

    # 将 CPU 工作卸载至进程池
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(process_pool, cpu_work, data)
    return result


async def main():
    urls = [f"https://example.com/{i}" for i in range(20)]

    with ProcessPoolExecutor(max_workers=4) as process_pool:
        async with aiohttp.ClientSession() as session:
            tasks = [
                fetch_and_process(session, url, process_pool)
                for url in urls
            ]
            results = await asyncio.gather(*tasks)

真实基准测试:串行 vs 线程 vs 异步#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""基准测试:用不同并发模型下载 20 个 URL。"""

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

import aiohttp
import requests

URL = "https://httpbin.org/delay/1"
COUNT = 20


def sequential():
    for _ in range(COUNT):
        requests.get(URL, timeout=10)


def threaded():
    def download(url):
        requests.get(url, timeout=10)

    with ThreadPoolExecutor(max_workers=COUNT) as executor:
        list(executor.map(download, [URL] * COUNT))


async def async_download():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for _ in range(COUNT):
            tasks.append(session.get(URL))
        responses = await asyncio.gather(*tasks)
        for r in responses:
            await r.read()
            r.close()


def benchmark(name, func):
    start = time.perf_counter()
    func()
    elapsed = time.perf_counter() - start
    print(f"{name:12s}: {elapsed:.2f}s")


benchmark("Sequential", sequential)
benchmark("Threaded", threaded)
benchmark("Async", lambda: asyncio.run(async_download()))

结果:

1
2
3
Sequential  : 21.34s
Threaded    : 1.18s
Async       : 1.09s

线程与异步均在约 1 秒内完成(等于服务器延迟)。异步方案系统资源占用更低——没有线程栈、没有上下文切换开销。

对比汇总表#

Python GIL 瓶颈:单车道桥,线程等待通过

特性threadingmultiprocessingasyncio
并行类型并发(受 GIL 限制)真正并行协作式并发
最佳适用场景I/O 密集型CPU 密集型I/O 密集型(大量任务)
内存开销低(~8KB/线程)高(~30MB/进程)极低(~1KB/协程)
实际可支撑任务数~100–1000等于 CPU 核心数~10,000+
共享状态是(需加锁)否(需序列化)是(无需锁,单线程)
调试难度难(竞态条件)中等(隔离性有助调试)中等(堆栈跟踪不够清晰)
生态兼容性所有库均可使用所有库均可使用需要异步兼容库
启动成本~1ms~50–100ms~0.01ms
是否受 GIL 影响不适用(单线程)

常见陷阱#

陷阱表现症状解决方案
对 CPU 密集型任务使用线程无性能提升,单核 100% 占用切换至 ProcessPoolExecutor
创建过多线程内存耗尽、上下文切换变慢限制线程池大小(通常 10–50)
忘记 if __name__ == "__main__":macOS/Windows 报 RuntimeError所有 multiprocessing 代码必须加此守卫
同步与异步混用RuntimeError: This event loop is already running仅在顶层调用 asyncio.run()
忘记 await 协程RuntimeWarning: coroutine was never awaited所有异步函数调用必须 await
在异步代码中执行阻塞操作事件循环冻结,其他任务饿死将阻塞操作卸载至线程/进程池
共享状态引发竞态条件数据不一致、偶发 Bug使用锁、队列或不可变数据
死锁程序永久挂起按固定顺序获取锁,使用超时机制

下一步#

你的代码现已具备并发能力且高效运行。但在发布前,还需正确打包。下一篇文章将介绍如何构建可分发的 Python 包、发布到 PyPI、创建 Docker 镜像,并搭建完整的分发流水线。

本系列

Python 工程实践 8 篇

  1. 01 Python 工程实践(一):环境搭建——pyenv、venv 与依赖地狱
  2. 02 Python 工程实践(二):项目结构 —— 从脚本到包
  3. 03 Python 工程实践(三):测试——pytest、Fixture 与信心循环
  4. 04 Python 工程实践(四):类型提示、代码检查与质量保障
  5. 05 Python 工程实践(五):I/O、序列化与数据格式
  6. 06 Python 工程实践(六):并发编程 —— 线程、进程与 asyncio 当前
  7. 07 Python 工程实践(七):打包分发 —— 从 pip install 到 PyPI
  8. 08 Python 工程实践(八):性能优化 —— 性能分析、缓存与适时收手

读有所得?

GitHub 关注我 → 新文周更

GitHub