From 3629369fbafda62427c6b9d4ab94a1f6a20a25ae Mon Sep 17 00:00:00 2001 From: cuom1999 Date: Sun, 19 Jul 2020 16:27:14 -0500 Subject: [PATCH] Update bridge (DMOJ) --- README.html | 1078 ----------------- dmoj_bridge_async.py | 20 + event_socket_server/__init__.py | 11 - event_socket_server/base_server.py | 169 --- event_socket_server/engines/__init__.py | 17 - event_socket_server/engines/epoll_server.py | 17 - event_socket_server/engines/poll_server.py | 97 -- event_socket_server/engines/select_server.py | 49 - event_socket_server/handler.py | 27 - event_socket_server/helpers.py | 125 -- event_socket_server/test_server.py | 54 - judge/admin/submission.py | 4 +- judge/bridge/__init__.py | 6 - judge/bridge/base_handler.py | 196 +++ judge/bridge/daemon.py | 47 + .../{djangohandler.py => django_handler.py} | 40 +- judge/bridge/djangoserver.py | 7 - .../bridge/echo_test_client.py | 21 +- judge/bridge/echo_test_server.py | 39 + .../{judgecallback.py => judge_handler.py} | 391 ++++-- judge/bridge/{judgelist.py => judge_list.py} | 30 +- judge/bridge/judgehandler.py | 268 ---- judge/bridge/judgeserver.py | 68 -- judge/bridge/server.py | 30 + judge/forms.py | 17 +- judge/judgeapi.py | 15 +- judge/management/commands/runbridged.py | 29 +- .../migrations/0108_submission_judged_date.py | 18 + judge/models/submission.py | 6 +- judge/views/problem.py | 13 +- requirements.txt | 1 + resources/problem.scss | 5 +- templates/problem/submit.html | 52 +- templates/submission/row.html | 2 +- 34 files changed, 770 insertions(+), 2199 deletions(-) delete mode 100644 README.html create mode 100644 dmoj_bridge_async.py delete mode 100644 event_socket_server/__init__.py delete mode 100644 event_socket_server/base_server.py delete mode 100644 event_socket_server/engines/__init__.py delete mode 100644 event_socket_server/engines/epoll_server.py delete mode 100644 event_socket_server/engines/poll_server.py delete mode 100644 event_socket_server/engines/select_server.py delete mode 100644 event_socket_server/handler.py delete mode 100644 event_socket_server/helpers.py delete mode 100644 event_socket_server/test_server.py create mode 100644 judge/bridge/base_handler.py create mode 100644 judge/bridge/daemon.py rename judge/bridge/{djangohandler.py => django_handler.py} (53%) delete mode 100644 judge/bridge/djangoserver.py rename event_socket_server/test_client.py => judge/bridge/echo_test_client.py (82%) create mode 100644 judge/bridge/echo_test_server.py rename judge/bridge/{judgecallback.py => judge_handler.py} (64%) rename judge/bridge/{judgelist.py => judge_list.py} (80%) delete mode 100644 judge/bridge/judgehandler.py delete mode 100644 judge/bridge/judgeserver.py create mode 100644 judge/bridge/server.py create mode 100644 judge/migrations/0108_submission_judged_date.py diff --git a/README.html b/README.html deleted file mode 100644 index 53d70c6..0000000 --- a/README.html +++ /dev/null @@ -1,1078 +0,0 @@ -README

online-judge

-

1. Activate virtualenv:

-

source dmojsite/bin/activate

-

2. Remember to change the local_settings

-

3. Run server:

-

python manage.py runserver 0.0.0.0:8000

-

4. Create configure file for judge:

-

python dmojauto-conf

-

5. Create folder for problems, change the dir in judge conf file and local_settings.py

-

6. Connect judge:

- -

7. Change vietnamese:

