同步示例代码

import socket

HOST = 'localhost'
PORT = 8888
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((HOST, PORT))
    s.listen(128)
    while True:
        conn, addr = s.accept()
        print('connected by', addr)
        with conn:
            while 1:
                msg = conn.recv(1024)
                if not msg:
                    break
                conn.sendall(msg)

IO复用的方式进行改造

这里使用 python3 提供的 selectros 来改造它,这个模块封装了操作系统底层提供的 I/O 复用机制(同步非阻塞),比如 linux 上使用了 epoll。通过 I/O 复用机制我们可以监听多个文件描述符的可读写事件并且注册回调函数,拥有更好的并发性能。

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:  # EventLoop
    events = sel.select()
    for key, mask in events:  # key代表selectorKey对象, mask代表read or write方法
        # callback相当于调accept函数
        callback = key.data
        # 获取函数内存地址,加入参数
        # key.fileobj = 文件句柄
        callback(key.fileobj, mask)

实现EventLoop类的改造方案

import selectors
import socket


class EventLoop:
    def __init__(self, selector=None):
        if selector is None:
            selector = selectors.DefaultSelector()
        self.selector = selector

    def run_forever(self):
        while True:  # EventLoop
            event = self.selector.select()
            for key, mask in event:
                if mask == selectors.EVENT_READ:  # 当监听到是读事件
                    callback = key.data  # callback is _on_read or accept
                    callback(key.fileobj)
                else:
                    callback, msg = key.data  # 当监听到是写事件
                    callback(key.fileobj, msg)  # callback is _on_write


class TCPEchoServer:
    def __init__(self, host, port, loop):
        self.host = host
        self.port = port
        self._loop = loop
        self.s = socket.socket()

    def run(self):
        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.s.bind((self.host, self.port))
        self.s.listen(128)
        self.s.setblocking(False)
        self._loop.selector.register(self.s, selectors.EVENT_READ, self._accept)
        self._loop.run_forever()

    def _accept(self, sock):
        conn, addr = sock.accept()
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        self._loop.selector.register(conn, selectors.EVENT_READ, self._on_read)

    def _on_read(self, conn):
        msg = conn.recv(1024)
        if msg:
            print('echoing', repr(msg), 'to', conn)
            self._loop.selector.modify(conn, selectors.EVENT_WRITE, (self._on_write, msg))
        else:
            print('closing', conn)
            self._loop.selector.unregister(conn)
            conn.close()

    def _on_write(self, conn, msg):
        conn.sendall(msg)
        self._loop.selector.modify(conn, selectors.EVENT_READ, self._on_read)


event_loop = EventLoop()
echo_server = TCPEchoServer('localhost', 8888, event_loop)
echo_server.run()

有些人可能会纠结,为何_on_write后面为何要再注册_on_read,其实当你debug之后,会一目了然。

整个调度过程,你可以把eventloop比喻成一个发动机,它需要借一个力去启动,这个力就是_accept

当我们注册了accept到event中后,整个发动机便开始工作,eventloop会遍历event,accetp会对当前的conn注册它的读事件(_on_read),但是当前循环并不会调用它的写事件,直到下一次循环的时候,才会callback上一次conn的写事件,同时再次注册读事件,等待当前的conn是否还有其他的信息,如果有就循环,没有就会对当前的conn进行unregister。然后一整个event循环结束,下次再进入event,accept一个链接,以此往复。

所以总结下来可以归结为如下步骤
第一次(event中只有_accept):

  1. 调用accpet,注册当前conn的on_read

第二次(event中有_accept,上一次conn的on_read)

  1. 调用accept,接受一个新的conn,注册当前conn的on_read
  2. 调用第一次conn的on_read方法,注册第一次conn的on_write

第三次

  1. 调用accpet,接受一个新的conn,注册当前conn的on_read
  2. 调用第二次conn的on_read方法,注册第二次conn的on_write
  3. 调用第一次conn的on_write方法,注册第一个conn的on_read方法(这时这个conn已经没有消息了,将执行这个conn的unregister的方法)

以此往复

那么我们继续深入测试一下只有一个连接,发现第二次循环event的时候,并没有执行到accept里面,这是为什么呢?

我们就需要了解一下什么时候是EVENT_READ,什么时候是EVENT_WRITE了。
对于服务端socket来说,只有当新的一个客户端的conn进来的时候,才能保证当前是可读的状态,也就是这时候才会触发accept到event中。

