97 lines
4 KiB
Python
97 lines
4 KiB
Python
import errno
|
|
import logging
|
|
import select
|
|
import threading
|
|
|
|
from ..base_server import BaseServer
|
|
|
|
logger = logging.getLogger('event_socket_server')
|
|
|
|
if not hasattr(select, 'poll'):
|
|
raise ImportError('System does not support poll')
|
|
|
|
|
|
class PollServer(BaseServer):
|
|
poll = select.poll
|
|
WRITE = select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP
|
|
READ = select.POLLIN | select.POLLERR | select.POLLHUP
|
|
POLLIN = select.POLLIN
|
|
POLLOUT = select.POLLOUT
|
|
POLL_CLOSE = select.POLLERR | select.POLLHUP
|
|
NEED_CLOSE = False
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(PollServer, self).__init__(*args, **kwargs)
|
|
self._poll = self.poll()
|
|
self._fdmap = {}
|
|
self._server_fds = {sock.fileno(): sock for sock in self._servers}
|
|
self._close_lock = threading.RLock()
|
|
|
|
def _register_write(self, client):
|
|
logger.debug('On write mode: %s', client.client_address)
|
|
self._poll.modify(client.fileno(), self.WRITE)
|
|
|
|
def _register_read(self, client):
|
|
logger.debug('On read mode: %s', client.client_address)
|
|
self._poll.modify(client.fileno(), self.READ)
|
|
|
|
def _clean_up_client(self, client, finalize=False):
|
|
logger.debug('Taking close lock: cleanup')
|
|
with self._close_lock:
|
|
logger.debug('Cleaning up client: %s, finalize: %d', client.client_address, finalize)
|
|
fd = client.fileno()
|
|
try:
|
|
self._poll.unregister(fd)
|
|
except IOError as e:
|
|
if e.errno != errno.ENOENT:
|
|
raise
|
|
except KeyError:
|
|
pass
|
|
del self._fdmap[fd]
|
|
super(PollServer, self)._clean_up_client(client, finalize)
|
|
|
|
def _serve(self):
|
|
for fd, sock in self._server_fds.items():
|
|
self._poll.register(fd, self.POLLIN)
|
|
sock.listen(16)
|
|
try:
|
|
while not self._stop.is_set():
|
|
for fd, event in self._poll.poll(self._dispatch_event()):
|
|
if fd in self._server_fds:
|
|
client = self._accept(self._server_fds[fd])
|
|
logger.debug('Accepting: %s', client.client_address)
|
|
fd = client.fileno()
|
|
self._poll.register(fd, self.READ)
|
|
self._fdmap[fd] = client
|
|
elif event & self.POLL_CLOSE:
|
|
logger.debug('Client closed: %s', self._fdmap[fd].client_address)
|
|
self._clean_up_client(self._fdmap[fd])
|
|
else:
|
|
logger.debug('Taking close lock: event loop')
|
|
with self._close_lock:
|
|
try:
|
|
client = self._fdmap[fd]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
logger.debug('Client active: %s, read: %d, write: %d',
|
|
client.client_address,
|
|
event & self.POLLIN,
|
|
event & self.POLLOUT)
|
|
if event & self.POLLIN:
|
|
logger.debug('Non-blocking read on client: %s', client.client_address)
|
|
self._nonblock_read(client)
|
|
# Might be closed in the read handler.
|
|
if event & self.POLLOUT and fd in self._fdmap:
|
|
logger.debug('Non-blocking write on client: %s', client.client_address)
|
|
self._nonblock_write(client)
|
|
finally:
|
|
logger.info('Shutting down server')
|
|
self.on_shutdown()
|
|
for client in self._clients:
|
|
self._clean_up_client(client, True)
|
|
for fd, sock in self._server_fds.items():
|
|
self._poll.unregister(fd)
|
|
sock.close()
|
|
if self.NEED_CLOSE:
|
|
self._poll.close()
|