首页

    python3 asyncio wait/gather

    标签:python

    coroutine

    async def coroutine(i):
        t = random.randint(1, 10)/10
        print("worker:%s,cost:%s" % (i, t))
        await asyncio.sleep(t)
        if t > 0.4:
            print('worker:%s trigger' % (i,))
            raise Exception('hehe '+str(i))
        else:
            print('worker:%s not trigger' % (i,))
    
        return t
    

    生成一个随机数t,如果

    • t>0.4,抛出异常
    • 否则正常执行

    wait

    asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

    return_when=FIRST_EXCEPTION

    这种情况只返回第一个异常,不执行完所有task.

    def main():
        loop = asyncio.get_event_loop()
        tasks = []
    
        for i in range(10):
            tasks.append(coroutine(i))
    
        start = time.time()
    
        outer_future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        done, pending = loop.run_until_complete(outer_future)
    
        print('done', len(done))
        print('pending', len(pending))
    
        for d in done:
            if d.exception() is None:
                print('get result', d.result())
            else:
                print('catch exception', d.exception())
    
        print('coroutine cost', time.time()-start)
    
    worker:0,cost:0.3
    worker:1,cost:0.8
    worker:2,cost:0.1
    worker:3,cost:0.5
    worker:4,cost:0.5
    worker:5,cost:0.7
    worker:6,cost:1.0
    worker:7,cost:0.3
    worker:8,cost:0.5
    worker:9,cost:0.6
    
    worker:2 not trigger
    worker:0 not trigger
    worker:7 not trigger
    worker:3 trigger
    worker:4 trigger
    worker:8 trigger
    done 6
    pending 4
    
    get result 0.3
    catch exception hehe 4
    catch exception hehe 8
    get result 0.1
    catch exception hehe 3
    get result 0.3
    coroutine cost 0.5069692134857178
    

    这里捕获了3个异常是因为worker4,worker8,worker3耗时相同

    注意tasks里面无论是Future

         tasks = []
        for i in range(10):
            future = asyncio.ensure_future(coroutine(i))
            tasks.append(future)
    
        outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
        done, pending = loop.run_until_complete(outer_future)
        ...
    

    还是Coroutine

        tasks = []
        for i in range(10):
            tasks.append(coroutine(i))
    
        outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
        done, pending = loop.run_until_complete(outer_future)
        ...
    

    done和pending都是里面为Task类型的列表,Task继承Future所有方法,除了set_result()set_exception()

    asyncio.Task inherits from Future all of its APIs except Future.set_result() and Future.set_exception().

    因而可以使用result(),exception()获取coroutine返回和异常

        ...
        outer_future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        done, pending = loop.run_until_complete(outer_future)
        for d in done:
            if d.exception() is None:
                print('get result', d.result())
            else:
                print('catch exception', d.exception())
    

    另外,asyncio.wait返回的outer_future,类型总是coroutine object,所以outer_future不能使用future.add_done_callback方法,绑定所有task均执行完成的回调方法

    return_when=ALL_COMPLETED

    这种情况会返回所有异常,执行完所有task

    def main():
        loop = asyncio.get_event_loop()
        tasks = []
    
        for i in range(10):
            tasks.append(coroutine(i))
    
        start = time.time()
    
        outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
        done, pending = loop.run_until_complete(outer_future)
    
        print('done', len(done))
        print('pending', len(pending))
        for d in done:
            if d.exception() is None:
                print('get result', d.result())
            else:
                print('catch exception', d.exception())
    
        print('coroutine cost', time.time()-start)
    
    worker:6,cost:0.5
    worker:9,cost:0.4
    worker:3,cost:0.3
    worker:1,cost:0.9
    worker:7,cost:0.8
    worker:4,cost:0.8
    worker:0,cost:1.0
    worker:8,cost:0.3
    worker:5,cost:0.9
    worker:2,cost:0.7
    
    worker:3 not trigger
    worker:8 not trigger
    worker:9 not trigger
    worker:6 trigger
    worker:2 trigger
    worker:7 trigger
    worker:4 trigger
    worker:1 trigger
    worker:5 trigger
    worker:0 trigger
    
    done 10
    pending 0
    
    catch exception hehe 0
    catch exception hehe 1
    catch exception hehe 6
    catch exception hehe 2
    get result 0.3
    catch exception hehe 7
    get result 0.4
    catch exception hehe 5
    catch exception hehe 4
    get result 0.3
    coroutine cost 1.003854751586914
    

    获取结果

    注意获取结果不能直接调用result(),如果是

    • return_when=asyncio.FIRST_EXCEPTION,会抛出第一个异常
        outer_future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        done, pending = loop.run_until_complete(outer_future)
    
        for d in done:
            print('直接调用', d.result())
    
    worker:9 not trigger
    worker:4 not trigger
    worker:7 not trigger
    worker:2 not trigger
    worker:3 trigger
    done 5
    pending 5
    直接调用 0.3
    Traceback (most recent call last):
      File "asyncio/wait.py", line 66, in <module>
        main()
      File "asyncio/wait.py", line 56, in main
        print('直接调用', d.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 3
    
    • return_when=asyncio.FIRST_EXCEPTION,会抛出所有异常
        outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
        done, pending = loop.run_until_complete(outer_future)
    
        for d in done:
            print('直接调用', d.result())
    
    worker:6,cost:0.1
    worker:9,cost:0.4
    worker:3,cost:0.9
    worker:1,cost:1.0
    worker:7,cost:0.3
    worker:4,cost:0.2
    worker:0,cost:0.1
    worker:8,cost:0.8
    worker:5,cost:0.1
    worker:2,cost:0.8
    
    worker:6 not trigger
    worker:0 not trigger
    worker:5 not trigger
    worker:4 not trigger
    worker:7 not trigger
    worker:9 not trigger
    worker:8 trigger
    worker:2 trigger
    worker:3 trigger
    worker:1 trigger
    
    done 10
    pending 0
    
    直接调用 0.1
    Traceback (most recent call last):
      File "asyncio/wait.py", line 66, in <module>
        main()
      File "asyncio/wait.py", line 56, in main
        print('直接调用', d.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 1
    
    Task exception was never retrieved
    future: <Task finished coro=<coroutine() done, defined at asyncio/wait.py:13> exception=Exception('hehe 2')>
    Traceback (most recent call last):
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 2
    
    Task exception was never retrieved
    future: <Task finished coro=<coroutine() done, defined at asyncio/wait.py:13> exception=Exception('hehe 8')>
    Traceback (most recent call last):
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 8
    
    Task exception was never retrieved
    future: <Task finished coro=<coroutine() done, defined at asyncio/wait.py:13> exception=Exception('hehe 3')>
    Traceback (most recent call last):
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 3
    

    done callback

    future设置了done callback,也一样不能直接调用result(),如果

    • return_when=asyncio.FIRST_EXCEPTION,会抛出先执行完的异常
    def get_result(future):
        print('result in callback', future.result())
    
    def main():
        ...
        for i in range(10):
            future = asyncio.ensure_future(coroutine(i))
            future.add_done_callback(get_result)
            tasks.append(future)
    
        outer_future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        ...
    
    worker:0,cost:0.4
    worker:1,cost:1.0
    worker:2,cost:0.3
    worker:3,cost:0.1
    worker:4,cost:1.0
    worker:5,cost:0.8
    worker:6,cost:0.8
    worker:7,cost:0.3
    worker:8,cost:0.4
    worker:9,cost:0.2
    
    worker:3 not trigger
    result in callback 0.1
    worker:9 not trigger
    result in callback 0.2
    worker:2 not trigger
    worker:7 not trigger
    result in callback 0.3
    result in callback 0.3
    worker:0 not trigger
    worker:8 not trigger
    result in callback 0.4
    result in callback 0.4
    worker:5 trigger
    worker:6 trigger
    Exception in callback get_result(<Task finishe...ion('hehe 5')>) at asyncio/wait.py:8
    handle: <Handle get_result(<Task finishe...ion('hehe 5')>) at asyncio/wait.py:8>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/wait.py", line 9, in get_result
        print('result in callback', future.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 5
    
    Exception in callback get_result(<Task finishe...ion('hehe 6')>) at asyncio/wait.py:8
    handle: <Handle get_result(<Task finishe...ion('hehe 6')>) at asyncio/wait.py:8>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/wait.py", line 9, in get_result
        print('result in callback', future.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 6
    
    done 8
    pending 2
    coroutine cost 0.8055620193481445
    

    这里有2个回调异常,因为coroutine-5,coroutine-6的执行时间相同

    • return_when=asyncio.ALL_COMPLETED,会抛出所有的异常
    def get_result(future):
        print('result in callback', future.result())
    
    def main():
        ...
        for i in range(10):
            future = asyncio.ensure_future(coroutine(i))
            future.add_done_callback(get_result)
            tasks.append(future)
    
        outer_future = asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
        ...
    
    worker:0,cost:0.2
    worker:1,cost:0.4
    worker:2,cost:0.1
    worker:3,cost:0.9
    worker:4,cost:0.9
    worker:5,cost:0.1
    worker:6,cost:0.8
    worker:7,cost:0.5
    worker:8,cost:1.0
    worker:9,cost:0.6
    
    worker:2 not trigger
    worker:5 not trigger
    result in callback 0.1
    result in callback 0.1
    worker:0 not trigger
    result in callback 0.2
    worker:1 not trigger
    result in callback 0.4
    worker:7 trigger
    worker:9 trigger
    worker:6 trigger
    worker:3 trigger
    worker:4 trigger
    worker:8 trigger
    
    Exception in callback get_result(<Task finishe...ion('hehe 7')>) at asyncio/wait.py:8
    handle: <Handle get_result(<Task finishe...ion('hehe 7')>) at asyncio/wait.py:8>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/wait.py", line 9, in get_result
        print('result in callback', future.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 7
    
    Exception in callback get_result(<Task finishe...ion('hehe 9')>) at asyncio/wait.py:8
    handle: <Handle get_result(<Task finishe...ion('hehe 9')>) at asyncio/wait.py:8>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/wait.py", line 9, in get_result
        print('result in callback', future.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 9
    
    Exception in callback get_result(<Task finishe...ion('hehe 6')>) at asyncio/wait.py:8
    handle: <Handle get_result(<Task finishe...ion('hehe 6')>) at asyncio/wait.py:8>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/wait.py", line 9, in get_result
        print('result in callback', future.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 6
    
    Exception in callback get_result(<Task finishe...ion('hehe 3')>) at asyncio/wait.py:8
    handle: <Handle get_result(<Task finishe...ion('hehe 3')>) at asyncio/wait.py:8>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/wait.py", line 9, in get_result
        print('result in callback', future.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 3
    
    Exception in callback get_result(<Task finishe...ion('hehe 4')>) at asyncio/wait.py:8
    handle: <Handle get_result(<Task finishe...ion('hehe 4')>) at asyncio/wait.py:8>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/wait.py", line 9, in get_result
        print('result in callback', future.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 4
    
    Exception in callback get_result(<Task finishe...ion('hehe 8')>) at asyncio/wait.py:8
    handle: <Handle get_result(<Task finishe...ion('hehe 8')>) at asyncio/wait.py:8>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/wait.py", line 9, in get_result
        print('result in callback', future.result())
      File "asyncio/wait.py", line 19, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 8
    

    gather

    asyncio.gather(*aws, loop=None, return_exceptions=False)

    return_exceptions=True

    这种情况会返回所有异常,执行完所有task

    def main():
        loop = asyncio.get_event_loop()
        tasks = []
    
        for i in range(10):
            future = asyncio.ensure_future(coroutine(i))
            tasks.append(future)
    
        start = time.time()
        outer_future = asyncio.gather(*tasks, return_exceptions=True)
        loop.run_until_complete(outer_future)
    
        for t in tasks:
            if t.exception() is None:
                print('result', t.result())
            else:
                print('catch exception', t.exception())
    
        print('coroutine cost', time.time()-start)
    
    worker:0,cost:0.2
    worker:1,cost:0.2
    worker:2,cost:0.7
    worker:3,cost:0.5
    worker:4,cost:0.6
    worker:5,cost:0.1
    worker:6,cost:0.3
    worker:7,cost:0.2
    worker:8,cost:1.0
    worker:9,cost:0.4
    
    worker:5 not trigger
    worker:0 not trigger
    worker:1 not trigger
    worker:7 not trigger
    worker:6 not trigger
    worker:9 not trigger
    worker:3 trigger
    worker:4 trigger
    worker:2 trigger
    worker:8 trigger
    
    result 0.2
    result 0.2
    catch exception hehe 2
    catch exception hehe 3
    catch exception hehe 4
    result 0.1
    result 0.3
    result 0.2
    catch exception hehe 8
    result 0.4
    coroutine cost 1.0015060901641846
    

    与上面asyncio.wait的tasks总是Task类型的列表不同,这里tasks的列表类型与传入的task有关,如果task是

    • coroutine,则tasks是Coroutine类型的列表。loop.run_until_complete执行后,只能通过Future获取结果(coroutine的返回结果)
        for i in range(10):
            tasks.append(coroutine(i))
    
        outer_future = asyncio.gather(*tasks, return_exceptions=True)
        loop.run_until_complete(outer_future)
    
        for r in outer_future.result():
            if type(r) is not Exception:
                print('result', r)
            else:
                print('catch exception', r)
    
    • Task(通过asyncio.ensure_future创建)类型,则tasks是Task类型的列表.loop.run_until_complete执行后,获取结果和异常,可以
    1. 通过Future获取
        for i in range(10):
            future = asyncio.ensure_future(coroutine(i))
            tasks.append(future)
    
        outer_future = asyncio.gather(*tasks, return_exceptions=True)
        loop.run_until_complete(outer_future)
    
        for r in outer_future.result():
            if type(r) is not Exception:
                print('result', r)
            else:
                print('catch exception', r)
    
    1. 通过tasks(list类型)获取
        ...
        outer_future = asyncio.gather(*tasks, return_exceptions=True)
        loop.run_until_complete(outer_future)
    
        for t in tasks:
            if t.exception() is None:
                print('result', t.result())
            else:
                print('catch exception', t.exception())
    

    return_exceptions=False

    这种情况只返回第一个异常,不执行完所有task.

        for i in range(10):
            tasks.append(coroutine(i))
    
        outer_future = asyncio.gather(*tasks, return_exceptions=False)
        loop.run_until_complete(outer_future)
    
    worker:0,cost:0.3
    worker:1,cost:0.3
    worker:2,cost:0.2
    worker:3,cost:0.4
    worker:4,cost:0.2
    worker:5,cost:0.1
    worker:6,cost:0.7
    worker:7,cost:0.1
    worker:8,cost:1.0
    worker:9,cost:0.6
    
    worker:5 not trigger
    worker:7 not trigger
    worker:2 not trigger
    worker:4 not trigger
    worker:0 not trigger
    worker:1 not trigger
    worker:3 not trigger
    worker:9 trigger
    Traceback (most recent call last):
      File "asyncio/gather.py", line 75, in <module>
        main()
      File "asyncio/gather.py", line 49, in main
        loop.run_until_complete(outer_future)
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
        return future.result()
      File "asyncio/gather.py", line 23, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 9
    

    获取结果

    与asyncio.wait类似,这里获取结果也不能直接调用result(),会抛出第一个异常,无论return_exceptions是True还是False

        for t in tasks:
            print('直接调用', t.result())
    
    worker:0,cost:1.0
    worker:1,cost:0.8
    worker:2,cost:0.5
    worker:3,cost:0.6
    worker:4,cost:0.9
    worker:5,cost:0.8
    worker:6,cost:0.8
    worker:7,cost:0.7
    worker:8,cost:0.5
    worker:9,cost:0.2
    
    worker:9 not trigger
    worker:2 trigger
    worker:8 trigger
    worker:3 trigger
    worker:7 trigger
    worker:1 trigger
    worker:5 trigger
    worker:6 trigger
    worker:4 trigger
    worker:0 trigger
    
    Traceback (most recent call last):
      File "asyncio/gather.py", line 75, in <module>
        main()
      File "asyncio/gather.py", line 64, in main
        print('直接调用', t.result())
      File "asyncio/gather.py", line 23, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 0
    

    但是return_exceptions=True时,通过Future获取不会有问题

        outer_future = asyncio.gather(*tasks, return_exceptions=False)
        loop.run_until_complete(outer_future)
        for r in outer_future.result():
            if type(r) is not Exception:
                print('result', r)
            else:
                print('catch exception', r)
    

    return_exceptions=False时还是有问题

    done callcack

    def done_callback(future):
        for r in future.result():
            if type(r) is not Exception:
                print('callback result', r)
            else:
                print('callback catch exception', r)
    
    def main():
        loop = asyncio.get_event_loop()
        tasks = []
    
        for i in range(10):
            future = asyncio.ensure_future(coroutine(i))
            tasks.append(future)
    
        outer_future = asyncio.gather(*tasks, return_exceptions=True)
        outer_future.add_done_callback(done_callback)
        loop.run_until_complete(outer_future)
    
    • return_exceptions=True
    worker:0,cost:1.0
    worker:1,cost:0.4
    worker:2,cost:0.1
    worker:3,cost:0.4
    worker:4,cost:0.6
    worker:5,cost:0.2
    worker:6,cost:0.8
    worker:7,cost:0.1
    worker:8,cost:0.2
    worker:9,cost:0.6
    
    worker:2 not trigger
    worker:7 not trigger
    worker:5 not trigger
    worker:8 not trigger
    worker:1 not trigger
    worker:3 not trigger
    worker:4 trigger
    worker:9 trigger
    worker:6 trigger
    worker:0 trigger
    
    callback catch exception hehe 0
    callback result 0.4
    callback result 0.1
    callback result 0.4
    callback catch exception hehe 4
    callback result 0.2
    callback catch exception hehe 6
    callback result 0.1
    callback result 0.2
    callback catch exception hehe 9
    

    这里done_callback的future是里面元素为Future的列表

    • return_exceptions=False
    worker:0,cost:0.8
    worker:1,cost:0.4
    worker:2,cost:1.0
    worker:3,cost:0.7
    worker:4,cost:0.4
    worker:5,cost:0.3
    worker:6,cost:0.3
    worker:7,cost:1.0
    worker:8,cost:1.0
    worker:9,cost:0.4
    
    worker:5 not trigger
    worker:6 not trigger
    worker:1 not trigger
    worker:4 not trigger
    worker:9 not trigger
    worker:3 trigger
    
    Exception in callback done_callback(<_GatheringFu...ion('hehe 3')>) at asyncio/gather.py:12
    handle: <Handle done_callback(<_GatheringFu...ion('hehe 3')>) at asyncio/gather.py:12>
    Traceback (most recent call last):
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/gather.py", line 15, in done_callback
        for r in future.result():
      File "asyncio/gather.py", line 28, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 3
    
    Traceback (most recent call last):
      File "asyncio/gather.py", line 80, in <module>
        main()
      File "asyncio/gather.py", line 55, in main
        loop.run_until_complete(outer_future)
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
        return future.result()
      File "/usr/local/Cellar/python/3.7.2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 88, in _run
        self._context.run(self._callback, *self._args)
      File "asyncio/gather.py", line 15, in done_callback
        for r in future.result():
      File "asyncio/gather.py", line 28, in coroutine
        raise Exception('hehe '+str(i))
    Exception: hehe 3
    

    这里done_callback的future是Future类型


    不定期更新