博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python异步非阻塞IO多路复用Select/Poll/Epoll使用
阅读量:6225 次
发布时间:2019-06-21

本文共 7058 字,大约阅读时间需要 23 分钟。

来源:

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。

下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:

#!/usr/bin/python# -*- coding: utf-8 -*-import selectimport socketimport Queue server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)server.setblocking(False)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)server_address= ('192.168.1.5',8080)server.bind(server_address)server.listen(10) #select轮询等待读socket集合inputs = [server]#select轮询等待写socket集合outputs = []message_queues = {}#select超时时间timeout = 20 while True:    print "等待活动连接......"    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)     if not (readable or writable or exceptional) :        print "select超时无活动连接,重新select...... "        continue;       #循环可读事件    for s in readable :        #如果是server监听的socket        if s is server:            #同意连接            connection, client_address = s.accept()            print "新连接: ", client_address            connection.setblocking(0)            #将连接加入到select可读事件队列            inputs.append(connection)            #新建连接为key的字典,写回读取到的消息            message_queues[connection] = Queue.Queue()        else:            #不是本机监听就是客户端发来的消息            data = s.recv(1024)            if data :                print "收到数据:" , data , "客户端:",s.getpeername()                message_queues[s].put(data)                if s not in outputs:                    #将读取到的socket加入到可写事件队列                    outputs.append(s)            else:                #空白消息,关闭连接                print "关闭连接:", client_address                if s in outputs :                    outputs.remove(s)                inputs.remove(s)                s.close()                del message_queues[s]    for s in writable:        try:            msg = message_queues[s].get_nowait()        except Queue.Empty:            print "连接:" , s.getpeername() , '消息队列为空'            outputs.remove(s)        else:            print "发送数据:" , msg , "到", s.getpeername()            s.send(msg)     for s in exceptional:        print "异常连接:", s.getpeername()        inputs.remove(s)        if s in outputs:            outputs.remove(s)        s.close()        del message_queues[s]
Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:

#!/usr/bin/python# -*- coding: utf-8 -*-import socketimport selectimport Queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setblocking(False)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ("192.168.1.5", 8080)server.bind(server_address)server.listen(5)print  "服务器启动成功,监听IP:" , server_addressmessage_queues = {}#超时,毫秒timeout = 5000#监听哪些事件READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)READ_WRITE = (READ_ONLY|select.POLLOUT)#新建轮询事件对象poller = select.poll()#注册本机监听socket到等待可读事件事件集合poller.register(server,READ_ONLY)#文件描述符到socket映射fd_to_socket = {server.fileno():server,}while True:    print "等待活动连接......"    #轮询注册的事件集合    events = poller.poll(timeout)    if not events:      print "poll超时,无活动连接,重新poll......"      continue    print "有" , len(events), "个新事件,开始处理......"    for fd ,flag in events:        s = fd_to_socket[fd]        #可读事件        if flag & (select.POLLIN | select.POLLPRI) :            if s is server :                #如果socket是监听的server代表有新连接                connection , client_address = s.accept()                print "新连接:" , client_address                connection.setblocking(False)                 fd_to_socket[connection.fileno()] = connection                #加入到等待读事件集合                poller.register(connection,READ_ONLY)                message_queues[connection]  = Queue.Queue()            else :                #接收客户端发送的数据                data = s.recv(1024)                if data:                    print "收到数据:" , data , "客户端:" , s.getpeername()                    message_queues[s].put(data)                    #修改读取到消息的连接到等待写事件集合                    poller.modify(s,READ_WRITE)                else :                    # Close the connection                    print "  closing" , s.getpeername()                    # Stop listening for input on the connection                    poller.unregister(s)                    s.close()                    del message_queues[s]        #连接关闭事件        elif flag & select.POLLHUP :            print " Closing ", s.getpeername() ,"(HUP)"            poller.unregister(s)            s.close()        #可写事件        elif flag & select.POLLOUT :            try:                msg = message_queues[s].get_nowait()            except Queue.Empty:                print s.getpeername() , " queue empty"                poller.modify(s,READ_ONLY)            else :                print "发送数据:" , data , "客户端:" , s.getpeername()                s.send(msg)        #异常事件        elif flag & select.POLLERR:            print "  exception on" , s.getpeername()            poller.unregister(s)            s.close()            del message_queues[s]
Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:
#!/usr/bin/python# -*- coding: utf-8 -*-import socket, selectimport Queue serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ("192.168.1.5", 8080)serversocket.bind(server_address)serversocket.listen(1)print  "服务器启动成功,监听IP:" , server_addressserversocket.setblocking(0)timeout = 10#新建epoll事件对象,后续要监控的事件添加到其中epoll = select.epoll()#添加服务器监听fd到等待读事件集合epoll.register(serversocket.fileno(), select.EPOLLIN)message_queues = {} fd_to_socket = {serversocket.fileno():serversocket,}while True:  print "等待活动连接......"  #轮询注册的事件集合  events = epoll.poll(timeout)  if not events:     print "epoll超时无活动连接,重新轮询......"     continue  print "有" , len(events), "个新事件,开始处理......"  for fd, event in events:     socket = fd_to_socket[fd]     #可读事件     if event & select.EPOLLIN:         #如果活动socket为服务器所监听,有新连接         if socket == serversocket:            connection, address = serversocket.accept()            print "新连接:" , address            connection.setblocking(0)            #注册新连接fd到待读事件集合            epoll.register(connection.fileno(), select.EPOLLIN)            fd_to_socket[connection.fileno()] = connection            message_queues[connection]  = Queue.Queue()         #否则为客户端发送的数据         else:            data = socket.recv(1024)            if data:               print "收到数据:" , data , "客户端:" , socket.getpeername()               message_queues[socket].put(data)               #修改读取到消息的连接到等待写事件集合               epoll.modify(fd, select.EPOLLOUT)     #可写事件     elif event & select.EPOLLOUT:        try:           msg = message_queues[socket].get_nowait()        except Queue.Empty:           print socket.getpeername() , " queue empty"           epoll.modify(fd, select.EPOLLIN)        else :           print "发送数据:" , data , "客户端:" , socket.getpeername()           socket.send(msg)     #关闭事件     elif event & select.EPOLLHUP:        epoll.unregister(fd)        fd_to_socket[fd].close()        del fd_to_socket[fd]epoll.unregister(serversocket.fileno())epoll.close()serversocket.close()

转载地址:http://hofna.baihongyu.com/

你可能感兴趣的文章
作业:实现简单的shell sed替换功能和修改haproxy配置文件
查看>>
spring配置多数据源问题
查看>>
Altium 拼板方法以及 注意的 地方
查看>>
简明Linux命令行笔记:tail
查看>>
PMP考试的过与只是
查看>>
java 监控 收集资料3(收集中)
查看>>
Apache Pulsar中的地域复制,第1篇:概念和功能
查看>>
getRealPath()和getContextPath()的区别
查看>>
Hadoop MapReduce编程 API入门系列之wordcount版本2(六)
查看>>
一个页面标题和过滤输出的解决方案(上)
查看>>
python pip install 出现 OSError: [Errno 1] Operation not permitted
查看>>
oracle12C 重做日志
查看>>
Linux ubuntu lamp安装配置环境phpmyadmin
查看>>
data guard 的部署
查看>>
枚举、模拟、递推
查看>>
sublime text 3安装
查看>>
awk-sed
查看>>
EXTJS4-----前言
查看>>
iOS11里判断Safari浏览器是无痕模式还是正常模式?
查看>>
zookeeper与kafka安装部署及java环境搭建(发布订阅模式)
查看>>