首页

    asyncio task cancel problem in aiomysql

    标签:python

    asyncio task cancel

    现在查sql用的是asyncio wait/gather批量并发执行,如果有一个sql特别慢,由于短板+累计效应,即使是asyncio,仍然对系统有很大影响,所以想对并发执行的sql设置超时时间,如果sql执行时间超过超时时间,就取消sql任务(可能会有多个)
    这里用wait()方法,参数就可以设置超时时间,当然gather()方法也可以,不过要麻烦些

    import asyncio
    import random
    import time
    
    
    async def coroutine(i):
        t = random.randint(1, 10)/10
        print("worker:%s,cost:%s,start" % (i, t))
    
        await asyncio.sleep(t)
        print("worker:%s,cost:%s" % (i, t))
        return True
    
    
    async def main():
        tasks = [coroutine(i) for i in range(10)]
    
        start = time.time()
        done, pending = await asyncio.wait(tasks, timeout=0.5)
        print('done', len(done))
        print('pending', len(pending))
    
        for t in pending:
            t.cancel()
    
        print('coroutine cost', time.time()-start)
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

    执行结果

    worker:3,cost:0.5,start
    worker:7,cost:0.1,start
    worker:4,cost:0.4,start
    worker:8,cost:1.0,start
    worker:1,cost:0.2,start
    worker:5,cost:0.9,start
    worker:0,cost:0.6,start
    worker:6,cost:1.0,start
    worker:9,cost:0.8,start
    worker:2,cost:0.4,start
    worker:7,cost:0.1
    worker:1,cost:0.2
    worker:4,cost:0.4
    worker:2,cost:0.4
    done 4
    pending 6
    

    可以看到执行时间超过0.5s的task都没有执行
    注意pending里的每个task都要取消

    done, pending = await asyncio.wait(tasks, timeout=0.5)
    for t in pending:
        t.cancel()
    

    否则有可能出现

    worker:5,cost:1.0,start
    worker:2,cost:1.0,start
    worker:0,cost:0.5,start
    worker:6,cost:0.4,start
    worker:9,cost:0.9,start
    worker:3,cost:0.5,start
    worker:7,cost:0.2,start
    worker:4,cost:0.2,start
    worker:8,cost:0.8,start
    worker:1,cost:0.5,start
    worker:7,cost:0.2
    worker:4,cost:0.2
    worker:6,cost:0.4
    done 3
    pending 7
    coroutine cost 0.5051000118255615
    worker:0,cost:0.5
    worker:3,cost:0.5
    worker:1,cost:0.5
    

    部分超时任务还在执行,没有取消

    task CancelledError

    task取消后,会向与之对应的coroutine抛出CancelledError

    To cancel a running Task use the cancel() method. Calling it will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled.

    import asyncio
    import random
    import time
    
    
    async def coroutine(i):
        t = random.randint(1, 10)/10
        print("worker:%s,cost:%s,start" % (i, t))
    
        try:
            await asyncio.sleep(t)
            print("worker:%s,cost:%s" % (i, t))
            return True
        except asyncio.CancelledError:
            print('outer timeout error,worker:', i)
    
    
    async def main():
        tasks = [coroutine(i) for i in range(10)]
    
        start = time.time()
        try:
            done, pending = await asyncio.wait(tasks, timeout=0.5)
    
            print('done', len(done))
            print('pending', len(pending))
    
            for t in pending:
                t.cancel()
        except asyncio.CancelledError:
            print('wait timeout error')
    
        print('coroutine cost', time.time()-start)
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

    执行结果

    worker:3,cost:0.5,start
    worker:7,cost:0.2,start
    worker:9,cost:0.1,start
    worker:6,cost:0.3,start
    worker:0,cost:0.9,start
    worker:4,cost:0.4,start
    worker:8,cost:0.2,start
    worker:1,cost:0.9,start
    worker:5,cost:0.8,start
    worker:2,cost:0.6,start
    worker:9,cost:0.1
    worker:7,cost:0.2
    worker:8,cost:0.2
    worker:6,cost:0.3
    worker:4,cost:0.4
    done 5
    pending 5
    coroutine cost 0.5010318756103516
    outer timeout error,worker: 3
    outer timeout error,worker: 1
    outer timeout error,worker: 0
    outer timeout error,worker: 5
    outer timeout error,worker: 2
    

    可以看到取消的任务都捕获了CancelledError,外面的try except则没捕获异常,说明wait()方法超时不会抛出异常

    aiomysql cancel problem

    工作中,上面的task是执行aiomysql的task,抛出错误Cancelled during execution,然后过了一下,后面的sql都查不了了。看了下aiomysql源码

    class Connection:
        ...
        async def _read_packet(self, packet_type=MysqlPacket):
            while True:
                try:
                    packet_header = await self._read_bytes(4)
                except asyncio.CancelledError:
                    self._close_on_cancel()
                    raise
            ...
    
        def _close_on_cancel(self):
            self.close()
            self._close_reason = "Cancelled during execution"
    

    可以看到,aiomysql在读取mysql报文的时候,如果得知要取消sql查询(捕获到CancelledError),会关闭读取连接,然后又raise抛出CancelledError.而我们在aiomysql之上又封装了一层

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.commit()
        await self._cur.close()
        await self.pool.release(self._conn)
    

    如果aiomysql抛出CancelledError,就执行不到

    await self.pool.release(self._conn)
    

    连接池回收不了当前连接,当到达连接池的最大连接数量后,后面的查询就获取不到连接了.知道这点就很好改了

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        try:
            await self.commit()
            await self._cur.close()
        except Exception as e:
            raise e
        finally:
            await self.pool.release(self._conn)