2020-01-21 06:35:58 +00:00
|
|
|
import logging
|
|
|
|
from collections import namedtuple
|
|
|
|
from operator import attrgetter
|
|
|
|
from threading import RLock
|
|
|
|
|
|
|
|
try:
|
|
|
|
from llist import dllist
|
|
|
|
except ImportError:
|
|
|
|
from pyllist import dllist
|
|
|
|
|
|
|
|
logger = logging.getLogger('judge.bridge')
|
|
|
|
|
|
|
|
PriorityMarker = namedtuple('PriorityMarker', 'priority')
|
|
|
|
|
|
|
|
|
|
|
|
class JudgeList(object):
|
|
|
|
priorities = 4
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.queue = dllist()
|
|
|
|
self.priority = [self.queue.append(PriorityMarker(i)) for i in range(self.priorities)]
|
|
|
|
self.judges = set()
|
|
|
|
self.node_map = {}
|
|
|
|
self.submission_map = {}
|
|
|
|
self.lock = RLock()
|
|
|
|
|
|
|
|
def _handle_free_judge(self, judge):
|
|
|
|
with self.lock:
|
|
|
|
node = self.queue.first
|
|
|
|
while node:
|
|
|
|
if not isinstance(node.value, PriorityMarker):
|
2020-07-19 21:27:14 +00:00
|
|
|
id, problem, language, source, judge_id = node.value
|
|
|
|
if judge.can_judge(problem, language, judge_id):
|
2020-01-21 06:35:58 +00:00
|
|
|
self.submission_map[id] = judge
|
|
|
|
logger.info('Dispatched queued submission %d: %s', id, judge.name)
|
|
|
|
try:
|
|
|
|
judge.submit(id, problem, language, source)
|
|
|
|
except Exception:
|
|
|
|
logger.exception('Failed to dispatch %d (%s, %s) to %s', id, problem, language, judge.name)
|
|
|
|
self.judges.remove(judge)
|
|
|
|
return
|
|
|
|
self.queue.remove(node)
|
|
|
|
del self.node_map[id]
|
|
|
|
break
|
|
|
|
node = node.next
|
|
|
|
|
|
|
|
def register(self, judge):
|
|
|
|
with self.lock:
|
|
|
|
# Disconnect all judges with the same name, see <https://github.com/DMOJ/online-judge/issues/828>
|
|
|
|
self.disconnect(judge, force=True)
|
|
|
|
self.judges.add(judge)
|
|
|
|
self._handle_free_judge(judge)
|
|
|
|
|
|
|
|
def disconnect(self, judge_id, force=False):
|
2020-07-19 21:27:14 +00:00
|
|
|
with self.lock:
|
|
|
|
for judge in self.judges:
|
|
|
|
if judge.name == judge_id:
|
|
|
|
judge.disconnect(force=force)
|
2020-01-21 06:35:58 +00:00
|
|
|
|
|
|
|
def update_problems(self, judge):
|
|
|
|
with self.lock:
|
|
|
|
self._handle_free_judge(judge)
|
|
|
|
|
|
|
|
def remove(self, judge):
|
|
|
|
with self.lock:
|
|
|
|
sub = judge.get_current_submission()
|
|
|
|
if sub is not None:
|
|
|
|
try:
|
|
|
|
del self.submission_map[sub]
|
|
|
|
except KeyError:
|
|
|
|
pass
|
|
|
|
self.judges.discard(judge)
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
return iter(self.judges)
|
|
|
|
|
|
|
|
def on_judge_free(self, judge, submission):
|
|
|
|
with self.lock:
|
|
|
|
logger.info('Judge available after grading %d: %s', submission, judge.name)
|
|
|
|
del self.submission_map[submission]
|
2020-07-19 21:27:14 +00:00
|
|
|
judge._working = False
|
2020-01-21 06:35:58 +00:00
|
|
|
self._handle_free_judge(judge)
|
|
|
|
|
|
|
|
def abort(self, submission):
|
|
|
|
with self.lock:
|
|
|
|
logger.info('Abort request: %d', submission)
|
|
|
|
try:
|
|
|
|
self.submission_map[submission].abort()
|
|
|
|
return True
|
|
|
|
except KeyError:
|
|
|
|
try:
|
|
|
|
node = self.node_map[submission]
|
|
|
|
except KeyError:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
self.queue.remove(node)
|
|
|
|
del self.node_map[submission]
|
|
|
|
return False
|
|
|
|
|
|
|
|
def check_priority(self, priority):
|
|
|
|
return 0 <= priority < self.priorities
|
|
|
|
|
2020-07-19 21:27:14 +00:00
|
|
|
def judge(self, id, problem, language, source, judge_id, priority):
|
2020-01-21 06:35:58 +00:00
|
|
|
with self.lock:
|
|
|
|
if id in self.submission_map or id in self.node_map:
|
|
|
|
# Already judging, don't queue again. This can happen during batch rejudges, rejudges should be
|
|
|
|
# idempotent.
|
|
|
|
return
|
|
|
|
|
2020-07-19 21:27:14 +00:00
|
|
|
candidates = [
|
|
|
|
judge for judge in self.judges if not judge.working and judge.can_judge(problem, language, judge_id)
|
|
|
|
]
|
|
|
|
if judge_id:
|
|
|
|
logger.info('Specified judge %s is%savailable', judge_id, ' ' if candidates else ' not ')
|
|
|
|
else:
|
|
|
|
logger.info('Free judges: %d', len(candidates))
|
2020-01-21 06:35:58 +00:00
|
|
|
if candidates:
|
|
|
|
# Schedule the submission on the judge reporting least load.
|
|
|
|
judge = min(candidates, key=attrgetter('load'))
|
|
|
|
logger.info('Dispatched submission %d to: %s', id, judge.name)
|
|
|
|
self.submission_map[id] = judge
|
|
|
|
try:
|
|
|
|
judge.submit(id, problem, language, source)
|
|
|
|
except Exception:
|
|
|
|
logger.exception('Failed to dispatch %d (%s, %s) to %s', id, problem, language, judge.name)
|
|
|
|
self.judges.discard(judge)
|
2020-07-19 21:27:14 +00:00
|
|
|
return self.judge(id, problem, language, source, judge_id, priority)
|
2020-01-21 06:35:58 +00:00
|
|
|
else:
|
2020-07-19 21:27:14 +00:00
|
|
|
self.node_map[id] = self.queue.insert(
|
|
|
|
(id, problem, language, source, judge_id),
|
|
|
|
self.priority[priority],
|
|
|
|
)
|
2020-01-21 06:35:58 +00:00
|
|
|
logger.info('Queued submission: %d', id)
|