Python并发系统学习路线第215讲_核心原理与实战案例详解【技巧】

Python并发稳定需避免隐式阻塞、上下文切换失控、资源竞争未加锁及协程混入同步I/O;asyncio.run()不可嵌套调用,因内部检查并拒绝已在运行的事件循环;应顶层调用,内部用await或create_task();跨同步异步边界宜用to_thread()(Py3.9+)或run_in_executor();协程间共享状态必须用asyncio.Lock且按资源独立实例化。

Python 并发不是靠多开几个 threading.Thread 或塞一堆 asyncio.create_task() 就能稳的——真正卡住系统的,往往是隐式阻塞、上下文切换失控、资源竞争没加锁,或者协程里混进了同步 I/O。

为什么 asyncio.run() 不能嵌套调用

这是初学者最常撞上的硬错误。当你在已运行的事件循环里再调用 asyncio.run(),会直接抛出 RuntimeError: asyncio.run() cannot be called from a running event loop

  • 根本原因:asyncio.run() 内部会检查当前线程是否已有运行中的事件循环,有就拒绝启动
  • 常见场景:在 Flask/FastAPI 的请求处理函数中、或某个已由 asyncio.run() 启动的协程里,又写了 asyncio.run(some_coro())
  • 正确做法:顶层用 asyncio.run() 启动;内部一律用 awaitasyncio.create_task();若需从同步代码“切”进异步上下文(如线程中),改用 asyncio.get_event_loop().create_task()(注意循环存在性)或 asyncio.to_thread()(Py3.9+)

concurrent.futures.ThreadPoolExecutorasyncio.to_thread() 怎么选

两者都用于把阻塞操作“搬出”主线程,但适用阶段和控制粒度不同。

  • ThreadPoolExecutor 是传统线程池,适合长期复用、需精细控制线程数(如 max_workers=4)、或要统一管理生命周期(.shutdown(wait=True))的场景
  • asyncio.to_thread()(Py3.9+)更轻量,每次调用自动分配线程并 await 返回,适合零散、偶发的阻塞调用(如读文件、调用旧版同步 SDK)
  • 关键区别:前者不返回协程对象,需配合 loop.run_in_executor() 才能 await;后者直接返回可 await 的协程
import asyncio
import time

错误:直接 await 同步函数 → 主线程卡死

await time.sleep(1)

正确(Py3.9+)

await asyncio.to_thread(time.sleep, 1)

或兼容老版本

loop = asyncio.get_running_loop() await loop.run_in_executor(None, time.sleep, 1)

asyncio.Lock 保护共享状态,但别锁错对象

协程并发下,threading.Lock 完全无效——它只对线程起作用,而多个协程可能跑在同一个线程里。

  • 必须用 asyncio.Lock,且每个需要保护的共享变量/资源应配独立的锁实例
  • 典型错误:所有协程共用一个全局 lock = asyncio.Lock(),结果本该互斥的两个不同资源(比如用户余额和订单号生成器)被强绑定,造成不必要等待
  • 更隐蔽的问题:在类方法中定义 self.lock = asyncio.Lock(),但忘记 await self.lock.acquire(),或忘了 try/finally 释放
class Counter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()  # 每个实例独享锁
async def increment(self):
    async with self.lock:  # 自动 acquire/release
        self.value += 1
        return self.value

并发真正的复杂点不在语法,而在你是否清楚每一行代码执行时,CPU 在哪、IO 在哪、锁在哪、事件循环有没有被意外阻塞——这些不会报错,但会让吞吐量掉一半、延迟毛刺翻倍、压测时连接池耗尽。留心那些没显式写 await 却实际会阻塞的调用,比如某些数据库驱动的 .execute()、日志库的同步刷盘、甚至 json.loads() 处理超大字符串。