首页

    tornado分析-ioloop asyncio方式实现

    标签:python

    version:5.1

    asyncio方式最简化代码

    import asyncio
    import socket
    
    sock=socket.socket()
    sock.setblocking(False)
    sock.bind(('127.0.0.1', 5500))
    sock.listen(100)
    loop = asyncio.get_event_loop()
    
    def accept(sock):
        conn, addr = sock.accept()
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        loop.add_reader(conn, read,conn)
    
    def read(conn):
        data = conn.recv(1000)
        if data:
            print('echoing', repr(data), 'to', conn)
            conn.send(data)
        else:
            print('closing', conn)
            loop.remove_reader(conn)
            conn.close()
    
    loop.add_reader(sock, accept,sock)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    

    下面就来看看tornado是怎样对应上面的最简化实现

    http server监听

    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(10001)
    

    httpserver.HTTPServer继承tcpserver.TCPServer

    class TCPServer(object):
        def __init__(self, ssl_options=None, max_buffer_size=None,
                     read_chunk_size=None):
            self._sockets = {}   # fd -> socket object
            self._handlers = {}  # fd -> remove_handler callable
        ...
        def listen(self, port, address=""):
            sockets = bind_sockets(port, address=address)
            self.add_sockets(sockets)
    
        def add_sockets(self, sockets):
            for sock in sockets:
                self._sockets[sock.fileno()] = sock
                self._handlers[sock.fileno()] = add_accept_handler(
                    sock, self._handle_connection)
    

    第4行相当于最简化代码的sock=socket.socket(),但是对系统,socket配置做了兼容
    第13行会为_handle_connection方法注册IO read事件

    def add_accept_handler(sock, callback):
        io_loop = IOLoop.current()
        removed = [False]
    
        def accept_handler(fd, events):
            for i in xrange(_DEFAULT_BACKLOG):
                if removed[0]:
                    # The socket was probably closed
                    return
                try:
                    connection, address = sock.accept()
                except socket.error as e:
                    ...
                    raise
                set_close_exec(connection.fileno())
                callback(connection, address)
    
        def remove_handler():
            io_loop.remove_handler(sock)
            removed[0] = True
    
        io_loop.add_handler(sock, accept_handler, IOLoop.READ)
        return remove_handler
    

    _handle_connection方法被装饰了,IO可读时,sock.accept()接收连接,将connection传入_handle_connection方法
    对于io_loop.add_handler方法,asyncio.BaseAsyncIOLoopioloop.PollIOLoop都实现了.实际上,它们都实现了这3个方法

    def add_handler(self, fd, handler, events):
    def update_handler(self, fd, events):
    def remove_handler(self, fd):
    

    fd,handler会以{fd:(fileobj, handler)}形式被保存,上面3个方法是对它们的操作

    class BaseAsyncIOLoop(IOLoop):
        ...
        def add_handler(self, fd, handler, events):
            fd, fileobj = self.split_fd(fd)
            if fd in self.handlers:
                raise ValueError("fd %s added twice" % fd)
            self.handlers[fd] = (fileobj, stack_context.wrap(handler))
            if events & IOLoop.READ:
                self.asyncio_loop.add_reader(
                    fd, self._handle_events, fd, IOLoop.READ)
                self.readers.add(fd)
            if events & IOLoop.WRITE:
                self.asyncio_loop.add_writer(
                    fd, self._handle_events, fd, IOLoop.WRITE)
                self.writers.add(fd)
    
        def _handle_events(self, fd, events):
            fileobj, handler_func = self.handlers[fd]
            handler_func(fileobj, events)
    

    上面这些过程相当于最简化代码里的

    def accept(sock):
        conn, addr = sock.accept()
        print('accepted', conn, 'from', addr)
    
    ...
    loop.add_reader(sock, accept,sock)
    

    继续。_handle_connection方法被触发

    def _handle_connection(self, connection, address):
        ...
        try:
            if self.ssl_options is not None:
                stream = SSLIOStream(connection,
                                     max_buffer_size=self.max_buffer_size,
                                     read_chunk_size=self.read_chunk_size)
            else:
                stream = IOStream(connection,
                                  max_buffer_size=self.max_buffer_size,
                                  read_chunk_size=self.read_chunk_size)
    
            future = self.handle_stream(stream, address)
            ...
    

    connection(socket)传入iostream.IOStream,封装成non-blocking socket

    class IOStream(BaseIOStream):
        def __init__(self, socket, *args, **kwargs):
            self.socket = socket
            self.socket.setblocking(False)
            super(IOStream, self).__init__(*args, **kwargs)
    
    class BaseIOStream(object):
        def __init__(self, max_buffer_size=None,
                     read_chunk_size=None, max_write_buffer_size=None):
            self.io_loop = ioloop.IOLoop.current()
            ...
    

    然后是上面第13行的handle_stream方法

    class HTTPServer(TCPServer, Configurable,httputil.HTTPServerConnectionDelegate):
        def handle_stream(self, stream, address):
            context = _HTTPRequestContext(stream, address,
                                          self.protocol,
                                          self.trusted_downstream)
            conn = HTTP1ServerConnection(
                stream, self.conn_params, context)
            self._connections.add(conn)
            conn.start_serving(self)
    
        def start_request(self, server_conn, request_conn):
            if isinstance(self.request_callback, httputil.HTTPServerConnectionDelegate):
                """
                self.request_callback是
                tornado.web.Application([
                    (r".*", YourHandler)
                ])
                """
                delegate = self.request_callback.start_request(server_conn, request_conn)
            else:
            ...
    
            # 返回web.Application实例
            return delegate
    
    class HTTP1ServerConnection(object):
        def __init__(self, stream, params=None, context=None):
            self.stream = stream
            ...
    
        def start_serving(self, delegate):
            assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
            self._serving_future = self._server_request_loop(delegate)
            # Register the future on the IOLoop so its errors get logged.
            self.stream.io_loop.add_future(self._serving_future,
                                           lambda f: f.result())
    
        @gen.coroutine
        def _server_request_loop(self, delegate):
            try:
                while True:
                    conn = HTTP1Connection(self.stream, False,
                                           self.params, self.context)
                    # delegete是HTTPServer实例
                    # 进入上面start_request方法
                    request_delegate = delegate.start_request(self, conn)
                    try:
                        ret = yield conn.read_response(request_delegate)
                    ...
                    if not ret:
                        return
                    yield gen.moment
            finally:
                delegate.on_close(self)
    

    这里暂时不研究@gen.coroutine,事实上,gen包是tornado基于生成器编写的协程工具包
    上面第23行开始解析请求

    class HTTP1Connection(httputil.HTTPConnection):
        def read_response(self, delegate):
            ...
            return self._read_message(delegate)
    
        @gen.coroutine
        def _read_message(self, delegate):
            need_delegate_close = False
            try:
                header_future = self.stream.read_until_regex(
                    b"\r?\n\r?\n",
                    max_bytes=self.params.max_header_size)
                ...
                start_line, headers = self._parse_headers(header_data)
                ...
    
    class BaseIOStream(object):
        ...
        def read_until_regex(self, regex, callback=None, max_bytes=None):
            ...
            try:
                self._try_inline_read()
            ...
    
        def _try_inline_read(self):
            ...
            # We couldn't satisfy the read inline, so either close the stream
            # or listen for new data.
            if self.closed():
                ...
            else:
                self._add_io_state(ioloop.IOLoop.READ)
    
        def _add_io_state(self, state):
            ...
            if self._state is None:
                self._state = ioloop.IOLoop.ERROR | state
                with stack_context.NullContext():
                    self.io_loop.add_handler(
                        self.fileno(), self._handle_events, self._state)
            elif not self._state & state:
                self._state = self._state | state
                self.io_loop.update_handler(self.fileno(), self._state)
    
    class IOStream(BaseIOStream):
        def __init__(self, socket, *args, **kwargs):
            self.socket = socket
            self.socket.setblocking(False)
            super(IOStream, self).__init__(*args, **kwargs)
    
        def fileno(self):
            return self.socket
    

    _add_io_state方法为self.fileno()添加或修改相应IO事件绑定,self.fileno()是之前sock.accept()接收的连接(也就是connection),相当于最简化代码的

    def accept(sock):
        conn, addr = sock.accept()
        ...
        loop.add_reader(conn, read,conn)
    
    def read(conn):
        ...
    

    不定期更新