您现在的位置是:网站首页> 编程资料编程资料
Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用_python_
2023-05-26
362人已围观
简介 Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用_python_
前记
上一遍文章《Python中Async语法协程的实现》介绍了Python是如何以生成器来实现协程的以及Python Asyncio通过Future和Task的封装来实现协程的调度,而在Python Asyncio之中Coroutines, Tasks和Future都属于可等待对象,在使用的Asyncio的过程中,经常涉及到三者的转换和调度,开发者容易在概念和作用上犯迷糊,本文主要阐述的是三者之间的关系以及他们的作用。
1.Asyncio的入口
协程是线程中的一种特例,协程的入口和切换都是靠事件循环来调度的,在新版的Python中协程的入口是Asyncio.run,当程序运行到Asyncio.run后,可以简单的理解为程序由线程模式切换为协程模式(只是方便理解,对于计算机而言,并没有这样区分),
以下是一个最小的协程例子代码:
import asyncio async def main(): await asyncio.sleep(0) asyncio.run(main())
在这段代码中,main函数和asyncio.sleep都属于Coroutine,main是通过asyncio.run进行调用的,接下来程序也进入一个协程模式,asyncio.run的核心调用是Runner.run,它的代码如下:
class Runner: ... def run(self, coro, *, context=None): """Run a coroutine inside the embedded event loop.""" # 省略代码 ... # 把coroutine转为task task = self._loop.create_task(coro, context=context) # 省略代码 ... try: # 如果传入的是Future或者coroutine,也会专为task return self._loop.run_until_complete(task) except exceptions.CancelledError: # 省略代码 ...
这段代码中删去了部分其它功能和初始化的代码,可以看到这段函数的主要功能是通过loop.create_task方法把一个Coroutine对象转为一个Task对象,然后通过loop.run_until_complete等待这个Task运行结束。
可以看到,Asycnio并不会直接去调度Coroutine,而是把它转为Task再进行调度,这是因为在Asyncio中事件循环的最小调度对象就是Task。不过在Asyncio中并不是所有的Coroutine的调用都会先被转为Task对象再等待,比如示例代码中的asyncio.sleep,由于它是在main函数中直接await的,所以它不会被进行转换,而是直接等待,通过调用工具分析展示的图如下:

