Update bridge (DMOJ)

This commit is contained in:
cuom1999 2020-07-19 16:27:14 -05:00
parent 77e64f1b85
commit 3629369fba
34 changed files with 770 additions and 2199 deletions

File diff suppressed because one or more lines are too long

20
dmoj_bridge_async.py Normal file
View file

@ -0,0 +1,20 @@
import os
import gevent.monkey # noqa: I100, gevent must be imported here
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dmoj.settings')
gevent.monkey.patch_all()
# noinspection PyUnresolvedReferences
import dmoj_install_pymysql # noqa: E402, F401, I100, I202, imported for side effect
import django # noqa: E402, F401, I100, I202, django must be imported here
django.setup()
# noinspection PyUnresolvedReferences
import django_2_2_pymysql_patch # noqa: E402, I100, F401, I202, imported for side effect
from judge.bridge.daemon import judge_daemon # noqa: E402, I100, I202, django code must be imported here
if __name__ == '__main__':
judge_daemon()

View file

@ -1,11 +0,0 @@
from .base_server import BaseServer
from .engines import *
from .handler import Handler
from .helpers import ProxyProtocolMixin, SizedPacketHandler, ZlibPacketHandler
def get_preferred_engine(choices=('epoll', 'poll', 'select')):
for choice in choices:
if choice in engines:
return engines[choice]
return engines['select']

View file