而当一个链接建立好之后<socket.socket fd=120, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8888), raddr=('127.0.0.1', 7846)> from ('127.0.0.1', 7846)

对于conn来说,客户端必须发送数据,只有服务端接受到完整的消息时,才处于可读就绪状态。也就是数据缓冲区必须有数据可读。
而可写状态是一直就绪的。也就是建立好conn后,只要你注册了可写事件,就一定会回调,只要这个链接不异常断开。

基于future对象的异步模式

上述的EventLoop模型都是通过采用监听,回调的方式获取到异步调用的结果呢?python提供了一个叫做 Future的对象,当异步调用执行完的时候,用来保存它的结果。 Future 对象的 result 用来保存未来的执行结果,setresult 用来设置 result并且运行给 future 对象添加的回调。

这里使用了原生的协程实现方式——yield from。

import selectors
import socket


class Future:  # 自定义future对象
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):  # 设置结果同时,执行回调自己
        self.result = result
        for callback in self._callbacks:
            callback(self)

    def __iter__(self):
        yield self  # 产出自己
        return self.result   # yield from 将把 result 值返回作为 yield from 表达式的值


class Task:
    """管理生成器的执行"""

    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:  # 把当前 future 的结果发送给协程作为 yield from 表达式的值,同时执行到下一个 future 处
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)


class TCPEchoServer:
    def __init__(self, host, port, loop):
        self.host = host
        self.port = port
        self._loop = loop
        self.s = socket.socket()

    def run(self):
        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.s.bind((self.host, self.port))
        self.s.listen(128)
        self.s.setblocking(False)

        while True:
            conn, addr = yield from self.accept()
            msg = yield from self.read(conn)
            if msg:
                yield from self.sendall(conn, msg)
            else:
                conn.close()

    def accept(self):
        f = Future()

        def on_accept():
            conn, addr = self.s.accept()
            print('accepted', conn, 'from', addr)
            conn.setblocking(False)
            f.set_result((conn, addr))  # accept 的 result 是接受连接的新对象 conn, addr

        self._loop.selector.register(self.s, selectors.EVENT_READ, on_accept)
        conn, addr = yield from f  # 委派给 future 对象,直到 future 执行了 socket.accept() 并且把 result 返回
        self._loop.selector.unregister(self.s)
        return conn, addr

    def read(self, conn):
        f = Future()

        def on_read():
            msg = conn.recv(1024)
            f.set_result(msg)

        self._loop.selector.register(conn, selectors.EVENT_READ, on_read)
        msg = yield from f
        return msg

    def sendall(self, conn, msg):
        f = Future()

        def on_write():
            conn.sendall(msg)
            f.set_result(None)
            self._loop.selector.unregister(conn)
            conn.close()

        self._loop.selector.modify(conn, selectors.EVENT_WRITE, on_write)
        yield from f


class EventLoop:
    def __init__(self, selector=None):
        if selector is None:
            selector = selectors.DefaultSelector()
        self.selector = selector

    def create_task(self, coro):
        return Task(coro)

    def run_forever(self):
        while 1:
            events = self.selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()


event_loop = EventLoop()
echo_server = TCPEchoServer('localhost', 8888, event_loop)
task = Task(echo_server.run())
event_loop.run_forever()

性能测试

建立10000个连接的tcp客户端

# Echo client program
import socket
import concurrent.futures
import time

HOST = '127.0.0.1'  # The remote host
PORT = 8888  # The same port as used by the server
time_list = []


def run_time(func):
    def wrapper(i, *args, **kwargs):
        start_time = time.perf_counter()
        func(i)
        end_time = time.perf_counter()
        time_list.append(end_time-start_time)

    return wrapper


@run_time
def send_message(i):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        s.sendall(('hello world {}\n'.format(i)).encode('utf-8'))
        data = s.recv(1024)
        # print('Received', repr(data))


with concurrent.futures.ThreadPoolExecutor(max_workers=128) as executors:
    executors.map(send_message, [i for i in range(10000)])

all_time = 0
for i in time_list:
    all_time += i
print(all_time)

时间结果

同步测试时间: 368.0034764000006
select IO复用:154.46833750000008
异步+IO复用:69.28810920000004

版权声明:本文为原创文章,版权归 heroyf 所有
本文链接: https://www.heroyf.club/2019/11/03/python_coro/


“苹果是给那些为了爱选择死亡的人的奖励”