信息发布→ 登录 注册 退出

Python async/await 协程:CPU密集型任务的陷阱与解决方案

发布时间:2025-12-01

点击量:

python async/await 协程:cpu密集型任务的陷阱与解决方案

Python的`async/await`机制旨在通过协程实现并发,但其工作原理在处理CPU密集型任务时常引人困惑。本文将深入探讨为何`await`一个纯计算任务无法交出控制权,而`await asyncio.sleep(0)`却能实现任务切换。我们将剖析事件循环的协作机制,并提供针对CPU密集型任务的正确并发策略,帮助开发者避免常见陷阱。

1. asyncio 与协程概述

asyncio是Python中用于编写并发代码的库,它通过事件循环(event loop)和协程(coroutines)实现单线程内的协作式多任务。async/await语法是定义和等待协程的核心。其设计初衷是为了高效处理I/O密集型任务,例如网络请求、文件读写等。当一个协程遇到I/O操作并使用await关键字时,它会将控制权交还给事件循环,允许事件循环调度其他“准备就绪”的协程运行,从而避免了线程切换的开销,提高了资源利用率。

2. CPU密集型任务与 await 的误区

许多开发者在初次接触asyncio时,会误以为只要在函数前加上async,并在调用时使用await,就能实现任务的并发交替执行。然而,这并非总是如此,尤其是在处理CPU密集型任务时。

考虑以下示例代码:

import asyncio
import time

async def long_function():
    """一个纯粹的CPU密集型任务,不涉及任何I/O或异步操作。"""
    print(f"Task {asyncio.current_task().get_name()}: long_function started...")
    for _ in range(50_000_000): # 大量循环,模拟耗时计算
        pass
    print(f"Task {asyncio.current_task().get_name()}: long_function finished.")

async def count_blocking():
    """包含阻塞性CPU任务的协程。"""
    for x in range(3):
        print(f"Count {x} in {asyncio.current_task().get_name()}")
        await long_function() # 等待一个CPU密集型任务

async def main_blocking():
    """主协程,启动两个阻塞性计数任务。"""
    task1 = asyncio.create_task(count_blocking(), name="Count-A")
    task2 = asyncio.create_task(count_blocking(), name="Count-B")
    await asyncio.gather(task1, task2)

if __name__ == "__main__":
    start_time = time.perf_counter()
    asyncio.run(main_blocking())
    end_time = time.perf_counter()
    print(f"\n总执行时间 (阻塞): {end_time - start_time:.2f} 秒")

运行上述代码,你会发现输出结果是:一个count_blocking协程会完全执行完毕,包括其内部的long_function的所有迭代,然后另一个count_blocking协程才开始执行。输出顺序会是:

Count 0 in Count-A
Task Count-A: long_function started...
Task Count-A: long_function finished.
Count 1 in Count-A
Task Count-A: long_function started...
Task Count-A: long_function finished.
Count 2 in Count-A
Task Count-A: long_function started...
Task Count-A: long_function finished.
Count 0 in Count-B
Task Count-B: long_function started...
Task Count-B: long_function finished.
...

这与我们期望的交替输出(如0、0、1、1...)大相径庭。原因是await关键字本身并不具备“中断”正在执行的函数的能力。它仅仅表示“我正在等待某个异步操作完成,在此期间,你可以去执行其他准备就绪的协程”。而long_function内部是一个纯粹的计算循环,它没有任何I/O操作,也没有主动向事件循环报告它正在“等待”什么。因此,一旦事件循环将控制权交给long_function,它就会一直运行直到计算完成,期间不会释放控制权,从而阻塞了整个事件循环。

3. asyncio.sleep(0) 的作用:显式交出控制权

为了实现CPU密集型任务的协作式并发,我们需要在耗时计算中显式地将控制权交还给事件循环。asyncio.sleep(0)就是实现这一目的的常用技巧。

import asyncio
import time

async def long_function_cooperative():
    """一个协作式的CPU密集型任务,周期性地交出控制权。"""
    task_name = asyncio.current_task().get_name()
    # print(f"Task {task_name}: long_function_cooperative started...")
    for i in range(50_000_000):
        # 每隔一定次数的循环,显式地交出控制权
        if i % 10_000_000 == 0 and i != 0:
            # print(f"Task {task_name}: Yielding at iteration {i}")
            await asyncio.sleep(0) # 关键:交出控制权
    print(f"Task {task_name}: long_function_cooperative finished.")

async def count_cooperative():
    """包含协作性CPU任务的协程。"""
    for x in range(3):
        print(f"Count {x} in {asyncio.current_task().get_name()}")
        await long_function_cooperative() # 等待一个协作性CPU任务

async def main_cooperative():
    """主协程,启动两个协作性计数任务。"""
    task1 = asyncio.create_task(count_cooperative(), name="Count-X")
    task2 = asyncio.create_task(count_cooperative(), name="Count-Y")
    await asyncio.gather(task1, task2)

