diff --git a/README.html b/README.html
deleted file mode 100644
index 53d70c6..0000000
--- a/README.html
+++ /dev/null
@@ -1,1078 +0,0 @@
-
online-judge
-1. Activate virtualenv:
-source dmojsite/bin/activate
-2. Remember to change the local_settings
-3. Run server:
-python manage.py runserver 0.0.0.0:8000
-
-python dmojauto-conf
-5. Create folder for problems, change the dir in judge conf file and local_settings.py
-6. Connect judge:
-
-- python manage.py runbridged
-- dmoj 0.0.0.0 -p 9999 -c judge/conf1.yml (depend on port in the local_settings.py)
-
-7. Change vietnamese:
-
-- go to /home/cuom1999/DMOJ/site/locale/vi
-- open .po file
-- python manage.py compilemessages
-- python manage.py compilejsi18n
-
\ No newline at end of file
diff --git a/dmoj_bridge_async.py b/dmoj_bridge_async.py
new file mode 100644
index 0000000..dee4112
--- /dev/null
+++ b/dmoj_bridge_async.py
@@ -0,0 +1,20 @@
+import os
+
+import gevent.monkey # noqa: I100, gevent must be imported here
+
+os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dmoj.settings')
+gevent.monkey.patch_all()
+
+# noinspection PyUnresolvedReferences
+import dmoj_install_pymysql # noqa: E402, F401, I100, I202, imported for side effect
+
+import django # noqa: E402, F401, I100, I202, django must be imported here
+django.setup()
+
+# noinspection PyUnresolvedReferences
+import django_2_2_pymysql_patch # noqa: E402, I100, F401, I202, imported for side effect
+
+from judge.bridge.daemon import judge_daemon # noqa: E402, I100, I202, django code must be imported here
+
+if __name__ == '__main__':
+ judge_daemon()
diff --git a/event_socket_server/__init__.py b/event_socket_server/__init__.py
deleted file mode 100644
index 88f7683..0000000
--- a/event_socket_server/__init__.py
+++ /dev/null
@@ -1,11 +0,0 @@
-from .base_server import BaseServer
-from .engines import *
-from .handler import Handler
-from .helpers import ProxyProtocolMixin, SizedPacketHandler, ZlibPacketHandler
-
-
-def get_preferred_engine(choices=('epoll', 'poll', 'select')):
- for choice in choices:
- if choice in engines:
- return engines[choice]
- return engines['select']
diff --git a/event_socket_server/base_server.py b/event_socket_server/base_server.py
deleted file mode 100644
index 667c627..0000000
--- a/event_socket_server/base_server.py
+++ /dev/null
@@ -1,169 +0,0 @@
-import logging
-import socket
-import threading
-import time
-from collections import defaultdict, deque
-from functools import total_ordering
-from heapq import heappop, heappush
-
-logger = logging.getLogger('event_socket_server')
-
-
-class SendMessage(object):
- __slots__ = ('data', 'callback')
-
- def __init__(self, data, callback):
- self.data = data
- self.callback = callback
-
-
-@total_ordering
-class ScheduledJob(object):
- __slots__ = ('time', 'func', 'args', 'kwargs', 'cancel', 'dispatched')
-
- def __init__(self, time, func, args, kwargs):
- self.time = time
- self.func = func
- self.args = args
- self.kwargs = kwargs
- self.cancel = False
- self.dispatched = False
-
- def __eq__(self, other):
- return self.time == other.time
-
- def __lt__(self, other):
- return self.time < other.time
-
-
-class BaseServer(object):
- def __init__(self, addresses, client):
- self._servers = set()
- for address, port in addresses:
- info = socket.getaddrinfo(address, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
- for af, socktype, proto, canonname, sa in info:
- sock = socket.socket(af, socktype, proto)
- sock.setblocking(0)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(sa)
- self._servers.add(sock)
-
- self._stop = threading.Event()
- self._clients = set()
- self._ClientClass = client
- self._send_queue = defaultdict(deque)
- self._job_queue = []
- self._job_queue_lock = threading.Lock()
-
- def _serve(self):
- raise NotImplementedError()
-
- def _accept(self, sock):
- conn, address = sock.accept()
- conn.setblocking(0)
- client = self._ClientClass(self, conn)
- self._clients.add(client)
- return client
-
- def schedule(self, delay, func, *args, **kwargs):
- with self._job_queue_lock:
- job = ScheduledJob(time.time() + delay, func, args, kwargs)
- heappush(self._job_queue, job)
- return job
-
- def unschedule(self, job):
- with self._job_queue_lock:
- if job.dispatched or job.cancel:
- return False
- job.cancel = True
- return True
-
- def _register_write(self, client):
- raise NotImplementedError()
-
- def _register_read(self, client):
- raise NotImplementedError()
-
- def _clean_up_client(self, client, finalize=False):
- try:
- del self._send_queue[client.fileno()]
- except KeyError:
- pass
- client.on_close()
- client._socket.close()
- if not finalize:
- self._clients.remove(client)
-
- def _dispatch_event(self):
- t = time.time()
- tasks = []
- with self._job_queue_lock:
- while True:
- dt = self._job_queue[0].time - t if self._job_queue else 1
- if dt > 0:
- break
- task = heappop(self._job_queue)
- task.dispatched = True
- if not task.cancel:
- tasks.append(task)
- for task in tasks:
- logger.debug('Dispatching event: %r(*%r, **%r)', task.func, task.args, task.kwargs)
- task.func(*task.args, **task.kwargs)
- if not self._job_queue or dt > 1:
- dt = 1
- return dt
-
- def _nonblock_read(self, client):
- try:
- data = client._socket.recv(1024)
- except socket.error:
- self._clean_up_client(client)
- else:
- logger.debug('Read from %s: %d bytes', client.client_address, len(data))
- if not data:
- self._clean_up_client(client)
- else:
- try:
- client._recv_data(data)
- except Exception:
- logger.exception('Client recv_data failure')
- self._clean_up_client(client)
-
- def _nonblock_write(self, client):
- fd = client.fileno()
- queue = self._send_queue[fd]
- try:
- top = queue[0]
- cb = client._socket.send(top.data)
- top.data = top.data[cb:]
- logger.debug('Send to %s: %d bytes', client.client_address, cb)
- if not top.data:
- logger.debug('Finished sending: %s', client.client_address)
- if top.callback is not None:
- logger.debug('Calling callback: %s: %r', client.client_address, top.callback)
- try:
- top.callback()
- except Exception:
- logger.exception('Client write callback failure')
- self._clean_up_client(client)
- return
- queue.popleft()
- if not queue:
- self._register_read(client)
- del self._send_queue[fd]
- except socket.error:
- self._clean_up_client(client)
-
- def send(self, client, data, callback=None):
- logger.debug('Writing %d bytes to client %s, callback: %s', len(data), client.client_address, callback)
- self._send_queue[client.fileno()].append(SendMessage(data, callback))
- self._register_write(client)
-
- def stop(self):
- self._stop.set()
-
- def serve_forever(self):
- self._serve()
-
- def on_shutdown(self):
- pass
diff --git a/event_socket_server/engines/__init__.py b/event_socket_server/engines/__init__.py
deleted file mode 100644
index 5737684..0000000
--- a/event_socket_server/engines/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-import select
-
-__author__ = 'Quantum'
-engines = {}
-
-from .select_server import SelectServer # noqa: E402, import not at top for consistency
-engines['select'] = SelectServer
-
-if hasattr(select, 'poll'):
- from .poll_server import PollServer
- engines['poll'] = PollServer
-
-if hasattr(select, 'epoll'):
- from .epoll_server import EpollServer
- engines['epoll'] = EpollServer
-
-del select
diff --git a/event_socket_server/engines/epoll_server.py b/event_socket_server/engines/epoll_server.py
deleted file mode 100644
index a59755a..0000000
--- a/event_socket_server/engines/epoll_server.py
+++ /dev/null
@@ -1,17 +0,0 @@
-import select
-__author__ = 'Quantum'
-
-if not hasattr(select, 'epoll'):
- raise ImportError('System does not support epoll')
-
-from .poll_server import PollServer # noqa: E402, must be imported here
-
-
-class EpollServer(PollServer):
- poll = select.epoll
- WRITE = select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP
- READ = select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP
- POLLIN = select.EPOLLIN
- POLLOUT = select.EPOLLOUT
- POLL_CLOSE = select.EPOLLHUP | select.EPOLLERR
- NEED_CLOSE = True
diff --git a/event_socket_server/engines/poll_server.py b/event_socket_server/engines/poll_server.py
deleted file mode 100644
index aa4124b..0000000
--- a/event_socket_server/engines/poll_server.py
+++ /dev/null
@@ -1,97 +0,0 @@
-import errno
-import logging
-import select
-import threading
-
-from ..base_server import BaseServer
-
-logger = logging.getLogger('event_socket_server')
-
-if not hasattr(select, 'poll'):
- raise ImportError('System does not support poll')
-
-
-class PollServer(BaseServer):
- poll = select.poll
- WRITE = select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP
- READ = select.POLLIN | select.POLLERR | select.POLLHUP
- POLLIN = select.POLLIN
- POLLOUT = select.POLLOUT
- POLL_CLOSE = select.POLLERR | select.POLLHUP
- NEED_CLOSE = False
-
- def __init__(self, *args, **kwargs):
- super(PollServer, self).__init__(*args, **kwargs)
- self._poll = self.poll()
- self._fdmap = {}
- self._server_fds = {sock.fileno(): sock for sock in self._servers}
- self._close_lock = threading.RLock()
-
- def _register_write(self, client):
- logger.debug('On write mode: %s', client.client_address)
- self._poll.modify(client.fileno(), self.WRITE)
-
- def _register_read(self, client):
- logger.debug('On read mode: %s', client.client_address)
- self._poll.modify(client.fileno(), self.READ)
-
- def _clean_up_client(self, client, finalize=False):
- logger.debug('Taking close lock: cleanup')
- with self._close_lock:
- logger.debug('Cleaning up client: %s, finalize: %d', client.client_address, finalize)
- fd = client.fileno()
- try:
- self._poll.unregister(fd)
- except IOError as e:
- if e.errno != errno.ENOENT:
- raise
- except KeyError:
- pass
- del self._fdmap[fd]
- super(PollServer, self)._clean_up_client(client, finalize)
-
- def _serve(self):
- for fd, sock in self._server_fds.items():
- self._poll.register(fd, self.POLLIN)
- sock.listen(16)
- try:
- while not self._stop.is_set():
- for fd, event in self._poll.poll(self._dispatch_event()):
- if fd in self._server_fds:
- client = self._accept(self._server_fds[fd])
- logger.debug('Accepting: %s', client.client_address)
- fd = client.fileno()
- self._poll.register(fd, self.READ)
- self._fdmap[fd] = client
- elif event & self.POLL_CLOSE:
- logger.debug('Client closed: %s', self._fdmap[fd].client_address)
- self._clean_up_client(self._fdmap[fd])
- else:
- logger.debug('Taking close lock: event loop')
- with self._close_lock:
- try:
- client = self._fdmap[fd]
- except KeyError:
- pass
- else:
- logger.debug('Client active: %s, read: %d, write: %d',
- client.client_address,
- event & self.POLLIN,
- event & self.POLLOUT)
- if event & self.POLLIN:
- logger.debug('Non-blocking read on client: %s', client.client_address)
- self._nonblock_read(client)
- # Might be closed in the read handler.
- if event & self.POLLOUT and fd in self._fdmap:
- logger.debug('Non-blocking write on client: %s', client.client_address)
- self._nonblock_write(client)
- finally:
- logger.info('Shutting down server')
- self.on_shutdown()
- for client in self._clients:
- self._clean_up_client(client, True)
- for fd, sock in self._server_fds.items():
- self._poll.unregister(fd)
- sock.close()
- if self.NEED_CLOSE:
- self._poll.close()
diff --git a/event_socket_server/engines/select_server.py b/event_socket_server/engines/select_server.py
deleted file mode 100644
index 3ae9a4d..0000000
--- a/event_socket_server/engines/select_server.py
+++ /dev/null
@@ -1,49 +0,0 @@
-import select
-
-from ..base_server import BaseServer
-
-
-class SelectServer(BaseServer):
- def __init__(self, *args, **kwargs):
- super(SelectServer, self).__init__(*args, **kwargs)
- self._reads = set(self._servers)
- self._writes = set()
-
- def _register_write(self, client):
- self._writes.add(client)
-
- def _register_read(self, client):
- self._writes.remove(client)
-
- def _clean_up_client(self, client, finalize=False):
- self._writes.discard(client)
- self._reads.remove(client)
- super(SelectServer, self)._clean_up_client(client, finalize)
-
- def _serve(self, select=select.select):
- for server in self._servers:
- server.listen(16)
- try:
- while not self._stop.is_set():
- r, w, x = select(self._reads, self._writes, self._reads, self._dispatch_event())
- for s in r:
- if s in self._servers:
- self._reads.add(self._accept(s))
- else:
- self._nonblock_read(s)
-
- for client in w:
- self._nonblock_write(client)
-
- for s in x:
- s.close()
- if s in self._servers:
- raise RuntimeError('Server is in exceptional condition')
- else:
- self._clean_up_client(s)
- finally:
- self.on_shutdown()
- for client in self._clients:
- self._clean_up_client(client, True)
- for server in self._servers:
- server.close()
diff --git a/event_socket_server/handler.py b/event_socket_server/handler.py
deleted file mode 100644
index ecc3314..0000000
--- a/event_socket_server/handler.py
+++ /dev/null
@@ -1,27 +0,0 @@
-__author__ = 'Quantum'
-
-
-class Handler(object):
- def __init__(self, server, socket):
- self._socket = socket
- self.server = server
- self.client_address = socket.getpeername()
-
- def fileno(self):
- return self._socket.fileno()
-
- def _recv_data(self, data):
- raise NotImplementedError
-
- def _send(self, data, callback=None):
- return self.server.send(self, data, callback)
-
- def close(self):
- self.server._clean_up_client(self)
-
- def on_close(self):
- pass
-
- @property
- def socket(self):
- return self._socket
diff --git a/event_socket_server/helpers.py b/event_socket_server/helpers.py
deleted file mode 100644
index e8c3672..0000000
--- a/event_socket_server/helpers.py
+++ /dev/null
@@ -1,125 +0,0 @@
-import struct
-import zlib
-
-from judge.utils.unicode import utf8text
-from .handler import Handler
-
-size_pack = struct.Struct('!I')
-
-
-class SizedPacketHandler(Handler):
- def __init__(self, server, socket):
- super(SizedPacketHandler, self).__init__(server, socket)
- self._buffer = b''
- self._packetlen = 0
-
- def _packet(self, data):
- raise NotImplementedError()
-
- def _format_send(self, data):
- return data
-
- def _recv_data(self, data):
- self._buffer += data
- while len(self._buffer) >= self._packetlen if self._packetlen else len(self._buffer) >= size_pack.size:
- if self._packetlen:
- data = self._buffer[:self._packetlen]
- self._buffer = self._buffer[self._packetlen:]
- self._packetlen = 0
- self._packet(data)
- else:
- data = self._buffer[:size_pack.size]
- self._buffer = self._buffer[size_pack.size:]
- self._packetlen = size_pack.unpack(data)[0]
-
- def send(self, data, callback=None):
- data = self._format_send(data)
- self._send(size_pack.pack(len(data)) + data, callback)
-
-
-class ZlibPacketHandler(SizedPacketHandler):
- def _format_send(self, data):
- return zlib.compress(data.encode('utf-8'))
-
- def packet(self, data):
- raise NotImplementedError()
-
- def _packet(self, data):
- try:
- self.packet(zlib.decompress(data).decode('utf-8'))
- except zlib.error as e:
- self.malformed_packet(e)
-
- def malformed_packet(self, exception):
- self.close()
-
-
-class ProxyProtocolMixin(object):
- __UNKNOWN_TYPE = 0
- __PROXY1 = 1
- __PROXY2 = 2
- __DATA = 3
-
- __HEADER2 = b'\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A'
- __HEADER2_LEN = len(__HEADER2)
-
- _REAL_IP_SET = None
-
- @classmethod
- def with_proxy_set(cls, ranges):
- from netaddr import IPSet, IPGlob
- from itertools import chain
-
- globs = []
- addrs = []
- for item in ranges:
- if '*' in item or '-' in item:
- globs.append(IPGlob(item))
- else:
- addrs.append(item)
- ipset = IPSet(chain(chain.from_iterable(globs), addrs))
- return type(cls.__name__, (cls,), {'_REAL_IP_SET': ipset})
-
- def __init__(self, server, socket):
- super(ProxyProtocolMixin, self).__init__(server, socket)
- self.__buffer = b''
- self.__type = (self.__UNKNOWN_TYPE if self._REAL_IP_SET and
- self.client_address[0] in self._REAL_IP_SET else self.__DATA)
-
- def __parse_proxy1(self, data):
- self.__buffer += data
- index = self.__buffer.find(b'\r\n')
- if 0 <= index < 106:
- proxy = data[:index].split()
- if len(proxy) < 2:
- return self.close()
- if proxy[1] == b'TCP4':
- if len(proxy) != 6:
- return self.close()
- self.client_address = (utf8text(proxy[2]), utf8text(proxy[4]))
- self.server_address = (utf8text(proxy[3]), utf8text(proxy[5]))
- elif proxy[1] == b'TCP6':
- self.client_address = (utf8text(proxy[2]), utf8text(proxy[4]), 0, 0)
- self.server_address = (utf8text(proxy[3]), utf8text(proxy[5]), 0, 0)
- elif proxy[1] != b'UNKNOWN':
- return self.close()
-
- self.__type = self.__DATA
- super(ProxyProtocolMixin, self)._recv_data(data[index + 2:])
- elif len(self.__buffer) > 107 or index > 105:
- self.close()
-
- def _recv_data(self, data):
- if self.__type == self.__DATA:
- super(ProxyProtocolMixin, self)._recv_data(data)
- elif self.__type == self.__UNKNOWN_TYPE:
- if len(data) >= 16 and data[:self.__HEADER2_LEN] == self.__HEADER2:
- self.close()
- elif len(data) >= 8 and data[:5] == b'PROXY':
- self.__type = self.__PROXY1
- self.__parse_proxy1(data)
- else:
- self.__type = self.__DATA
- super(ProxyProtocolMixin, self)._recv_data(data)
- else:
- self.__parse_proxy1(data)
diff --git a/event_socket_server/test_server.py b/event_socket_server/test_server.py
deleted file mode 100644
index 7dec107..0000000
--- a/event_socket_server/test_server.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from .engines import engines
-from .helpers import ProxyProtocolMixin, ZlibPacketHandler
-
-
-class EchoPacketHandler(ProxyProtocolMixin, ZlibPacketHandler):
- def __init__(self, server, socket):
- super(EchoPacketHandler, self).__init__(server, socket)
- self._gotdata = False
- self.server.schedule(5, self._kill_if_no_data)
-
- def _kill_if_no_data(self):
- if not self._gotdata:
- print('Inactive client:', self._socket.getpeername())
- self.close()
-
- def packet(self, data):
- self._gotdata = True
- print('Data from %s: %r' % (self._socket.getpeername(), data[:30] if len(data) > 30 else data))
- self.send(data)
-
- def on_close(self):
- self._gotdata = True
- print('Closed client:', self._socket.getpeername())
-
-
-def main():
- import argparse
- parser = argparse.ArgumentParser()
- parser.add_argument('-l', '--host', action='append')
- parser.add_argument('-p', '--port', type=int, action='append')
- parser.add_argument('-e', '--engine', default='select', choices=sorted(engines.keys()))
- try:
- import netaddr
- except ImportError:
- netaddr = None
- else:
- parser.add_argument('-P', '--proxy', action='append')
- args = parser.parse_args()
-
- class TestServer(engines[args.engine]):
- def _accept(self, sock):
- client = super(TestServer, self)._accept(sock)
- print('New connection:', client.socket.getpeername())
- return client
-
- handler = EchoPacketHandler
- if netaddr is not None and args.proxy:
- handler = handler.with_proxy_set(args.proxy)
- server = TestServer(list(zip(args.host, args.port)), handler)
- server.serve_forever()
-
-
-if __name__ == '__main__':
- main()
diff --git a/judge/admin/submission.py b/judge/admin/submission.py
index d774658..2689a76 100644
--- a/judge/admin/submission.py
+++ b/judge/admin/submission.py
@@ -108,8 +108,8 @@ class SubmissionSourceInline(admin.StackedInline):
class SubmissionAdmin(admin.ModelAdmin):
- readonly_fields = ('user', 'problem', 'date')
- fields = ('user', 'problem', 'date', 'time', 'memory', 'points', 'language', 'status', 'result',
+ readonly_fields = ('user', 'problem', 'date', 'judged_date')
+ fields = ('user', 'problem', 'date', 'judged_date', 'time', 'memory', 'points', 'language', 'status', 'result',
'case_points', 'case_total', 'judged_on', 'error')
actions = ('judge', 'recalculate_score')
list_display = ('id', 'problem_code', 'problem_name', 'user_column', 'execution_time', 'pretty_memory',
diff --git a/judge/bridge/__init__.py b/judge/bridge/__init__.py
index f1f1bfb..e69de29 100644
--- a/judge/bridge/__init__.py
+++ b/judge/bridge/__init__.py
@@ -1,6 +0,0 @@
-from .djangohandler import DjangoHandler
-from .djangoserver import DjangoServer
-from .judgecallback import DjangoJudgeHandler
-from .judgehandler import JudgeHandler
-from .judgelist import JudgeList
-from .judgeserver import JudgeServer
diff --git a/judge/bridge/base_handler.py b/judge/bridge/base_handler.py
new file mode 100644
index 0000000..c3fbd40
--- /dev/null
+++ b/judge/bridge/base_handler.py
@@ -0,0 +1,196 @@
+import logging
+import socket
+import struct
+import zlib
+from itertools import chain
+
+from netaddr import IPGlob, IPSet
+
+from judge.utils.unicode import utf8text
+
+logger = logging.getLogger('judge.bridge')
+
+size_pack = struct.Struct('!I')
+assert size_pack.size == 4
+
+MAX_ALLOWED_PACKET_SIZE = 8 * 1024 * 1024
+
+
+def proxy_list(human_readable):
+ globs = []
+ addrs = []
+ for item in human_readable:
+ if '*' in item or '-' in item:
+ globs.append(IPGlob(item))
+ else:
+ addrs.append(item)
+ return IPSet(chain(chain.from_iterable(globs), addrs))
+
+
+class Disconnect(Exception):
+ pass
+
+
+# socketserver.BaseRequestHandler does all the handling in __init__,
+# making it impossible to inherit __init__ sanely. While it lets you
+# use setup(), most tools will complain about uninitialized variables.
+# This metaclass will allow sane __init__ behaviour while also magically
+# calling the methods that handles the request.
+class RequestHandlerMeta(type):
+ def __call__(cls, *args, **kwargs):
+ handler = super().__call__(*args, **kwargs)
+ handler.on_connect()
+ try:
+ handler.handle()
+ except BaseException:
+ logger.exception('Error in base packet handling')
+ raise
+ finally:
+ handler.on_disconnect()
+
+
+class ZlibPacketHandler(metaclass=RequestHandlerMeta):
+ proxies = []
+
+ def __init__(self, request, client_address, server):
+ self.request = request
+ self.server = server
+ self.client_address = client_address
+ self.server_address = server.server_address
+ self._initial_tag = None
+ self._got_packet = False
+
+ @property
+ def timeout(self):
+ return self.request.gettimeout()
+
+ @timeout.setter
+ def timeout(self, timeout):
+ self.request.settimeout(timeout or None)
+
+ def read_sized_packet(self, size, initial=None):
+ if size > MAX_ALLOWED_PACKET_SIZE:
+ logger.log(logging.WARNING if self._got_packet else logging.INFO,
+ 'Disconnecting client due to too-large message size (%d bytes): %s', size, self.client_address)
+ raise Disconnect()
+
+ buffer = []
+ remainder = size
+
+ if initial:
+ buffer.append(initial)
+ remainder -= len(initial)
+ assert remainder >= 0
+
+ while remainder:
+ data = self.request.recv(remainder)
+ remainder -= len(data)
+ buffer.append(data)
+ self._on_packet(b''.join(buffer))
+
+ def parse_proxy_protocol(self, line):
+ words = line.split()
+
+ if len(words) < 2:
+ raise Disconnect()
+
+ if words[1] == b'TCP4':
+ if len(words) != 6:
+ raise Disconnect()
+ self.client_address = (utf8text(words[2]), utf8text(words[4]))
+ self.server_address = (utf8text(words[3]), utf8text(words[5]))
+ elif words[1] == b'TCP6':
+ self.client_address = (utf8text(words[2]), utf8text(words[4]), 0, 0)
+ self.server_address = (utf8text(words[3]), utf8text(words[5]), 0, 0)
+ elif words[1] != b'UNKNOWN':
+ raise Disconnect()
+
+ def read_size(self, buffer=b''):
+ while len(buffer) < size_pack.size:
+ recv = self.request.recv(size_pack.size - len(buffer))
+ if not recv:
+ raise Disconnect()
+ buffer += recv
+ return size_pack.unpack(buffer)[0]
+
+ def read_proxy_header(self, buffer=b''):
+ # Max line length for PROXY protocol is 107, and we received 4 already.
+ while b'\r\n' not in buffer:
+ if len(buffer) > 107:
+ raise Disconnect()
+ data = self.request.recv(107)
+ if not data:
+ raise Disconnect()
+ buffer += data
+ return buffer
+
+ def _on_packet(self, data):
+ decompressed = zlib.decompress(data).decode('utf-8')
+ self._got_packet = True
+ self.on_packet(decompressed)
+
+ def on_packet(self, data):
+ raise NotImplementedError()
+
+ def on_connect(self):
+ pass
+
+ def on_disconnect(self):
+ pass
+
+ def on_timeout(self):
+ pass
+
+ def handle(self):
+ try:
+ tag = self.read_size()
+ self._initial_tag = size_pack.pack(tag)
+ if self.client_address[0] in self.proxies and self._initial_tag == b'PROX':
+ proxy, _, remainder = self.read_proxy_header(self._initial_tag).partition(b'\r\n')
+ self.parse_proxy_protocol(proxy)
+
+ while remainder:
+ while len(remainder) < size_pack.size:
+ self.read_sized_packet(self.read_size(remainder))
+ break
+
+ size = size_pack.unpack(remainder[:size_pack.size])[0]
+ remainder = remainder[size_pack.size:]
+ if len(remainder) <= size:
+ self.read_sized_packet(size, remainder)
+ break
+
+ self._on_packet(remainder[:size])
+ remainder = remainder[size:]
+ else:
+ self.read_sized_packet(tag)
+
+ while True:
+ self.read_sized_packet(self.read_size())
+ except Disconnect:
+ return
+ except zlib.error:
+ if self._got_packet:
+ logger.warning('Encountered zlib error during packet handling, disconnecting client: %s',
+ self.client_address, exc_info=True)
+ else:
+ logger.info('Potentially wrong protocol (zlib error): %s: %r', self.client_address, self._initial_tag,
+ exc_info=True)
+ except socket.timeout:
+ if self._got_packet:
+ logger.info('Socket timed out: %s', self.client_address)
+ self.on_timeout()
+ else:
+ logger.info('Potentially wrong protocol: %s: %r', self.client_address, self._initial_tag)
+ except socket.error as e:
+ # When a gevent socket is shutdown, gevent cancels all waits, causing recv to raise cancel_wait_ex.
+ if e.__class__.__name__ == 'cancel_wait_ex':
+ return
+ raise
+
+ def send(self, data):
+ compressed = zlib.compress(data.encode('utf-8'))
+ self.request.sendall(size_pack.pack(len(compressed)) + compressed)
+
+ def close(self):
+ self.request.shutdown(socket.SHUT_RDWR)
diff --git a/judge/bridge/daemon.py b/judge/bridge/daemon.py
new file mode 100644
index 0000000..ce8a702
--- /dev/null
+++ b/judge/bridge/daemon.py
@@ -0,0 +1,47 @@
+import logging
+import signal
+import threading
+from functools import partial
+
+from django.conf import settings
+
+from judge.bridge.django_handler import DjangoHandler
+from judge.bridge.judge_handler import JudgeHandler
+from judge.bridge.judge_list import JudgeList
+from judge.bridge.server import Server
+from judge.models import Judge, Submission
+
+logger = logging.getLogger('judge.bridge')
+
+
+def reset_judges():
+ Judge.objects.update(online=False, ping=None, load=None)
+
+
+def judge_daemon():
+ reset_judges()
+ Submission.objects.filter(status__in=Submission.IN_PROGRESS_GRADING_STATUS) \
+ .update(status='IE', result='IE', error=None)
+ judges = JudgeList()
+
+ judge_server = Server(settings.BRIDGED_JUDGE_ADDRESS, partial(JudgeHandler, judges=judges))
+ django_server = Server(settings.BRIDGED_DJANGO_ADDRESS, partial(DjangoHandler, judges=judges))
+
+ threading.Thread(target=django_server.serve_forever).start()
+ threading.Thread(target=judge_server.serve_forever).start()
+
+ stop = threading.Event()
+
+ def signal_handler(signum, _):
+ logger.info('Exiting due to %s', signal.Signals(signum).name)
+ stop.set()
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGQUIT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ try:
+ stop.wait()
+ finally:
+ django_server.shutdown()
+ judge_server.shutdown()
diff --git a/judge/bridge/djangohandler.py b/judge/bridge/django_handler.py
similarity index 53%
rename from judge/bridge/djangohandler.py
rename to judge/bridge/django_handler.py
index 8b18a53..b284b68 100644
--- a/judge/bridge/djangohandler.py
+++ b/judge/bridge/django_handler.py
@@ -2,63 +2,55 @@ import json
import logging
import struct
-from event_socket_server import ZlibPacketHandler
+from judge.bridge.base_handler import Disconnect, ZlibPacketHandler
logger = logging.getLogger('judge.bridge')
size_pack = struct.Struct('!I')
class DjangoHandler(ZlibPacketHandler):
- def __init__(self, server, socket):
- super(DjangoHandler, self).__init__(server, socket)
+ def __init__(self, request, client_address, server, judges):
+ super().__init__(request, client_address, server)
self.handlers = {
'submission-request': self.on_submission,
'terminate-submission': self.on_termination,
- 'disconnect-judge': self.on_disconnect,
+ 'disconnect-judge': self.on_disconnect_request,
}
- self._to_kill = True
- # self.server.schedule(5, self._kill_if_no_request)
+ self.judges = judges
- def _kill_if_no_request(self):
- if self._to_kill:
- logger.info('Killed inactive connection: %s', self._socket.getpeername())
- self.close()
+ def send(self, data):
+ super().send(json.dumps(data, separators=(',', ':')))
- def _format_send(self, data):
- return super(DjangoHandler, self)._format_send(json.dumps(data, separators=(',', ':')))
-
- def packet(self, packet):
- self._to_kill = False
+ def on_packet(self, packet):
packet = json.loads(packet)
try:
result = self.handlers.get(packet.get('name', None), self.on_malformed)(packet)
except Exception:
logger.exception('Error in packet handling (Django-facing)')
result = {'name': 'bad-request'}
- self.send(result, self._schedule_close)
-
- def _schedule_close(self):
- self.server.schedule(0, self.close)
+ self.send(result)
+ raise Disconnect()
def on_submission(self, data):
id = data['submission-id']
problem = data['problem-id']
language = data['language']
source = data['source']
+ judge_id = data['judge-id']
priority = data['priority']
- if not self.server.judges.check_priority(priority):
+ if not self.judges.check_priority(priority):
return {'name': 'bad-request'}
- self.server.judges.judge(id, problem, language, source, priority)
+ self.judges.judge(id, problem, language, source, judge_id, priority)
return {'name': 'submission-received', 'submission-id': id}
def on_termination(self, data):
- return {'name': 'submission-received', 'judge-aborted': self.server.judges.abort(data['submission-id'])}
+ return {'name': 'submission-received', 'judge-aborted': self.judges.abort(data['submission-id'])}
- def on_disconnect(self, data):
+ def on_disconnect_request(self, data):
judge_id = data['judge-id']
force = data['force']
- self.server.judges.disconnect(judge_id, force=force)
+ self.judges.disconnect(judge_id, force=force)
def on_malformed(self, packet):
logger.error('Malformed packet: %s', packet)
diff --git a/judge/bridge/djangoserver.py b/judge/bridge/djangoserver.py
deleted file mode 100644
index 39f5787..0000000
--- a/judge/bridge/djangoserver.py
+++ /dev/null
@@ -1,7 +0,0 @@
-from event_socket_server import get_preferred_engine
-
-
-class DjangoServer(get_preferred_engine()):
- def __init__(self, judges, *args, **kwargs):
- super(DjangoServer, self).__init__(*args, **kwargs)
- self.judges = judges
diff --git a/event_socket_server/test_client.py b/judge/bridge/echo_test_client.py
similarity index 82%
rename from event_socket_server/test_client.py
rename to judge/bridge/echo_test_client.py
index 4f696e8..8fec692 100644
--- a/event_socket_server/test_client.py
+++ b/judge/bridge/echo_test_client.py
@@ -1,14 +1,10 @@
-import ctypes
+import os
import socket
import struct
import time
import zlib
size_pack = struct.Struct('!I')
-try:
- RtlGenRandom = ctypes.windll.advapi32.SystemFunction036
-except AttributeError:
- RtlGenRandom = None
def open_connection():
@@ -27,15 +23,6 @@ def dezlibify(data, skip_head=True):
return zlib.decompress(data).decode('utf-8')
-def random(length):
- if RtlGenRandom is None:
- with open('/dev/urandom') as f:
- return f.read(length)
- buf = ctypes.create_string_buffer(length)
- RtlGenRandom(buf, length)
- return buf.raw
-
-
def main():
global host, port
import argparse
@@ -64,11 +51,11 @@ def main():
s2.close()
print('Large random data test:', end=' ')
s4 = open_connection()
- data = random(1000000)
+ data = os.urandom(1000000).decode('iso-8859-1')
print('Generated', end=' ')
s4.sendall(zlibify(data))
print('Sent', end=' ')
- result = ''
+ result = b''
while len(result) < size_pack.size:
result += s4.recv(1024)
size = size_pack.unpack(result[:size_pack.size])[0]
@@ -81,7 +68,7 @@ def main():
s4.close()
print('Test malformed connection:', end=' ')
s5 = open_connection()
- s5.sendall(data[:100000])
+ s5.sendall(data[:100000].encode('utf-8'))
s5.close()
print('Success')
print('Waiting for timeout to close idle connection:', end=' ')
diff --git a/judge/bridge/echo_test_server.py b/judge/bridge/echo_test_server.py
new file mode 100644
index 0000000..59e21fa
--- /dev/null
+++ b/judge/bridge/echo_test_server.py
@@ -0,0 +1,39 @@
+from judge.bridge.base_handler import ZlibPacketHandler
+
+
+class EchoPacketHandler(ZlibPacketHandler):
+ def on_connect(self):
+ print('New client:', self.client_address)
+ self.timeout = 5
+
+ def on_timeout(self):
+ print('Inactive client:', self.client_address)
+
+ def on_packet(self, data):
+ self.timeout = None
+ print('Data from %s: %r' % (self.client_address, data[:30] if len(data) > 30 else data))
+ self.send(data)
+
+ def on_disconnect(self):
+ print('Closed client:', self.client_address)
+
+
+def main():
+ import argparse
+ from judge.bridge.server import Server
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-l', '--host', action='append')
+ parser.add_argument('-p', '--port', type=int, action='append')
+ parser.add_argument('-P', '--proxy', action='append')
+ args = parser.parse_args()
+
+ class Handler(EchoPacketHandler):
+ proxies = args.proxy or []
+
+ server = Server(list(zip(args.host, args.port)), Handler)
+ server.serve_forever()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/judge/bridge/judgecallback.py b/judge/bridge/judge_handler.py
similarity index 64%
rename from judge/bridge/judgecallback.py
rename to judge/bridge/judge_handler.py
index a7ddf69..86f53b4 100644
--- a/judge/bridge/judgecallback.py
+++ b/judge/bridge/judge_handler.py
@@ -1,22 +1,27 @@
+import hmac
import json
import logging
+import threading
import time
+from collections import deque, namedtuple
from operator import itemgetter
from django import db
+from django.conf import settings
from django.utils import timezone
from django.db.models import F
from judge import event_poster as event
+from judge.bridge.base_handler import ZlibPacketHandler, proxy_list
from judge.caching import finished_submission
from judge.models import Judge, Language, LanguageLimit, Problem, RuntimeVersion, Submission, SubmissionTestCase
-from .judgehandler import JudgeHandler, SubmissionData
logger = logging.getLogger('judge.bridge')
json_log = logging.getLogger('judge.json.bridge')
UPDATE_RATE_LIMIT = 5
UPDATE_RATE_TIME = 0.5
+SubmissionData = namedtuple('SubmissionData', 'time memory short_circuit pretests_only contest_no attempt_no user_id')
def _ensure_connection():
@@ -26,9 +31,42 @@ def _ensure_connection():
db.connection.close()
-class DjangoJudgeHandler(JudgeHandler):
- def __init__(self, server, socket):
- super(DjangoJudgeHandler, self).__init__(server, socket)
+class JudgeHandler(ZlibPacketHandler):
+ proxies = proxy_list(settings.BRIDGED_JUDGE_PROXIES or [])
+
+ def __init__(self, request, client_address, server, judges):
+ super().__init__(request, client_address, server)
+
+ self.judges = judges
+ self.handlers = {
+ 'grading-begin': self.on_grading_begin,
+ 'grading-end': self.on_grading_end,
+ 'compile-error': self.on_compile_error,
+ 'compile-message': self.on_compile_message,
+ 'batch-begin': self.on_batch_begin,
+ 'batch-end': self.on_batch_end,
+ 'test-case-status': self.on_test_case,
+ 'internal-error': self.on_internal_error,
+ 'submission-terminated': self.on_submission_terminated,
+ 'submission-acknowledged': self.on_submission_acknowledged,
+ 'ping-response': self.on_ping_response,
+ 'supported-problems': self.on_supported_problems,
+ 'handshake': self.on_handshake,
+ }
+ self._working = False
+ self._no_response_job = None
+ self._problems = []
+ self.executors = {}
+ self.problems = {}
+ self.latency = None
+ self.time_delta = None
+ self.load = 1e100
+ self.name = None
+ self.batch_id = None
+ self.in_batch = False
+ self._stop_ping = threading.Event()
+ self._ping_average = deque(maxlen=6) # 1 minute average, just like load
+ self._time_delta = deque(maxlen=6)
# each value is (updates, last reset)
self.update_counter = {}
@@ -38,60 +76,33 @@ class DjangoJudgeHandler(JudgeHandler):
self._submission_cache_id = None
self._submission_cache = {}
+ def on_connect(self):
+ self.timeout = 15
+ logger.info('Judge connected from: %s', self.client_address)
json_log.info(self._make_json_log(action='connect'))
- def on_close(self):
- super(DjangoJudgeHandler, self).on_close()
+ def on_disconnect(self):
+ self._stop_ping.set()
+ if self._working:
+ logger.error('Judge %s disconnected while handling submission %s', self.name, self._working)
+ self.judges.remove(self)
+ if self.name is not None:
+ self._disconnected()
+ logger.info('Judge disconnected from: %s with name %s', self.client_address, self.name)
+
json_log.info(self._make_json_log(action='disconnect', info='judge disconnected'))
if self._working:
- Submission.objects.filter(id=self._working).update(status='IE', result='IE')
+ Submission.objects.filter(id=self._working).update(status='IE', result='IE', error='')
json_log.error(self._make_json_log(sub=self._working, action='close', info='IE due to shutdown on grading'))
- def on_malformed(self, packet):
- super(DjangoJudgeHandler, self).on_malformed(packet)
- json_log.exception(self._make_json_log(sub=self._working, info='malformed zlib packet'))
-
- def _packet_exception(self):
- json_log.exception(self._make_json_log(sub=self._working, info='packet processing exception'))
-
- def get_related_submission_data(self, submission):
- _ensure_connection() # We are called from the django-facing daemon thread. Guess what happens.
-
- try:
- pid, time, memory, short_circuit, lid, is_pretested, sub_date, uid, part_virtual, part_id = (
- Submission.objects.filter(id=submission)
- .values_list('problem__id', 'problem__time_limit', 'problem__memory_limit',
- 'problem__short_circuit', 'language__id', 'is_pretested', 'date', 'user__id',
- 'contest__participation__virtual', 'contest__participation__id')).get()
- except Submission.DoesNotExist:
- logger.error('Submission vanished: %s', submission)
- json_log.error(self._make_json_log(
- sub=self._working, action='request',
- info='submission vanished when fetching info',
- ))
- return
-
- attempt_no = Submission.objects.filter(problem__id=pid, contest__participation__id=part_id, user__id=uid,
- date__lt=sub_date).exclude(status__in=('CE', 'IE')).count() + 1
-
- try:
- time, memory = (LanguageLimit.objects.filter(problem__id=pid, language__id=lid)
- .values_list('time_limit', 'memory_limit').get())
- except LanguageLimit.DoesNotExist:
- pass
-
- return SubmissionData(
- time=time,
- memory=memory,
- short_circuit=short_circuit,
- pretests_only=is_pretested,
- contest_no=part_virtual,
- attempt_no=attempt_no,
- user_id=uid,
- )
-
def _authenticate(self, id, key):
- result = Judge.objects.filter(name=id, auth_key=key, is_blocked=False).exists()
+ try:
+ judge = Judge.objects.get(name=id, is_blocked=False)
+ except Judge.DoesNotExist:
+ result = False
+ else:
+ result = hmac.compare_digest(judge.auth_key, key)
+
if not result:
json_log.warning(self._make_json_log(action='auth', judge=id, info='judge failed authentication'))
return result
@@ -130,26 +141,118 @@ class DjangoJudgeHandler(JudgeHandler):
if e.__class__.__name__ == 'OperationalError' and e.__module__ == '_mysql_exceptions' and e.args[0] == 2006:
db.connection.close()
- def _post_update_submission(self, id, state, done=False):
- if self._submission_cache_id == id:
- data = self._submission_cache
- else:
- self._submission_cache = data = Submission.objects.filter(id=id).values(
- 'problem__is_public', 'contest__participation__contest__key',
- 'user_id', 'problem_id', 'status', 'language__key',
- ).get()
- self._submission_cache_id = id
+ def send(self, data):
+ super().send(json.dumps(data, separators=(',', ':')))
- if data['problem__is_public']:
- event.post('submissions', {
- 'type': 'done-submission' if done else 'update-submission',
- 'state': state, 'id': id,
- 'contest': data['contest__participation__contest__key'],
- 'user': data['user_id'], 'problem': data['problem_id'],
- 'status': data['status'], 'language': data['language__key'],
- })
+ def on_handshake(self, packet):
+ if 'id' not in packet or 'key' not in packet:
+ logger.warning('Malformed handshake: %s', self.client_address)
+ self.close()
+ return
+
+ if not self._authenticate(packet['id'], packet['key']):
+ logger.warning('Authentication failure: %s', self.client_address)
+ self.close()
+ return
+
+ self.timeout = 60
+ self._problems = packet['problems']
+ self.problems = dict(self._problems)
+ self.executors = packet['executors']
+ self.name = packet['id']
+
+ self.send({'name': 'handshake-success'})
+ logger.info('Judge authenticated: %s (%s)', self.client_address, packet['id'])
+ self.judges.register(self)
+ threading.Thread(target=self._ping_thread).start()
+ self._connected()
+
+ def can_judge(self, problem, executor, judge_id=None):
+ return problem in self.problems and executor in self.executors and (not judge_id or self.name == judge_id)
+
+ @property
+ def working(self):
+ return bool(self._working)
+
+ def get_related_submission_data(self, submission):
+ _ensure_connection()
+
+ try:
+ pid, time, memory, short_circuit, lid, is_pretested, sub_date, uid, part_virtual, part_id = (
+ Submission.objects.filter(id=submission)
+ .values_list('problem__id', 'problem__time_limit', 'problem__memory_limit',
+ 'problem__short_circuit', 'language__id', 'is_pretested', 'date', 'user__id',
+ 'contest__participation__virtual', 'contest__participation__id')).get()
+ except Submission.DoesNotExist:
+ logger.error('Submission vanished: %s', submission)
+ json_log.error(self._make_json_log(
+ sub=self._working, action='request',
+ info='submission vanished when fetching info',
+ ))
+ return
+
+ attempt_no = Submission.objects.filter(problem__id=pid, contest__participation__id=part_id, user__id=uid,
+ date__lt=sub_date).exclude(status__in=('CE', 'IE')).count() + 1
+
+ try:
+ time, memory = (LanguageLimit.objects.filter(problem__id=pid, language__id=lid)
+ .values_list('time_limit', 'memory_limit').get())
+ except LanguageLimit.DoesNotExist:
+ pass
+
+ return SubmissionData(
+ time=time,
+ memory=memory,
+ short_circuit=short_circuit,
+ pretests_only=is_pretested,
+ contest_no=part_virtual,
+ attempt_no=attempt_no,
+ user_id=uid,
+ )
+
+ def disconnect(self, force=False):
+ if force:
+ # Yank the power out.
+ self.close()
+ else:
+ self.send({'name': 'disconnect'})
+
+ def submit(self, id, problem, language, source):
+ data = self.get_related_submission_data(id)
+ self._working = id
+ self._no_response_job = threading.Timer(20, self._kill_if_no_response)
+ self.send({
+ 'name': 'submission-request',
+ 'submission-id': id,
+ 'problem-id': problem,
+ 'language': language,
+ 'source': source,
+ 'time-limit': data.time,
+ 'memory-limit': data.memory,
+ 'short-circuit': data.short_circuit,
+ 'meta': {
+ 'pretests-only': data.pretests_only,
+ 'in-contest': data.contest_no,
+ 'attempt-no': data.attempt_no,
+ 'user': data.user_id,
+ },
+ })
+
+ def _kill_if_no_response(self):
+ logger.error('Judge failed to acknowledge submission: %s: %s', self.name, self._working)
+ self.close()
+
+ def on_timeout(self):
+ if self.name:
+ logger.warning('Judge seems dead: %s: %s', self.name, self._working)
+
+ def malformed_packet(self, exception):
+ logger.exception('Judge sent malformed packet: %s', self.name)
+ super(JudgeHandler, self).malformed_packet(exception)
def on_submission_processing(self, packet):
+ _ensure_connection()
+
id = packet['submission-id']
if Submission.objects.filter(id=id).update(status='P', judged_on=self.judge):
event.post('sub_%s' % Submission.get_id_secret(id), {'type': 'processing'})
@@ -161,12 +264,72 @@ class DjangoJudgeHandler(JudgeHandler):
def on_submission_wrong_acknowledge(self, packet, expected, got):
json_log.error(self._make_json_log(packet, action='processing', info='wrong-acknowledge', expected=expected))
+ Submission.objects.filter(id=expected).update(status='IE', result='IE', error=None)
+ Submission.objects.filter(id=got, status='QU').update(status='IE', result='IE', error=None)
+
+ def on_submission_acknowledged(self, packet):
+ if not packet.get('submission-id', None) == self._working:
+ logger.error('Wrong acknowledgement: %s: %s, expected: %s', self.name, packet.get('submission-id', None),
+ self._working)
+ self.on_submission_wrong_acknowledge(packet, self._working, packet.get('submission-id', None))
+ self.close()
+ logger.info('Submission acknowledged: %d', self._working)
+ if self._no_response_job:
+ self._no_response_job.cancel()
+ self._no_response_job = None
+ self.on_submission_processing(packet)
+
+ def abort(self):
+ self.send({'name': 'terminate-submission'})
+
+ def get_current_submission(self):
+ return self._working or None
+
+ def ping(self):
+ self.send({'name': 'ping', 'when': time.time()})
+
+ def on_packet(self, data):
+ try:
+ try:
+ data = json.loads(data)
+ if 'name' not in data:
+ raise ValueError
+ except ValueError:
+ self.on_malformed(data)
+ else:
+ handler = self.handlers.get(data['name'], self.on_malformed)
+ handler(data)
+ except Exception:
+ logger.exception('Error in packet handling (Judge-side): %s', self.name)
+ self._packet_exception()
+ # You can't crash here because you aren't so sure about the judges
+ # not being malicious or simply malforms. THIS IS A SERVER!
+
+ def _packet_exception(self):
+ json_log.exception(self._make_json_log(sub=self._working, info='packet processing exception'))
+
+ def _submission_is_batch(self, id):
+ if not Submission.objects.filter(id=id).update(batch=True):
+ logger.warning('Unknown submission: %s', id)
+
+ def on_supported_problems(self, packet):
+ logger.info('%s: Updated problem list', self.name)
+ self._problems = packet['problems']
+ self.problems = dict(self._problems)
+ if not self.working:
+ self.judges.update_problems(self)
+
+ self.judge.problems.set(Problem.objects.filter(code__in=list(self.problems.keys())))
+ json_log.info(self._make_json_log(action='update-problems', count=len(self.problems)))
def on_grading_begin(self, packet):
- super(DjangoJudgeHandler, self).on_grading_begin(packet)
+ logger.info('%s: Grading has begun on: %s', self.name, packet['submission-id'])
+ self.batch_id = None
+
if Submission.objects.filter(id=packet['submission-id']).update(
- status='G', is_pretested=packet['pretested'],
- current_testcase=1, batch=False, points=0):
+ status='G', is_pretested=packet['pretested'],
+ current_testcase=1, points=0,
+ batch=False, judged_date=timezone.now()):
SubmissionTestCase.objects.filter(submission_id=packet['submission-id']).delete()
event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'grading-begin'})
self._post_update_submission(packet['submission-id'], 'grading-begin')
@@ -175,12 +338,10 @@ class DjangoJudgeHandler(JudgeHandler):
logger.warning('Unknown submission: %s', packet['submission-id'])
json_log.error(self._make_json_log(packet, action='grading-begin', info='unknown submission'))
- def _submission_is_batch(self, id):
- if not Submission.objects.filter(id=id).update(batch=True):
- logger.warning('Unknown submission: %s', id)
-
def on_grading_end(self, packet):
- super(DjangoJudgeHandler, self).on_grading_end(packet)
+ logger.info('%s: Grading has ended on: %s', self.name, packet['submission-id'])
+ self._free_self(packet)
+ self.batch_id = None
try:
submission = Submission.objects.get(id=packet['submission-id'])
@@ -263,7 +424,8 @@ class DjangoJudgeHandler(JudgeHandler):
self._post_update_submission(submission.id, 'grading-end', done=True)
def on_compile_error(self, packet):
- super(DjangoJudgeHandler, self).on_compile_error(packet)
+ logger.info('%s: Submission failed to compile: %s', self.name, packet['submission-id'])
+ self._free_self(packet)
if Submission.objects.filter(id=packet['submission-id']).update(status='CE', result='CE', error=packet['log']):
event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {
@@ -279,7 +441,7 @@ class DjangoJudgeHandler(JudgeHandler):
log=packet['log'], finish=True, result='CE'))
def on_compile_message(self, packet):
- super(DjangoJudgeHandler, self).on_compile_message(packet)
+ logger.info('%s: Submission generated compiler messages: %s', self.name, packet['submission-id'])
if Submission.objects.filter(id=packet['submission-id']).update(error=packet['log']):
event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'compile-message'})
@@ -290,7 +452,11 @@ class DjangoJudgeHandler(JudgeHandler):
log=packet['log']))
def on_internal_error(self, packet):
- super(DjangoJudgeHandler, self).on_internal_error(packet)
+ try:
+ raise ValueError('\n\n' + packet['message'])
+ except ValueError:
+ logger.exception('Judge %s failed while handling submission %s', self.name, packet['submission-id'])
+ self._free_self(packet)
id = packet['submission-id']
if Submission.objects.filter(id=id).update(status='IE', result='IE', error=packet['message']):
@@ -304,7 +470,8 @@ class DjangoJudgeHandler(JudgeHandler):
message=packet['message'], finish=True, result='IE'))
def on_submission_terminated(self, packet):
- super(DjangoJudgeHandler, self).on_submission_terminated(packet)
+ logger.info('%s: Submission aborted: %s', self.name, packet['submission-id'])
+ self._free_self(packet)
if Submission.objects.filter(id=packet['submission-id']).update(status='AB', result='AB'):
event.post('sub_%s' % Submission.get_id_secret(packet['submission-id']), {'type': 'aborted-submission'})
@@ -316,20 +483,29 @@ class DjangoJudgeHandler(JudgeHandler):
finish=True, result='AB'))
def on_batch_begin(self, packet):
- super(DjangoJudgeHandler, self).on_batch_begin(packet)
+ logger.info('%s: Batch began on: %s', self.name, packet['submission-id'])
+ self.in_batch = True
+ if self.batch_id is None:
+ self.batch_id = 0
+ self._submission_is_batch(packet['submission-id'])
+ self.batch_id += 1
+
json_log.info(self._make_json_log(packet, action='batch-begin', batch=self.batch_id))
def on_batch_end(self, packet):
- super(DjangoJudgeHandler, self).on_batch_end(packet)
+ self.in_batch = False
+ logger.info('%s: Batch ended on: %s', self.name, packet['submission-id'])
json_log.info(self._make_json_log(packet, action='batch-end', batch=self.batch_id))
def on_test_case(self, packet, max_feedback=SubmissionTestCase._meta.get_field('feedback').max_length):
- super(DjangoJudgeHandler, self).on_test_case(packet)
+ logger.info('%s: %d test case(s) completed on: %s', self.name, len(packet['cases']), packet['submission-id'])
+
id = packet['submission-id']
updates = packet['cases']
max_position = max(map(itemgetter('position'), updates))
sum_points = sum(map(itemgetter('points'), updates))
+
if not Submission.objects.filter(id=id).update(current_testcase=max_position + 1, points=F('points') + sum_points):
logger.warning('Unknown submission: %s', id)
json_log.error(self._make_json_log(packet, action='test-case', info='unknown submission'))
@@ -395,10 +571,32 @@ class DjangoJudgeHandler(JudgeHandler):
SubmissionTestCase.objects.bulk_create(bulk_test_case_updates)
- def on_supported_problems(self, packet):
- super(DjangoJudgeHandler, self).on_supported_problems(packet)
- self.judge.problems.set(Problem.objects.filter(code__in=list(self.problems.keys())))
- json_log.info(self._make_json_log(action='update-problems', count=len(self.problems)))
+ def on_malformed(self, packet):
+ logger.error('%s: Malformed packet: %s', self.name, packet)
+ json_log.exception(self._make_json_log(sub=self._working, info='malformed json packet'))
+
+ def on_ping_response(self, packet):
+ end = time.time()
+ self._ping_average.append(end - packet['when'])
+ self._time_delta.append((end + packet['when']) / 2 - packet['time'])
+ self.latency = sum(self._ping_average) / len(self._ping_average)
+ self.time_delta = sum(self._time_delta) / len(self._time_delta)
+ self.load = packet['load']
+ self._update_ping()
+
+ def _free_self(self, packet):
+ self.judges.on_judge_free(self, packet['submission-id'])
+
+ def _ping_thread(self):
+ try:
+ while True:
+ self.ping()
+ if self._stop_ping.wait(10):
+ break
+ except Exception:
+ logger.exception('Ping error in %s', self.name)
+ self.close()
+ raise
def _make_json_log(self, packet=None, sub=None, **kwargs):
data = {
@@ -411,3 +609,22 @@ class DjangoJudgeHandler(JudgeHandler):
data['submission'] = sub
data.update(kwargs)
return json.dumps(data)
+
+ def _post_update_submission(self, id, state, done=False):
+ if self._submission_cache_id == id:
+ data = self._submission_cache
+ else:
+ self._submission_cache = data = Submission.objects.filter(id=id).values(
+ 'problem__is_public', 'contest_object__key',
+ 'user_id', 'problem_id', 'status', 'language__key',
+ ).get()
+ self._submission_cache_id = id
+
+ if data['problem__is_public']:
+ event.post('submissions', {
+ 'type': 'done-submission' if done else 'update-submission',
+ 'state': state, 'id': id,
+ 'contest': data['contest_object__key'],
+ 'user': data['user_id'], 'problem': data['problem_id'],
+ 'status': data['status'], 'language': data['language__key'],
+ })
diff --git a/judge/bridge/judgelist.py b/judge/bridge/judge_list.py
similarity index 80%
rename from judge/bridge/judgelist.py
rename to judge/bridge/judge_list.py
index ec3cebd..828bb83 100644
--- a/judge/bridge/judgelist.py
+++ b/judge/bridge/judge_list.py
@@ -29,8 +29,8 @@ class JudgeList(object):
node = self.queue.first
while node:
if not isinstance(node.value, PriorityMarker):
- id, problem, language, source = node.value
- if judge.can_judge(problem, language):
+ id, problem, language, source, judge_id = node.value
+ if judge.can_judge(problem, language, judge_id):
self.submission_map[id] = judge
logger.info('Dispatched queued submission %d: %s', id, judge.name)
try:
@@ -52,9 +52,10 @@ class JudgeList(object):
self._handle_free_judge(judge)
def disconnect(self, judge_id, force=False):
- for judge in self.judges:
- if judge.name == judge_id:
- judge.disconnect(force=force)
+ with self.lock:
+ for judge in self.judges:
+ if judge.name == judge_id:
+ judge.disconnect(force=force)
def update_problems(self, judge):
with self.lock:
@@ -77,6 +78,7 @@ class JudgeList(object):
with self.lock:
logger.info('Judge available after grading %d: %s', submission, judge.name)
del self.submission_map[submission]
+ judge._working = False
self._handle_free_judge(judge)
def abort(self, submission):
@@ -98,15 +100,20 @@ class JudgeList(object):
def check_priority(self, priority):
return 0 <= priority < self.priorities
- def judge(self, id, problem, language, source, priority):
+ def judge(self, id, problem, language, source, judge_id, priority):
with self.lock:
if id in self.submission_map or id in self.node_map:
# Already judging, don't queue again. This can happen during batch rejudges, rejudges should be
# idempotent.
return
- candidates = [judge for judge in self.judges if not judge.working and judge.can_judge(problem, language)]
- logger.info('Free judges: %d', len(candidates))
+ candidates = [
+ judge for judge in self.judges if not judge.working and judge.can_judge(problem, language, judge_id)
+ ]
+ if judge_id:
+ logger.info('Specified judge %s is%savailable', judge_id, ' ' if candidates else ' not ')
+ else:
+ logger.info('Free judges: %d', len(candidates))
if candidates:
# Schedule the submission on the judge reporting least load.
judge = min(candidates, key=attrgetter('load'))
@@ -117,7 +124,10 @@ class JudgeList(object):
except Exception:
logger.exception('Failed to dispatch %d (%s, %s) to %s', id, problem, language, judge.name)
self.judges.discard(judge)
- return self.judge(id, problem, language, source, priority)
+ return self.judge(id, problem, language, source, judge_id, priority)
else:
- self.node_map[id] = self.queue.insert((id, problem, language, source), self.priority[priority])
+ self.node_map[id] = self.queue.insert(
+ (id, problem, language, source, judge_id),
+ self.priority[priority],
+ )
logger.info('Queued submission: %d', id)
diff --git a/judge/bridge/judgehandler.py b/judge/bridge/judgehandler.py
deleted file mode 100644
index 2a09218..0000000
--- a/judge/bridge/judgehandler.py
+++ /dev/null
@@ -1,268 +0,0 @@
-import json
-import logging
-import time
-from collections import deque, namedtuple
-
-from event_socket_server import ProxyProtocolMixin, ZlibPacketHandler
-
-logger = logging.getLogger('judge.bridge')
-
-SubmissionData = namedtuple('SubmissionData', 'time memory short_circuit pretests_only contest_no attempt_no user_id')
-
-
-class JudgeHandler(ProxyProtocolMixin, ZlibPacketHandler):
- def __init__(self, server, socket):
- super(JudgeHandler, self).__init__(server, socket)
-
- self.handlers = {
- 'grading-begin': self.on_grading_begin,
- 'grading-end': self.on_grading_end,
- 'compile-error': self.on_compile_error,
- 'compile-message': self.on_compile_message,
- 'batch-begin': self.on_batch_begin,
- 'batch-end': self.on_batch_end,
- 'test-case-status': self.on_test_case,
- 'internal-error': self.on_internal_error,
- 'submission-terminated': self.on_submission_terminated,
- 'submission-acknowledged': self.on_submission_acknowledged,
- 'ping-response': self.on_ping_response,
- 'supported-problems': self.on_supported_problems,
- 'handshake': self.on_handshake,
- }
- self._to_kill = True
- self._working = False
- self._no_response_job = None
- self._problems = []
- self.executors = []
- self.problems = {}
- self.latency = None
- self.time_delta = None
- self.load = 1e100
- self.name = None
- self.batch_id = None
- self.in_batch = False
- self._ping_average = deque(maxlen=6) # 1 minute average, just like load
- self._time_delta = deque(maxlen=6)
-
- self.server.schedule(15, self._kill_if_no_auth)
- logger.info('Judge connected from: %s', self.client_address)
-
- def _kill_if_no_auth(self):
- if self._to_kill:
- logger.info('Judge not authenticated: %s', self.client_address)
- self.close()
-
- def on_close(self):
- self._to_kill = False
- if self._no_response_job:
- self.server.unschedule(self._no_response_job)
- self.server.judges.remove(self)
- if self.name is not None:
- self._disconnected()
- logger.info('Judge disconnected from: %s', self.client_address)
-
- def _authenticate(self, id, key):
- return False
-
- def _connected(self):
- pass
-
- def _disconnected(self):
- pass
-
- def _update_ping(self):
- pass
-
- def _format_send(self, data):
- return super(JudgeHandler, self)._format_send(json.dumps(data, separators=(',', ':')))
-
- def on_handshake(self, packet):
- if 'id' not in packet or 'key' not in packet:
- logger.warning('Malformed handshake: %s', self.client_address)
- self.close()
- return
-
- if not self._authenticate(packet['id'], packet['key']):
- logger.warning('Authentication failure: %s', self.client_address)
- self.close()
- return
-
- self._to_kill = False
- self._problems = packet['problems']
- self.problems = dict(self._problems)
- self.executors = packet['executors']
- self.name = packet['id']
-
- self.send({'name': 'handshake-success'})
- logger.info('Judge authenticated: %s (%s)', self.client_address, packet['id'])
- self.server.judges.register(self)
- self._connected()
-
- def can_judge(self, problem, executor):
- return problem in self.problems and executor in self.executors
-
- @property
- def working(self):
- return bool(self._working)
-
- def get_related_submission_data(self, submission):
- return SubmissionData(
- time=2,
- memory=16384,
- short_circuit=False,
- pretests_only=False,
- contest_no=None,
- attempt_no=1,
- user_id=None,
- )
-
- def disconnect(self, force=False):
- if force:
- # Yank the power out.
- self.close()
- else:
- self.send({'name': 'disconnect'})
-
- def submit(self, id, problem, language, source):
- data = self.get_related_submission_data(id)
- self._working = id
- self._no_response_job = self.server.schedule(20, self._kill_if_no_response)
- self.send({
- 'name': 'submission-request',
- 'submission-id': id,
- 'problem-id': problem,
- 'language': language,
- 'source': source,
- 'time-limit': data.time,
- 'memory-limit': data.memory,
- 'short-circuit': data.short_circuit,
- 'meta': {
- 'pretests-only': data.pretests_only,
- 'in-contest': data.contest_no,
- 'attempt-no': data.attempt_no,
- 'user': data.user_id,
- },
- })
-
- def _kill_if_no_response(self):
- logger.error('Judge seems dead: %s: %s', self.name, self._working)
- self.close()
-
- def malformed_packet(self, exception):
- logger.exception('Judge sent malformed packet: %s', self.name)
- super(JudgeHandler, self).malformed_packet(exception)
-
- def on_submission_processing(self, packet):
- pass
-
- def on_submission_wrong_acknowledge(self, packet, expected, got):
- pass
-
- def on_submission_acknowledged(self, packet):
- if not packet.get('submission-id', None) == self._working:
- logger.error('Wrong acknowledgement: %s: %s, expected: %s', self.name, packet.get('submission-id', None),
- self._working)
- self.on_submission_wrong_acknowledge(packet, self._working, packet.get('submission-id', None))
- self.close()
- logger.info('Submission acknowledged: %d', self._working)
- if self._no_response_job:
- self.server.unschedule(self._no_response_job)
- self._no_response_job = None
- self.on_submission_processing(packet)
-
- def abort(self):
- self.send({'name': 'terminate-submission'})
-
- def get_current_submission(self):
- return self._working or None
-
- def ping(self):
- self.send({'name': 'ping', 'when': time.time()})
-
- def packet(self, data):
- try:
- try:
- data = json.loads(data)
- if 'name' not in data:
- raise ValueError
- except ValueError:
- self.on_malformed(data)
- else:
- handler = self.handlers.get(data['name'], self.on_malformed)
- handler(data)
- except Exception:
- logger.exception('Error in packet handling (Judge-side): %s', self.name)
- self._packet_exception()
- # You can't crash here because you aren't so sure about the judges
- # not being malicious or simply malforms. THIS IS A SERVER!
-
- def _packet_exception(self):
- pass
-
- def _submission_is_batch(self, id):
- pass
-
- def on_supported_problems(self, packet):
- logger.info('%s: Updated problem list', self.name)
- self._problems = packet['problems']
- self.problems = dict(self._problems)
- if not self.working:
- self.server.judges.update_problems(self)
-
- def on_grading_begin(self, packet):
- logger.info('%s: Grading has begun on: %s', self.name, packet['submission-id'])
- self.batch_id = None
-
- def on_grading_end(self, packet):
- logger.info('%s: Grading has ended on: %s', self.name, packet['submission-id'])
- self._free_self(packet)
- self.batch_id = None
-
- def on_compile_error(self, packet):
- logger.info('%s: Submission failed to compile: %s', self.name, packet['submission-id'])
- self._free_self(packet)
-
- def on_compile_message(self, packet):
- logger.info('%s: Submission generated compiler messages: %s', self.name, packet['submission-id'])
-
- def on_internal_error(self, packet):
- try:
- raise ValueError('\n\n' + packet['message'])
- except ValueError:
- logger.exception('Judge %s failed while handling submission %s', self.name, packet['submission-id'])
- self._free_self(packet)
-
- def on_submission_terminated(self, packet):
- logger.info('%s: Submission aborted: %s', self.name, packet['submission-id'])
- self._free_self(packet)
-
- def on_batch_begin(self, packet):
- logger.info('%s: Batch began on: %s', self.name, packet['submission-id'])
- self.in_batch = True
- if self.batch_id is None:
- self.batch_id = 0
- self._submission_is_batch(packet['submission-id'])
- self.batch_id += 1
-
- def on_batch_end(self, packet):
- self.in_batch = False
- logger.info('%s: Batch ended on: %s', self.name, packet['submission-id'])
-
- def on_test_case(self, packet):
- logger.info('%s: %d test case(s) completed on: %s', self.name, len(packet['cases']), packet['submission-id'])
-
- def on_malformed(self, packet):
- logger.error('%s: Malformed packet: %s', self.name, packet)
-
- def on_ping_response(self, packet):
- end = time.time()
- self._ping_average.append(end - packet['when'])
- self._time_delta.append((end + packet['when']) / 2 - packet['time'])
- self.latency = sum(self._ping_average) / len(self._ping_average)
- self.time_delta = sum(self._time_delta) / len(self._time_delta)
- self.load = packet['load']
- self._update_ping()
-
- def _free_self(self, packet):
- self._working = False
- self.server.judges.on_judge_free(self, packet['submission-id'])
diff --git a/judge/bridge/judgeserver.py b/judge/bridge/judgeserver.py
deleted file mode 100644
index 251a431..0000000
--- a/judge/bridge/judgeserver.py
+++ /dev/null
@@ -1,68 +0,0 @@
-import logging
-import os
-import threading
-import time
-
-from event_socket_server import get_preferred_engine
-from judge.models import Judge
-from .judgelist import JudgeList
-
-logger = logging.getLogger('judge.bridge')
-
-
-def reset_judges():
- Judge.objects.update(online=False, ping=None, load=None)
-
-
-class JudgeServer(get_preferred_engine()):
- def __init__(self, *args, **kwargs):
- super(JudgeServer, self).__init__(*args, **kwargs)
- reset_judges()
- self.judges = JudgeList()
- self.ping_judge_thread = threading.Thread(target=self.ping_judge, args=())
- self.ping_judge_thread.daemon = True
- self.ping_judge_thread.start()
-
- def on_shutdown(self):
- super(JudgeServer, self).on_shutdown()
- reset_judges()
-
- def ping_judge(self):
- try:
- while True:
- for judge in self.judges:
- judge.ping()
- time.sleep(10)
- except Exception:
- logger.exception('Ping error')
- raise
-
-
-def main():
- import argparse
- import logging
- from .judgehandler import JudgeHandler
-
- format = '%(asctime)s:%(levelname)s:%(name)s:%(message)s'
- logging.basicConfig(format=format)
- logging.getLogger().setLevel(logging.INFO)
- handler = logging.FileHandler(os.path.join(os.path.dirname(__file__), 'judgeserver.log'), encoding='utf-8')
- handler.setFormatter(logging.Formatter(format))
- handler.setLevel(logging.INFO)
- logging.getLogger().addHandler(handler)
-
- parser = argparse.ArgumentParser(description='''
- Runs the bridge between DMOJ website and judges.
- ''')
- parser.add_argument('judge_host', nargs='+', action='append',
- help='host to listen for the judge')
- parser.add_argument('-p', '--judge-port', type=int, action='append',
- help='port to listen for the judge')
-
- args = parser.parse_args()
- server = JudgeServer(list(zip(args.judge_host, args.judge_port)), JudgeHandler)
- server.serve_forever()
-
-
-if __name__ == '__main__':
- main()
diff --git a/judge/bridge/server.py b/judge/bridge/server.py
new file mode 100644
index 0000000..cc83f84
--- /dev/null
+++ b/judge/bridge/server.py
@@ -0,0 +1,30 @@
+import threading
+from socketserver import TCPServer, ThreadingMixIn
+
+
+class ThreadingTCPListener(ThreadingMixIn, TCPServer):
+ allow_reuse_address = True
+
+
+class Server:
+ def __init__(self, addresses, handler):
+ self.servers = [ThreadingTCPListener(address, handler) for address in addresses]
+ self._shutdown = threading.Event()
+
+ def serve_forever(self):
+ threads = [threading.Thread(target=server.serve_forever) for server in self.servers]
+ for thread in threads:
+ thread.daemon = True
+ thread.start()
+ try:
+ self._shutdown.wait()
+ except KeyboardInterrupt:
+ self.shutdown()
+ finally:
+ for thread in threads:
+ thread.join()
+
+ def shutdown(self):
+ for server in self.servers:
+ server.shutdown()
+ self._shutdown.set()
diff --git a/judge/forms.py b/judge/forms.py
index 11aa092..67ccd12 100644
--- a/judge/forms.py
+++ b/judge/forms.py
@@ -7,7 +7,7 @@ from django.contrib.auth.forms import AuthenticationForm
from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db.models import Q
-from django.forms import CharField, Form, ModelForm
+from django.forms import CharField, ChoiceField, Form, ModelForm
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
@@ -69,18 +69,23 @@ class ProfileForm(ModelForm):
class ProblemSubmitForm(ModelForm):
source = CharField(max_length=65536, widget=AceWidget(theme='twilight', no_ace_media=True))
+ judge = ChoiceField(choices=(), widget=forms.HiddenInput(), required=False)
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args, judge_choices=(), **kwargs):
super(ProblemSubmitForm, self).__init__(*args, **kwargs)
- self.fields['problem'].empty_label = None
- self.fields['problem'].widget = forms.HiddenInput()
self.fields['language'].empty_label = None
self.fields['language'].label_from_instance = attrgetter('display_name')
self.fields['language'].queryset = Language.objects.filter(judges__online=True).distinct()
+ if judge_choices:
+ self.fields['judge'].widget = Select2Widget(
+ attrs={'style': 'width: 150px', 'data-placeholder': _('Any judge')},
+ )
+ self.fields['judge'].choices = judge_choices
+
class Meta:
model = Submission
- fields = ['problem', 'language']
+ fields = ['language']
class EditOrganizationForm(ModelForm):
@@ -156,4 +161,4 @@ class ContestCloneForm(Form):
key = self.cleaned_data['key']
if Contest.objects.filter(key=key).exists():
raise ValidationError(_('Contest with key already exists.'))
- return key
+ return key
\ No newline at end of file
diff --git a/judge/judgeapi.py b/judge/judgeapi.py
index ca49460..57627b2 100644
--- a/judge/judgeapi.py
+++ b/judge/judgeapi.py
@@ -48,7 +48,7 @@ def judge_request(packet, reply=True):
return result
-def judge_submission(submission, rejudge, batch_rejudge=False):
+def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=None):
from .models import ContestSubmission, Submission, SubmissionTestCase
CONTEST_SUBMISSION_PRIORITY = 0
@@ -61,8 +61,8 @@ def judge_submission(submission, rejudge, batch_rejudge=False):
try:
# This is set proactively; it might get unset in judgecallback's on_grading_begin if the problem doesn't
# actually have pretests stored on the judge.
- updates['is_pretested'] = ContestSubmission.objects.filter(submission=submission) \
- .values_list('problem__contest__run_pretests_only', flat=True)[0]
+ updates['is_pretested'] = all(ContestSubmission.objects.filter(submission=submission)
+ .values_list('problem__contest__run_pretests_only', 'problem__is_pretested')[0])
except IndexError:
priority = DEFAULT_PRIORITY
else:
@@ -88,15 +88,16 @@ def judge_submission(submission, rejudge, batch_rejudge=False):
'problem-id': submission.problem.code,
'language': submission.language.key,
'source': submission.source.source,
+ 'judge-id': judge_id,
'priority': BATCH_REJUDGE_PRIORITY if batch_rejudge else REJUDGE_PRIORITY if rejudge else priority,
})
except BaseException:
logger.exception('Failed to send request to judge')
- Submission.objects.filter(id=submission.id).update(status='IE')
+ Submission.objects.filter(id=submission.id).update(status='IE', result='IE')
success = False
else:
if response['name'] != 'submission-received' or response['submission-id'] != submission.id:
- Submission.objects.filter(id=submission.id).update(status='IE')
+ Submission.objects.filter(id=submission.id).update(status='IE', result='IE')
_post_update_submission(submission)
success = True
return success
@@ -109,9 +110,9 @@ def disconnect_judge(judge, force=False):
def abort_submission(submission):
from .models import Submission
response = judge_request({'name': 'terminate-submission', 'submission-id': submission.id})
- # This defaults to true, so that in the case the judgelist fails to remove the submission from the queue,
+ # This defaults to true, so that in the case the JudgeList fails to remove the submission from the queue,
# and returns a bad-request, the submission is not falsely shown as "Aborted" when it will still be judged.
if not response.get('judge-aborted', True):
Submission.objects.filter(id=submission.id).update(status='AB', result='AB')
event.post('sub_%s' % Submission.get_id_secret(submission.id), {'type': 'aborted-submission'})
- _post_update_submission(submission, done=True)
+ _post_update_submission(submission, done=True)
\ No newline at end of file
diff --git a/judge/management/commands/runbridged.py b/judge/management/commands/runbridged.py
index f20a9fe..c6f4536 100644
--- a/judge/management/commands/runbridged.py
+++ b/judge/management/commands/runbridged.py
@@ -1,33 +1,8 @@
-import threading
-
-from django.conf import settings
from django.core.management.base import BaseCommand
-from judge.bridge import DjangoHandler, DjangoServer
-from judge.bridge import DjangoJudgeHandler, JudgeServer
+from judge.bridge.daemon import judge_daemon
class Command(BaseCommand):
def handle(self, *args, **options):
- judge_handler = DjangoJudgeHandler
-
- try:
- import netaddr # noqa: F401, imported to see if it exists
- except ImportError:
- pass
- else:
- proxies = settings.BRIDGED_JUDGE_PROXIES
- if proxies:
- judge_handler = judge_handler.with_proxy_set(proxies)
-
- judge_server = JudgeServer(settings.BRIDGED_JUDGE_ADDRESS, judge_handler)
- django_server = DjangoServer(judge_server.judges, settings.BRIDGED_DJANGO_ADDRESS, DjangoHandler)
-
- # TODO: Merge the two servers
- threading.Thread(target=django_server.serve_forever).start()
- try:
- judge_server.serve_forever()
- except KeyboardInterrupt:
- pass
- finally:
- django_server.stop()
+ judge_daemon()
\ No newline at end of file
diff --git a/judge/migrations/0108_submission_judged_date.py b/judge/migrations/0108_submission_judged_date.py
new file mode 100644
index 0000000..5794ace
--- /dev/null
+++ b/judge/migrations/0108_submission_judged_date.py
@@ -0,0 +1,18 @@
+# Generated by Django 2.2.12 on 2020-07-19 21:07
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('judge', '0107_notification'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='submission',
+ name='judged_date',
+ field=models.DateTimeField(default=None, null=True, verbose_name='submission judge time'),
+ ),
+ ]
diff --git a/judge/models/submission.py b/judge/models/submission.py
index 0941eb2..e6c0208 100644
--- a/judge/models/submission.py
+++ b/judge/models/submission.py
@@ -78,6 +78,7 @@ class Submission(models.Model):
case_total = models.FloatField(verbose_name=_('test case total points'), default=0)
judged_on = models.ForeignKey('Judge', verbose_name=_('judged on'), null=True, blank=True,
on_delete=models.SET_NULL)
+ judged_date = models.DateTimeField(verbose_name=_('submission judge time'), default=None, null=True)
was_rejudged = models.BooleanField(verbose_name=_('was rejudged by admin'), default=False)
is_pretested = models.BooleanField(verbose_name=_('was ran on pretests only'), default=False)
contest_object = models.ForeignKey('Contest', verbose_name=_('contest'), null=True, blank=True,
@@ -112,8 +113,9 @@ class Submission(models.Model):
def long_status(self):
return Submission.USER_DISPLAY_CODES.get(self.short_status, '')
- def judge(self, rejudge=False, batch_rejudge=False):
- judge_submission(self, rejudge, batch_rejudge)
+ def judge(self, *args, **kwargs):
+ judge_submission(self, *args, **kwargs)
+
judge.alters_data = True
diff --git a/judge/views/problem.py b/judge/views/problem.py
index 639c799..4a82399 100644
--- a/judge/views/problem.py
+++ b/judge/views/problem.py
@@ -537,8 +537,15 @@ def problem_submit(request, problem=None, submission=None):
raise PermissionDenied()
profile = request.profile
+
+ if problem.is_editable_by(request.user):
+ judge_choices = tuple(Judge.objects.filter(online=True, problems=problem).values_list('name', 'name'))
+ else:
+ judge_choices = ()
+
if request.method == 'POST':
- form = ProblemSubmitForm(request.POST, instance=Submission(user=profile))
+ form = ProblemSubmitForm(request.POST, judge_choices=judge_choices,
+ instance=Submission(user=profile, problem=problem))
if form.is_valid():
if (not request.user.has_perm('judge.spam_submission') and
Submission.objects.filter(user=profile, was_rejudged=False)
@@ -586,7 +593,7 @@ def problem_submit(request, problem=None, submission=None):
# Save a query
model.source = source
- model.judge(rejudge=False)
+ model.judge(rejudge=False, judge_id=form.cleaned_data['judge'])
return HttpResponseRedirect(reverse('submission_status', args=[str(model.id)]))
else:
@@ -610,7 +617,7 @@ def problem_submit(request, problem=None, submission=None):
initial['language'] = sub.language
except ValueError:
raise Http404()
- form = ProblemSubmitForm(initial=initial)
+ form = ProblemSubmitForm(judge_choices=judge_choices, initial=initial)
form_data = initial
if 'problem' in form_data:
form.fields['language'].queryset = (
diff --git a/requirements.txt b/requirements.txt
index a7b9d9c..035bad7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -34,3 +34,4 @@ channels
channels-redis
docker
python-memcached
+netaddr
diff --git a/resources/problem.scss b/resources/problem.scss
index 7ab1a63..9c13337 100644
--- a/resources/problem.scss
+++ b/resources/problem.scss
@@ -222,9 +222,12 @@ ul.problem-list {
box-sizing: border-box;
.button {
- float: right;
+ display: inline-block !important;
padding: 6px 12px;
}
+ .submit-bar {
+ float: right;
+ }
}
@media (max-width: 550px) {
diff --git a/templates/problem/submit.html b/templates/problem/submit.html
index cf24c3c..3f4a217 100644
--- a/templates/problem/submit.html
+++ b/templates/problem/submit.html
@@ -2,8 +2,8 @@
{% block js_media %}
+ {{ form.media.js }}
{% compress js %}
- {{ form.media.js }}
{% endcompress %}
{% endblock %}
{% block media %}
+ {{ form.media.css }}
{% compress css %}
- {{ form.media.css }}