python3 asyncio wait/gather
标签:python
coroutine
async def coroutine(i):
t = random.randint(1, 10)/10
print("worker:%s,cost:%s" % (i, t))
await asyncio.sleep(t)
if t > 0.4:
print('worker:%s trigger' % (i,))
raise Exception('hehe '+str(i))
else:
print('worker:%s not trigger' % (i,))
return t
生成一个随机数t,如果
- t>0.4,抛出异常
- 否则正常执行
wait
asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
return_when=FIRST_EXCEPTION
这种情况只返回第一个异常,不执行完所有task.
def main():
loop = asyncio.get_event_loop()
tasks = []
for i in range(10):
tasks.append(coroutine(i))
start = time.time()
outer_future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
done, pending = loop.run_until_complete(outer_future)
print('done', len(done))
print('pending', len(pending))
for d in done:
if d.exception() is None:
print('get result', d.result())
else:
print('catch exception', d.exception())
print('coroutine cost', time.time()-start)
worker:0,cost:0.3
worker:1,cost:0.8
worker:2,cost:0.1
worker:3,cost:0.5
worker:4,cost:0.5
worker:5,cost:0.7
worker:6,cost:1.0
worker:7,cost:0.3
worker:8,cost:0.5
worker:9,cost:0.6
worker:2 not trigger
worker:0 not trigger
worker:7 not trigger
worker:3 trigger
worker:4 trigger
worker:8 trigger
done 6
pending 4
get result 0.3
catch exception hehe 4
catch exception hehe 8
get result 0.1
catch exception hehe 3
get result 0.3
coroutine cost 0.5069692134857178
这里捕获了3个异常是因为worker4,worker8,worker3耗时相同
注意tasks里面无论是Future
tasks = []
for i in range(10):
future = asyncio.ensure_future(coroutine(i))
tasks.append(future)
outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
done, pending = loop.run_until_complete(outer_future)
...
还是Coroutine
tasks = []
for i in range(10):
tasks.append(coroutine(i))
outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
done, pending = loop.run_until_complete(outer_future)
...
done和pending都是里面为Task
类型的列表,Task
继承Future
所有方法,除了set_result()
和set_exception()
asyncio.Task inherits from Future all of its APIs except Future.set_result() and Future.set_exception().
因而可以使用result()
,exception()
获取coroutine返回和异常
...
outer_future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
done, pending = loop.run_until_complete(outer_future)
for d in done:
if d.exception() is None:
print('get result', d.result())
else:
print('catch exception', d.exception())
另外,asyncio.wait返回的outer_future
,类型总是coroutine object
,所以outer_future
不能使用future.add_done_callback方法
,绑定所有task均执行完成的回调方法
return_when=ALL_COMPLETED
这种情况会返回所有异常,执行完所有task
def main():
loop = asyncio.get_event_loop()
tasks = []
for i in range(10):
tasks.append(coroutine(i))
start = time.time()
outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
done, pending = loop.run_until_complete(outer_future)
print('done', len(done))
print('pending', len(pending))
for d in done:
if d.exception() is None:
print('get result', d.result())
else:
print('catch exception', d.exception())
print('coroutine cost', time.time()-start)
worker:6,cost:0.5
worker:9,cost:0.4
worker:3,cost:0.3
worker:1,cost:0.9
worker:7,cost:0.8
worker:4,cost:0.8
worker:0,cost:1.0
worker:8,cost:0.3
worker:5,cost:0.9
worker:2,cost:0.7
worker:3 not trigger
worker:8 not trigger
worker:9 not trigger
worker:6 trigger
worker:2 trigger
worker:7 trigger
worker:4 trigger
worker:1 trigger
worker:5 trigger
worker:0 trigger
done 10
pending 0
catch exception hehe 0
catch exception hehe 1
catch exception hehe 6
catch exception hehe 2
get result 0.3
catch exception hehe 7
get result 0.4
catch exception hehe 5
catch exception hehe 4
get result 0.3
coroutine cost 1.003854751586914
获取结果
注意获取结果不能直接调用result(),如果是
- return_when=asyncio.FIRST_EXCEPTION,会抛出第一个异常
outer_future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
done, pending = loop.run_until_complete(outer_future)
for d in done:
print('直接调用', d.result())
worker:9 not trigger
worker:4 not trigger
worker:7 not trigger
worker:2 not trigger
worker:3 trigger
done 5
pending 5
直接调用 0.3
Traceback (most recent call last):
File "asyncio/wait.py", line 66, in <module>
main()
File "asyncio/wait.py", line 56, in main
print('直接调用', d.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 3
- return_when=asyncio.FIRST_EXCEPTION,会抛出所有异常
outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
done, pending = loop.run_until_complete(outer_future)
for d in done:
print('直接调用', d.result())
worker:6,cost:0.1
worker:9,cost:0.4
worker:3,cost:0.9
worker:1,cost:1.0
worker:7,cost:0.3
worker:4,cost:0.2
worker:0,cost:0.1
worker:8,cost:0.8
worker:5,cost:0.1
worker:2,cost:0.8
worker:6 not trigger
worker:0 not trigger
worker:5 not trigger
worker:4 not trigger
worker:7 not trigger
worker:9 not trigger
worker:8 trigger
worker:2 trigger
worker:3 trigger
worker:1 trigger
done 10
pending 0
直接调用 0.1
Traceback (most recent call last):
File "asyncio/wait.py", line 66, in <module>
main()
File "asyncio/wait.py", line 56, in main
print('直接调用', d.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 1
Task exception was never retrieved
future: <Task finished coro=<coroutine() done, defined at asyncio/wait.py:13> exception=Exception('hehe 2')>
Traceback (most recent call last):
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 2
Task exception was never retrieved
future: <Task finished coro=<coroutine() done, defined at asyncio/wait.py:13> exception=Exception('hehe 8')>
Traceback (most recent call last):
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 8
Task exception was never retrieved
future: <Task finished coro=<coroutine() done, defined at asyncio/wait.py:13> exception=Exception('hehe 3')>
Traceback (most recent call last):
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 3
done callback
future设置了done callback,也一样不能直接调用result(),如果
- return_when=asyncio.FIRST_EXCEPTION,会抛出先执行完的异常
def get_result(future):
print('result in callback', future.result())
def main():
...
for i in range(10):
future = asyncio.ensure_future(coroutine(i))
future.add_done_callback(get_result)
tasks.append(future)
outer_future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
...
worker:0,cost:0.4
worker:1,cost:1.0
worker:2,cost:0.3
worker:3,cost:0.1
worker:4,cost:1.0
worker:5,cost:0.8
worker:6,cost:0.8
worker:7,cost:0.3
worker:8,cost:0.4
worker:9,cost:0.2
worker:3 not trigger
result in callback 0.1
worker:9 not trigger
result in callback 0.2
worker:2 not trigger
worker:7 not trigger
result in callback 0.3
result in callback 0.3
worker:0 not trigger
worker:8 not trigger
result in callback 0.4
result in callback 0.4
worker:5 trigger
worker:6 trigger
Exception in callback get_result(<Task finishe...ion('hehe 5')>) at asyncio/wait.py:8
handle: <Handle get_result(<Task finishe...ion('hehe 5')>) at asyncio/wait.py:8>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/wait.py", line 9, in get_result
print('result in callback', future.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 5
Exception in callback get_result(<Task finishe...ion('hehe 6')>) at asyncio/wait.py:8
handle: <Handle get_result(<Task finishe...ion('hehe 6')>) at asyncio/wait.py:8>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/wait.py", line 9, in get_result
print('result in callback', future.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 6
done 8
pending 2
coroutine cost 0.8055620193481445
这里有2个回调异常,因为coroutine-5,coroutine-6的执行时间相同
- return_when=asyncio.ALL_COMPLETED,会抛出所有的异常
def get_result(future):
print('result in callback', future.result())
def main():
...
for i in range(10):
future = asyncio.ensure_future(coroutine(i))
future.add_done_callback(get_result)
tasks.append(future)
outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
...
worker:0,cost:0.2
worker:1,cost:0.4
worker:2,cost:0.1
worker:3,cost:0.9
worker:4,cost:0.9
worker:5,cost:0.1
worker:6,cost:0.8
worker:7,cost:0.5
worker:8,cost:1.0
worker:9,cost:0.6
worker:2 not trigger
worker:5 not trigger
result in callback 0.1
result in callback 0.1
worker:0 not trigger
result in callback 0.2
worker:1 not trigger
result in callback 0.4
worker:7 trigger
worker:9 trigger
worker:6 trigger
worker:3 trigger
worker:4 trigger
worker:8 trigger
Exception in callback get_result(<Task finishe...ion('hehe 7')>) at asyncio/wait.py:8
handle: <Handle get_result(<Task finishe...ion('hehe 7')>) at asyncio/wait.py:8>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/wait.py", line 9, in get_result
print('result in callback', future.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 7
Exception in callback get_result(<Task finishe...ion('hehe 9')>) at asyncio/wait.py:8
handle: <Handle get_result(<Task finishe...ion('hehe 9')>) at asyncio/wait.py:8>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/wait.py", line 9, in get_result
print('result in callback', future.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 9
Exception in callback get_result(<Task finishe...ion('hehe 6')>) at asyncio/wait.py:8
handle: <Handle get_result(<Task finishe...ion('hehe 6')>) at asyncio/wait.py:8>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/wait.py", line 9, in get_result
print('result in callback', future.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 6
Exception in callback get_result(<Task finishe...ion('hehe 3')>) at asyncio/wait.py:8
handle: <Handle get_result(<Task finishe...ion('hehe 3')>) at asyncio/wait.py:8>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/wait.py", line 9, in get_result
print('result in callback', future.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 3
Exception in callback get_result(<Task finishe...ion('hehe 4')>) at asyncio/wait.py:8
handle: <Handle get_result(<Task finishe...ion('hehe 4')>) at asyncio/wait.py:8>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/wait.py", line 9, in get_result
print('result in callback', future.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 4
Exception in callback get_result(<Task finishe...ion('hehe 8')>) at asyncio/wait.py:8
handle: <Handle get_result(<Task finishe...ion('hehe 8')>) at asyncio/wait.py:8>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/wait.py", line 9, in get_result
print('result in callback', future.result())
File "asyncio/wait.py", line 19, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 8
gather
asyncio.gather(*aws, loop=None, return_exceptions=False)
return_exceptions=True
这种情况会返回所有异常,执行完所有task
def main():
loop = asyncio.get_event_loop()
tasks = []
for i in range(10):
future = asyncio.ensure_future(coroutine(i))
tasks.append(future)
start = time.time()
outer_future = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(outer_future)
for t in tasks:
if t.exception() is None:
print('result', t.result())
else:
print('catch exception', t.exception())
print('coroutine cost', time.time()-start)
worker:0,cost:0.2
worker:1,cost:0.2
worker:2,cost:0.7
worker:3,cost:0.5
worker:4,cost:0.6
worker:5,cost:0.1
worker:6,cost:0.3
worker:7,cost:0.2
worker:8,cost:1.0
worker:9,cost:0.4
worker:5 not trigger
worker:0 not trigger
worker:1 not trigger
worker:7 not trigger
worker:6 not trigger
worker:9 not trigger
worker:3 trigger
worker:4 trigger
worker:2 trigger
worker:8 trigger
result 0.2
result 0.2
catch exception hehe 2
catch exception hehe 3
catch exception hehe 4
result 0.1
result 0.3
result 0.2
catch exception hehe 8
result 0.4
coroutine cost 1.0015060901641846
与上面asyncio.wait的tasks
总是Task
类型的列表不同,这里tasks
的列表类型与传入的task有关,如果task是
- coroutine,则tasks是
Coroutine
类型的列表。loop.run_until_complete执行后,只能通过Future获取结果(coroutine的返回结果)
for i in range(10):
tasks.append(coroutine(i))
outer_future = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(outer_future)
for r in outer_future.result():
if type(r) is not Exception:
print('result', r)
else:
print('catch exception', r)
- Task(通过asyncio.ensure_future创建)类型,则tasks是
Task
类型的列表.loop.run_until_complete执行后,获取结果和异常,可以
- 通过Future获取
for i in range(10):
future = asyncio.ensure_future(coroutine(i))
tasks.append(future)
outer_future = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(outer_future)
for r in outer_future.result():
if type(r) is not Exception:
print('result', r)
else:
print('catch exception', r)
- 通过tasks(list类型)获取
...
outer_future = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(outer_future)
for t in tasks:
if t.exception() is None:
print('result', t.result())
else:
print('catch exception', t.exception())
return_exceptions=False
这种情况只返回第一个异常,不执行完所有task.
for i in range(10):
tasks.append(coroutine(i))
outer_future = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(outer_future)
worker:0,cost:0.3
worker:1,cost:0.3
worker:2,cost:0.2
worker:3,cost:0.4
worker:4,cost:0.2
worker:5,cost:0.1
worker:6,cost:0.7
worker:7,cost:0.1
worker:8,cost:1.0
worker:9,cost:0.6
worker:5 not trigger
worker:7 not trigger
worker:2 not trigger
worker:4 not trigger
worker:0 not trigger
worker:1 not trigger
worker:3 not trigger
worker:9 trigger
Traceback (most recent call last):
File "asyncio/gather.py", line 75, in <module>
main()
File "asyncio/gather.py", line 49, in main
loop.run_until_complete(outer_future)
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "asyncio/gather.py", line 23, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 9
获取结果
与asyncio.wait类似,这里获取结果也不能直接调用result(),会抛出第一个异常,无论return_exceptions是True还是False
for t in tasks:
print('直接调用', t.result())
worker:0,cost:1.0
worker:1,cost:0.8
worker:2,cost:0.5
worker:3,cost:0.6
worker:4,cost:0.9
worker:5,cost:0.8
worker:6,cost:0.8
worker:7,cost:0.7
worker:8,cost:0.5
worker:9,cost:0.2
worker:9 not trigger
worker:2 trigger
worker:8 trigger
worker:3 trigger
worker:7 trigger
worker:1 trigger
worker:5 trigger
worker:6 trigger
worker:4 trigger
worker:0 trigger
Traceback (most recent call last):
File "asyncio/gather.py", line 75, in <module>
main()
File "asyncio/gather.py", line 64, in main
print('直接调用', t.result())
File "asyncio/gather.py", line 23, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 0
但是return_exceptions=True
时,通过Future获取不会有问题
outer_future = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(outer_future)
for r in outer_future.result():
if type(r) is not Exception:
print('result', r)
else:
print('catch exception', r)
return_exceptions=False
时还是有问题
done callcack
def done_callback(future):
for r in future.result():
if type(r) is not Exception:
print('callback result', r)
else:
print('callback catch exception', r)
def main():
loop = asyncio.get_event_loop()
tasks = []
for i in range(10):
future = asyncio.ensure_future(coroutine(i))
tasks.append(future)
outer_future = asyncio.gather(*tasks, return_exceptions=True)
outer_future.add_done_callback(done_callback)
loop.run_until_complete(outer_future)
- return_exceptions=True
worker:0,cost:1.0
worker:1,cost:0.4
worker:2,cost:0.1
worker:3,cost:0.4
worker:4,cost:0.6
worker:5,cost:0.2
worker:6,cost:0.8
worker:7,cost:0.1
worker:8,cost:0.2
worker:9,cost:0.6
worker:2 not trigger
worker:7 not trigger
worker:5 not trigger
worker:8 not trigger
worker:1 not trigger
worker:3 not trigger
worker:4 trigger
worker:9 trigger
worker:6 trigger
worker:0 trigger
callback catch exception hehe 0
callback result 0.4
callback result 0.1
callback result 0.4
callback catch exception hehe 4
callback result 0.2
callback catch exception hehe 6
callback result 0.1
callback result 0.2
callback catch exception hehe 9
这里done_callback的future是里面元素为Future的列表
- return_exceptions=False
worker:0,cost:0.8
worker:1,cost:0.4
worker:2,cost:1.0
worker:3,cost:0.7
worker:4,cost:0.4
worker:5,cost:0.3
worker:6,cost:0.3
worker:7,cost:1.0
worker:8,cost:1.0
worker:9,cost:0.4
worker:5 not trigger
worker:6 not trigger
worker:1 not trigger
worker:4 not trigger
worker:9 not trigger
worker:3 trigger
Exception in callback done_callback(<_GatheringFu...ion('hehe 3')>) at asyncio/gather.py:12
handle: <Handle done_callback(<_GatheringFu...ion('hehe 3')>) at asyncio/gather.py:12>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/gather.py", line 15, in done_callback
for r in future.result():
File "asyncio/gather.py", line 28, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 3
Traceback (most recent call last):
File "asyncio/gather.py", line 80, in <module>
main()
File "asyncio/gather.py", line 55, in main
loop.run_until_complete(outer_future)
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "asyncio/gather.py", line 15, in done_callback
for r in future.result():
File "asyncio/gather.py", line 28, in coroutine
raise Exception('hehe '+str(i))
Exception: hehe 3
这里done_callback的future是Future类型
不定期更新