if __name__ == "__main__":
    start_time = time.perf_counter()
    asyncio.run(main_cooperative())
    end_time = time.perf_counter()
    print(f"\n总执行时间 (协作): {end_time - start_time:.2f} 秒")

现在,运行这段代码,你会看到期望的交替输出:

Count 0 in Count-X
Count 0 in Count-Y
Count 1 in Count-X
Count 1 in Count-Y
Count 2 in Count-X
Count 2 in Count-Y
Task Count-X: long_function_cooperative finished.
Task Count-Y: long_function_cooperative finished.
...

await asyncio.sleep(0)的原理是:它是一个非阻塞的异步操作,告诉事件循环“我暂时不需要CPU,你可以去检查是否有其他协程准备好了”。即使是sleep(0),它也触发了事件循环的调度机制,允许其他等待中的协程获得执行机会。这正是asyncio协作式多任务的核心体现。

Seede AI Seede AI

AI 驱动的设计工具

Seede AI 713 查看详情 Seede AI

4. 事件循环的工作机制

asyncio的事件循环是单线程的,它维护一个任务队列。当一个协程通过await等待一个异步操作(如网络I/O、定时器或asyncio.sleep(0))时,它会暂停执行,并将控制权交还给事件循环。事件循环会检查任务队列,选择下一个“准备就绪”的协程来运行。

  • I/O密集型任务: 当协程等待网络响应时,操作系统会处理网络通信,而Python线程可以去执行其他协程。当网络数据到达时,事件循环会收到通知,然后将等待该数据的协程标记为“准备就绪”,并在合适的时机重新调度它。
  • CPU密集型任务: 如果一个协程正在执行纯粹的CPU计算,它不会自动释放控制权。它会一直占用CPU,直到计算完成。除非它内部显式地调用await一个异步操作(如asyncio.sleep(0)),否则事件循环无法介入并切换到其他协程。

因此,asyncio的并发性是“协作式”的,而不是“抢占式”的。协程必须主动选择何时交出控制权。

5. CPU密集型任务的真正解决方案

虽然asyncio.sleep(0)可以在一定程度上缓解CPU密集型任务的阻塞问题,但它并不能真正实现并行计算,因为asyncio事件循环仍然运行在单个线程中。对于需要充分利用多核CPU的重度CPU密集型任务,真正的解决方案是使用多进程(multiprocessing)。

concurrent.futures模块提供了ProcessPoolExecutor,可以方便地将CPU密集型任务提交到独立的进程中执行,从而绕过Python的全局解释器锁(GIL)限制,实现真正的并行。

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def blocking_cpu_task(task_id, iterations):
    """一个阻塞的CPU密集型函数,适合在进程池中运行。"""
    print(f"Process {task_id}: Starting CPU-bound task with {iterations} iterations...")
    result = 0
    for i in range(iterations):
        result += i # 执行一些计算
    print(f"Process {task_id}: Finished CPU-bound task. Result: {result % 1000}")
    return f"Task {task_id} completed."

async def run_cpu_tasks_with_pool():
    """使用ProcessPoolExecutor异步运行CPU密集型任务。"""
    # 使用ProcessPoolExecutor创建进程池,max_workers=None表示使用CPU核心数
    with ProcessPoolExecutor(max_workers=2) as executor:
        loop = asyncio.get_running_loop()

        # 将CPU密集型任务提交到进程池,并等待其完成
        tasks = [
            loop.run_in_executor(executor, blocking_cpu_task, "Alpha", 50_000_000),
            loop.run_in_executor(executor, blocking_cpu_task, "Beta", 50_000_000)
        ]

        results = await asyncio.gather(*tasks)
        print("\n所有CPU密集型任务通过ProcessPoolExecutor完成:")
        for res in results:
            print(res)

if __name__ == "__main__":
    start_time = time.perf_counter()
    asyncio.run(run_cpu_tasks_with_pool())
    end_time = time.perf_counter()
    print(f"\n总执行时间 (ProcessPoolExecutor): {end_time - start_time:.2f} 秒")

运行此代码,你会看到两个blocking_cpu_task几乎同时开始执行,并且总执行时间会接近单个任务的执行时间,因为它们在不同的CPU核心上并行运行。

注意事项:

  • ThreadPoolExecutor vs ProcessPoolExecutor: concurrent.futures.ThreadPoolExecutor用于线程池。虽然它可以用于将阻塞I/O操作移出主事件循环,但由于GIL的存在,对于纯Python的CPU密集型任务,线程池无法实现真正的并行计算。因此,对于CPU密集型任务,应优先考虑ProcessPoolExecutor。
  • 任务粒度: 如果CPU密集型任务可以被细分为许多小块,并且每小块的执行时间较短,那么在每小块结束后插入await asyncio.sleep(0)可能是一种权宜之计,但它增加了上下文切换的开销。对于长时间运行的、不可中断的CPU计算,使用进程池是更健壮的选择。

