160 lines
5.6 KiB
Python
160 lines
5.6 KiB
Python
import logging
|
|
from collections import namedtuple
|
|
from operator import attrgetter
|
|
from threading import RLock
|
|
|
|
from judge.bridge.utils import VanishedSubmission
|
|
|
|
try:
|
|
from llist import dllist
|
|
except ImportError:
|
|
from pyllist import dllist
|
|
|
|
logger = logging.getLogger("judge.bridge")
|
|
|
|
PriorityMarker = namedtuple("PriorityMarker", "priority")
|
|
|
|
|
|
class JudgeList(object):
|
|
priorities = 4
|
|
|
|
def __init__(self):
|
|
self.queue = dllist()
|
|
self.priority = [
|
|
self.queue.append(PriorityMarker(i)) for i in range(self.priorities)
|
|
]
|
|
self.judges = set()
|
|
self.node_map = {}
|
|
self.submission_map = {}
|
|
self.lock = RLock()
|
|
|
|
def _handle_free_judge(self, judge):
|
|
with self.lock:
|
|
node = self.queue.first
|
|
while node:
|
|
if not isinstance(node.value, PriorityMarker):
|
|
id, problem, language, source, judge_id = node.value
|
|
if judge.can_judge(problem, language, judge_id):
|
|
self.submission_map[id] = judge
|
|
logger.info(
|
|
"Dispatched queued submission %d: %s", id, judge.name
|
|
)
|
|
try:
|
|
judge.submit(id, problem, language, source)
|
|
except VanishedSubmission:
|
|
pass
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to dispatch %d (%s, %s) to %s",
|
|
id,
|
|
problem,
|
|
language,
|
|
judge.name,
|
|
)
|
|
self.judges.remove(judge)
|
|
return
|
|
self.queue.remove(node)
|
|
del self.node_map[id]
|
|
break
|
|
node = node.next
|
|
|
|
def register(self, judge):
|
|
with self.lock:
|
|
# Disconnect all judges with the same name, see <https://github.com/DMOJ/online-judge/issues/828>
|
|
self.disconnect(judge, force=True)
|
|
self.judges.add(judge)
|
|
self._handle_free_judge(judge)
|
|
|
|
def disconnect(self, judge_id, force=False):
|
|
with self.lock:
|
|
for judge in self.judges:
|
|
if judge.name == judge_id:
|
|
judge.disconnect(force=force)
|
|
|
|
def update_problems(self, judge):
|
|
with self.lock:
|
|
self._handle_free_judge(judge)
|
|
|
|
def remove(self, judge):
|
|
with self.lock:
|
|
sub = judge.get_current_submission()
|
|
if sub is not None:
|
|
try:
|
|
del self.submission_map[sub]
|
|
except KeyError:
|
|
pass
|
|
self.judges.discard(judge)
|
|
|
|
def __iter__(self):
|
|
return iter(self.judges)
|
|
|
|
def on_judge_free(self, judge, submission):
|
|
with self.lock:
|
|
logger.info("Judge available after grading %d: %s", submission, judge.name)
|
|
del self.submission_map[submission]
|
|
judge._working = False
|
|
judge._working_data = {}
|
|
self._handle_free_judge(judge)
|
|
|
|
def abort(self, submission):
|
|
with self.lock:
|
|
logger.info("Abort request: %d", submission)
|
|
try:
|
|
self.submission_map[submission].abort()
|
|
return True
|
|
except KeyError:
|
|
try:
|
|
node = self.node_map[submission]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self.queue.remove(node)
|
|
del self.node_map[submission]
|
|
return False
|
|
|
|
def check_priority(self, priority):
|
|
return 0 <= priority < self.priorities
|
|
|
|
def judge(self, id, problem, language, source, judge_id, priority):
|
|
with self.lock:
|
|
if id in self.submission_map or id in self.node_map:
|
|
# Already judging, don't queue again. This can happen during batch rejudges, rejudges should be
|
|
# idempotent.
|
|
return
|
|
|
|
candidates = [
|
|
judge
|
|
for judge in self.judges
|
|
if not judge.working and judge.can_judge(problem, language, judge_id)
|
|
]
|
|
if judge_id:
|
|
logger.info(
|
|
"Specified judge %s is%savailable",
|
|
judge_id,
|
|
" " if candidates else " not ",
|
|
)
|
|
else:
|
|
logger.info("Free judges: %d", len(candidates))
|
|
if candidates:
|
|
# Schedule the submission on the judge reporting least load.
|
|
judge = min(candidates, key=attrgetter("load"))
|
|
logger.info("Dispatched submission %d to: %s", id, judge.name)
|
|
self.submission_map[id] = judge
|
|
try:
|
|
judge.submit(id, problem, language, source)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to dispatch %d (%s, %s) to %s",
|
|
id,
|
|
problem,
|
|
language,
|
|
judge.name,
|
|
)
|
|
self.judges.discard(judge)
|
|
return self.judge(id, problem, language, source, judge_id, priority)
|
|
else:
|
|
self.node_map[id] = self.queue.insert(
|
|
(id, problem, language, source, judge_id),
|
|
self.priority[priority],
|
|
)
|
|
logger.info("Queued submission: %d", id)
|