在这个图示中,从main函数到asyncio.sleep函数中没有明显的loop.create_task等把Coroutine转为Task调用,这里之所以不用进行转换的原因不是做了一些特殊优化,而是本因如此, 这个await asyncio.sleep函数实际上还是会被main这个Coroutine转换成的Task继续调度到。
2.两种Coroutine调用方法的区别
在了解Task的调度原理之前,还是先回到最初的调用示例,看看直接用Task调用和直接用Coroutine调用的区别是什么。
如下代码,我们显示的执行一个Coroutine转为Task的操作再等待,那么代码会变成下面这样:
import asyncio async def main(): await asyncio.create_task(asyncio.sleep(0)) asyncio.run(main())
这样的代码看起来跟最初的调用示例很像,没啥区别,但是如果进行一些改变,比如增加一些休眠时间和Coroutine的调用,就能看出Task对象的作用了,现在编写两份文件,
他们的代码如下:
# demo_coro.py import asyncio import time async def main(): await asyncio.sleep(1) await asyncio.sleep(2) s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 3.0028765201568604 # demo_task.py import asyncio import time async def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2 s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 2.0027475357055664
其中demo_coro.py进行了两次await调用,程序的运行总时长为3秒,而demo_task.py则是先把两个Coroutine对象转为Task对象,然后再进行两次await调用,程序的运行总时长为2秒。可以发现,demo_task.py的运行时长近似于其中运行最久的Task对象时长,而demo_coro.py的运行时长则是近似于两个Coroutine对象的总运行时长。
之所以会是这样的结果,是因为直接awaitCoroutine对象时,这段程序会一直等待,直到Coroutine对象执行完毕再继续往下走,而Task对象的不同之处就是在创建的那一刻,就已经把自己注册到事件循环之中等待被安排运行了,然后返回一个task对象供开发者等待,由于asyncio.sleep是一个纯IO类型的调用,所以在这个程序中,两个asyncio.sleepCoroutine被转为Task从而实现了并发调用。
3.Task与Future
上述的代码之所以通过Task能实现并发调用,是因为Task中出现了一些与事件循环交互的函数,正是这些函数架起了Coroutine并发调用的可能, 不过Task是Future的一个子对象,所以在了解Task之前,需要先了解Future。
3.1.Future
与Coroutine只有让步和接收结果不同的是Future除了让步和接收结果功能外,它还是一个只会被动进行事件调用且带有状态的容器,它在初始化时就是Pending状态,这时可以被取消,被设置结果和设置异常。而在被设定对应的操作后,Future会被转化到一个不可逆的对应状态,并通过loop.call_sonn来调用所有注册到本身上的回调函数,同时它带有__iter__和__await__方法使其可以被await和yield from调用,它的主要代码如下:
class Future: ... def set_result(self, result): """设置结果,并安排下一个调用""" if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') self._result = result self._state = _FINISHED self.__schedule_callbacks() def set_exception(self, exception): """设置异常,并安排下一个调用""" if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') if isinstance(exception, type): exception = exception() if type(exception) is StopIteration: raise TypeError("StopIteration interacts badly with generators " "and cannot be raised into a Future") self._exception = exception self._state = _FINISHED self.__schedule_callbacks() self.__log_traceback = True def __await__(self): """设置为blocking,并接受await或者yield from调用""" if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() # May raise too. __iter__ = __await__ # make compatible with 'yield from'.单看这段代码是很难理解为什么下面这个future被调用set_result后就能继续往下走:
async def demo(future: asyncio.Future): await future print("aha")这是因为Future跟Coroutine一样,没有主动调度的能力,只能通过Task和事件循环联手被调度。
3.2.Task
Task是Future的子类,除了继承了Future的所有方法,它还多了两个重要的方法__step和__wakeup,通过这两个方法赋予了Task调度能力,这是Coroutine和Future没有的,Task的涉及到调度的主要代码如下(说明见注释):
class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. _log_destroy_pending = True def __init__(self, coro, *, loop=None, name=None, context=None): super().__init__(loop=loop) # 省略部分初始化代码 ... # 托管的coroutine self._coro = coro if context is None: self._context = contextvars.copy_context() else: self._context = context # 通过loop.call_sonn,在Task初始化后马上就通知事件循环在下次有空的时候执行自己的__step函数 self._loop.call_soon(self.__step, context=self._context) def __step(self, exc=None): coro = self._coro # 方便asyncio自省 _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # 通过send预激托管的coroutine # 这时候只会得到coroutine yield回来的数据或者收到一个StopIteration的异常 # 对于Future或者Task返回的是Self result = coro.send(None) else: # 发送异常给coroutine result = coro.throw(exc) except StopIteration as exc: # StopIteration代表Coroutine运行完毕 if self._must_cancel: # coroutine在停止之前被执行了取消操作,则需要显示的执行取消操作 self._must_cancel = False super().cancel(msg=self._cancel_message) else: # 把运行完毕的值发送到结果值中 super().set_result(exc.value) # 省略其它异常封装 ... else: # 如果没有异常抛出 blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # 通过Future代码可以判断,如果带有_asyncio_future_blocking属性,则代表当前result是Future或者是Task # 意味着这个Task里面裹着另外一个的Future或者Task # 省略Future判断 ... if blocking: # 代表这这个Future或者Task处于卡住的状态, # 此时的Task放弃了自己对事件循环的控制权,等待这个卡住的Future或者Task执行完成时唤醒一下自己 result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel(msg=self._cancel_message): self._must_cancel = False else: # 不能被await两次 new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) elif result is None: # 放弃了对事件循环的控制权,代表自己托管的coroutine可能有个coroutine在运行,接下来会把控制权交给他和事件循环 # 当前的coroutine里面即使没有Future或者Task,但是子Future可能有 self._loop.call_soon(self.__step, context=self._context) finally: _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): # 其它Task和Future完成后会调用到该函数,接下来进行一些处理 try: # 回收Future的状态,如果Future发生了异常,则把异常传回给自己 future.result() except BaseException as exc: # This may also be a cancellation. self.__step(exc) else: # Task并不需要自己托管的Future的结果值,而且如下注释,这样能使调度变得更快 # Don't pass the value of `future.result()` explicitly, # as `Future.__iter__` and `Future.__await__` don't need it. # If we call `_step(value, None)` instead of `_step()`, # Python eval loop would use `.send(value)` method call, # instead of `__next__()`, which is slower for futures # that return non-generator iterators from their `__iter__`. self.__step() self = None # Needed to break cycles when an exception occurs.这份源码的Task对象中的__setp方法比较长,经过精简后可以发现他主要做的工作有三个:
- 1.通过
send或者throw来驱动Coroutine进行下一步 - 2.通过给被自己托管的Future或者Task添加回调来获得完成的通知并重新获取控制权
- 3.通过
loop.call_soon来让步,把控制权交给事件循环
单通过源码分析可能很难明白, 以下是以两种Coroutine的代码为例子,简单的阐述Task与事件循环调度的过程,首先是demo_coro,这个例子中只有一个Task:
# demo_coro.py import asyncio import time async def main(): await asyncio.sleep(1) await asyncio.sleep(2) s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 3.0028765201568604
这个例子中第一步是把
main转为一个Task,然后调用到了对应的__step方法,这时候__step方法会会调用main()这个Coroutine的send(None)方法。
之后整个程序的逻辑会直接转到main函数中的await asyncio.sleep(1)这个Coroutine中,await asy
相关内容
- Python报错:ModuleNotFoundError的解决办法_python_
- Python实现线程池工作模式的案例详解_python_
- python数字图像处理之基本图形的绘制_python_
- python数字图像处理之图像自动阈值分割示例_python_
- Python 读取千万级数据自动写入 MySQL 数据库_python_
- python数字图像处理之图像简单滤波实现_python_
- python实现一个简单的贪吃蛇游戏附代码_python_
- python数字图像处理之对比度与亮度调整示例_python_
- python数字图像处理实现图像的形变与缩放_python_
- 在python中读取和写入CSV文件详情_python_