-
\ No newline at end of file diff --git a/dmoj_bridge_async.py b/dmoj_bridge_async.py new file mode 100644 index 0000000..dee4112 --- /dev/null +++ b/dmoj_bridge_async.py @@ -0,0 +1,20 @@ +import os + +import gevent.monkey # noqa: I100, gevent must be imported here + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dmoj.settings') +gevent.monkey.patch_all() + +# noinspection PyUnresolvedReferences +import dmoj_install_pymysql # noqa: E402, F401, I100, I202, imported for side effect + +import django # noqa: E402, F401, I100, I202, django must be imported here +django.setup() + +# noinspection PyUnresolvedReferences +import django_2_2_pymysql_patch # noqa: E402, I100, F401, I202, imported for side effect + +from judge.bridge.daemon import judge_daemon # noqa: E402, I100, I202, django code must be imported here + +if __name__ == '__main__': + judge_daemon() diff --git a/event_socket_server/__init__.py b/event_socket_server/__init__.py deleted file mode 100644 index 88f7683..0000000 --- a/event_socket_server/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from .base_server import BaseServer -from .engines import * -from .handler import Handler -from .helpers import ProxyProtocolMixin, SizedPacketHandler, ZlibPacketHandler - - -def get_preferred_engine(choices=('epoll', 'poll', 'select')): - for choice in choices: - if choice in engines: - return engines[choice] - return engines['select'] diff --git a/event_socket_server/base_server.py b/event_socket_server/base_server.py deleted file mode 100644 index 667c627..0000000 --- a/event_socket_server/base_server.py +++ /dev/null @@ -1,169 +0,0 @@ -import logging -import socket -import threading -import time -from collections import defaultdict, deque -from functools import total_ordering -from heapq import heappop, heappush - -logger = logging.getLogger('event_socket_server') - - -class SendMessage(object): - __slots__ = ('data', 'callback') - - def __init__(self, data, callback): - self.data = data - self.callback = callback - - -@total_ordering -class ScheduledJob(object): - __slots__ = ('time', 'func', 'args', 'kwargs', 'cancel', 'dispatched') - - def __init__(self, time, func, args, kwargs): - self.time = time - self.func = func - self.args = args - self.kwargs = kwargs - self.cancel = False - self.dispatched = False - - def __eq__(self, other): - return self.time == other.time - - def __lt__(self, other): - return self.time < other.time - - -class BaseServer(object): - def __init__(self, addresses, client): - self._servers = set() - for address, port in addresses: - info = socket.getaddrinfo(address, port, socket.AF_UNSPEC, socket.SOCK_STREAM) - for af, socktype, proto, canonname, sa in info: - sock = socket.socket(af, socktype, proto) - sock.setblocking(0) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(sa) - self._servers.add(sock) - - self._stop = threading.Event() - self._clients = set() - self._ClientClass = client - self._send_queue = defaultdict(deque) - self._job_queue = [] - self._job_queue_lock = threading.Lock() - - def _serve(self): - raise NotImplementedError() - - def _accept(self, sock): - conn, address = sock.accept() - conn.setblocking(0) - client = self._ClientClass(self, conn) - self._clients.add(client) - return client - - def schedule(self, delay, func, *args, **kwargs): - with self._job_queue_lock: - job = ScheduledJob(time.time() + delay, func, args, kwargs) - heappush(self._job_queue, job) - return job - - def unschedule(self, job): - with self._job_queue_lock: - if job.dispatched or job.cancel: - return False - job.cancel = True - return True - - def _register_write(self, client): - raise NotImplementedError() - - def _register_read(self, client): - raise NotImplementedError() - - def _clean_up_client(self, client, finalize=False): - try: - del self._send_queue[client.fileno()] - except KeyError: - pass - client.on_close() - client._socket.close() - if not finalize: - self._clients.remove(client) - - def _dispatch_event(self): - t = time.time() - tasks = [] - with self._job_queue_lock: - while True: - dt = self._job_queue[0].time - t if self._job_queue else 1 - if dt > 0: - break - task = heappop(self._job_queue) - task.dispatched = True - if not task.cancel: - tasks.append(task) - for task in tasks: - logger.debug('Dispatching event: %r(*%r, **%r)', task.func, task.args, task.kwargs) - task.func(*task.args, **task.kwargs) - if not self._job_queue or dt > 1: - dt = 1 - return dt - - def _nonblock_read(self, client): - try: - data = client._socket.recv(1024) - except socket.error: - self._clean_up_client(client) - else: - logger.debug('Read from %s: %d bytes', client.client_address, len(data)) - if not data: - self._clean_up_client(client) - else: - try: - client._recv_data(data) - except Exception: - logger.exception('Client recv_data failure') - self._clean_up_client(client) - - def _nonblock_write(self, client): - fd = client.fileno() - queue = self._send_queue[fd] - try: - top = queue[0] - cb = client._socket.send(top.data) - top.data = top.data[cb:] - logger.debug('Send to %s: %d bytes', client.client_address, cb) - if not top.data: - logger.debug('Finished sending: %s', client.client_address) - if top.callback is not None: - logger.debug('Calling callback: %s: %r', client.client_address, top.callback) - try: - top.callback() - except Exception: - logger.exception('Client write callback failure') - self._clean_up_client(client) - return - queue.popleft() - if not queue: - self._register_read(client) - del self._send_queue[fd] - except socket.error: - self._clean_up_client(client) - - def send(self, client, data, callback=None): - logger.debug('Writing %d bytes to client %s, callback: %s', len(data), client.client_address, callback) - self._send_queue[client.fileno()].append(SendMessage(data, callback)) - self._register_write(client) - - def stop(self): - self._stop.set() - - def serve_forever(self): - self._serve() - - def on_shutdown(self): - pass diff --git a/event_socket_server/engines/__init__.py b/event_socket_server/engines/__init__.py deleted file mode 100644 index 5737684..0000000 --- a/event_socket_server/engines/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -import select - -__author__ = 'Quantum' -engines = {} - -from .select_server import SelectServer # noqa: E402, import not at top for consistency -engines['select'] = SelectServer - -if hasattr(select, 'poll'): - from .poll_server import PollServer - engines['poll'] = PollServer - -if hasattr(select, 'epoll'): - from .epoll_server import EpollServer - engines['epoll'] = EpollServer - -del select diff --git a/event_socket_server/engines/epoll_server.py b/event_socket_server/engines/epoll_server.py deleted file mode 100644 index a59755a..0000000 --- a/event_socket_server/engines/epoll_server.py +++ /dev/null @@ -1,17 +0,0 @@ -import select -__author__ = 'Quantum' - -if not hasattr(select, 'epoll'): - raise ImportError('System does not support epoll') - -from .poll_server import PollServer # noqa: E402, must be imported here - - -class EpollServer(PollServer): - poll = select.epoll - WRITE = select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP - READ = select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP - POLLIN = select.EPOLLIN - POLLOUT = select.EPOLLOUT - POLL_CLOSE = select.EPOLLHUP | select.EPOLLERR - NEED_CLOSE = True diff --git a/event_socket_server/engines/poll_server.py b/event_socket_server/engines/poll_server.py deleted file mode 100644 index aa4124b..0000000 --- a/event_socket_server/engines/poll_server.py +++ /dev/null @@ -1,97 +0,0 @@ -import errno -import logging -import select -import threading - -from ..base_server import BaseServer - -logger = logging.getLogger('event_socket_server') - -if not hasattr(select, 'poll'): - raise ImportError('System does not support poll') - - -class PollServer(BaseServer): - poll = select.poll - WRITE = select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP - READ = select.POLLIN | select.POLLERR | select.POLLHUP - POLLIN = select.POLLIN - POLLOUT = select.POLLOUT - POLL_CLOSE = select.POLLERR | select.POLLHUP - NEED_CLOSE = False - - def __init__(self, *args, **kwargs): - super(PollServer, self).__init__(*args, **kwargs) - self._poll = self.poll() - self._fdmap = {} - self._server_fds = {sock.fileno(): sock for sock in self._servers} - self._close_lock = threading.RLock() - - def _register_write(self, client): - logger.debug('On write mode: %s', client.client_address) - self._poll.modify(client.fileno(), self.WRITE) - - def _register_read(self, client): - logger.debug('On read mode: %s', client.client_address) - self._poll.modify(client.fileno(), self.READ) - - def _clean_up_client(self, client, finalize=False): - logger.debug('Taking close lock: cleanup') - with self._close_lock: - logger.debug('Cleaning up client: %s, finalize: %d', client.client_address, finalize) - fd = client.fileno() - try: - self._poll.unregister(fd) - except IOError as e: - if e.errno != errno.ENOENT: - raise - except KeyError: - pass - del self._fdmap[fd] - super(PollServer, self)._clean_up_client(client, finalize) - - def _serve(self): - for fd, sock in self._server_fds.items(): - self._poll.register(fd, self.POLLIN) - sock.listen(16) - try: - while not self._stop.is_set(): - for fd, event in self._poll.poll(self._dispatch_event()): - if fd in self._server_fds: - client = self._accept(self._server_fds[fd]) - logger.debug('Accepting: %s', client.client_address) - fd = client.fileno() - self._poll.register(fd, self.READ) - self._fdmap[fd] = client - elif event & self.POLL_CLOSE: - logger.debug('Client closed: %s', self._fdmap[fd].client_address) - self._clean_up_client(self._fdmap[fd]) - else: - logger.debug('Taking close lock: event loop') - with self._close_lock: - try: - client = self._fdmap[fd] - except KeyError: - pass - else: - logger.debug('Client active: %s, read: %d, write: %d', - client.client_address, - event & self.POLLIN, - event & self.POLLOUT) - if event & self.POLLIN: - logger.debug('Non-blocking read on client: %s', client.client_address) - self._nonblock_read(client) - # Might be closed in the read handler. - if event & self.POLLOUT and fd in self._fdmap: - logger.debug('Non-blocking write on client: %s', client.client_address) - self._nonblock_write(client) - finally: - logger.info('Shutting down server') - self.on_shutdown() - for client in self._clients: - self._clean_up_client(client, True) - for fd, sock in self._server_fds.items(): - self._poll.unregister(fd) - sock.close() - if self.NEED_CLOSE: - self._poll.close() diff --git a/event_socket_server/engines/select_server.py b/event_socket_server/engines/select_server.py deleted file mode 100644 index 3ae9a4d..0000000 --- a/event_socket_server/engines/select_server.py +++ /dev/null @@ -1,49 +0,0 @@ -import select - -from ..base_server import BaseServer - - -class SelectServer(BaseServer): - def __init__(self, *args, **kwargs): - super(SelectServer, self).__init__(*args, **kwargs) - self._reads = set(self._servers) - self._writes = set() - - def _register_write(self, client): - self._writes.add(client) - - def _register_read(self, client): - self._writes.remove(client) - - def _clean_up_client(self, client, finalize=False): - self._writes.discard(client) - self._reads.remove(client) - super(SelectServer, self)._clean_up_client(client, finalize) - - def _serve(self, select=select.select): - for server in self._servers: - server.listen(16) - try: - while not self._stop.is_set(): - r, w, x = select(self._reads, self._writes, self._reads, self._dispatch_event()) - for s in r: - if s in self._servers: - self._reads.add(self._accept(s)) - else: - self._nonblock_read(s) - - for client in w: - self._nonblock_write(client) - - for s in x: - s.close() - if s in self._servers: - raise RuntimeError('Server is in exceptional condition') - else: - self._clean_up_client(s) - finally: - self.on_shutdown() - for client in self._clients: - self._clean_up_client(client, True) - for server in self._servers: - server.close() diff --git a/event_socket_server/handler.py b/event_socket_server/handler.py deleted file mode 100644 index ecc3314..0000000 --- a/event_socket_server/handler.py +++ /dev/null @@ -1,27 +0,0 @@ -__author__ = 'Quantum' - - -class Handler(object): - def __init__(self, server, socket): - self._socket = socket - self.server = server - self.client_address = socket.getpeername() - - def fileno(self): - return self._socket.fileno() - - def _recv_data(self, data): - raise NotImplementedError - - def _send(self, data, callback=None): - return self.server.send(self, data, callback) - - def close(self): - self.server._clean_up_client(self) - - def on_close(self): - pass - - @property - def socket(self): - return self._socket diff --git a/event_socket_server/helpers.py b/event_socket_server/helpers.py deleted file mode 100644 index e8c3672..0000000 --- a/event_socket_server/helpers.py +++ /dev/null @@ -1,125 +0,0 @@ -import struct -import zlib - -from judge.utils.unicode import utf8text -from .handler import Handler - -size_pack = struct.Struct('!I') - - -class SizedPacketHandler(Handler): - def __init__(self, server, socket): - super(SizedPacketHandler, self).__init__(server, socket) - self._buffer = b'' - self._packetlen = 0 - - def _packet(self, data): - raise NotImplementedError() - - def _format_send(self, data): - return data - - def _recv_data(self, data): - self._buffer += data - while len(self._buffer) >= self._packetlen if self._packetlen else len(self._buffer) >= size_pack.size: - if self._packetlen: - data = self._buffer[:self._packetlen] - self._buffer = self._buffer[self._packetlen:] - self._packetlen = 0 - self._packet(data) - else: - data = self._buffer[:size_pack.size] - self._buffer = self._buffer[size_pack.size:] - self._packetlen = size_pack.unpack(data)[0] - - def send(self, data, callback=None): - data = self._format_send(data) - self._send(size_pack.pack(len(data)) + data, callback) - - -class ZlibPacketHandler(SizedPacketHandler): - def _format_send(self, data): - return zlib.compress(data.encode('utf-8')) - - def packet(self, data): - raise NotImplementedError() - - def _packet(self, data): - try: - self.packet(zlib.decompress(data).decode('utf-8')) - except zlib.error as e: - self.malformed_packet(e) - - def malformed_packet(self, exception): - self.close() - - -class ProxyProtocolMixin(object): - __UNKNOWN_TYPE = 0 - __PROXY1 = 1 - __PROXY2 = 2 - __DATA = 3 - - __HEADER2 = b'\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A' - __HEADER2_LEN = len(__HEADER2) - - _REAL_IP_SET = None - - @classmethod - def with_proxy_set(cls, ranges): - from netaddr import IPSet, IPGlob - from itertools import chain - - globs = [] - addrs = [] - for item in ranges: - if '*' in item or '-' in item: - globs.append(IPGlob(item)) - else: - addrs.append(item) - ipset = IPSet(chain(chain.from_iterable(globs), addrs)) - return type(cls.__name__, (cls,), {'_REAL_IP_SET': ipset}) - - def __init__(self, server, socket): - super(ProxyProtocolMixin, self).__init__(server, socket) - self.__buffer = b'' - self.__type = (self.__UNKNOWN_TYPE if self._REAL_IP_SET and - self.client_address[0] in self._REAL_IP_SET else self.__DATA) - - def __parse_proxy1(self, data): - self.__buffer += data - index = self.__buffer.find(b'\r\n') - if 0 <= index < 106: - proxy = data[:index].split() - if len(proxy) < 2: - return self.close() - if proxy[1] == b'TCP4': - if len(proxy) != 6: - return self.close() - self.client_address = (utf8text(proxy[2]), utf8text(proxy[4])) - self.server_address = (utf8text(proxy[3]), utf8text(proxy[5])) - elif proxy[1] == b'TCP6': - self.client_address = (utf8text(proxy[2]), utf8text(proxy[4]), 0, 0) - self.server_address = (utf8text(proxy[3]), utf8text(proxy[5]), 0, 0) - elif proxy[1] != b'UNKNOWN': - return self.close() - - self.__type = self.__DATA - super(ProxyProtocolMixin, self)._recv_data(data[index + 2:]) - elif len(self.__buffer) > 107 or index > 105: - self.close() - - def _recv_data(self, data): - if self.__type == self.__DATA: - super(ProxyProtocolMixin, self)._recv_data(data) - elif self.__type == self.__UNKNOWN_TYPE: - if len(data) >= 16 and data[:self.__HEADER2_LEN] == self.__HEADER2: - self.close() - elif len(data) >= 8 and data[:5] == b'PROXY': - self.__type = self.__PROXY1 - self.__parse_proxy1(data) - else: - self.__type = self.__DATA - super(ProxyProtocolMixin, self)._recv_data(data) - else: - self.__parse_proxy1(data) diff --git a/event_socket_server/test_server.py b/event_socket_server/test_server.py deleted file mode 100644 index 7dec107..0000000 --- a/event_socket_server/test_server.py +++ /dev/null @@ -1,54 +0,0 @@ -from .engines import engines -from .helpers import ProxyProtocolMixin, ZlibPacketHandler - - -class EchoPacketHandler(ProxyProtocolMixin, ZlibPacketHandler): - def __init__(self, server, socket): - super(EchoPacketHandler, self).__init__(server, socket) - self._gotdata = False - self.server.schedule(5, self._kill_if_no_data) - - def _kill_if_no_data(self): - if not self._gotdata: - print('Inactive client:', self._socket.getpeername()) - self.close() - - def packet(self, data): - self._gotdata = True - print('Data from %s: %r' % (self._socket.getpeername(), data[:30] if len(data) > 30 else data)) - self.send(data) - - def on_close(self): - self._gotdata = True - print('Closed client:', self._socket.getpeername()) - - -def main(): - import argparse - parser = argparse.ArgumentParser() - parser.add_argument('-l', '--host', action='append') - parser.add_argument('-p', '--port', type=int, action='append') - parser.add_argument('-e', '--engine', default='select', choices=sorted(engines.keys())) - try: - import netaddr - except ImportError: - netaddr = None - else: - parser.add_argument('-P', '--proxy', action='append') - args = parser.parse_args() - - class TestServer(engines[args.engine]): - def _accept(self, sock): - client = super(TestServer, self)._accept(sock) - print('New connection:', client.socket.getpeername()) - return client - - handler = EchoPacketHandler - if netaddr is not None and args.proxy: - handler = handler.with_proxy_set(args.proxy) - server = TestServer(list(zip(args.host, args.port)), handler) - server.serve_forever() - - -if __name__ == '__main__': - main() diff --git a/judge/admin/submission.py b/judge/admin/submission.py index d774658..2689a76 100644 --- a/judge/admin/submission.py +++ b/judge/admin/submission.py @@ -108,8 +108,8 @@ class SubmissionSourceInline(admin.StackedInline): class SubmissionAdmin(admin.ModelAdmin): - readonly_fields = ('user', 'problem', 'date') - fields = ('user', 'problem', 'date', 'time', 'memory', 'points', 'language', 'status', 'result', + readonly_fields = ('user', 'problem', 'date', 'judged_date') + fields = ('user', 'problem', 'date', 'judged_date', 'time', 'memory', 'points', 'language', 'status', 'result', 'case_points', 'case_total', 'judged_on', 'error') actions = ('judge', 'recalculate_score') list_display = ('id', 'problem_code', 'problem_name', 'user_column', 'execution_time', 'pretty_memory', diff --git a/judge/bridge/__init__.py b/judge/bridge/__init__.py index f1f1bfb..e69de29 100644 --- a/judge/bridge/__init__.py +++ b/judge/bridge/__init__.py @@ -1,6 +0,0 @@ -from .djangohandler import DjangoHandler -from .djangoserver import DjangoServer -from .judgecallback import DjangoJudgeHandler -from .judgehandler import JudgeHandler -from .judgelist import JudgeList -from .judgeserver import JudgeServer diff --git a/judge/bridge/base_handler.py b/judge/bridge/base_handler.py new file mode 100644 index 0000000..c3fbd40 --- /dev/null +++ b/judge/bridge/base_handler.py @@ -0,0 +1,196 @@ +import logging +import socket +import struct +import zlib +from itertools import chain + +from netaddr import IPGlob, IPSet + +from judge.utils.unicode import utf8text + +logger = logging.getLogger('judge.bridge') + +size_pack = struct.Struct('!I') +assert size_pack.size == 4 + +MAX_ALLOWED_PACKET_SIZE = 8 * 1024 * 1024 + + +def proxy_list(human_readable): + globs = [] + addrs = [] + for item in human_readable: + if '*' in item or '-' in item: + globs.append(IPGlob(item)) + else: + addrs.append(item) + return IPSet(chain(chain.from_iterable(globs), addrs)) + + +class Disconnect(Exception): + pass + + +# socketserver.BaseRequestHandler does all the handling in __init__, +# making it impossible to inherit __init__ sanely. While it lets you +# use setup(), most tools will complain about uninitialized variables. +# This metaclass will allow sane __init__ behaviour while also magically +# calling the methods that handles the request. +class RequestHandlerMeta(type): + def __call__(cls, *args, **kwargs): + handler = super().__call__(*args, **kwargs) + handler.on_connect() + try: + handler.handle() + except BaseException: + logger.exception('Error in base packet handling') + raise + finally: + handler.on_disconnect() + + +class ZlibPacketHandler(metaclass=RequestHandlerMeta): + proxies = [] + + def __init__(self, request, client_address, server): + self.request = request + self.server = server + self.client_address = client_address + self.server_address = server.server_address + self._initial_tag = None + self._got_packet = False + + @property + def timeout(self): + return self.request.gettimeout() + + @timeout.setter + def timeout(self, timeout): + self.request.settimeout(timeout or None) + + def read_sized_packet(self, size, initial=None): + if size > MAX_ALLOWED_PACKET_SIZE: + logger.log(logging.WARNING if self._got_packet else logging.INFO, + 'Disconnecting client due to too-large message size (%d bytes): %s', size, self.client_address) + raise Disconnect() + + buffer = [] + remainder = size + + if initial: + buffer.append(initial) + remainder -= len(initial) + assert remainder >= 0 + + while remainder: + data = self.request.recv(remainder) + remainder -= len(data) + buffer.append(data) + self._on_packet(b''.join(buffer)) + + def parse_proxy_protocol(self, line): + words = line.split() + + if len(words) < 2: + raise Disconnect() + + if words[1] == b'TCP4': + if len(words) != 6: + raise Disconnect() + self.client_address = (utf8text(words[2]), utf8text(words[4])) + self.server_address = (utf8text(words[3]), utf8text(words[5])) + elif words[1] == b'TCP6': + self.client_address = (utf8text(words[2]), utf8text(words[4]), 0, 0) + self.server_address = (utf8text(words[3]), utf8text(words[5]), 0, 0) + elif words[1] != b'UNKNOWN': + raise Disconnect() + + def read_size(self, buffer=b''): + while len(buffer) < size_pack.size: + recv = self.request.recv(size_pack.size - len(buffer)) + if not recv: + raise Disconnect() + buffer += recv + return size_pack.unpack(buffer)[0] + + def read_proxy_header(self, buffer=b''): + # Max line length for PROXY protocol is 107, and we received 4 already. + while b'\r\n' not in buffer: + if len(buffer) > 107: + raise Disconnect() + data = self.request.recv(107) + if not data: + raise Disconnect() + buffer += data + return buffer + + def _on_packet(self, data): + decompressed = zlib.decompress(data).decode('utf-8') + self._got_packet = True + self.on_packet(decompressed) + + def on_packet(self, data): + raise NotImplementedError() + + def on_connect(self): + pass + + def on_disconnect(self): + pass + + def on_timeout(self): + pass + + def handle(self): + try: + tag = self.read_size() + self._initial_tag = size_pack.pack(tag) + if self.client_address[0] in self.proxies and self._initial_tag == b'PROX': + proxy, _, remainder = self.read_proxy_header(self._initial_tag).partition(b'\r\n') + self.parse_proxy_protocol(proxy) + + while remainder: + while len(remainder) < size_pack.size: + self.read_sized_packet(self.read_size(remainder)) + break + + size = size_pack.unpack(remainder[:size_pack.size])[0] + remainder = remainder[size_pack.size:] + if len(remainder) <= size: + self.read_sized_packet(size, remainder) + break + + self._on_packet(remainder[:size]) + remainder = remainder[size:] + else: + self.read_sized_packet(tag) + + while True: + self.read_sized_packet(self.read_size()) + except Disconnect: + return + except zlib.error: + if self._got_packet: + logger.warning('Encountered zlib error during packet handling, disconnecting client: %s', + self.client_address, exc_info=True) + else: + logger.info('Potentially wrong protocol (zlib error): %s: %r', self.client_address, self._initial_tag, + exc_info=True) + except socket.timeout: + if self._got_packet: + logger.info('Socket timed out: %s', self.client_address) + self.on_timeout() + else: + logger.info('Potentially wrong protocol: %s: %r', self.client_address, self._initial_tag) + except socket.error as e: + # When a gevent socket is shutdown, gevent cancels all waits, causing recv to raise cancel_wait_ex. + if e.__class__.__name__ == 'cancel_wait_ex': + return + raise + + def send(self, data): + compressed = zlib.compress(data.encode('utf-8')) + self.request.sendall(size_pack.pack(len(compressed)) + compressed) + + def close(self): + self.request.shutdown(socket.SHUT_RDWR) diff --git a/judge/bridge/daemon.py b/judge/bridge/daemon.py new file mode 100644 index 0000000..ce8a702 --- /dev/null +++ b/judge/bridge/daemon.py @@ -0,0 +1,47 @@ +import logging +import signal +import threading +from functools import partial + +from django.conf import settings + +from judge.bridge.django_handler import DjangoHandler +from judge.bridge.judge_handler import JudgeHandler +from judge.bridge.judge_list import JudgeList +from judge.bridge.server import Server +from judge.models import Judge, Submission + +logger = logging.getLogger('judge.bridge') + + +def reset_judges(): + Judge.objects.update(online=False, ping=None, load=None) + + +def judge_daemon(): + reset_judges() + Submission.objects.filter(status__in=Submission.IN_PROGRESS_GRADING_STATUS) \ + .update(status='IE', result='IE', error=None) + judges = JudgeList() + + judge_server = Server(settings.BRIDGED_JUDGE_ADDRESS, partial(JudgeHandler, judges=judges)) + django_server = Server(settings.BRIDGED_DJANGO_ADDRESS, partial(DjangoHandler, judges=judges)) + + threading.Thread(target=django_server.serve_forever).start() + threading.Thread(target=judge_server.serve_forever).start() + + stop = threading.Event() + + def signal_handler(signum, _): + logger.info('Exiting due to %s', signal.Signals(signum).name) + stop.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGQUIT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + stop.wait() + finally: + django_server.shutdown() + judge_server.shutdown() diff --git a/judge/bridge/djangohandler.py b/judge/bridge/django_handler.py similarity index 53% rename from judge/bridge/djangohandler.py rename to judge/bridge/django_handler.py index 8b18a53..b284b68 100644 --- a/judge/bridge/djangohandler.py +++ b/judge/bridge/django_handler.py @@ -2,63 +2,55 @@ import json import logging import struct -from event_socket_server import ZlibPacketHandler +from judge.bridge.base_handler import Disconnect, ZlibPacketHandler logger = logging.getLogger('judge.bridge') size_pack = struct.Struct('!I') class DjangoHandler(ZlibPacketHandler): - def __init__(self, server, socket): - super(DjangoHandler, self).__init__(server, socket) + def __init__(self, request, client_address, server, judges): + super().__init__(request, client_address, server) self.handlers = { 'submission-request': self.on_submission, 'terminate-submission': self.on_termination, - 'disconnect-judge': self.on_disconnect, + 'disconnect-judge': self.on_disconnect_request, } - self._to_kill = True - # self.server.schedule(5, self._kill_if_no_request) + self.judges = judges - def _kill_if_no_request(self): - if self._to_kill: - logger.info('Killed inactive connection: %s', self._socket.getpeername()) - self.close() + def send(self, data): + super().send(json.dumps(data, separators=(',', ':'))) - def _format_send(self, data): - return super(DjangoHandler, self)._format_send(json.dumps(data, separators=(',', ':'))) - - def packet(self, packet): - self._to_kill = False + def on_packet(self, packet): packet = json.loads(packet) try: result = self.handlers.get(packet.get('name', None), self.on_malformed)(packet) except Exception: logger.exception('Error in packet handling (Django-facing)') result = {'name': 'bad-request'} - self.send(result, self._schedule_close) - - def _schedule_close(self): - self.server.schedule(0, self.close) + self.send(result) + raise Disconnect() def on_submission(self, data): id = data['submission-id'] problem = data['problem-id'] language = data['language'] source = data['source'] + judge_id = data['judge-id'] priority = data['priority'] - if not self.server.judges.check_priority(priority): + if not self.judges.check_priority(priority): return {'name': 'bad-request'} - self.server.judges.judge(id, problem, language, source, priority) + self.judges.judge(id, problem, language, source, judge_id, priority) return {'name': 'submission-received', 'submission-id': id} def on_termination(self, data): - return {'name': 'submission-received', 'judge-aborted': self.server.judges.abort(data['submission-id'])} + return {'name': 'submission-received', 'judge-aborted': self.judges.abort(data['submission-id'])} - def on_disconnect(self, data): + def on_disconnect_request(self, data): judge_id = data['judge-id'] force = data['force'] - self.server.judges.disconnect(judge_id, force=force) + self.judges.disconnect(judge_id, force=force) def on_malformed(self, packet): logger.error('Malformed packet: %s', packet) diff --git a/judge/bridge/djangoserver.py b/judge/bridge/djangoserver.py deleted file mode 100644 index 39f5787..0000000 --- a/judge/bridge/djangoserver.py +++ /dev/null @@ -1,7 +0,0 @@ -from event_socket_server import get_preferred_engine - - -class DjangoServer(get_preferred_engine()): - def __init__(self, judges, *args, **kwargs): - super(DjangoServer, self).__init__(*args, **kwargs) - self.judges = judges diff --git a/event_socket_server/test_client.py b/judge/bridge/echo_test_client.py similarity index 82% rename from event_socket_server/test_client.py rename to judge/bridge/echo_test_client.py index 4f696e8..8fec692 100644 --- a/event_socket_server/test_client.py +++ b/judge/bridge/echo_test_client.py @@ -1,14 +1,10 @@ -import ctypes +import os import socket import struct import time import zlib size_pack = struct.Struct('!I') -try: - RtlGenRandom = ctypes.windll.advapi32.SystemFunction036 -except AttributeError: - RtlGenRandom = None def open_connection(): @@ -27,15 +23,6 @@ def dezlibify(data, skip_head=True): return zlib.decompress(data).decode('utf-8') -def random(length): - if RtlGenRandom is None: - with open('/dev/urandom') as f: - return f.read(length) - buf = ctypes.create_string_buffer(length) - RtlGenRandom(buf, length) - return buf.raw - - def main(): global host, port import argparse @@ -64,11 +51,11 @@ def main(): s2.close() print('Large random data test:', end=' ') s4 = open_connection() - data = random(1000000) + data = os.urandom(1000000).decode('iso-8859-1') print('Generated', end=' ') s4.sendall(zlibify(data)) print('Sent', end=' ') - result = '' + result = b'' while len(result) < size_pack.size: result += s4.recv(1024) size = size_pack.unpack(result[:size_pack.size])[0] @@ -81,7 +68,7 @@ def main(): s4.close() print('Test malformed connection:', end=' ') s5 = open_connection() - s5.sendall(data[:100000]) + s5.sendall(data[:100000].encode('utf-8')) s5.close() print('Success') print('Waiting for timeout to close idle connection:', end=' ') diff --git a/judge/bridge/echo_test_server.py b/judge/bridge/echo_test_server.py new file mode 100644 index 0000000..59e21fa --- /dev/null +++ b/judge/bridge/echo_test_server.py @@ -0,0 +1,39 @@ +from judge.bridge.base_handler import ZlibPacketHandler + + +class EchoPacketHandler(ZlibPacketHandler): + def on_connect(self): + print('New client:', self.client_address) + self.timeout = 5 + + def on_timeout(self): + print('Inactive client:', self.client_address) + + def on_packet(self, data): + self.timeout = None + print('Data from %s: %r' % (self.client_address, data[:30] if len(data) > 30 else data)) + self.send(data) + + def on_disconnect(self): + print('Closed client:', self.client_address) + + +def main(): + import argparse + from judge.bridge.server import Server + + parser = argparse.ArgumentParser() + parser.add_argument('-l', '--host', action='append') + parser.add_argument('-p', '--port', type=int, action='append') + parser.add_argument('-P', '--proxy', action='append') + args = parser.parse_args() + + class Handler(EchoPacketHandler): + proxies = args.proxy or [] + + server = Server(list(zip(args.host, args.port)), Handler) + server.serve_forever() + + +if __name__ == '__main__': + main() diff --git a/judge/bridge/judgecallback.py b/judge/bridge/judge_handler.py similarity index 64% rename from judge/bridge/judgecallback.py rename to judge/bridge/judge_handler.py index a7ddf69..86f53b4 100644 --- a/judge/bridge/judgecallback.py +++ b/judge/bridge/judge_handler.py @@ -1,22 +1,27 @@ +import hmac import json import logging +import threading import time +from collections import deque, namedtuple from operator import itemgetter from django import db +from django.conf import settings from django.utils import timezone from django.db.models import F from judge import event_poster as event +from judge.bridge.base_handler import ZlibPacketHandler, proxy_list from judge.caching import finished_submission from judge.models import Judge, Language, LanguageLimit, Problem, RuntimeVersion, Submission, SubmissionTestCase -from .judgehandler import JudgeHandler, SubmissionData logger = logging.getLogger('judge.bridge') json_log = logging.getLogger('judge.json.bridge') UPDATE_RATE_LIMIT = 5 UPDATE_RATE_TIME = 0.5 +SubmissionData = namedtuple('SubmissionData', 'time memory short_circuit pretests_only contest_no attempt_no user_id') def _ensure_connection(): @@ -26,9 +31,42 @@ def _ensure_connection(): db.connection.close() -class DjangoJudgeHandler(JudgeHandler): - def __init__(self, server, socket): - super(DjangoJudgeHandler, self).__init__(server, socket) +class JudgeHandler(ZlibPacketHandler): + proxies = proxy_list(settings.BRIDGED_JUDGE_PROXIES or []) + + def __init__(self, request, client_address, server, judges): + super().__init__(request, client_address, server) + + self.judges = judges + self.handlers = { + 'grading-begin': self.on_grading_begin, + 'grading-end': self.on_grading_end, + 'compile-error': self.on_compile_error, + 'compile-message': self.on_compile_message, + 'batch-begin': self.on_batch_begin, + 'batch-end': self.on_batch_end, + 'test-case-status': self.on_test_case, + 'internal-error': self.on_internal_error, + 'submission-terminated': self.on_submission_terminated, + 'submission-acknowledged': self.on_submission_acknowledged, + 'ping-response': self.on_ping_response, + 'supported-problems': self.on_supported_problems, + 'handshake': self.on_handshake, + } + self._working = False + self._no_response_job = None + self._problems = [] + self.executors = {} + self.problems = {} + self.latency = None + self.time_delta = None + self.load = 1e100 + self.name = None + self.batch_id = None + self.in_batch = False + self._stop_ping = threading.Event() + self._ping_average = deque(maxlen=6) # 1 minute average, just like load + self._time_delta = deque(maxlen=6) # each value is (updates, last reset) self.update_counter = {} @@ -38,60 +76,33 @@ class DjangoJudgeHandler(JudgeHandler): self._submission_cache_id = None self._submission_cache = {} + def on_connect(self): + self.timeout = 15 + logger.info('Judge connected from: %s', self.client_address) json_log.info(self._make_json_log(action='connect')) - def on_close(self): - super(DjangoJudgeHandler, self).on_close() + def on_disconnect(self): + self._stop_ping.set() + if self._working: + logger.error('Judge %s disconnected while handling submission %s', self.name, self._working) + self.judges.remove(self) + if self.name is not None: + self._disconnected() + logger.info('Judge disconnected from: %s with name %s', self.client_address, self.name) + json_log.info(self._make_json_log(action='disconnect', info='judge disconnected')) if self._working: - Submission.objects.filter(id=self._working).update(status='IE', result='IE') + Submission.objects.filter(id=self._working).update(status='IE', result='IE', error='') json_log.error(self._make_json_log(sub=self._working, action='close', info='IE due to shutdown on grading')) - def on_malformed(self, packet): - super(DjangoJudgeHandler, self).on_malformed(packet) - json_log.exception(self._make_json_log(sub=self._working, info='malformed zlib packet')) - - def _packet_exception(self): - json_log.exception(self._make_json_log(sub=self._working, info='packet processing exception')) - - def get_related_submission_data(self, submission): - _ensure_connection() # We are called from the django-facing daemon thread. Guess what happens. - - try: - pid, time, memory, short_circuit, lid, is_pretested, sub_date, uid, part_virtual, part_id = ( - Submission.objects.filter(id=submission) - .values_list('problem__id', 'problem__time_limit', 'problem__memory_limit', - 'problem__short_circuit', 'language__id', 'is_pretested', 'date', 'user__id', - 'contest__participation__virtual', 'contest__participation__id')).get() - except Submission.DoesNotExist: - logger.error('Submission vanished: %s', submission) - json_log.error(self._make_json_log( - sub=self._working, action='request', - info='submission vanished when fetching info', - )) - return - - attempt_no = Submission.objects.filter(problem__id=pid, contest__participation__id=part_id, user__id=uid, - date__lt=sub_date).exclude(status__in=('CE', 'IE')).count() + 1 - - try: - time, memory = (LanguageLimit.objects.filter(problem__id=pid, language__id=lid) - .values_list('time_limit', 'memory_limit').get()) - except LanguageLimit.DoesNotExist: - pass - - return SubmissionData( - time=time, - memory=memory, - short_circuit=short_circuit, - pretests_only=is_pretested, - contest_no=part_virtual, - attempt_no=attempt_no, - user_id=uid, - ) - def _authenticate(self, id, key): - result = Judge.objects.filter(name=id, auth_key=key, is_blocked=False).exists() + try: + judge = Judge.objects.get(name=id, is_blocked=False) + except Judge.DoesNotExist: + result = False + else: + result = hmac.compare_digest(judge.auth_key, key) + if not result: json_log.warning(self._make_json_log(action='auth', judge=id, info='judge failed authentication')) return result @@ -130,26 +141,118 @@ class DjangoJudgeHandler(JudgeHandler): if e.__class__.__name__ == 'OperationalError' and e.__module__ == '_mysql_exceptions' and e.args[0] == 2006: db.connection.close() - def _post_update_submission(self, id, state, done=False): - if self._submission_cache_id == id: - data = self._submission_cache - else: - self._submission_cache = data = Submission.objects.filter(id=id).values( - 'problem__is_public', 'contest__participation__contest__key', - 'user_id', 'problem_id', 'status', 'language__key', - ).get() - self._submission_cache_id = id + def send(self, data): + super().send(json.dumps(data, separators=(',', ':'))) - if data['problem__is_public']: - event.post('submissions', { - 'type': 'done-submission' if done else 'update-submission', - 'state': state, 'id': id, - 'contest': data['contest__participation__contest__key'], - 'user': data['user_id'], 'problem': data['problem_id'], - 'status': data['status'], 'language': data['language__key'], - }) + def on_handshake(self, packet): + if 'id' not in packet or 'key' not in packet: + logger.warning('Malformed handshake: %s', self.client_address) + self.close() + return + + if not self._authenticate(packet['id'], packet['key']): + logger.warning('Authentication failure: %s', self.client_address) + self.close() + return + + self.timeout = 60 + self._problems = packet['problems'] + self.problems = dict(self._problems) + self.executors = packet['executors'] + self.name = packet['id'] + + self.send({'name': 'handshake-success'}) + logger.info('Judge authenticated: %s (%s)', self.client_address, packet['id']) + self.judges.register(self) + threading.Thread(target=self._ping_thread).start() + self._connected() + + def can_judge(self, problem, executor, judge_id=None): + return problem in self.problems and executor in self.executors and (not judge_id or self.name == judge_id) + + @property + def working(self): + return bool(self._working) + + def get_related_submission_data(self, submission): + _ensure_connection() + + try: + pid, time, memory, short_circuit, lid, is_pretested, sub_date, uid, part_virtual, part_id = ( + Submission.objects.filter(id=submission) + .values_list('problem__id', 'problem__time_limit', 'problem__memory_limit', + 'problem__short_circuit', 'language__id', 'is_pretested', 'date', 'user__id', + 'contest__participation__virtual', 'contest__participation__id')).get() + except Submission.DoesNotExist: + logger.error('Submission vanished: %s', submission) + json_log.error(self._make_json_log( + sub=self._working, action='request', + info='submission vanished when fetching info', + )) + return + + attempt_no = Submission.objects.filter(problem__id=pid, contest__participation__id=part_id, user__id=uid, + date__lt=sub_date).exclude(status__in=('CE', 'IE')).count() + 1 + + try: + time, memory = (LanguageLimit.objects.filter(problem__id=pid, language__id=lid) + .values_list('time_limit', 'memory_limit').get()) + except LanguageLimit.DoesNotExist: + pass + + return SubmissionData( + time=time, + memory=memory, + short_circuit=short_circuit, + pretests_only=is_pretested, + contest_no=part_virtual, + attempt_no=attempt_no, + user_id=uid, + ) + + def disconnect(self, force=False): + if force: + # Yank the power out. + self.close() + else: + self.send({'name': 'disconnect'}) + + def submit(self, id, problem, language, source): + data = self.get_related_submission_data(id) + self._working = id + self._no_response_job = threading.Timer(20, self._kill_if_no_response) + self.send({ + 'name': 'submission-request', + 'submission-id': id, + 'problem-id': problem, + 'language': language, + 'source': source, + 'time-limit': data.time, + 'memory-limit': data.memory, + 'short-circuit': data.short_circuit, + 'meta': { + 'pretests-only': data.pretests_only, + 'in-contest': data.contest_no, + 'attempt-no': data.attempt_no, + 'user': data.user_id, + }, + }) + + def _kill_if_no_response(self): + logger.error('Judge failed to acknowledge submission: %s: %s', self.name, self._working) + self.close() + + def on_timeout(self): + if self.name: + logger.warning('Judge seems dead: %s: %s', self.name, self._working) + + def malformed_packet(self, exception): + logger.exception('Judge sent malformed packet: %s', self.name) + super(JudgeHandler, self).malformed_packet(exception) def on_submission_processing(self, packet): + _ensure_connection() + id = packet['submission-id'] if Submission.objects.filter(id=id).update(status='P', judged_on=self.judge): event.post('sub_%s' % Submission.get_id_secret(id), {'type': 'processing'}) @@ -161,12 +264,72 @@ class DjangoJudgeHandler(JudgeHandler): def on_submission_wrong_acknowledge(self, packet, expected, got): json_log.error(self._make_json_log(packet, action='processing', info='wrong-acknowledge', expected=expected)) + Submission.objects.filter(id=expected).update(status='IE', result='IE', error=None) + Submission.objects.filter(id=got, status='QU').update(status='IE', result='IE', error=None) + + def on_submission_acknowledged(self, packet): + if not packet.get('submission-id', None) == self._working: + logger.error('Wrong acknowledgement: %s: %s, expected: %s', self.name, packet.get('submission-id', None), + self._working) + self.on_submission_wrong_acknowledge(packet, self._working, packet.get('submission-id', None)) + self.close() + logger.info('Submission acknowledged: %d', self._working) + if self._no_response_job: + self._no_response_job.cancel() + self._no_response_job = None + self.on_submission_processing(packet) + + def abort(self): + self.send({'name': 'terminate-submission'}) + + def get_current_submission(self): + return self._working or None + + def ping(self): + self.send({'name': 'ping', 'when': time.time()}) + + def on_packet(self, data): + try: + try: + data = json.loads(data) + if 'name' not in data: + raise ValueError + except ValueError: + self.on_malformed(data) + else: + handler = self.handlers.get(data['name'], self.on_malformed) + handler(data) + except Exception: + logger.exception('Error in packet handling (Judge-side): %s', self.name) + self._packet_exception() + # You can't crash here because you aren't so sure about the judges + # not being malicious or simply malforms. THIS IS A SERVER! + + def _packet_exception(self): + json_log.exception(self._make_json_log(sub=self._working, info='packet processing exception')) + + def _submission_is_batch(self, id): + if not Submission.objects.filter(id=id).update(batch=True): + logger.warning('Unknown submission: %s', id) + + def on_supported_problems(self, packet): + logger.info('%s: Updated problem list', self.name) + self._problems = packet['problems'] + self.problems = dict(self._problems) + if not self.working: + self.judges.update_problems(self) + + self.judge.problems.set(Problem.objects.filter(code__in=list(self.problems.keys()))) + json_log.info(self._make_json_log(action='update-problems', count=len(self.problems))) def on_grading_begin(self, packet): - super(DjangoJudgeHandler, self).on_grading_begin(packet) + logger.info('%s: Grading has begun on: %s', self.name, packet['submission-id']) + self.batch_id = None + if Submission.objects.filter(id=packet['submission-id']).update( - status='G', is_pretested=packet['pretested'], - current_testcase=1, batch=False, points=0): + status='G', is_pretested=packet['pretested'], + current_testcase=1, points=0, + batch=False, judged_date=timezone.now()): SubmissionTestCase.objects.filter(submission_id=packet['submission-id']).delete() event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'grading-begin'}) self._post_update_submission(packet['submission-id'], 'grading-begin') @@ -175,12 +338,10 @@ class DjangoJudgeHandler(JudgeHandler): logger.warning('Unknown submission: %s', packet['submission-id']) json_log.error(self._make_json_log(packet, action='grading-begin', info='unknown submission')) - def _submission_is_batch(self, id): - if not Submission.objects.filter(id=id).update(batch=True): - logger.warning('Unknown submission: %s', id) - def on_grading_end(self, packet): - super(DjangoJudgeHandler, self).on_grading_end(packet) + logger.info('%s: Grading has ended on: %s', self.name, packet['submission-id']) + self._free_self(packet) + self.batch_id = None try: submission = Submission.objects.get(id=packet['submission-id']) @@ -263,7 +424,8 @@ class DjangoJudgeHandler(JudgeHandler): self._post_update_submission(submission.id, 'grading-end', done=True) def on_compile_error(self, packet): - super(DjangoJudgeHandler, self).on_compile_error(packet) + logger.info('%s: Submission failed to compile: %s', self.name, packet['submission-id']) + self._free_self(packet) if Submission.objects.filter(id=packet['submission-id']).update(status='CE', result='CE', error=packet['log']): event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), { @@ -279,7 +441,7 @@ class DjangoJudgeHandler(JudgeHandler): log=packet['log'], finish=True, result='CE')) def on_compile_message(self, packet): - super(DjangoJudgeHandler, self).on_compile_message(packet) + logger.info('%s: Submission generated compiler messages: %s', self.name, packet['submission-id']) if Submission.objects.filter(id=packet['submission-id']).update(error=packet['log']): event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'compile-message'}) @@ -290,7 +452,11 @@ class DjangoJudgeHandler(JudgeHandler): log=packet['log'])) def on_internal_error(self, packet): - super(DjangoJudgeHandler, self).on_internal_error(packet) + try: + raise ValueError('\n\n' + packet['message']) + except ValueError: + logger.exception('Judge %s failed while handling submission %s', self.name, packet['submission-id']) + self._free_self(packet) id = packet['submission-id'] if Submission.objects.filter(id=id).update(status='IE', result='IE', error=packet['message']): @@ -304,7 +470,8 @@ class DjangoJudgeHandler(JudgeHandler): message=packet['message'], finish=True, result='IE')) def on_submission_terminated(self, packet): - super(DjangoJudgeHandler, self).on_submission_terminated(packet) + logger.info('%s: Submission aborted: %s', self.name, packet['submission-id']) + self._free_self(packet) if Submission.objects.filter(id=packet['submission-id']).update(status='AB', result='AB'): event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'aborted-submission'}) @@ -316,20 +483,29 @@ class DjangoJudgeHandler(JudgeHandler): finish=True, result='AB')) def on_batch_begin(self, packet): - super(DjangoJudgeHandler, self).on_batch_begin(packet) + logger.info('%s: Batch began on: %s', self.name, packet['submission-id']) + self.in_batch = True + if self.batch_id is None: + self.batch_id = 0 + self._submission_is_batch(packet['submission-id']) + self.batch_id += 1 + json_log.info(self._make_json_log(packet, action='batch-begin', batch=self.batch_id)) def on_batch_end(self, packet): - super(DjangoJudgeHandler, self).on_batch_end(packet) + self.in_batch = False + logger.info('%s: Batch ended on: %s', self.name, packet['submission-id']) json_log.info(self._make_json_log(packet, action='batch-end', batch=self.batch_id)) def on_test_case(self, packet, max_feedback=SubmissionTestCase._meta.get_field('feedback').max_length): - super(DjangoJudgeHandler, self).on_test_case(packet) + logger.info('%s: %d test case(s) completed on: %s', self.name, len(packet['cases']), packet['submission-id']) + id = packet['submission-id'] updates = packet['cases'] max_position = max(map(itemgetter('position'), updates)) sum_points = sum(map(itemgetter('points'), updates)) + if not Submission.objects.filter(id=id).update(current_testcase=max_position + 1, points=F('points') + sum_points): logger.warning('Unknown submission: %s', id) json_log.error(self._make_json_log(packet, action='test-case', info='unknown submission')) @@ -395,10 +571,32 @@ class DjangoJudgeHandler(JudgeHandler): SubmissionTestCase.objects.bulk_create(bulk_test_case_updates) - def on_supported_problems(self, packet): - super(DjangoJudgeHandler, self).on_supported_problems(packet) - self.judge.problems.set(Problem.objects.filter(code__in=list(self.problems.keys()))) - json_log.info(self._make_json_log(action='update-problems', count=len(self.problems))) + def on_malformed(self, packet): + logger.error('%s: Malformed packet: %s', self.name, packet) + json_log.exception(self._make_json_log(sub=self._working, info='malformed json packet')) + + def on_ping_response(self, packet): + end = time.time() + self._ping_average.append(end - packet['when']) + self._time_delta.append((end + packet['when']) / 2 - packet['time']) + self.latency = sum(self._ping_average) / len(self._ping_average) + self.time_delta = sum(self._time_delta) / len(self._time_delta) + self.load = packet['load'] + self._update_ping() + + def _free_self(self, packet): + self.judges.on_judge_free(self, packet['submission-id']) + + def _ping_thread(self): + try: + while True: + self.ping() + if self._stop_ping.wait(10): + break + except Exception: + logger.exception('Ping error in %s', self.name) + self.close() + raise def _make_json_log(self, packet=None, sub=None, **kwargs): data = { @@ -411,3 +609,22 @@ class DjangoJudgeHandler(JudgeHandler): data['submission'] = sub data.update(kwargs) return json.dumps(data) + + def _post_update_submission(self, id, state, done=False): + if self._submission_cache_id == id: + data = self._submission_cache + else: + self._submission_cache = data = Submission.objects.filter(id=id).values( + 'problem__is_public', 'contest_object__key', + 'user_id', 'problem_id', 'status', 'language__key', + ).get() + self._submission_cache_id = id + + if data['problem__is_public']: + event.post('submissions', { + 'type': 'done-submission' if done else 'update-submission', + 'state': state, 'id': id, + 'contest': data['contest_object__key'], + 'user': data['user_id'], 'problem': data['problem_id'], + 'status': data['status'], 'language': data['language__key'], + }) diff --git a/judge/bridge/judgelist.py b/judge/bridge/judge_list.py similarity index 80% rename from judge/bridge/judgelist.py rename to judge/bridge/judge_list.py index ec3cebd..828bb83 100644 --- a/judge/bridge/judgelist.py +++ b/judge/bridge/judge_list.py @@ -29,8 +29,8 @@ class JudgeList(object): node = self.queue.first while node: if not isinstance(node.value, PriorityMarker): - id, problem, language, source = node.value - if judge.can_judge(problem, language): + id, problem, language, source, judge_id = node.value + if judge.can_judge(problem, language, judge_id): self.submission_map[id] = judge logger.info('Dispatched queued submission %d: %s', id, judge.name) try: @@ -52,9 +52,10 @@ class JudgeList(object): self._handle_free_judge(judge) def disconnect(self, judge_id, force=False): - for judge in self.judges: - if judge.name == judge_id: - judge.disconnect(force=force) + with self.lock: + for judge in self.judges: + if judge.name == judge_id: + judge.disconnect(force=force) def update_problems(self, judge): with self.lock: @@ -77,6 +78,7 @@ class JudgeList(object): with self.lock: logger.info('Judge available after grading %d: %s', submission, judge.name) del self.submission_map[submission] + judge._working = False self._handle_free_judge(judge) def abort(self, submission): @@ -98,15 +100,20 @@ class JudgeList(object): def check_priority(self, priority): return 0 <= priority < self.priorities - def judge(self, id, problem, language, source, priority): + def judge(self, id, problem, language, source, judge_id, priority): with self.lock: if id in self.submission_map or id in self.node_map: # Already judging, don't queue again. This can happen during batch rejudges, rejudges should be # idempotent. return - candidates = [judge for judge in self.judges if not judge.working and judge.can_judge(problem, language)] - logger.info('Free judges: %d', len(candidates)) + candidates = [ + judge for judge in self.judges if not judge.working and judge.can_judge(problem, language, judge_id) + ] + if judge_id: + logger.info('Specified judge %s is%savailable', judge_id, ' ' if candidates else ' not ') + else: + logger.info('Free judges: %d', len(candidates)) if candidates: # Schedule the submission on the judge reporting least load. judge = min(candidates, key=attrgetter('load')) @@ -117,7 +124,10 @@ class JudgeList(object): except Exception: logger.exception('Failed to dispatch %d (%s, %s) to %s', id, problem, language, judge.name) self.judges.discard(judge) - return self.judge(id, problem, language, source, priority) + return self.judge(id, problem, language, source, judge_id, priority) else: - self.node_map[id] = self.queue.insert((id, problem, language, source), self.priority[priority]) + self.node_map[id] = self.queue.insert( + (id, problem, language, source, judge_id), + self.priority[priority], + ) logger.info('Queued submission: %d', id) diff --git a/judge/bridge/judgehandler.py b/judge/bridge/judgehandler.py deleted file mode 100644 index 2a09218..0000000 --- a/judge/bridge/judgehandler.py +++ /dev/null @@ -1,268 +0,0 @@ -import json -import logging -import time -from collections import deque, namedtuple - -from event_socket_server import ProxyProtocolMixin, ZlibPacketHandler - -logger = logging.getLogger('judge.bridge') - -SubmissionData = namedtuple('SubmissionData', 'time memory short_circuit pretests_only contest_no attempt_no user_id') - - -class JudgeHandler(ProxyProtocolMixin, ZlibPacketHandler): - def __init__(self, server, socket): - super(JudgeHandler, self).__init__(server, socket) - - self.handlers = { - 'grading-begin': self.on_grading_begin, - 'grading-end': self.on_grading_end, - 'compile-error': self.on_compile_error, - 'compile-message': self.on_compile_message, - 'batch-begin': self.on_batch_begin, - 'batch-end': self.on_batch_end, - 'test-case-status': self.on_test_case, - 'internal-error': self.on_internal_error, - 'submission-terminated': self.on_submission_terminated, - 'submission-acknowledged': self.on_submission_acknowledged, - 'ping-response': self.on_ping_response, - 'supported-problems': self.on_supported_problems, - 'handshake': self.on_handshake, - } - self._to_kill = True - self._working = False - self._no_response_job = None - self._problems = [] - self.executors = [] - self.problems = {} - self.latency = None - self.time_delta = None - self.load = 1e100 - self.name = None - self.batch_id = None - self.in_batch = False - self._ping_average = deque(maxlen=6) # 1 minute average, just like load - self._time_delta = deque(maxlen=6) - - self.server.schedule(15, self._kill_if_no_auth) - logger.info('Judge connected from: %s', self.client_address) - - def _kill_if_no_auth(self): - if self._to_kill: - logger.info('Judge not authenticated: %s', self.client_address) - self.close() - - def on_close(self): - self._to_kill = False - if self._no_response_job: - self.server.unschedule(self._no_response_job) - self.server.judges.remove(self) - if self.name is not None: - self._disconnected() - logger.info('Judge disconnected from: %s', self.client_address) - - def _authenticate(self, id, key): - return False - - def _connected(self): - pass - - def _disconnected(self): - pass - - def _update_ping(self): - pass - - def _format_send(self, data): - return super(JudgeHandler, self)._format_send(json.dumps(data, separators=(',', ':'))) - - def on_handshake(self, packet): - if 'id' not in packet or 'key' not in packet: - logger.warning('Malformed handshake: %s', self.client_address) - self.close() - return - - if not self._authenticate(packet['id'], packet['key']): - logger.warning('Authentication failure: %s', self.client_address) - self.close() - return - - self._to_kill = False - self._problems = packet['problems'] - self.problems = dict(self._problems) - self.executors = packet['executors'] - self.name = packet['id'] - - self.send({'name': 'handshake-success'}) - logger.info('Judge authenticated: %s (%s)', self.client_address, packet['id']) - self.server.judges.register(self) - self._connected() - - def can_judge(self, problem, executor): - return problem in self.problems and executor in self.executors - - @property - def working(self): - return bool(self._working) - - def get_related_submission_data(self, submission): - return SubmissionData( - time=2, - memory=16384, - short_circuit=False, - pretests_only=False, - contest_no=None, - attempt_no=1, - user_id=None, - ) - - def disconnect(self, force=False): - if force: - # Yank the power out. - self.close() - else: - self.send({'name': 'disconnect'}) - - def submit(self, id, problem, language, source): - data = self.get_related_submission_data(id) - self._working = id - self._no_response_job = self.server.schedule(20, self._kill_if_no_response) - self.send({ - 'name': 'submission-request', - 'submission-id': id, - 'problem-id': problem, - 'language': language, - 'source': source, - 'time-limit': data.time, - 'memory-limit': data.memory, - 'short-circuit': data.short_circuit, - 'meta': { - 'pretests-only': data.pretests_only, - 'in-contest': data.contest_no, - 'attempt-no': data.attempt_no, - 'user': data.user_id, - }, - }) - - def _kill_if_no_response(self): - logger.error('Judge seems dead: %s: %s', self.name, self._working) - self.close() - - def malformed_packet(self, exception): - logger.exception('Judge sent malformed packet: %s', self.name) - super(JudgeHandler, self).malformed_packet(exception) - - def on_submission_processing(self, packet): - pass - - def on_submission_wrong_acknowledge(self, packet, expected, got): - pass - - def on_submission_acknowledged(self, packet): - if not packet.get('submission-id', None) == self._working: - logger.error('Wrong acknowledgement: %s: %s, expected: %s', self.name, packet.get('submission-id', None), - self._working) - self.on_submission_wrong_acknowledge(packet, self._working, packet.get('submission-id', None)) - self.close() - logger.info('Submission acknowledged: %d', self._working) - if self._no_response_job: - self.server.unschedule(self._no_response_job) - self._no_response_job = None - self.on_submission_processing(packet) - - def abort(self): - self.send({'name': 'terminate-submission'}) - - def get_current_submission(self): - return self._working or None - - def ping(self): - self.send({'name': 'ping', 'when': time.time()}) - - def packet(self, data): - try: - try: - data = json.loads(data) - if 'name' not in data: - raise ValueError - except ValueError: - self.on_malformed(data) - else: - handler = self.handlers.get(data['name'], self.on_malformed) - handler(data) - except Exception: - logger.exception('Error in packet handling (Judge-side): %s', self.name) - self._packet_exception() - # You can't crash here because you aren't so sure about the judges - # not being malicious or simply malforms. THIS IS A SERVER! - - def _packet_exception(self): - pass - - def _submission_is_batch(self, id): - pass - - def on_supported_problems(self, packet): - logger.info('%s: Updated problem list', self.name) - self._problems = packet['problems'] - self.problems = dict(self._problems) - if not self.working: - self.server.judges.update_problems(self) - - def on_grading_begin(self, packet): - logger.info('%s: Grading has begun on: %s', self.name, packet['submission-id']) - self.batch_id = None - - def on_grading_end(self, packet): - logger.info('%s: Grading has ended on: %s', self.name, packet['submission-id']) - self._free_self(packet) - self.batch_id = None - - def on_compile_error(self, packet): - logger.info('%s: Submission failed to compile: %s', self.name, packet['submission-id']) - self._free_self(packet) - - def on_compile_message(self, packet): - logger.info('%s: Submission generated compiler messages: %s', self.name, packet['submission-id']) - - def on_internal_error(self, packet): - try: - raise ValueError('\n\n' + packet['message']) - except ValueError: - logger.exception('Judge %s failed while handling submission %s', self.name, packet['submission-id']) - self._free_self(packet) - - def on_submission_terminated(self, packet): - logger.info('%s: Submission aborted: %s', self.name, packet['submission-id']) - self._free_self(packet) - - def on_batch_begin(self, packet): - logger.info('%s: Batch began on: %s', self.name, packet['submission-id']) - self.in_batch = True - if self.batch_id is None: - self.batch_id = 0 - self._submission_is_batch(packet['submission-id']) - self.batch_id += 1 - - def on_batch_end(self, packet): - self.in_batch = False - logger.info('%s: Batch ended on: %s', self.name, packet['submission-id']) - - def on_test_case(self, packet): - logger.info('%s: %d test case(s) completed on: %s', self.name, len(packet['cases']), packet['submission-id']) - - def on_malformed(self, packet): - logger.error('%s: Malformed packet: %s', self.name, packet) - - def on_ping_response(self, packet): - end = time.time() - self._ping_average.append(end - packet['when']) - self._time_delta.append((end + packet['when']) / 2 - packet['time']) - self.latency = sum(self._ping_average) / len(self._ping_average) - self.time_delta = sum(self._time_delta) / len(self._time_delta) - self.load = packet['load'] - self._update_ping() - - def _free_self(self, packet): - self._working = False - self.server.judges.on_judge_free(self, packet['submission-id']) diff --git a/judge/bridge/judgeserver.py b/judge/bridge/judgeserver.py deleted file mode 100644 index 251a431..0000000 --- a/judge/bridge/judgeserver.py +++ /dev/null @@ -1,68 +0,0 @@ -import logging -import os -import threading -import time - -from event_socket_server import get_preferred_engine -from judge.models import Judge -from .judgelist import JudgeList - -logger = logging.getLogger('judge.bridge') - - -def reset_judges(): - Judge.objects.update(online=False, ping=None, load=None) - - -class JudgeServer(get_preferred_engine()): - def __init__(self, *args, **kwargs): - super(JudgeServer, self).__init__(*args, **kwargs) - reset_judges() - self.judges = JudgeList() - self.ping_judge_thread = threading.Thread(target=self.ping_judge, args=()) - self.ping_judge_thread.daemon = True - self.ping_judge_thread.start() - - def on_shutdown(self): - super(JudgeServer, self).on_shutdown() - reset_judges() - - def ping_judge(self): - try: - while True: - for judge in self.judges: - judge.ping() - time.sleep(10) - except Exception: - logger.exception('Ping error') - raise - - -def main(): - import argparse - import logging - from .judgehandler import JudgeHandler - - format = '%(asctime)s:%(levelname)s:%(name)s:%(message)s' - logging.basicConfig(format=format) - logging.getLogger().setLevel(logging.INFO) - handler = logging.FileHandler(os.path.join(os.path.dirname(__file__), 'judgeserver.log'), encoding='utf-8') - handler.setFormatter(logging.Formatter(format)) - handler.setLevel(logging.INFO) - logging.getLogger().addHandler(handler) - - parser = argparse.ArgumentParser(description=''' - Runs the bridge between DMOJ website and judges. - ''') - parser.add_argument('judge_host', nargs='+', action='append', - help='host to listen for the judge') - parser.add_argument('-p', '--judge-port', type=int, action='append', - help='port to listen for the judge') - - args = parser.parse_args() - server = JudgeServer(list(zip(args.judge_host, args.judge_port)), JudgeHandler) - server.serve_forever() - - -if __name__ == '__main__': - main() diff --git a/judge/bridge/server.py b/judge/bridge/server.py new file mode 100644 index 0000000..cc83f84 --- /dev/null +++ b/judge/bridge/server.py @@ -0,0 +1,30 @@ +import threading +from socketserver import TCPServer, ThreadingMixIn + + +class ThreadingTCPListener(ThreadingMixIn, TCPServer): + allow_reuse_address = True + + +class Server: + def __init__(self, addresses, handler): + self.servers = [ThreadingTCPListener(address, handler) for address in addresses] + self._shutdown = threading.Event() + + def serve_forever(self): + threads = [threading.Thread(target=server.serve_forever) for server in self.servers] + for thread in threads: + thread.daemon = True + thread.start() + try: + self._shutdown.wait() + except KeyboardInterrupt: + self.shutdown() + finally: + for thread in threads: + thread.join() + + def shutdown(self): + for server in self.servers: + server.shutdown() + self._shutdown.set() diff --git a/judge/forms.py b/judge/forms.py index 11aa092..67ccd12 100644 --- a/judge/forms.py +++ b/judge/forms.py @@ -7,7 +7,7 @@ from django.contrib.auth.forms import AuthenticationForm from django.core.exceptions import ValidationError from django.core.validators import RegexValidator from django.db.models import Q -from django.forms import CharField, Form, ModelForm +from django.forms import CharField, ChoiceField, Form, ModelForm from django.urls import reverse_lazy from django.utils.translation import gettext_lazy as _ @@ -69,18 +69,23 @@ class ProfileForm(ModelForm): class ProblemSubmitForm(ModelForm): source = CharField(max_length=65536, widget=AceWidget(theme='twilight', no_ace_media=True)) + judge = ChoiceField(choices=(), widget=forms.HiddenInput(), required=False) - def __init__(self, *args, **kwargs): + def __init__(self, *args, judge_choices=(), **kwargs): super(ProblemSubmitForm, self).__init__(*args, **kwargs) - self.fields['problem'].empty_label = None - self.fields['problem'].widget = forms.HiddenInput() self.fields['language'].empty_label = None self.fields['language'].label_from_instance = attrgetter('display_name') self.fields['language'].queryset = Language.objects.filter(judges__online=True).distinct() + if judge_choices: + self.fields['judge'].widget = Select2Widget( + attrs={'style': 'width: 150px', 'data-placeholder': _('Any judge')}, + ) + self.fields['judge'].choices = judge_choices + class Meta: model = Submission - fields = ['problem', 'language'] + fields = ['language'] class EditOrganizationForm(ModelForm): @@ -156,4 +161,4 @@ class ContestCloneForm(Form): key = self.cleaned_data['key'] if Contest.objects.filter(key=key).exists(): raise ValidationError(_('Contest with key already exists.')) - return key + return key \ No newline at end of file diff --git a/judge/judgeapi.py b/judge/judgeapi.py index ca49460..57627b2 100644 --- a/judge/judgeapi.py +++ b/judge/judgeapi.py @@ -48,7 +48,7 @@ def judge_request(packet, reply=True): return result -def judge_submission(submission, rejudge, batch_rejudge=False): +def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=None): from .models import ContestSubmission, Submission, SubmissionTestCase CONTEST_SUBMISSION_PRIORITY = 0 @@ -61,8 +61,8 @@ def judge_submission(submission, rejudge, batch_rejudge=False): try: # This is set proactively; it might get unset in judgecallback's on_grading_begin if the problem doesn't # actually have pretests stored on the judge. - updates['is_pretested'] = ContestSubmission.objects.filter(submission=submission) \ - .values_list('problem__contest__run_pretests_only', flat=True)[0] + updates['is_pretested'] = all(ContestSubmission.objects.filter(submission=submission) + .values_list('problem__contest__run_pretests_only', 'problem__is_pretested')[0]) except IndexError: priority = DEFAULT_PRIORITY else: @@ -88,15 +88,16 @@ def judge_submission(submission, rejudge, batch_rejudge=False): 'problem-id': submission.problem.code, 'language': submission.language.key, 'source': submission.source.source, + 'judge-id': judge_id, 'priority': BATCH_REJUDGE_PRIORITY if batch_rejudge else REJUDGE_PRIORITY if rejudge else priority, }) except BaseException: logger.exception('Failed to send request to judge') - Submission.objects.filter(id=submission.id).update(status='IE') + Submission.objects.filter(id=submission.id).update(status='IE', result='IE') success = False else: if response['name'] != 'submission-received' or response['submission-id'] != submission.id: - Submission.objects.filter(id=submission.id).update(status='IE') + Submission.objects.filter(id=submission.id).update(status='IE', result='IE') _post_update_submission(submission) success = True return success @@ -109,9 +110,9 @@ def disconnect_judge(judge, force=False): def abort_submission(submission): from .models import Submission response = judge_request({'name': 'terminate-submission', 'submission-id': submission.id}) - # This defaults to true, so that in the case the judgelist fails to remove the submission from the queue, + # This defaults to true, so that in the case the JudgeList fails to remove the submission from the queue, # and returns a bad-request, the submission is not falsely shown as "Aborted" when it will still be judged. if not response.get('judge-aborted', True): Submission.objects.filter(id=submission.id).update(status='AB', result='AB') event.post('sub_%s' % Submission.get_id_secret(submission.id), {'type': 'aborted-submission'}) - _post_update_submission(submission, done=True) + _post_update_submission(submission, done=True) \ No newline at end of file diff --git a/judge/management/commands/runbridged.py b/judge/management/commands/runbridged.py index f20a9fe..c6f4536 100644 --- a/judge/management/commands/runbridged.py +++ b/judge/management/commands/runbridged.py @@ -1,33 +1,8 @@ -import threading - -from django.conf import settings from django.core.management.base import BaseCommand -from judge.bridge import DjangoHandler, DjangoServer -from judge.bridge import DjangoJudgeHandler, JudgeServer +from judge.bridge.daemon import judge_daemon class Command(BaseCommand): def handle(self, *args, **options): - judge_handler = DjangoJudgeHandler - - try: - import netaddr # noqa: F401, imported to see if it exists - except ImportError: - pass - else: - proxies = settings.BRIDGED_JUDGE_PROXIES - if proxies: - judge_handler = judge_handler.with_proxy_set(proxies) - - judge_server = JudgeServer(settings.BRIDGED_JUDGE_ADDRESS, judge_handler) - django_server = DjangoServer(judge_server.judges, settings.BRIDGED_DJANGO_ADDRESS, DjangoHandler) - - # TODO: Merge the two servers - threading.Thread(target=django_server.serve_forever).start() - try: - judge_server.serve_forever() - except KeyboardInterrupt: - pass - finally: - django_server.stop() + judge_daemon() \ No newline at end of file diff --git a/judge/migrations/0108_submission_judged_date.py b/judge/migrations/0108_submission_judged_date.py new file mode 100644 index 0000000..5794ace --- /dev/null +++ b/judge/migrations/0108_submission_judged_date.py @@ -0,0 +1,18 @@ +# Generated by Django 2.2.12 on 2020-07-19 21:07 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('judge', '0107_notification'), + ] + + operations = [ + migrations.AddField( + model_name='submission', + name='judged_date', + field=models.DateTimeField(default=None, null=True, verbose_name='submission judge time'), + ), + ] diff --git a/judge/models/submission.py b/judge/models/submission.py index 0941eb2..e6c0208 100644 --- a/judge/models/submission.py +++ b/judge/models/submission.py @@ -78,6 +78,7 @@ class Submission(models.Model): case_total = models.FloatField(verbose_name=_('test case total points'), default=0) judged_on = models.ForeignKey('Judge', verbose_name=_('judged on'), null=True, blank=True, on_delete=models.SET_NULL) + judged_date = models.DateTimeField(verbose_name=_('submission judge time'), default=None, null=True) was_rejudged = models.BooleanField(verbose_name=_('was rejudged by admin'), default=False) is_pretested = models.BooleanField(verbose_name=_('was ran on pretests only'), default=False) contest_object = models.ForeignKey('Contest', verbose_name=_('contest'), null=True, blank=True, @@ -112,8 +113,9 @@ class Submission(models.Model): def long_status(self): return Submission.USER_DISPLAY_CODES.get(self.short_status, '') - def judge(self, rejudge=False, batch_rejudge=False): - judge_submission(self, rejudge, batch_rejudge) + def judge(self, *args, **kwargs): + judge_submission(self, *args, **kwargs) + judge.alters_data = True diff --git a/judge/views/problem.py b/judge/views/problem.py index 639c799..4a82399 100644 --- a/judge/views/problem.py +++ b/judge/views/problem.py @@ -537,8 +537,15 @@ def problem_submit(request, problem=None, submission=None): raise PermissionDenied() profile = request.profile + + if problem.is_editable_by(request.user): + judge_choices = tuple(Judge.objects.filter(online=True, problems=problem).values_list('name', 'name')) + else: + judge_choices = () + if request.method == 'POST': - form = ProblemSubmitForm(request.POST, instance=Submission(user=profile)) + form = ProblemSubmitForm(request.POST, judge_choices=judge_choices, + instance=Submission(user=profile, problem=problem)) if form.is_valid(): if (not request.user.has_perm('judge.spam_submission') and Submission.objects.filter(user=profile, was_rejudged=False) @@ -586,7 +593,7 @@ def problem_submit(request, problem=None, submission=None): # Save a query model.source = source - model.judge(rejudge=False) + model.judge(rejudge=False, judge_id=form.cleaned_data['judge']) return HttpResponseRedirect(reverse('submission_status', args=[str(model.id)])) else: @@ -610,7 +617,7 @@ def problem_submit(request, problem=None, submission=None): initial['language'] = sub.language except ValueError: raise Http404() - form = ProblemSubmitForm(initial=initial) + form = ProblemSubmitForm(judge_choices=judge_choices, initial=initial) form_data = initial if 'problem' in form_data: form.fields['language'].queryset = ( diff --git a/requirements.txt b/requirements.txt index a7b9d9c..035bad7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,3 +34,4 @@ channels channels-redis docker python-memcached +netaddr diff --git a/resources/problem.scss b/resources/problem.scss index 7ab1a63..9c13337 100644 --- a/resources/problem.scss +++ b/resources/problem.scss @@ -222,9 +222,12 @@ ul.problem-list { box-sizing: border-box; .button { - float: right; + display: inline-block !important; padding: 6px 12px; } + .submit-bar { + float: right; + } } @media (max-width: 550px) { diff --git a/templates/problem/submit.html b/templates/problem/submit.html index cf24c3c..3f4a217 100644 --- a/templates/problem/submit.html +++ b/templates/problem/submit.html @@ -2,8 +2,8 @@ {% block js_media %} + {{ form.media.js }} {% compress js %} - {{ form.media.js }} {% endcompress %} {% endblock %} {% block media %} + {{ form.media.css }} {% compress css %} - {{ form.media.css }}