NDOJ/event_socket_server/base_server.py
2020-01-21 15:35:58 +09:00

169 lines
5.3 KiB
Python

import logging
import socket
import threading
import time
from collections import defaultdict, deque
from functools import total_ordering
from heapq import heappop, heappush
logger = logging.getLogger('event_socket_server')
class SendMessage(object):
__slots__ = ('data', 'callback')
def __init__(self, data, callback):
self.data = data
self.callback = callback
@total_ordering
class ScheduledJob(object):
__slots__ = ('time', 'func', 'args', 'kwargs', 'cancel', 'dispatched')
def __init__(self, time, func, args, kwargs):
self.time = time
self.func = func
self.args = args
self.kwargs = kwargs
self.cancel = False
self.dispatched = False
def __eq__(self, other):
return self.time == other.time
def __lt__(self, other):
return self.time < other.time
class BaseServer(object):
def __init__(self, addresses, client):
self._servers = set()
for address, port in addresses:
info = socket.getaddrinfo(address, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
for af, socktype, proto, canonname, sa in info:
sock = socket.socket(af, socktype, proto)
sock.setblocking(0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(sa)
self._servers.add(sock)
self._stop = threading.Event()
self._clients = set()
self._ClientClass = client
self._send_queue = defaultdict(deque)
self._job_queue = []
self._job_queue_lock = threading.Lock()
def _serve(self):
raise NotImplementedError()
def _accept(self, sock):
conn, address = sock.accept()
conn.setblocking(0)
client = self._ClientClass(self, conn)
self._clients.add(client)
return client
def schedule(self, delay, func, *args, **kwargs):
with self._job_queue_lock:
job = ScheduledJob(time.time() + delay, func, args, kwargs)
heappush(self._job_queue, job)
return job
def unschedule(self, job):
with self._job_queue_lock:
if job.dispatched or job.cancel:
return False
job.cancel = True
return True
def _register_write(self, client):
raise NotImplementedError()
def _register_read(self, client):
raise NotImplementedError()
def _clean_up_client(self, client, finalize=False):
try:
del self._send_queue[client.fileno()]
except KeyError:
pass
client.on_close()
client._socket.close()
if not finalize:
self._clients.remove(client)
def _dispatch_event(self):
t = time.time()
tasks = []
with self._job_queue_lock:
while True:
dt = self._job_queue[0].time - t if self._job_queue else 1
if dt > 0:
break
task = heappop(self._job_queue)
task.dispatched = True
if not task.cancel:
tasks.append(task)
for task in tasks:
logger.debug('Dispatching event: %r(*%r, **%r)', task.func, task.args, task.kwargs)
task.func(*task.args, **task.kwargs)
if not self._job_queue or dt > 1:
dt = 1
return dt
def _nonblock_read(self, client):
try:
data = client._socket.recv(1024)
except socket.error:
self._clean_up_client(client)
else:
logger.debug('Read from %s: %d bytes', client.client_address, len(data))
if not data:
self._clean_up_client(client)
else:
try:
client._recv_data(data)
except Exception:
logger.exception('Client recv_data failure')
self._clean_up_client(client)
def _nonblock_write(self, client):
fd = client.fileno()
queue = self._send_queue[fd]
try:
top = queue[0]
cb = client._socket.send(top.data)
top.data = top.data[cb:]
logger.debug('Send to %s: %d bytes', client.client_address, cb)
if not top.data:
logger.debug('Finished sending: %s', client.client_address)
if top.callback is not None:
logger.debug('Calling callback: %s: %r', client.client_address, top.callback)
try:
top.callback()
except Exception:
logger.exception('Client write callback failure')
self._clean_up_client(client)
return
queue.popleft()
if not queue:
self._register_read(client)
del self._send_queue[fd]
except socket.error:
self._clean_up_client(client)
def send(self, client, data, callback=None):
logger.debug('Writing %d bytes to client %s, callback: %s', len(data), client.client_address, callback)
self._send_queue[client.fileno()].append(SendMessage(data, callback))
self._register_write(client)
def stop(self):
self._stop.set()
def serve_forever(self):
self._serve()
def on_shutdown(self):
pass