@ -1,169 +0,0 @@
import logging
import socket
import threading
import time
from collections import defaultdict, deque
from functools import total_ordering
from heapq import heappop, heappush
logger = logging.getLogger('event_socket_server')
class SendMessage(object):
__slots__ = ('data', 'callback')
def __init__(self, data, callback):
self.data = data
self.callback = callback
@total_ordering
class ScheduledJob(object):
__slots__ = ('time', 'func', 'args', 'kwargs', 'cancel', 'dispatched')
def __init__(self, time, func, args, kwargs):
self.time = time
self.func = func
self.args = args
self.kwargs = kwargs
self.cancel = False
self.dispatched = False
def __eq__(self, other):
return self.time == other.time
def __lt__(self, other):
return self.time < other.time
class BaseServer(object):
def __init__(self, addresses, client):
self._servers = set()
for address, port in addresses:
info = socket.getaddrinfo(address, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
for af, socktype, proto, canonname, sa in info:
sock = socket.socket(af, socktype, proto)
sock.setblocking(0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(sa)
self._servers.add(sock)
self._stop = threading.Event()
self._clients = set()
self._ClientClass = client
self._send_queue = defaultdict(deque)
self._job_queue = []
self._job_queue_lock = threading.Lock()
def _serve(self):
raise NotImplementedError()
def _accept(self, sock):
conn, address = sock.accept()
conn.setblocking(0)
client = self._ClientClass(self, conn)
self._clients.add(client)
return client
def schedule(self, delay, func, *args, **kwargs):
with self._job_queue_lock:
job = ScheduledJob(time.time() + delay, func, args, kwargs)
heappush(self._job_queue, job)
return job
def unschedule(self, job):
with self._job_queue_lock:
if job.dispatched or job.cancel:
return False
job.cancel = True
return True
def _register_write(self, client):
raise NotImplementedError()
def _register_read(self, client):
raise NotImplementedError()
def _clean_up_client(self, client, finalize=False):
try:
del self._send_queue[client.fileno()]
except KeyError:
pass
client.on_close()
client._socket.close()
if not finalize:
self._clients.remove(client)
def _dispatch_event(self):
t = time.time()
tasks = []
with self._job_queue_lock:
while True:
dt = self._job_queue[0].time - t if self._job_queue else 1
if dt > 0:
break
task = heappop(self._job_queue)
task.dispatched = True
if not task.cancel:
tasks.append(task)
for task in tasks:
logger.debug('Dispatching event: %r(*%r, **%r)', task.func, task.args, task.kwargs)
task.func(*task.args, **task.kwargs)
if not self._job_queue or dt > 1:
dt = 1
return dt
def _nonblock_read(self, client):
try:
data = client._socket.recv(1024)
except socket.error:
self._clean_up_client(client)
else:
logger.debug('Read from %s: %d bytes', client.client_address, len(data))
if not data:
self._clean_up_client(client)
else:
try:
client._recv_data(data)
except Exception:
logger.exception('Client recv_data failure')
self._clean_up_client(client)
def _nonblock_write(self, client):
fd = client.fileno()
queue = self._send_queue[fd]
try:
top = queue[0]
cb = client._socket.send(top.data)
top.data = top.data[cb:]
logger.debug('Send to %s: %d bytes', client.client_address, cb)
if not top.data:
logger.debug('Finished sending: %s', client.client_address)
if top.callback is not None:
logger.debug('Calling callback: %s: %r', client.client_address, top.callback)
try:
top.callback()
except Exception:
logger.exception('Client write callback failure')
self._clean_up_client(client)
return
queue.popleft()
if not queue:
self._register_read(client)
del self._send_queue[fd]
except socket.error:
self._clean_up_client(client)
def send(self, client, data, callback=None):
logger.debug('Writing %d bytes to client %s, callback: %s', len(data), client.client_address, callback)
self._send_queue[client.fileno()].append(SendMessage(data, callback))
self._register_write(client)
def stop(self):
self._stop.set()
def serve_forever(self):
self._serve()
def on_shutdown(self):
pass

View file

@ -1,17 +0,0 @@
import select
__author__ = 'Quantum'
engines = {}
from .select_server import SelectServer # noqa: E402, import not at top for consistency
engines['select'] = SelectServer
if hasattr(select, 'poll'):
from .poll_server import PollServer
engines['poll'] = PollServer
if hasattr(select, 'epoll'):
from .epoll_server import EpollServer
engines['epoll'] = EpollServer
del select

View file

@ -1,17 +0,0 @@
import select
__author__ = 'Quantum'
if not hasattr(select, 'epoll'):
raise ImportError('System does not support epoll')
from .poll_server import PollServer # noqa: E402, must be imported here
class EpollServer(PollServer):
poll = select.epoll
WRITE = select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP
READ = select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP
POLLIN = select.EPOLLIN
POLLOUT = select.EPOLLOUT
POLL_CLOSE = select.EPOLLHUP | select.EPOLLERR
NEED_CLOSE = True

View file

@ -1,97 +0,0 @@
import errno
import logging
import select
import threading
from ..base_server import BaseServer
logger = logging.getLogger('event_socket_server')
if not hasattr(select, 'poll'):
raise ImportError('System does not support poll')
class PollServer(BaseServer):
poll = select.poll
WRITE = select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP
READ = select.POLLIN | select.POLLERR | select.POLLHUP
POLLIN = select.POLLIN
POLLOUT = select.POLLOUT
POLL_CLOSE = select.POLLERR | select.POLLHUP
NEED_CLOSE = False
def __init__(self, *args, **kwargs):
super(PollServer, self).__init__(*args, **kwargs)
self._poll = self.poll()
self._fdmap = {}
self._server_fds = {sock.fileno(): sock for sock in self._servers}
self._close_lock = threading.RLock()
def _register_write(self, client):
logger.debug('On write mode: %s', client.client_address)
self._poll.modify(client.fileno(), self.WRITE)
def _register_read(self, client):
logger.debug('On read mode: %s', client.client_address)
self._poll.modify(client.fileno(), self.READ)
def _clean_up_client(self, client, finalize=False):
logger.debug('Taking close lock: cleanup')
with self._close_lock:
logger.debug('Cleaning up client: %s, finalize: %d', client.client_address, finalize)
fd = client.fileno()
try:
self._poll.unregister(fd)
except IOError as e:
if e.errno != errno.ENOENT:
raise
except KeyError:
pass
del self._fdmap[fd]
super(PollServer, self)._clean_up_client(client, finalize)
def _serve(self):
for fd, sock in self._server_fds.items():
self._poll.register(fd, self.POLLIN)
sock.listen(16)
try:
while not self._stop.is_set():
for fd, event in self._poll.poll(self._dispatch_event()):
if fd in self._server_fds:
client = self._accept(self._server_fds[fd])
logger.debug('Accepting: %s', client.client_address)
fd = client.fileno()
self._poll.register(fd, self.READ)
self._fdmap[fd] = client
elif event & self.POLL_CLOSE:
logger.debug('Client closed: %s', self._fdmap[fd].client_address)
self._clean_up_client(self._fdmap[fd])
else:
logger.debug('Taking close lock: event loop')
with self._close_lock:
try:
client = self._fdmap[fd]
except KeyError:
pass
else:
logger.debug('Client active: %s, read: %d, write: %d',
client.client_address,
event & self.POLLIN,
event & self.POLLOUT)
if event & self.POLLIN:
logger.debug('Non-blocking read on client: %s', client.client_address)
self._nonblock_read(client)
# Might be closed in the read handler.
if event & self.POLLOUT and fd in self._fdmap:
logger.debug('Non-blocking write on client: %s', client.client_address)
self._nonblock_write(client)
finally:
logger.info('Shutting down server')
self.on_shutdown()
for client in self._clients:
self._clean_up_client(client, True)
for fd, sock in self._server_fds.items():
self._poll.unregister(fd)
sock.close()
if self.NEED_CLOSE:
self._poll.close()

View file

@ -1,49 +0,0 @@
import select
from ..base_server import BaseServer
class SelectServer(BaseServer):
def __init__(self, *args, **kwargs):
super(SelectServer, self).__init__(*args, **kwargs)
self._reads = set(self._servers)
self._writes = set()
def _register_write(self, client):
self._writes.add(client)
def _register_read(self, client):
self._writes.remove(client)
def _clean_up_client(self, client, finalize=False):
self._writes.discard(client)
self._reads.remove(client)
super(SelectServer, self)._clean_up_client(client, finalize)
def _serve(self, select=select.select):
for server in self._servers:
server.listen(16)
try:
while not self._stop.is_set():
r, w, x = select(self._reads, self._writes, self._reads, self._dispatch_event())
for s in r:
if s in self._servers:
self._reads.add(self._accept(s))
else:
self._nonblock_read(s)
for client in w:
self._nonblock_write(client)
for s in x:
s.close()
if s in self._servers:
raise RuntimeError('Server is in exceptional condition')
else:
self._clean_up_client(s)
finally:
self.on_shutdown()
for client in self._clients:
self._clean_up_client(client, True)
for server in self._servers:
server.close()

View file

@ -1,27 +0,0 @@
__author__ = 'Quantum'
class Handler(object):
def __init__(self, server, socket):
self._socket = socket
self.server = server
self.client_address = socket.getpeername()
def fileno(self):
return self._socket.fileno()
def _recv_data(self, data):
raise NotImplementedError
def _send(self, data, callback=None):
return self.server.send(self, data, callback)
def close(self):
self.server._clean_up_client(self)
def on_close(self):
pass
@property
def socket(self):
return self._socket

View file

@ -1,125 +0,0 @@
import struct
import zlib
from judge.utils.unicode import utf8text
from .handler import Handler
size_pack = struct.Struct('!I')
class SizedPacketHandler(Handler):
def __init__(self, server, socket):
super(SizedPacketHandler, self).__init__(server, socket)
self._buffer = b''
self._packetlen = 0
def _packet(self, data):
raise NotImplementedError()
def _format_send(self, data):
return data
def _recv_data(self, data):
self._buffer += data
while len(self._buffer) >= self._packetlen if self._packetlen else len(self._buffer) >= size_pack.size:
if self._packetlen:
data = self._buffer[:self._packetlen]
self._buffer = self._buffer[self._packetlen:]
self._packetlen = 0
self._packet(data)
else:
data = self._buffer[:size_pack.size]
self._buffer = self._buffer[size_pack.size:]
self._packetlen = size_pack.unpack(data)[0]
def send(self, data, callback=None):
data = self._format_send(data)
self._send(size_pack.pack(len(data)) + data, callback)
class ZlibPacketHandler(SizedPacketHandler):
def _format_send(self, data):
return zlib.compress(data.encode('utf-8'))
def packet(self, data):
raise NotImplementedError()
def _packet(self, data):
try:
self.packet(zlib.decompress(data).decode('utf-8'))
except zlib.error as e:
self.malformed_packet(e)
def malformed_packet(self, exception):
self.close()
class ProxyProtocolMixin(object):
__UNKNOWN_TYPE = 0
__PROXY1 = 1
__PROXY2 = 2
__DATA = 3
__HEADER2 = b'\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A'
__HEADER2_LEN = len(__HEADER2)
_REAL_IP_SET = None
@classmethod
def with_proxy_set(cls, ranges):
from netaddr import IPSet, IPGlob
from itertools import chain
globs = []
addrs = []
for item in ranges:
if '*' in item or '-' in item:
globs.append(IPGlob(item))
else:
addrs.append(item)
ipset = IPSet(chain(chain.from_iterable(globs), addrs))
return type(cls.__name__, (cls,), {'_REAL_IP_SET': ipset})
def __init__(self, server, socket):
super(ProxyProtocolMixin, self).__init__(server, socket)
self.__buffer = b''
self.__type = (self.__UNKNOWN_TYPE if self._REAL_IP_SET and
self.client_address[0] in self._REAL_IP_SET else self.__DATA)
def __parse_proxy1(self, data):
self.__buffer += data
index = self.__buffer.find(b'\r\n')
if 0 <= index < 106:
proxy = data[:index].split()
if len(proxy) < 2:
return self.close()
if proxy[1] == b'TCP4':
if len(proxy) != 6:
return self.close()
self.client_address = (utf8text(proxy[2]), utf8text(proxy[4]))
self.server_address = (utf8text(proxy[3]), utf8text(proxy[5]))
elif proxy[1] == b'TCP6':
self.client_address = (utf8text(proxy[2]), utf8text(proxy[4]), 0, 0)
self.server_address = (utf8text(proxy[3]), utf8text(proxy[5]), 0, 0)
elif proxy[1] != b'UNKNOWN':
return self.close()
self.__type = self.__DATA
super(ProxyProtocolMixin, self)._recv_data(data[index + 2:])
elif len(self.__buffer) > 107 or index > 105:
self.close()
def _recv_data(self, data):
if self.__type == self.__DATA:
super(ProxyProtocolMixin, self)._recv_data(data)
elif self.__type == self.__UNKNOWN_TYPE:
if len(data) >= 16 and data[:self.__HEADER2_LEN] == self.__HEADER2:
self.close()
elif len(data) >= 8 and data[:5] == b'PROXY':
self.__type = self.__PROXY1
self.__parse_proxy1(data)
else:
self.__type = self.__DATA
super(ProxyProtocolMixin, self)._recv_data(data)
else:
self.__parse_proxy1(data)

View file

@ -1,54 +0,0 @@
from .engines import engines
from .helpers import ProxyProtocolMixin, ZlibPacketHandler
class EchoPacketHandler(ProxyProtocolMixin, ZlibPacketHandler):
def __init__(self, server, socket):
super(EchoPacketHandler, self).__init__(server, socket)
self._gotdata = False
self.server.schedule(5, self._kill_if_no_data)
def _kill_if_no_data(self):
if not self._gotdata:
print('Inactive client:', self._socket.getpeername())
self.close()
def packet(self, data):
self._gotdata = True
print('Data from %s: %r' % (self._socket.getpeername(), data[:30] if len(data) > 30 else data))
self.send(data)
def on_close(self):
self._gotdata = True
print('Closed client:', self._socket.getpeername())
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--host', action='append')
parser.add_argument('-p', '--port', type=int, action='append')
parser.add_argument('-e', '--engine', default='select', choices=sorted(engines.keys()))
try:
import netaddr
except ImportError:
netaddr = None
else:
parser.add_argument('-P', '--proxy', action='append')
args = parser.parse_args()
class TestServer(engines[args.engine]):
def _accept(self, sock):
client = super(TestServer, self)._accept(sock)
print('New connection:', client.socket.getpeername())
return client
handler = EchoPacketHandler
if netaddr is not None and args.proxy:
handler = handler.with_proxy_set(args.proxy)
server = TestServer(list(zip(args.host, args.port)), handler)
server.serve_forever()
if __name__ == '__main__':
main()

View file

@ -108,8 +108,8 @@ class SubmissionSourceInline(admin.StackedInline):
class SubmissionAdmin(admin.ModelAdmin):
readonly_fields = ('user', 'problem', 'date')
fields = ('user', 'problem', 'date', 'time', 'memory', 'points', 'language', 'status', 'result',
readonly_fields = ('user', 'problem', 'date', 'judged_date')
fields = ('user', 'problem', 'date', 'judged_date', 'time', 'memory', 'points', 'language', 'status', 'result',
'case_points', 'case_total', 'judged_on', 'error')
actions = ('judge', 'recalculate_score')
list_display = ('id', 'problem_code', 'problem_name', 'user_column', 'execution_time', 'pretty_memory',

View file

@ -1,6 +0,0 @@
from .djangohandler import DjangoHandler
from .djangoserver import DjangoServer
from .judgecallback import DjangoJudgeHandler
from .judgehandler import JudgeHandler
from .judgelist import JudgeList
from .judgeserver import JudgeServer

View file

@ -0,0 +1,196 @@
import logging
import socket
import struct
import zlib
from itertools import chain
from netaddr import IPGlob, IPSet
from judge.utils.unicode import utf8text
logger = logging.getLogger('judge.bridge')
size_pack = struct.Struct('!I')
assert size_pack.size == 4
MAX_ALLOWED_PACKET_SIZE = 8 * 1024 * 1024
def proxy_list(human_readable):
globs = []
addrs = []
for item in human_readable:
if '*' in item or '-' in item:
globs.append(IPGlob(item))
else:
addrs.append(item)
return IPSet(chain(chain.from_iterable(globs), addrs))
class Disconnect(Exception):
pass
# socketserver.BaseRequestHandler does all the handling in __init__,
# making it impossible to inherit __init__ sanely. While it lets you
# use setup(), most tools will complain about uninitialized variables.
# This metaclass will allow sane __init__ behaviour while also magically
# calling the methods that handles the request.
class RequestHandlerMeta(type):
def __call__(cls, *args, **kwargs):
handler = super().__call__(*args, **kwargs)
handler.on_connect()
try:
handler.handle()
except BaseException:
logger.exception('Error in base packet handling')
raise
finally:
handler.on_disconnect()
class ZlibPacketHandler(metaclass=RequestHandlerMeta):
proxies = []
def __init__(self, request, client_address, server):
self.request = request
self.server = server
self.client_address = client_address
self.server_address = server.server_address
self._initial_tag = None
self._got_packet = False
@property
def timeout(self):
return self.request.gettimeout()
@timeout.setter
def timeout(self, timeout):
self.request.settimeout(timeout or None)
def read_sized_packet(self, size, initial=None):
if size > MAX_ALLOWED_PACKET_SIZE:
logger.log(logging.WARNING if self._got_packet else logging.INFO,
'Disconnecting client due to too-large message size (%d bytes): %s', size, self.client_address)
raise Disconnect()
buffer = []
remainder = size
if initial:
buffer.append(initial)
remainder -= len(initial)
assert remainder >= 0
while remainder:
data = self.request.recv(remainder)
remainder -= len(data)
buffer.append(data)
self._on_packet(b''.join(buffer))
def parse_proxy_protocol(self, line):
words = line.split()
if len(words) < 2:
raise Disconnect()
if words[1] == b'TCP4':
if len(words) != 6:
raise Disconnect()
self.client_address = (utf8text(words[2]), utf8text(words[4]))
self.server_address = (utf8text(words[3]), utf8text(words[5]))
elif words[1] == b'TCP6':
self.client_address = (utf8text(words[2]), utf8text(words[4]), 0, 0)
self.server_address = (utf8text(words[3]), utf8text(words[5]), 0, 0)
elif words[1] != b'UNKNOWN':
raise Disconnect()
def read_size(self, buffer=b''):
while len(buffer) < size_pack.size:
recv = self.request.recv(size_pack.size - len(buffer))
if not recv:
raise Disconnect()
buffer += recv
return size_pack.unpack(buffer)[0]
def read_proxy_header(self, buffer=b''):
# Max line length for PROXY protocol is 107, and we received 4 already.
while b'\r\n' not in buffer:
if len(buffer) > 107:
raise Disconnect()
data = self.request.recv(107)
if not data:
raise Disconnect()
buffer += data
return buffer
def _on_packet(self, data):
decompressed = zlib.decompress(data).decode('utf-8')
self._got_packet = True
self.on_packet(decompressed)
def on_packet(self, data):
raise NotImplementedError()
def on_connect(self):
pass
def on_disconnect(self):
pass
def on_timeout(self):
pass
def handle(self):
try:
tag = self.read_size()
self._initial_tag = size_pack.pack(tag)
if self.client_address[0] in self.proxies and self._initial_tag == b'PROX':
proxy, _, remainder = self.read_proxy_header(self._initial_tag).partition(b'\r\n')
self.parse_proxy_protocol(proxy)
while remainder:
while len(remainder) < size_pack.size:
self.read_sized_packet(self.read_size(remainder))
break
size = size_pack.unpack(remainder[:size_pack.size])[0]
remainder = remainder[size_pack.size:]
if len(remainder) <= size:
self.read_sized_packet(size, remainder)
break
self._on_packet(remainder[:size])
remainder = remainder[size:]
else:
self.read_sized_packet(tag)
while True:
self.read_sized_packet(self.read_size())
except Disconnect:
return
except zlib.error:
if self._got_packet:
logger.warning('Encountered zlib error during packet handling, disconnecting client: %s',
self.client_address, exc_info=True)
else:
logger.info('Potentially wrong protocol (zlib error): %s: %r', self.client_address, self._initial_tag,
exc_info=True)
except socket.timeout:
if self._got_packet:
logger.info('Socket timed out: %s', self.client_address)
self.on_timeout()
else:
logger.info('Potentially wrong protocol: %s: %r', self.client_address, self._initial_tag)
except socket.error as e:
# When a gevent socket is shutdown, gevent cancels all waits, causing recv to raise cancel_wait_ex.
if e.__class__.__name__ == 'cancel_wait_ex':
return
raise
def send(self, data):
compressed = zlib.compress(data.encode('utf-8'))
self.request.sendall(size_pack.pack(len(compressed)) + compressed)
def close(self):
self.request.shutdown(socket.SHUT_RDWR)

47
judge/bridge/daemon.py Normal file
View file

@ -0,0 +1,47 @@
import logging
import signal
import threading
from functools import partial
from django.conf import settings
from judge.bridge.django_handler import DjangoHandler
from judge.bridge.judge_handler import JudgeHandler
from judge.bridge.judge_list import JudgeList
from judge.bridge.server import Server
from judge.models import Judge, Submission
logger = logging.getLogger('judge.bridge')
def reset_judges():
Judge.objects.update(online=False, ping=None, load=None)
def judge_daemon():
reset_judges()
Submission.objects.filter(status__in=Submission.IN_PROGRESS_GRADING_STATUS) \
.update(status='IE', result='IE', error=None)
judges = JudgeList()
judge_server = Server(settings.BRIDGED_JUDGE_ADDRESS, partial(JudgeHandler, judges=judges))
django_server = Server(settings.BRIDGED_DJANGO_ADDRESS, partial(DjangoHandler, judges=judges))
threading.Thread(target=django_server.serve_forever).start()
threading.Thread(target=judge_server.serve_forever).start()
stop = threading.Event()
def signal_handler(signum, _):
logger.info('Exiting due to %s', signal.Signals(signum).name)
stop.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGQUIT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
stop.wait()
finally:
django_server.shutdown()
judge_server.shutdown()

View file

@ -2,63 +2,55 @@ import json
import logging
import struct
from event_socket_server import ZlibPacketHandler
from judge.bridge.base_handler import Disconnect, ZlibPacketHandler
logger = logging.getLogger('judge.bridge')
size_pack = struct.Struct('!I')
class DjangoHandler(ZlibPacketHandler):
def __init__(self, server, socket):
super(DjangoHandler, self).__init__(server, socket)
def __init__(self, request, client_address, server, judges):
super().__init__(request, client_address, server)
self.handlers = {
'submission-request': self.on_submission,
'terminate-submission': self.on_termination,
'disconnect-judge': self.on_disconnect,
'disconnect-judge': self.on_disconnect_request,
}
self._to_kill = True
# self.server.schedule(5, self._kill_if_no_request)
self.judges = judges
def _kill_if_no_request(self):
if self._to_kill:
logger.info('Killed inactive connection: %s', self._socket.getpeername())
self.close()
def send(self, data):
super().send(json.dumps(data, separators=(',', ':')))
def _format_send(self, data):
return super(DjangoHandler, self)._format_send(json.dumps(data, separators=(',', ':')))
def packet(self, packet):
self._to_kill = False
def on_packet(self, packet):
packet = json.loads(packet)
try:
result = self.handlers.get(packet.get('name', None), self.on_malformed)(packet)
except Exception:
logger.exception('Error in packet handling (Django-facing)')
result = {'name': 'bad-request'}
self.send(result, self._schedule_close)
def _schedule_close(self):
self.server.schedule(0, self.close)
self.send(result)
raise Disconnect()
def on_submission(self, data):
id = data['submission-id']
problem = data['problem-id']
language = data['language']
source = data['source']
judge_id = data['judge-id']
priority = data['priority']
if not self.server.judges.check_priority(priority):
if not self.judges.check_priority(priority):
return {'name': 'bad-request'}
self.server.judges.judge(id, problem, language, source, priority)
self.judges.judge(id, problem, language, source, judge_id, priority)
return {'name': 'submission-received', 'submission-id': id}
def on_termination(self, data):
return {'name': 'submission-received', 'judge-aborted': self.server.judges.abort(data['submission-id'])}
return {'name': 'submission-received', 'judge-aborted': self.judges.abort(data['submission-id'])}
def on_disconnect(self, data):
def on_disconnect_request(self, data):
judge_id = data['judge-id']
force = data['force']
self.server.judges.disconnect(judge_id, force=force)
self.judges.disconnect(judge_id, force=force)
def on_malformed(self, packet):
logger.error('Malformed packet: %s', packet)

View file

@ -1,7 +0,0 @@
from event_socket_server import get_preferred_engine
class DjangoServer(get_preferred_engine()):
def __init__(self, judges, *args, **kwargs):
super(DjangoServer, self).__init__(*args, **kwargs)
self.judges = judges

View file

@ -1,14 +1,10 @@
import ctypes
import os
import socket
import struct
import time
import zlib
size_pack = struct.Struct('!I')
try:
RtlGenRandom = ctypes.windll.advapi32.SystemFunction036
except AttributeError:
RtlGenRandom = None
def open_connection():
@ -27,15 +23,6 @@ def dezlibify(data, skip_head=True):
return zlib.decompress(data).decode('utf-8')
def random(length):
if RtlGenRandom is None:
with open('/dev/urandom') as f:
return f.read(length)
buf = ctypes.create_string_buffer(length)
RtlGenRandom(buf, length)
return buf.raw
def main():
global host, port
import argparse
@ -64,11 +51,11 @@ def main():
s2.close()
print('Large random data test:', end=' ')
s4 = open_connection()
data = random(1000000)
data = os.urandom(1000000).decode('iso-8859-1')
print('Generated', end=' ')
s4.sendall(zlibify(data))
print('Sent', end=' ')
result = ''
result = b''
while len(result) < size_pack.size:
result += s4.recv(1024)
size = size_pack.unpack(result[:size_pack.size])[0]
@ -81,7 +68,7 @@ def main():
s4.close()
print('Test malformed connection:', end=' ')
s5 = open_connection()
s5.sendall(data[:100000])
s5.sendall(data[:100000].encode('utf-8'))
s5.close()
print('Success')
print('Waiting for timeout to close idle connection:', end=' ')

View file

@ -0,0 +1,39 @@
from judge.bridge.base_handler import ZlibPacketHandler
class EchoPacketHandler(ZlibPacketHandler):
def on_connect(self):
print('New client:', self.client_address)
self.timeout = 5
def on_timeout(self):
print('Inactive client:', self.client_address)
def on_packet(self, data):
self.timeout = None
print('Data from %s: %r' % (self.client_address, data[:30] if len(data) > 30 else data))
self.send(data)
def on_disconnect(self):
print('Closed client:', self.client_address)
def main():
import argparse
from judge.bridge.server import Server
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--host', action='append')
parser.add_argument('-p', '--port', type=int, action='append')
parser.add_argument('-P', '--proxy', action='append')
args = parser.parse_args()
class Handler(EchoPacketHandler):
proxies = args.proxy or []
server = Server(list(zip(args.host, args.port)), Handler)
server.serve_forever()
if __name__ == '__main__':
main()

View file

@ -1,22 +1,27 @@
import hmac
import json
import logging
import threading
import time
from collections import deque, namedtuple
from operator import itemgetter
from django import db
from django.conf import settings
from django.utils import timezone
from django.db.models import F
from judge import event_poster as event
from judge.bridge.base_handler import ZlibPacketHandler, proxy_list
from judge.caching import finished_submission
from judge.models import Judge, Language, LanguageLimit, Problem, RuntimeVersion, Submission, SubmissionTestCase
from .judgehandler import JudgeHandler, SubmissionData
logger = logging.getLogger('judge.bridge')
json_log = logging.getLogger('judge.json.bridge')
UPDATE_RATE_LIMIT = 5
UPDATE_RATE_TIME = 0.5
SubmissionData = namedtuple('SubmissionData', 'time memory short_circuit pretests_only contest_no attempt_no user_id')
def _ensure_connection():
@ -26,9 +31,42 @@ def _ensure_connection():
db.connection.close()
class DjangoJudgeHandler(JudgeHandler):
def __init__(self, server, socket):
super(DjangoJudgeHandler, self).__init__(server, socket)
class JudgeHandler(ZlibPacketHandler):
proxies = proxy_list(settings.BRIDGED_JUDGE_PROXIES or [])
def __init__(self, request, client_address, server, judges):
super().__init__(request, client_address, server)
self.judges = judges
self.handlers = {
'grading-begin': self.on_grading_begin,
'grading-end': self.on_grading_end,
'compile-error': self.on_compile_error,
'compile-message': self.on_compile_message,
'batch-begin': self.on_batch_begin,
'batch-end': self.on_batch_end,
'test-case-status': self.on_test_case,
'internal-error': self.on_internal_error,
'submission-terminated': self.on_submission_terminated,
'submission-acknowledged': self.on_submission_acknowledged,
'ping-response': self.on_ping_response,
'supported-problems': self.on_supported_problems,
'handshake': self.on_handshake,
}
self._working = False
self._no_response_job = None
self._problems = []
self.executors = {}
self.problems = {}
self.latency = None
self.time_delta = None
self.load = 1e100
self.name = None
self.batch_id = None
self.in_batch = False
self._stop_ping = threading.Event()
self._ping_average = deque(maxlen=6) # 1 minute average, just like load
self._time_delta = deque(maxlen=6)
# each value is (updates, last reset)
self.update_counter = {}
@ -38,60 +76,33 @@ class DjangoJudgeHandler(JudgeHandler):
self._submission_cache_id = None
self._submission_cache = {}
def on_connect(self):
self.timeout = 15
logger.info('Judge connected from: %s', self.client_address)
json_log.info(self._make_json_log(action='connect'))
def on_close(self):
super(DjangoJudgeHandler, self).on_close()
def on_disconnect(self):
self._stop_ping.set()
if self._working:
logger.error('Judge %s disconnected while handling submission %s', self.name, self._working)
self.judges.remove(self)
if self.name is not None:
self._disconnected()
logger.info('Judge disconnected from: %s with name %s', self.client_address, self.name)
json_log.info(self._make_json_log(action='disconnect', info='judge disconnected'))
if self._working:
Submission.objects.filter(id=self._working).update(status='IE', result='IE')
Submission.objects.filter(id=self._working).update(status='IE', result='IE', error='')
json_log.error(self._make_json_log(sub=self._working, action='close', info='IE due to shutdown on grading'))
def on_malformed(self, packet):
super(DjangoJudgeHandler, self).on_malformed(packet)
json_log.exception(self._make_json_log(sub=self._working, info='malformed zlib packet'))
def _packet_exception(self):
json_log.exception(self._make_json_log(sub=self._working, info='packet processing exception'))
def get_related_submission_data(self, submission):
_ensure_connection() # We are called from the django-facing daemon thread. Guess what happens.
try:
pid, time, memory, short_circuit, lid, is_pretested, sub_date, uid, part_virtual, part_id = (
Submission.objects.filter(id=submission)
.values_list('problem__id', 'problem__time_limit', 'problem__memory_limit',
'problem__short_circuit', 'language__id', 'is_pretested', 'date', 'user__id',
'contest__participation__virtual', 'contest__participation__id')).get()
except Submission.DoesNotExist:
logger.error('Submission vanished: %s', submission)
json_log.error(self._make_json_log(
sub=self._working, action='request',
info='submission vanished when fetching info',
))
return
attempt_no = Submission.objects.filter(problem__id=pid, contest__participation__id=part_id, user__id=uid,
date__lt=sub_date).exclude(status__in=('CE', 'IE')).count() + 1
try:
time, memory = (LanguageLimit.objects.filter(problem__id=pid, language__id=lid)
.values_list('time_limit', 'memory_limit').get())
except LanguageLimit.DoesNotExist:
pass
return SubmissionData(
time=time,
memory=memory,
short_circuit=short_circuit,
pretests_only=is_pretested,
contest_no=part_virtual,
attempt_no=attempt_no,
user_id=uid,
)
def _authenticate(self, id, key):
result = Judge.objects.filter(name=id, auth_key=key, is_blocked=False).exists()
try:
judge = Judge.objects.get(name=id, is_blocked=False)
except Judge.DoesNotExist:
result = False
else:
result = hmac.compare_digest(judge.auth_key, key)
if not result:
json_log.warning(self._make_json_log(action='auth', judge=id, info='judge failed authentication'))
return result
@ -130,26 +141,118 @@ class DjangoJudgeHandler(JudgeHandler):
if e.__class__.__name__ == 'OperationalError' and e.__module__ == '_mysql_exceptions' and e.args[0] == 2006:
db.connection.close()
def _post_update_submission(self, id, state, done=False):
if self._submission_cache_id == id:
data = self._submission_cache
else:
self._submission_cache = data = Submission.objects.filter(id=id).values(
'problem__is_public', 'contest__participation__contest__key',
'user_id', 'problem_id', 'status', 'language__key',
).get()
self._submission_cache_id = id
def send(self, data):
super().send(json.dumps(data, separators=(',', ':')))
if data['problem__is_public']:
event.post('submissions', {
'type': 'done-submission' if done else 'update-submission',
'state': state, 'id': id,
'contest': data['contest__participation__contest__key'],
'user': data['user_id'], 'problem': data['problem_id'],
'status': data['status'], 'language': data['language__key'],
})
def on_handshake(self, packet):
if 'id' not in packet or 'key' not in packet:
logger.warning('Malformed handshake: %s', self.client_address)
self.close()
return
if not self._authenticate(packet['id'], packet['key']):
logger.warning('Authentication failure: %s', self.client_address)
self.close()
return
self.timeout = 60
self._problems = packet['problems']
self.problems = dict(self._problems)
self.executors = packet['executors']
self.name = packet['id']
self.send({'name': 'handshake-success'})
logger.info('Judge authenticated: %s (%s)', self.client_address, packet['id'])
self.judges.register(self)
threading.Thread(target=self._ping_thread).start()
self._connected()
def can_judge(self, problem, executor, judge_id=None):
return problem in self.problems and executor in self.executors and (not judge_id or self.name == judge_id)
@property
def working(self):
return bool(self._working)
def get_related_submission_data(self, submission):
_ensure_connection()
try:
pid, time, memory, short_circuit, lid, is_pretested, sub_date, uid, part_virtual, part_id = (
Submission.objects.filter(id=submission)
.values_list('problem__id', 'problem__time_limit', 'problem__memory_limit',
'problem__short_circuit', 'language__id', 'is_pretested', 'date', 'user__id',
'contest__participation__virtual', 'contest__participation__id')).get()
except Submission.DoesNotExist:
logger.error('Submission vanished: %s', submission)
json_log.error(self._make_json_log(
sub=self._working, action='request',
info='submission vanished when fetching info',
))
return
attempt_no = Submission.objects.filter(problem__id=pid, contest__participation__id=part_id, user__id=uid,
date__lt=sub_date).exclude(status__in=('CE', 'IE')).count() + 1
try:
time, memory = (LanguageLimit.objects.filter(problem__id=pid, language__id=lid)
.values_list('time_limit', 'memory_limit').get())
except LanguageLimit.DoesNotExist:
pass
return SubmissionData(
time=time,
memory=memory,
short_circuit=short_circuit,
pretests_only=is_pretested,
contest_no=part_virtual,
attempt_no=attempt_no,
user_id=uid,
)
def disconnect(self, force=False):
if force:
# Yank the power out.
self.close()
else:
self.send({'name': 'disconnect'})
def submit(self, id, problem, language, source):
data = self.get_related_submission_data(id)
self._working = id
self._no_response_job = threading.Timer(20, self._kill_if_no_response)
self.send({
'name': 'submission-request',
'submission-id': id,
'problem-id': problem,
'language': language,
'source': source,
'time-limit': data.time,
'memory-limit': data.memory,
'short-circuit': data.short_circuit,
'meta': {
'pretests-only': data.pretests_only,
'in-contest': data.contest_no,
'attempt-no': data.attempt_no,
'user': data.user_id,
},
})
def _kill_if_no_response(self):
logger.error('Judge failed to acknowledge submission: %s: %s', self.name, self._working)
self.close()
def on_timeout(self):
if self.name:
logger.warning('Judge seems dead: %s: %s', self.name, self._working)
def malformed_packet(self, exception):
logger.exception('Judge sent malformed packet: %s', self.name)
super(JudgeHandler, self).malformed_packet(exception)
def on_submission_processing(self, packet):
_ensure_connection()
id = packet['submission-id']
if Submission.objects.filter(id=id).update(status='P', judged_on=self.judge):
event.post('sub_%s' % Submission.get_id_secret(id), {'type': 'processing'})
@ -161,12 +264,72 @@ class DjangoJudgeHandler(JudgeHandler):
def on_submission_wrong_acknowledge(self, packet, expected, got):
json_log.error(self._make_json_log(packet, action='processing', info='wrong-acknowledge', expected=expected))
Submission.objects.filter(id=expected).update(status='IE', result='IE', error=None)
Submission.objects.filter(id=got, status='QU').update(status='IE', result='IE', error=None)
def on_submission_acknowledged(self, packet):
if not packet.get('submission-id', None) == self._working:
logger.error('Wrong acknowledgement: %s: %s, expected: %s', self.name, packet.get('submission-id', None),
self._working)
self.on_submission_wrong_acknowledge(packet, self._working, packet.get('submission-id', None))
self.close()
logger.info('Submission acknowledged: %d', self._working)
if self._no_response_job:
self._no_response_job.cancel()
self._no_response_job = None
self.on_submission_processing(packet)
def abort(self):
self.send({'name': 'terminate-submission'})
def get_current_submission(self):
return self._working or None
def ping(self):
self.send({'name': 'ping', 'when': time.time()})
def on_packet(self, data):
try:
try:
data = json.loads(data)
if 'name' not in data:
raise ValueError
except ValueError:
self.on_malformed(data)
else:
handler = self.handlers.get(data['name'], self.on_malformed)
handler(data)
except Exception:
logger.exception('Error in packet handling (Judge-side): %s', self.name)
self._packet_exception()
# You can't crash here because you aren't so sure about the judges
# not being malicious or simply malforms. THIS IS A SERVER!
def _packet_exception(self):
json_log.exception(self._make_json_log(sub=self._working, info='packet processing exception'))
def _submission_is_batch(self, id):
if not Submission.objects.filter(id=id).update(batch=True):
logger.warning('Unknown submission: %s', id)
def on_supported_problems(self, packet):
logger.info('%s: Updated problem list', self.name)
self._problems = packet['problems']
self.problems = dict(self._problems)
if not self.working:
self.judges.update_problems(self)
self.judge.problems.set(Problem.objects.filter(code__in=list(self.problems.keys())))
json_log.info(self._make_json_log(action='update-problems', count=len(self.problems)))
def on_grading_begin(self, packet):
super(DjangoJudgeHandler, self).on_grading_begin(packet)
logger.info('%s: Grading has begun on: %s', self.name, packet['submission-id'])
self.batch_id = None
if Submission.objects.filter(id=packet['submission-id']).update(
status='G', is_pretested=packet['pretested'],
current_testcase=1, batch=False, points=0):
current_testcase=1, points=0,
batch=False, judged_date=timezone.now()):
SubmissionTestCase.objects.filter(submission_id=packet['submission-id']).delete()
event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'grading-begin'})
self._post_update_submission(packet['submission-id'], 'grading-begin')
@ -175,12 +338,10 @@ class DjangoJudgeHandler(JudgeHandler):
logger.warning('Unknown submission: %s', packet['submission-id'])
json_log.error(self._make_json_log(packet, action='grading-begin', info='unknown submission'))
def _submission_is_batch(self, id):
if not Submission.objects.filter(id=id).update(batch=True):
logger.warning('Unknown submission: %s', id)
def on_grading_end(self, packet):
super(DjangoJudgeHandler, self).on_grading_end(packet)
logger.info('%s: Grading has ended on: %s', self.name, packet['submission-id'])
self._free_self(packet)
self.batch_id = None
try:
submission = Submission.objects.get(id=packet['submission-id'])
@ -263,7 +424,8 @@ class DjangoJudgeHandler(JudgeHandler):
self._post_update_submission(submission.id, 'grading-end', done=True)
def on_compile_error(self, packet):
super(DjangoJudgeHandler, self).on_compile_error(packet)
logger.info('%s: Submission failed to compile: %s', self.name, packet['submission-id'])
self._free_self(packet)
if Submission.objects.filter(id=packet['submission-id']).update(status='CE', result='CE', error=packet['log']):
event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {
@ -279,7 +441,7 @@ class DjangoJudgeHandler(JudgeHandler):
log=packet['log'], finish=True, result='CE'))
def on_compile_message(self, packet):
super(DjangoJudgeHandler, self).on_compile_message(packet)
logger.info('%s: Submission generated compiler messages: %s', self.name, packet['submission-id'])
if Submission.objects.filter(id=packet['submission-id']).update(error=packet['log']):
event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'compile-message'})
@ -290,7 +452,11 @@ class DjangoJudgeHandler(JudgeHandler):
log=packet['log']))
def on_internal_error(self, packet):
super(DjangoJudgeHandler, self).on_internal_error(packet)
try:
raise ValueError('\n\n' + packet['message'])
except ValueError:
logger.exception('Judge %s failed while handling submission %s', self.name, packet['submission-id'])
self._free_self(packet)
id = packet['submission-id']
if Submission.objects.filter(id=id).update(status='IE', result='IE', error=packet['message']):
@ -304,7 +470,8 @@ class DjangoJudgeHandler(JudgeHandler):
message=packet['message'], finish=True, result='IE'))
def on_submission_terminated(self, packet):
super(DjangoJudgeHandler, self).on_submission_terminated(packet)
logger.info('%s: Submission aborted: %s', self.name, packet['submission-id'])
self._free_self(packet)
if Submission.objects.filter(id=packet['submission-id']).update(status='AB', result='AB'):
event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'aborted-submission'})
@ -316,20 +483,29 @@ class DjangoJudgeHandler(JudgeHandler):
finish=True, result='AB'))
def on_batch_begin(self, packet):
super(DjangoJudgeHandler, self).on_batch_begin(packet)
logger.info('%s: Batch began on: %s', self.name, packet['submission-id'])
self.in_batch = True
if self.batch_id is None:
self.batch_id = 0
self._submission_is_batch(packet['submission-id'])
self.batch_id += 1
json_log.info(self._make_json_log(packet, action='batch-begin', batch=self.batch_id))
def on_batch_end(self, packet):
super(DjangoJudgeHandler, self).on_batch_end(packet)
self.in_batch = False
logger.info('%s: Batch ended on: %s', self.name, packet['submission-id'])
json_log.info(self._make_json_log(packet, action='batch-end', batch=self.batch_id))
def on_test_case(self, packet, max_feedback=SubmissionTestCase._meta.get_field('feedback').max_length):
super(DjangoJudgeHandler, self).on_test_case(packet)
logger.info('%s: %d test case(s) completed on: %s', self.name, len(packet['cases']), packet['submission-id'])
id = packet['submission-id']
updates = packet['cases']
max_position = max(map(itemgetter('position'), updates))
sum_points = sum(map(itemgetter('points'), updates))
if not Submission.objects.filter(id=id).update(current_testcase=max_position + 1, points=F('points') + sum_points):
logger.warning('Unknown submission: %s', id)
json_log.error(self._make_json_log(packet, action='test-case', info='unknown submission'))
@ -395,10 +571,32 @@ class DjangoJudgeHandler(JudgeHandler):
SubmissionTestCase.objects.bulk_create(bulk_test_case_updates)
def on_supported_problems(self, packet):
super(DjangoJudgeHandler, self).on_supported_problems(packet)
self.judge.problems.set(Problem.objects.filter(code__in=list(self.problems.keys())))
json_log.info(self._make_json_log(action='update-problems', count=len(self.problems)))
def on_malformed(self, packet):
logger.error('%s: Malformed packet: %s', self.name, packet)
json_log.exception(self._make_json_log(sub=self._working, info='malformed json packet'))
def on_ping_response(self, packet):
end = time.time()
self._ping_average.append(end - packet['when'])
self._time_delta.append((end + packet['when']) / 2 - packet['time'])
self.latency = sum(self._ping_average) / len(self._ping_average)
self.time_delta = sum(self._time_delta) / len(self._time_delta)
self.load = packet['load']
self._update_ping()
def _free_self(self, packet):
self.judges.on_judge_free(self, packet['submission-id'])
def _ping_thread(self):
try:
while True:
self.ping()
if self._stop_ping.wait(10):
break
except Exception:
logger.exception('Ping error in %s', self.name)
self.close()
raise
def _make_json_log(self, packet=None, sub=None, **kwargs):
data = {
@ -411,3 +609,22 @@ class DjangoJudgeHandler(JudgeHandler):
data['submission'] = sub
data.update(kwargs)
return json.dumps(data)
def _post_update_submission(self, id, state, done=False):
if self._submission_cache_id == id:
data = self._submission_cache
else:
self._submission_cache = data = Submission.objects.filter(id=id).values(
'problem__is_public', 'contest_object__key',
'user_id', 'problem_id', 'status', 'language__key',
).get()
self._submission_cache_id = id
if data['problem__is_public']:
event.post('submissions', {
'type': 'done-submission' if done else 'update-submission',
'state': state, 'id': id,
'contest': data['contest_object__key'],
'user': data['user_id'], 'problem': data['problem_id'],
'status': data['status'], 'language': data['language__key'],
})

View file

@ -29,8 +29,8 @@ class JudgeList(object):
node = self.queue.first
while node:
if not isinstance(node.value, PriorityMarker):
id, problem, language, source = node.value
if judge.can_judge(problem, language):
id, problem, language, source, judge_id = node.value
if judge.can_judge(problem, language, judge_id):
self.submission_map[id] = judge
logger.info('Dispatched queued submission %d: %s', id, judge.name)
try:
@ -52,9 +52,10 @@ class JudgeList(object):
self._handle_free_judge(judge)
def disconnect(self, judge_id, force=False):
for judge in self.judges:
if judge.name == judge_id:
judge.disconnect(force=force)
with self.lock:
for judge in self.judges:
if judge.name == judge_id:
judge.disconnect(force=force)
def update_problems(self, judge):
with self.lock:
@ -77,6 +78,7 @@ class JudgeList(object):
with self.lock:
logger.info('Judge available after grading %d: %s', submission, judge.name)
del self.submission_map[submission]
judge._working = False
self._handle_free_judge(judge)
def abort(self, submission):
@ -98,15 +100,20 @@ class JudgeList(object):
def check_priority(self, priority):
return 0 <= priority < self.priorities
def judge(self, id, problem, language, source, priority):
def judge(self, id, problem, language, source, judge_id, priority):
with self.lock:
if id in self.submission_map or id in self.node_map:
# Already judging, don't queue again. This can happen during batch rejudges, rejudges should be
# idempotent.
return
candidates = [judge for judge in self.judges if not judge.working and judge.can_judge(problem, language)]
logger.info('Free judges: %d', len(candidates))
candidates = [
judge for judge in self.judges if not judge.working and judge.can_judge(problem, language, judge_id)
]
if judge_id:
logger.info('Specified judge %s is%savailable', judge_id, ' ' if candidates else ' not ')
else:
logger.info('Free judges: %d', len(candidates))
if candidates:
# Schedule the submission on the judge reporting least load.
judge = min(candidates, key=attrgetter('load'))
@ -117,7 +124,10 @@ class JudgeList(object):
except Exception:
logger.exception('Failed to dispatch %d (%s, %s) to %s', id, problem, language, judge.name)
self.judges.discard(judge)
return self.judge(id, problem, language, source, priority)
return self.judge(id, problem, language, source, judge_id, priority)
else:
self.node_map[id] = self.queue.insert((id, problem, language, source), self.priority[priority])
self.node_map[id] = self.queue.insert(
(id, problem, language, source, judge_id),
self.priority[priority],
)
logger.info('Queued submission: %d', id)

View file

@ -1,268 +0,0 @@
import json
import logging
import time
from collections import deque, namedtuple
from event_socket_server import ProxyProtocolMixin, ZlibPacketHandler
logger = logging.getLogger('judge.bridge')
SubmissionData = namedtuple('SubmissionData', 'time memory short_circuit pretests_only contest_no attempt_no user_id')
class JudgeHandler(ProxyProtocolMixin, ZlibPacketHandler):
def __init__(self, server, socket):
super(JudgeHandler, self).__init__(server, socket)
self.handlers = {
'grading-begin': self.on_grading_begin,
'grading-end': self.on_grading_end,
'compile-error': self.on_compile_error,
'compile-message': self.on_compile_message,
'batch-begin': self.on_batch_begin,
'batch-end': self.on_batch_end,
'test-case-status': self.on_test_case,
'internal-error': self.on_internal_error,
'submission-terminated': self.on_submission_terminated,
'submission-acknowledged': self.on_submission_acknowledged,
'ping-response': self.on_ping_response,
'supported-problems': self.on_supported_problems,
'handshake': self.on_handshake,
}
self._to_kill = True
self._working = False
self._no_response_job = None
self._problems = []
self.executors = []
self.problems = {}
self.latency = None
self.time_delta = None
self.load = 1e100
self.name = None
self.batch_id = None
self.in_batch = False
self._ping_average = deque(maxlen=6) # 1 minute average, just like load
self._time_delta = deque(maxlen=6)
self.server.schedule(15, self._kill_if_no_auth)
logger.info('Judge connected from: %s', self.client_address)
def _kill_if_no_auth(self):
if self._to_kill:
logger.info('Judge not authenticated: %s', self.client_address)
self.close()
def on_close(self):
self._to_kill = False
if self._no_response_job:
self.server.unschedule(self._no_response_job)
self.server.judges.remove(self)
if self.name is not None:
self._disconnected()
logger.info('Judge disconnected from: %s', self.client_address)
def _authenticate(self, id, key):
return False
def _connected(self):
pass
def _disconnected(self):
pass
def _update_ping(self):
pass
def _format_send(self, data):
return super(JudgeHandler, self)._format_send(json.dumps(data, separators=(',', ':')))
def on_handshake(self, packet):
if 'id' not in packet or 'key' not in packet:
logger.warning('Malformed handshake: %s', self.client_address)
self.close()
return
if not self._authenticate(packet['id'], packet['key']):
logger.warning('Authentication failure: %s', self.client_address)
self.close()
return
self._to_kill = False
self._problems = packet['problems']
self.problems = dict(self._problems)
self.executors = packet['executors']
self.name = packet['id']
self.send({'name': 'handshake-success'})
logger.info('Judge authenticated: %s (%s)', self.client_address, packet['id'])
self.server.judges.register(self)
self._connected()
def can_judge(self, problem, executor):
return problem in self.problems and executor in self.executors
@property
def working(self):
return bool(self._working)
def get_related_submission_data(self, submission):
return SubmissionData(
time=2,
memory=16384,
short_circuit=False,
pretests_only=False,
contest_no=None,
attempt_no=1,
user_id=None,
)
def disconnect(self, force=False):
if force:
# Yank the power out.
self.close()
else:
self.send({'name': 'disconnect'})
def submit(self, id, problem, language, source):
data = self.get_related_submission_data(id)
self._working = id
self._no_response_job = self.server.schedule(20, self._kill_if_no_response)
self.send({
'name': 'submission-request',
'submission-id': id,
'problem-id': problem,
'language': language,
'source': source,
'time-limit': data.time,
'memory-limit': data.memory,
'short-circuit': data.short_circuit,
'meta': {
'pretests-only': data.pretests_only,
'in-contest': data.contest_no,
'attempt-no': data.attempt_no,
'user': data.user_id,
},
})
def _kill_if_no_response(self):
logger.error('Judge seems dead: %s: %s', self.name, self._working)
self.close()
def malformed_packet(self, exception):
logger.exception('Judge sent malformed packet: %s', self.name)
super(JudgeHandler, self).malformed_packet(exception)
def on_submission_processing(self, packet):
pass
def on_submission_wrong_acknowledge(self, packet, expected, got):
pass
def on_submission_acknowledged(self, packet):
if not packet.get('submission-id', None) == self._working:
logger.error('Wrong acknowledgement: %s: %s, expected: %s', self.name, packet.get('submission-id', None),
self._working)
self.on_submission_wrong_acknowledge(packet, self._working, packet.get('submission-id', None))
self.close()
logger.info('Submission acknowledged: %d', self._working)
if self._no_response_job:
self.server.unschedule(self._no_response_job)
self._no_response_job = None
self.on_submission_processing(packet)
def abort(self):
self.send({'name': 'terminate-submission'})
def get_current_submission(self):
return self._working or None
def ping(self):
self.send({'name': 'ping', 'when': time.time()})
def packet(self, data):
try:
try:
data = json.loads(data)
if 'name' not in data:
raise ValueError
except ValueError:
self.on_malformed(data)
else:
handler = self.handlers.get(data['name'], self.on_malformed)
handler(data)
except Exception:
logger.exception('Error in packet handling (Judge-side): %s', self.name)
self._packet_exception()
# You can't crash here because you aren't so sure about the judges
# not being malicious or simply malforms. THIS IS A SERVER!
def _packet_exception(self):
pass
def _submission_is_batch(self, id):
pass
def on_supported_problems(self, packet):
logger.info('%s: Updated problem list', self.name)
self._problems = packet['problems']
self.problems = dict(self._problems)
if not self.working:
self.server.judges.update_problems(self)
def on_grading_begin(self, packet):
logger.info('%s: Grading has begun on: %s', self.name, packet['submission-id'])
self.batch_id = None
def on_grading_end(self, packet):
logger.info('%s: Grading has ended on: %s', self.name, packet['submission-id'])
self._free_self(packet)
self.batch_id = None
def on_compile_error(self, packet):
logger.info('%s: Submission failed to compile: %s', self.name, packet['submission-id'])
self._free_self(packet)
def on_compile_message(self, packet):
logger.info('%s: Submission generated compiler messages: %s', self.name, packet['submission-id'])
def on_internal_error(self, packet):
try:
raise ValueError('\n\n' + packet['message'])
except ValueError:
logger.exception('Judge %s failed while handling submission %s', self.name, packet['submission-id'])
self._free_self(packet)
def on_submission_terminated(self, packet):
logger.info('%s: Submission aborted: %s', self.name, packet['submission-id'])
self._free_self(packet)
def on_batch_begin(self, packet):
logger.info('%s: Batch began on: %s', self.name, packet['submission-id'])
self.in_batch = True
if self.batch_id is None:
self.batch_id = 0
self._submission_is_batch(packet['submission-id'])
self.batch_id += 1
def on_batch_end(self, packet):
self.in_batch = False
logger.info('%s: Batch ended on: %s', self.name, packet['submission-id'])
def on_test_case(self, packet):
logger.info('%s: %d test case(s) completed on: %s', self.name, len(packet['cases']), packet['submission-id'])
def on_malformed(self, packet):
logger.error('%s: Malformed packet: %s', self.name, packet)
def on_ping_response(self, packet):
end = time.time()
self._ping_average.append(end - packet['when'])
self._time_delta.append((end + packet['when']) / 2 - packet['time'])
self.latency = sum(self._ping_average) / len(self._ping_average)
self.time_delta = sum(self._time_delta) / len(self._time_delta)
self.load = packet['load']
self._update_ping()
def _free_self(self, packet):
self._working = False
self.server.judges.on_judge_free(self, packet['submission-id'])

View file

@ -1,68 +0,0 @@
import logging
import os
import threading
import time
from event_socket_server import get_preferred_engine
from judge.models import Judge
from .judgelist import JudgeList
logger = logging.getLogger('judge.bridge')
def reset_judges():
Judge.objects.update(online=False, ping=None, load=None)
class JudgeServer(get_preferred_engine()):
def __init__(self, *args, **kwargs):
super(JudgeServer, self).__init__(*args, **kwargs)
reset_judges()
self.judges = JudgeList()
self.ping_judge_thread = threading.Thread(target=self.ping_judge, args=())
self.ping_judge_thread.daemon = True
self.ping_judge_thread.start()
def on_shutdown(self):
super(JudgeServer, self).on_shutdown()
reset_judges()
def ping_judge(self):
try:
while True:
for judge in self.judges:
judge.ping()
time.sleep(10)
except Exception:
logger.exception('Ping error')
raise
def main():
import argparse
import logging
from .judgehandler import JudgeHandler
format = '%(asctime)s:%(levelname)s:%(name)s:%(message)s'
logging.basicConfig(format=format)
logging.getLogger().setLevel(logging.INFO)
handler = logging.FileHandler(os.path.join(os.path.dirname(__file__), 'judgeserver.log'), encoding='utf-8')
handler.setFormatter(logging.Formatter(format))
handler.setLevel(logging.INFO)
logging.getLogger().addHandler(handler)
parser = argparse.ArgumentParser(description='''
Runs the bridge between DMOJ website and judges.
''')
parser.add_argument('judge_host', nargs='+', action='append',
help='host to listen for the judge')
parser.add_argument('-p', '--judge-port', type=int, action='append',
help='port to listen for the judge')
args = parser.parse_args()
server = JudgeServer(list(zip(args.judge_host, args.judge_port)), JudgeHandler)
server.serve_forever()
if __name__ == '__main__':
main()

30
judge/bridge/server.py Normal file
View file

@ -0,0 +1,30 @@
import threading
from socketserver import TCPServer, ThreadingMixIn
class ThreadingTCPListener(ThreadingMixIn, TCPServer):
allow_reuse_address = True
class Server:
def __init__(self, addresses, handler):
self.servers = [ThreadingTCPListener(address, handler) for address in addresses]
self._shutdown = threading.Event()
def serve_forever(self):
threads = [threading.Thread(target=server.serve_forever) for server in self.servers]
for thread in threads:
thread.daemon = True
thread.start()
try:
self._shutdown.wait()
except KeyboardInterrupt:
self.shutdown()
finally:
for thread in threads:
thread.join()
def shutdown(self):
for server in self.servers:
server.shutdown()
self._shutdown.set()

View file

@ -7,7 +7,7 @@ from django.contrib.auth.forms import AuthenticationForm
from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db.models import Q
from django.forms import CharField, Form, ModelForm
from django.forms import CharField, ChoiceField, Form, ModelForm
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
@ -69,18 +69,23 @@ class ProfileForm(ModelForm):
class ProblemSubmitForm(ModelForm):
source = CharField(max_length=65536, widget=AceWidget(theme='twilight', no_ace_media=True))
judge = ChoiceField(choices=(), widget=forms.HiddenInput(), required=False)
def __init__(self, *args, **kwargs):
def __init__(self, *args, judge_choices=(), **kwargs):
super(ProblemSubmitForm, self).__init__(*args, **kwargs)
self.fields['problem'].empty_label = None
self.fields['problem'].widget = forms.HiddenInput()
self.fields['language'].empty_label = None
self.fields['language'].label_from_instance = attrgetter('display_name')
self.fields['language'].queryset = Language.objects.filter(judges__online=True).distinct()
if judge_choices:
self.fields['judge'].widget = Select2Widget(
attrs={'style': 'width: 150px', 'data-placeholder': _('Any judge')},
)
self.fields['judge'].choices = judge_choices
class Meta:
model = Submission
fields = ['problem', 'language']
fields = ['language']
class EditOrganizationForm(ModelForm):

View file

@ -48,7 +48,7 @@ def judge_request(packet, reply=True):
return result
def judge_submission(submission, rejudge, batch_rejudge=False):
def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=None):
from .models import ContestSubmission, Submission, SubmissionTestCase
CONTEST_SUBMISSION_PRIORITY = 0
@ -61,8 +61,8 @@ def judge_submission(submission, rejudge, batch_rejudge=False):
try:
# This is set proactively; it might get unset in judgecallback's on_grading_begin if the problem doesn't
# actually have pretests stored on the judge.
updates['is_pretested'] = ContestSubmission.objects.filter(submission=submission) \
.values_list('problem__contest__run_pretests_only', flat=True)[0]
updates['is_pretested'] = all(ContestSubmission.objects.filter(submission=submission)
.values_list('problem__contest__run_pretests_only', 'problem__is_pretested')[0])
except IndexError:
priority = DEFAULT_PRIORITY
else:
@ -88,15 +88,16 @@ def judge_submission(submission, rejudge, batch_rejudge=False):
'problem-id': submission.problem.code,
'language': submission.language.key,
'source': submission.source.source,
'judge-id': judge_id,
'priority': BATCH_REJUDGE_PRIORITY if batch_rejudge else REJUDGE_PRIORITY if rejudge else priority,
})
except BaseException:
logger.exception('Failed to send request to judge')
Submission.objects.filter(id=submission.id).update(status='IE')
Submission.objects.filter(id=submission.id).update(status='IE', result='IE')
success = False
else:
if response['name'] != 'submission-received' or response['submission-id'] != submission.id:
Submission.objects.filter(id=submission.id).update(status='IE')
Submission.objects.filter(id=submission.id).update(status='IE', result='IE')
_post_update_submission(submission)
success = True
return success
@ -109,7 +110,7 @@ def disconnect_judge(judge, force=False):
def abort_submission(submission):
from .models import Submission
response = judge_request({'name': 'terminate-submission', 'submission-id': submission.id})
# This defaults to true, so that in the case the judgelist fails to remove the submission from the queue,
# This defaults to true, so that in the case the JudgeList fails to remove the submission from the queue,
# and returns a bad-request, the submission is not falsely shown as "Aborted" when it will still be judged.
if not response.get('judge-aborted', True):
Submission.objects.filter(id=submission.id).update(status='AB', result='AB')