6. 总结

asyncio和async/await是Python实现高效并发的强大工具,但它们主要适用于I/O密集型任务。理解其协作式多任务的本质至关重要:

  • await关键字只有在等待一个异步操作(如I/O、定时器或显式地交出控制权)时,才会让出控制权。
  • 纯粹的CPU密集型计算会阻塞整个asyncio事件循环,直到其完成。
  • await asyncio.sleep(0)可以作为一种显式交出控制权的机制,使事件循环有机会调度其他协程。
  • 对于需要真正并行执行的CPU密集型任务,应使用concurrent.futures.ProcessPoolExecutor将任务提交到独立的进程中运行。

正确区分任务类型并选择合适的并发策略,是编写高效、响应迅速的Python异步应用程序的关键。

以上就是Python async/await 协程:CPU密集型任务的陷阱与解决方案的详细内容,更多请关注其它相关文章!


相关文章: win11开机启动修复循环怎么办 Win11无法进入系统高级启动解决方法【修复】  LINUX下如何进行磁盘分区_fdisk与parted工具在LINUX中的使用对比  Sublime怎么配置Nim语言环境_Sublime Nim代码高亮与补全  J*aScript井字棋(Tic-Tac-Toe)核心交互逻辑实现教程  QQ邮箱网页版邮箱入口 QQ邮箱官方登录平台  在WordPress中通过REST API获取BasicAuth保护的远程文章  小米Civi 4录制视频过暗_小米Civi 4亮度优化  使用 Pandas 高效处理 .dat 文件:字符清理与数据计算  yy漫画网页版官方入口_yy漫画官网登录页面链接  Python多线程中正确使用sigwait处理SIGALRM信号  处理动态列数据:J*a ArrayList的正确初始化与字符累加教程  Lar*el Form Request 中唯一性验证更新操作的正确实践  深入理解rpy2中的类型转换:优化Python对象到R矩阵的映射  Python vgamepad库按键模拟:正确使用XUSB_BUTTON常量  解决移动端滚动问题的overflow属性应用指南  在J*a中如何隐藏复杂性_使用门面模式组织对象交互  蛙漫正版漫画平台入口_蛙漫免费阅读全站漫画资源  解决Bootstrap卡片顶部边距导致背景图下移的问题  如何使用CaptainHook和Composer管理Git钩子_在提交前自动运行代码检查的Composer配置  使用Pandas转换并合并DataFrame:多列映射至统一结构  Go语言中动态执行代码字符串的策略与实践  HTML长属性值处理:表单action路径优化与代码规范应对  mysql如何分析事务日志_mysql事务日志分析方法  怎么在html里运行vbs脚本_html中运行vbs脚本方法【教程】  cad如何更改注释性对象的比例_cad注释性比例调整方法  qq游戏跨平台入口_qq游戏多设备同步登录  在VS Code中配置和运行Dart程序的完整步骤  Win11怎么关闭触摸屏_Windows 11禁用HID符合标准触摸屏  如何高效处理PHP中的Excel数据导入导出?PortPHP/Spreadsheet助你轻松搞定!  知乎APP怎么管理已购盐选内容_知乎APP盐选内容购买记录与查看方法  谷歌邮箱网页版官方页面入口 谷歌邮箱网页端快速访问  J*aScript中localStorage数据的获取、清洗与格式化教程  Excel组合图表怎么做 Excel创建柱状图与折线组合图教程【图表】  PHP字符串中复杂变量插值的最佳实践与语法解析  Win11怎么关闭快速启动_Win11彻底关机设置教程  Tabulator表格中精确实现日期时间排序的指南  如何在离线环境中使用Composer_Composer离线安装依赖包的技巧与策略  在J*a中如何在J*a中使用异常机制记录错误日志_异常日志实践经验  蛙漫官网漫画入口地址_蛙漫在线畅读无广告弹窗  163邮箱网页版入口导航平台 163邮箱网页版登录入口官网导航  一加 14R 快充无反应_一加 14R 充电优化  夸克浏览器图书入口 夸克手机浏览器阅读入口  一加手机拍照效果不好怎么办 一加哈苏影像调校与专业模式使用教程【高手篇】  漫蛙官网正版漫画入口 漫蛙2官方网页登录地址  抖音极速版最新版本 抖音极速版官方下载地址  EMS快递官网app_中国邮政速递物流手机客户端  Selenium Python中处理点击后新窗口加载冻结问题的策略与实践  Lar*el Eloquent:基于关联关系是否存在进行父模型过滤与删除  深入理解与实现最大堆的Heapify过程:常见错误与修正  如何在PHP中实现基于MySQL的动态分页查询 

在线客服
服务热线

服务热线

4008988990

微信咨询
二维码
返回顶部
×二维码

截屏,微信识别二维码

打开微信

微信号已复制,请打开微信添加咨询详情!