View file

@ -1,33 +1,8 @@
import threading
from django.conf import settings
from django.core.management.base import BaseCommand
from judge.bridge import DjangoHandler, DjangoServer
from judge.bridge import DjangoJudgeHandler, JudgeServer
from judge.bridge.daemon import judge_daemon
class Command(BaseCommand):
def handle(self, *args, **options):
judge_handler = DjangoJudgeHandler
try:
import netaddr # noqa: F401, imported to see if it exists
except ImportError:
pass
else:
proxies = settings.BRIDGED_JUDGE_PROXIES
if proxies:
judge_handler = judge_handler.with_proxy_set(proxies)
judge_server = JudgeServer(settings.BRIDGED_JUDGE_ADDRESS, judge_handler)
django_server = DjangoServer(judge_server.judges, settings.BRIDGED_DJANGO_ADDRESS, DjangoHandler)
# TODO: Merge the two servers
threading.Thread(target=django_server.serve_forever).start()
try:
judge_server.serve_forever()
except KeyboardInterrupt:
pass
finally:
django_server.stop()
judge_daemon()

View file

@ -0,0 +1,18 @@
# Generated by Django 2.2.12 on 2020-07-19 21:07
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('judge', '0107_notification'),
]
operations = [
migrations.AddField(
model_name='submission',
name='judged_date',
field=models.DateTimeField(default=None, null=True, verbose_name='submission judge time'),
),
]

View file

@ -78,6 +78,7 @@ class Submission(models.Model):
case_total = models.FloatField(verbose_name=_('test case total points'), default=0)
judged_on = models.ForeignKey('Judge', verbose_name=_('judged on'), null=True, blank=True,
on_delete=models.SET_NULL)
judged_date = models.DateTimeField(verbose_name=_('submission judge time'), default=None, null=True)
was_rejudged = models.BooleanField(verbose_name=_('was rejudged by admin'), default=False)
is_pretested = models.BooleanField(verbose_name=_('was ran on pretests only'), default=False)
contest_object = models.ForeignKey('Contest', verbose_name=_('contest'), null=True, blank=True,
@ -112,8 +113,9 @@ class Submission(models.Model):
def long_status(self):
return Submission.USER_DISPLAY_CODES.get(self.short_status, '')
def judge(self, rejudge=False, batch_rejudge=False):
judge_submission(self, rejudge, batch_rejudge)
def judge(self, *args, **kwargs):
judge_submission(self, *args, **kwargs)
judge.alters_data = True

View file

@ -537,8 +537,15 @@ def problem_submit(request, problem=None, submission=None):
raise PermissionDenied()
profile = request.profile
if problem.is_editable_by(request.user):
judge_choices = tuple(Judge.objects.filter(online=True, problems=problem).values_list('name', 'name'))
else:
judge_choices = ()
if request.method == 'POST':
form = ProblemSubmitForm(request.POST, instance=Submission(user=profile))
form = ProblemSubmitForm(request.POST, judge_choices=judge_choices,
instance=Submission(user=profile, problem=problem))
if form.is_valid():
if (not request.user.has_perm('judge.spam_submission') and
Submission.objects.filter(user=profile, was_rejudged=False)
@ -586,7 +593,7 @@ def problem_submit(request, problem=None, submission=None):
# Save a query
model.source = source
model.judge(rejudge=False)
model.judge(rejudge=False, judge_id=form.cleaned_data['judge'])
return HttpResponseRedirect(reverse('submission_status', args=[str(model.id)]))
else:
@ -610,7 +617,7 @@ def problem_submit(request, problem=None, submission=None):
initial['language'] = sub.language
except ValueError:
raise Http404()
form = ProblemSubmitForm(initial=initial)
form = ProblemSubmitForm(judge_choices=judge_choices, initial=initial)
form_data = initial
if 'problem' in form_data:
form.fields['language'].queryset = (

View file

@ -34,3 +34,4 @@ channels
channels-redis
docker
python-memcached
netaddr

View file

@ -222,9 +222,12 @@ ul.problem-list {
box-sizing: border-box;
.button {
float: right;
display: inline-block !important;
padding: 6px 12px;
}
.submit-bar {
float: right;
}
}
@media (max-width: 550px) {

View file

@ -2,8 +2,8 @@
{% block js_media %}
<script type="text/javascript" src="{{ ACE_URL }}/ace.js"></script>
{{ form.media.js }}
{% compress js %}
{{ form.media.js }}
<script type="text/javascript">
$(function () {
function format(state) {
@ -81,6 +81,13 @@
var dropdown = $('.select2-dropdown');
if (!$('#result-version-info').length)
dropdown.append($("<span id=\"result-version-info\">"));
dropdown.attr('id', 'language-select2');
});
$('#id_judge').on('select2:open', function (evt) {
var dropdown = $('.select2-dropdown');
$('#result-version-info').remove();
dropdown.attr('id', 'judge-select2');
});
$('#id_language').change(function () {
@ -107,20 +114,28 @@
$(window).resize(function () {
$('#ace_source').height(Math.max($(window).height() - 353, 100));
}).resize();
$('#problem_submit').submit(function (event) {
if ($('#id_source').val().length > 65536) {
alert("{{ _('Your source code must contain at most 65536 characters.') }}");
event.preventDefault();
$('#problem_submit').find(':submit').attr('disabled', false);
}
});
});
</script>
{% endcompress %}
{% endblock %}
{% block media %}
{{ form.media.css }}
{% compress css %}
{{ form.media.css }}
<style media="screen">
#submit-wrapper {
margin-top: 0.7em;
}
#submit-wrapper #problem-id, #submit-wrapper #editor, #submit-wrapper #language {
#submit-wrapper #editor, #submit-wrapper #language {
margin-top: 4px;
}
@ -137,42 +152,42 @@
text-align: right;
}
.select2-results__message {
#language-select2 .select2-results__message {
white-space: nowrap
}
.select2-dropdown--above {
#language-select2 .select2-dropdown--above {
display: flex;
flex-direction: column-reverse;
}
.select2-results__option {
#language-select2 .select2-results__option {
color: #757575 !important;
background: white !important;
}
.select2-results__option--highlighted {
#language-select2 .select2-results__option--highlighted {
text-decoration: underline;
}
.select2-results__option[aria-selected=true] {
#language-select2 .select2-results__option[aria-selected=true] {
font-weight: bold;
color: black !important;
}
.select2-results__option {
#language-select2 .select2-results__option {
padding: 4px 0px;
}
.select2-results__options {
#language-select2 .select2-results__options {
overflow-y: visible !important;
}
.select2-results__option {
#language-select2 .select2-results__option {
break-inside: avoid-column;
}
.select2-results {
#language-select2 .select2-results {
-webkit-columns: 10 7em;
-moz-columns: 10 7em;
columns: 10 7em;
@ -219,10 +234,6 @@
{% csrf_token %}
{{ form.non_field_errors() }}
<div id="submit-wrapper">
<div id="problem-id">
{{ form.problem.errors }}
{{ form.problem }}
</div>
<div id="editor">
{{ form.source.errors }}
{{ form.source }}
@ -249,8 +260,11 @@
{% if no_judges %}
<span style="color: red">{{ _('No judge is available for this problem.') }}</span>
{% else %}
<input type="submit" value="{{ _('Submit!') }}" class="button"
{% if request.in_contest and submission_limit and not submissions_left %}disabled{% endif %}>
<div class="submit-bar">
{{ form.judge }}
<input type="submit" value="{{ _('Submit!') }}" class="button"
{% if request.in_contest and submission_limit and not submissions_left %}disabled{% endif %}>
</div>
{% endif %}
</form>
{% endblock %}

View file

@ -36,7 +36,7 @@
{% if submission.status == 'G' %}
<div class="sub-testcase">
{%- if submission.current_testcase > 0 -%}
{{ _('Point %(point)s / Case #%(case)s', point=submission.points, case=submission.current_testcase) }}
{{ _('Point %(point)s / Case #%(case)s', point=submission.points|floatformat(1), case=submission.current_testcase) }}
{%- else -%}
...
{%- endif -%}