NDOJ/judge/bridge/judge_handler.py

966 lines
32 KiB
Python

import hmac
import json
import logging
import threading
import time
from collections import deque, namedtuple
from operator import itemgetter
from django import db
from django.conf import settings
from django.utils import timezone
from django.db.models import F
from django.core.cache import cache
from judge import event_poster as event
from judge.bridge.base_handler import ZlibPacketHandler, proxy_list
from judge.utils.problems import finished_submission
from judge.models import (
Judge,
Language,
LanguageLimit,
Problem,
RuntimeVersion,
Submission,
SubmissionTestCase,
)
from judge.bridge.utils import VanishedSubmission
from judge.caching import cache_wrapper
logger = logging.getLogger("judge.bridge")
json_log = logging.getLogger("judge.json.bridge")
UPDATE_RATE_LIMIT = 5
UPDATE_RATE_TIME = 0.5
SubmissionData = namedtuple(
"SubmissionData",
"time memory short_circuit pretests_only contest_no attempt_no user_id",
)
def _ensure_connection():
db.connection.close_if_unusable_or_obsolete()
class JudgeHandler(ZlibPacketHandler):
proxies = proxy_list(settings.BRIDGED_JUDGE_PROXIES or [])
def __init__(self, request, client_address, server, judges):
super().__init__(request, client_address, server)
self.judges = judges
self.handlers = {
"grading-begin": self.on_grading_begin,
"grading-end": self.on_grading_end,
"compile-error": self.on_compile_error,
"compile-message": self.on_compile_message,
"batch-begin": self.on_batch_begin,
"batch-end": self.on_batch_end,
"test-case-status": self.on_test_case,
"internal-error": self.on_internal_error,
"submission-terminated": self.on_submission_terminated,
"submission-acknowledged": self.on_submission_acknowledged,
"ping-response": self.on_ping_response,
"supported-problems": self.on_supported_problems,
"handshake": self.on_handshake,
}
self._working = False
self._working_data = {}
self._no_response_job = None
self.executors = {}
self.problems = set()
self.latency = None
self.time_delta = None
self.load = 1e100
self.name = None
self.batch_id = None
self.in_batch = False
self._stop_ping = threading.Event()
self._ping_average = deque(maxlen=6) # 1 minute average, just like load
self._time_delta = deque(maxlen=6)
# each value is (updates, last reset)
self.update_counter = {}
self.judge = None
self.judge_address = None
self._submission_cache_id = None
self._submission_cache = {}
def on_connect(self):
self.timeout = 15
logger.info("Judge connected from: %s", self.client_address)
json_log.info(self._make_json_log(action="connect"))
def on_disconnect(self):
self._stop_ping.set()
self.judges.remove(self)
if self.name is not None:
self._disconnected()
logger.info(
"Judge disconnected from: %s with name %s", self.client_address, self.name
)
json_log.info(
self._make_json_log(action="disconnect", info="judge disconnected")
)
if self._working:
self.judges.judge(
self._working,
self._working_data["problem"],
self._working_data["language"],
self._working_data["source"],
None,
0,
)
def _authenticate(self, id, key):
try:
judge = Judge.objects.get(name=id)
except Judge.DoesNotExist:
if settings.BRIDGED_AUTO_CREATE_JUDGE:
judge = Judge()
judge.name = id
judge.auth_key = key
judge.save()
result = True
else:
result = False
else:
if judge.is_blocked:
result = False
else:
result = hmac.compare_digest(judge.auth_key, key)
if not result:
json_log.warning(
self._make_json_log(
action="auth", judge=id, info="judge failed authentication"
)
)
return result
def _update_supported_problems(self, problem_packet):
# problem_packet is a dict {code: mtimes} from judge-server
self.problems = set(p for p, _ in problem_packet)
def _update_judge_problems(self):
chunk_size = 500
target_problem_codes = self.problems
current_problems = _get_judge_problems(self.judge)
updated = False
problems_to_add = list(target_problem_codes - current_problems)
problems_to_remove = list(current_problems - target_problem_codes)
if problems_to_add:
for i in range(0, len(problems_to_add), chunk_size):
chunk = problems_to_add[i : i + chunk_size]
problem_ids = Problem.objects.filter(code__in=chunk).values_list(
"id", flat=True
)
if not problem_ids:
continue
logger.info("%s: Add %d problems", self.name, len(problem_ids))
self.judge.problems.add(*problem_ids)
updated = True
if problems_to_remove:
for i in range(0, len(problems_to_remove), chunk_size):
chunk = problems_to_remove[i : i + chunk_size]
problem_ids = Problem.objects.filter(code__in=chunk).values_list(
"id", flat=True
)
if not problem_ids:
continue
logger.info("%s: Remove %d problems", self.name, len(problem_ids))
self.judge.problems.remove(*problem_ids)
updated = True
if updated:
_get_judge_problems.dirty(self.judge)
def _connected(self):
judge = self.judge = Judge.objects.get(name=self.name)
judge.start_time = timezone.now()
judge.online = True
self._update_judge_problems()
judge.runtimes.set(Language.objects.filter(key__in=list(self.executors.keys())))
# Delete now in case we somehow crashed and left some over from the last connection
RuntimeVersion.objects.filter(judge=judge).delete()
versions = []
for lang in judge.runtimes.all():
versions += [
RuntimeVersion(
language=lang,
name=name,
version=".".join(map(str, version)),
priority=idx,
judge=judge,
)
for idx, (name, version) in enumerate(self.executors[lang.key])
]
RuntimeVersion.objects.bulk_create(versions)
judge.last_ip = self.client_address[0]
judge.save()
self.judge_address = "[%s]:%s" % (
self.client_address[0],
self.client_address[1],
)
json_log.info(
self._make_json_log(
action="auth",
info="judge successfully authenticated",
executors=list(self.executors.keys()),
)
)
def _disconnected(self):
Judge.objects.filter(id=self.judge.id).update(online=False)
RuntimeVersion.objects.filter(judge=self.judge).delete()
self.judge.problems.clear()
_get_judge_problems.dirty(self.judge)
def _update_ping(self):
try:
Judge.objects.filter(name=self.name).update(
ping=self.latency, load=self.load
)
except Exception as e:
# What can I do? I don't want to tie this to MySQL.
if (
e.__class__.__name__ == "OperationalError"
and e.__module__ == "_mysql_exceptions"
and e.args[0] == 2006
):
db.connection.close()
def send(self, data):
super().send(json.dumps(data, separators=(",", ":")))
def on_handshake(self, packet):
if "id" not in packet or "key" not in packet:
logger.warning("Malformed handshake: %s", self.client_address)
self.close()
return
if not self._authenticate(packet["id"], packet["key"]):
logger.warning("Authentication failure: %s", self.client_address)
self.close()
return
self.timeout = 60
self._update_supported_problems(packet["problems"])
self.executors = packet["executors"]
self.name = packet["id"]
self.send({"name": "handshake-success"})
logger.info("Judge authenticated: %s (%s)", self.client_address, packet["id"])
self.judges.register(self)
threading.Thread(target=self._ping_thread).start()
self._connected()
def can_judge(self, problem, executor, judge_id=None):
return (
problem in self.problems
and executor in self.executors
and (not judge_id or self.name == judge_id)
)
@property
def working(self):
return bool(self._working)
def get_related_submission_data(self, submission):
_ensure_connection()
try:
(
pid,
time,
memory,
short_circuit,
lid,
is_pretested,
sub_date,
uid,
part_virtual,
part_id,
) = (
Submission.objects.filter(id=submission).values_list(
"problem__id",
"problem__time_limit",
"problem__memory_limit",
"problem__short_circuit",
"language__id",
"is_pretested",
"date",
"user__id",
"contest__participation__virtual",
"contest__participation__id",
)
).get()
except Submission.DoesNotExist:
logger.error("Submission vanished: %s", submission)
json_log.error(
self._make_json_log(
sub=self._working,
action="request",
info="submission vanished when fetching info",
)
)
return
attempt_no = (
Submission.objects.filter(
problem__id=pid,
contest__participation__id=part_id,
user__id=uid,
date__lt=sub_date,
)
.exclude(status__in=("CE", "IE"))
.count()
+ 1
)
try:
time, memory = (
LanguageLimit.objects.filter(problem__id=pid, language__id=lid)
.values_list("time_limit", "memory_limit")
.get()
)
except LanguageLimit.DoesNotExist:
pass
return SubmissionData(
time=time,
memory=memory,
short_circuit=short_circuit,
pretests_only=is_pretested,
contest_no=part_virtual,
attempt_no=attempt_no,
user_id=uid,
)
def disconnect(self, force=False):
if force:
# Yank the power out.
self.close()
else:
self.send({"name": "disconnect"})
def submit(self, id, problem, language, source):
data = self.get_related_submission_data(id)
if not data:
self._update_internal_error_submission(id, "Submission vanished")
raise VanishedSubmission()
self._working = id
self._working_data = {
"problem": problem,
"language": language,
"source": source,
}
self._no_response_job = threading.Timer(20, self._kill_if_no_response)
self.send(
{
"name": "submission-request",
"submission-id": id,
"problem-id": problem,
"language": language,
"source": source,
"time-limit": data.time,
"memory-limit": data.memory,
"short-circuit": data.short_circuit,
"meta": {
"pretests-only": data.pretests_only,
"in-contest": data.contest_no,
"attempt-no": data.attempt_no,
"user": data.user_id,
},
}
)
def _kill_if_no_response(self):
logger.error(
"Judge failed to acknowledge submission: %s: %s", self.name, self._working
)
self.close()
def on_timeout(self):
if self.name:
logger.warning("Judge seems dead: %s: %s", self.name, self._working)
def malformed_packet(self, exception):
logger.exception("Judge sent malformed packet: %s", self.name)
super(JudgeHandler, self).malformed_packet(exception)
def on_submission_processing(self, packet):
_ensure_connection()
id = packet["submission-id"]
if Submission.objects.filter(id=id).update(status="P", judged_on=self.judge):
event.post("sub_%s" % Submission.get_id_secret(id), {"type": "processing"})
self._post_update_submission(id, "processing")
json_log.info(self._make_json_log(packet, action="processing"))
else:
logger.warning("Unknown submission: %s", id)
json_log.error(
self._make_json_log(
packet, action="processing", info="unknown submission"
)
)
def on_submission_wrong_acknowledge(self, packet, expected, got):
json_log.error(
self._make_json_log(
packet, action="processing", info="wrong-acknowledge", expected=expected
)
)
Submission.objects.filter(id=expected).update(
status="IE", result="IE", error=None
)
Submission.objects.filter(id=got, status="QU").update(
status="IE", result="IE", error=None
)
def on_submission_acknowledged(self, packet):
if not packet.get("submission-id", None) == self._working:
logger.error(
"Wrong acknowledgement: %s: %s, expected: %s",
self.name,
packet.get("submission-id", None),
self._working,
)
self.on_submission_wrong_acknowledge(
packet, self._working, packet.get("submission-id", None)
)
self.close()
logger.info("Submission acknowledged: %d", self._working)
if self._no_response_job:
self._no_response_job.cancel()
self._no_response_job = None
self.on_submission_processing(packet)
def abort(self):
self.send({"name": "terminate-submission"})
def get_current_submission(self):
return self._working or None
def ping(self):
self.send({"name": "ping", "when": time.time()})
def on_packet(self, data):
try:
try:
data = json.loads(data)
if "name" not in data:
raise ValueError
except ValueError:
self.on_malformed(data)
else:
handler = self.handlers.get(data["name"], self.on_malformed)
handler(data)
except Exception:
logger.exception("Error in packet handling (Judge-side): %s", self.name)
self._packet_exception()
# You can't crash here because you aren't so sure about the judges
# not being malicious or simply malforms. THIS IS A SERVER!
def _packet_exception(self):
json_log.exception(
self._make_json_log(sub=self._working, info="packet processing exception")
)
def _submission_is_batch(self, id):
if not Submission.objects.filter(id=id).update(batch=True):
logger.warning("Unknown submission: %s", id)
def on_supported_problems(self, packet):
logger.info("%s: Updated problem list", self.name)
self._update_supported_problems(packet["problems"])
if not self.working:
self.judges.update_problems(self)
self._update_judge_problems()
json_log.info(
self._make_json_log(action="update-problems", count=len(self.problems))
)
def on_grading_begin(self, packet):
logger.info("%s: Grading has begun on: %s", self.name, packet["submission-id"])
self.batch_id = None
if Submission.objects.filter(id=packet["submission-id"]).update(
status="G",
is_pretested=packet["pretested"],
current_testcase=1,
points=0,
batch=False,
judged_date=timezone.now(),
):
SubmissionTestCase.objects.filter(
submission_id=packet["submission-id"]
).delete()
event.post(
"sub_%s" % Submission.get_id_secret(packet["submission-id"]),
{"type": "grading-begin"},
)
self._post_update_submission(packet["submission-id"], "grading-begin")
json_log.info(self._make_json_log(packet, action="grading-begin"))
else:
logger.warning("Unknown submission: %s", packet["submission-id"])
json_log.error(
self._make_json_log(
packet, action="grading-begin", info="unknown submission"
)
)
def on_grading_end(self, packet):
logger.info("%s: Grading has ended on: %s", self.name, packet["submission-id"])
self._free_self(packet)
self.batch_id = None
try:
submission = Submission.objects.get(id=packet["submission-id"])
except Submission.DoesNotExist:
logger.warning("Unknown submission: %s", packet["submission-id"])
json_log.error(
self._make_json_log(
packet, action="grading-end", info="unknown submission"
)
)
return
time = 0
memory = 0
points = 0.0
total = 0
status = 0
status_codes = ["SC", "AC", "WA", "MLE", "TLE", "IR", "RTE", "OLE"]
batches = {} # batch number: (points, total)
for case in SubmissionTestCase.objects.filter(submission=submission):
time += case.time
if not case.batch:
points += case.points
total += case.total
else:
if case.batch in batches:
batches[case.batch][0] += case.points
batches[case.batch][1] += case.total
else:
batches[case.batch] = [case.points, case.total]
memory = max(memory, case.memory)
i = status_codes.index(case.status)
if i > status:
status = i
for i in batches:
points += batches[i][0]
total += batches[i][1]
points = points
total = total
submission.case_points = points
submission.case_total = total
problem = submission.problem
sub_points = round(points / total * problem.points if total > 0 else 0, 3)
if not problem.partial and sub_points != problem.points:
sub_points = 0
submission.status = "D"
submission.time = time
submission.memory = memory
submission.points = sub_points
submission.result = status_codes[status]
submission.save()
json_log.info(
self._make_json_log(
packet,
action="grading-end",
time=time,
memory=memory,
points=sub_points,
total=problem.points,
result=submission.result,
case_points=points,
case_total=total,
user=submission.user_id,
problem=problem.code,
finish=True,
)
)
submission.user._updating_stats_only = True
submission.user.calculate_points()
problem._updating_stats_only = True
problem.update_stats()
submission.update_contest()
finished_submission(submission)
event.post(
"sub_%s" % submission.id_secret,
{
"type": "grading-end",
"time": time,
"memory": memory,
"points": float(points),
"total": float(problem.points),
"result": submission.result,
},
)
if hasattr(submission, "contest"):
participation = submission.contest.participation
event.post("contest_%d" % participation.contest_id, {"type": "update"})
self._post_update_submission(submission.id, "grading-end", done=True)
def on_compile_error(self, packet):
logger.info(
"%s: Submission failed to compile: %s", self.name, packet["submission-id"]
)
self._free_self(packet)
if Submission.objects.filter(id=packet["submission-id"]).update(
status="CE", result="CE", error=packet["log"]
):
event.post(
"sub_%s" % Submission.get_id_secret(packet["submission-id"]),
{
"type": "compile-error",
"log": packet["log"],
},
)
self._post_update_submission(
packet["submission-id"], "compile-error", done=True
)
json_log.info(
self._make_json_log(
packet,
action="compile-error",
log=packet["log"],
finish=True,
result="CE",
)
)
else:
logger.warning("Unknown submission: %s", packet["submission-id"])
json_log.error(
self._make_json_log(
packet,
action="compile-error",
info="unknown submission",
log=packet["log"],
finish=True,
result="CE",
)
)
def on_compile_message(self, packet):
logger.info(
"%s: Submission generated compiler messages: %s",
self.name,
packet["submission-id"],
)
if Submission.objects.filter(id=packet["submission-id"]).update(
error=packet["log"]
):
event.post(
"sub_%s" % Submission.get_id_secret(packet["submission-id"]),
{"type": "compile-message"},
)
json_log.info(
self._make_json_log(packet, action="compile-message", log=packet["log"])
)
else:
logger.warning("Unknown submission: %s", packet["submission-id"])
json_log.error(
self._make_json_log(
packet,
action="compile-message",
info="unknown submission",
log=packet["log"],
)
)
def on_internal_error(self, packet):
try:
raise ValueError("\n\n" + packet["message"])
except ValueError:
logger.exception(
"Judge %s failed while handling submission %s",
self.name,
packet["submission-id"],
)
self._free_self(packet)
id = packet["submission-id"]
self._update_internal_error_submission(id, packet["message"])
def _update_internal_error_submission(self, id, message):
if Submission.objects.filter(id=id).update(
status="IE", result="IE", error=message
):
event.post(
"sub_%s" % Submission.get_id_secret(id), {"type": "internal-error"}
)
self._post_update_submission(id, "internal-error", done=True)
json_log.info(
self._make_json_log(
sub=id,
action="internal-error",
message=message,
finish=True,
result="IE",
)
)
else:
logger.warning("Unknown submission: %s", id)
json_log.error(
self._make_json_log(
sub=id,
action="internal-error",
info="unknown submission",
message=message,
finish=True,
result="IE",
)
)
def on_submission_terminated(self, packet):
logger.info("%s: Submission aborted: %s", self.name, packet["submission-id"])
self._free_self(packet)
if Submission.objects.filter(id=packet["submission-id"]).update(
status="AB", result="AB"
):
event.post(
"sub_%s" % Submission.get_id_secret(packet["submission-id"]),
{"type": "aborted-submission"},
)
self._post_update_submission(
packet["submission-id"], "terminated", done=True
)
json_log.info(
self._make_json_log(packet, action="aborted", finish=True, result="AB")
)
else:
logger.warning("Unknown submission: %s", packet["submission-id"])
json_log.error(
self._make_json_log(
packet,
action="aborted",
info="unknown submission",
finish=True,
result="AB",
)
)
def on_batch_begin(self, packet):
logger.info("%s: Batch began on: %s", self.name, packet["submission-id"])
self.in_batch = True
if self.batch_id is None:
self.batch_id = 0
self._submission_is_batch(packet["submission-id"])
self.batch_id += 1
json_log.info(
self._make_json_log(packet, action="batch-begin", batch=self.batch_id)
)
def on_batch_end(self, packet):
self.in_batch = False
logger.info("%s: Batch ended on: %s", self.name, packet["submission-id"])
json_log.info(
self._make_json_log(packet, action="batch-end", batch=self.batch_id)
)
def on_test_case(
self,
packet,
max_feedback=SubmissionTestCase._meta.get_field("feedback").max_length,
):
logger.info(
"%s: %d test case(s) completed on: %s",
self.name,
len(packet["cases"]),
packet["submission-id"],
)
id = packet["submission-id"]
updates = packet["cases"]
max_position = max(map(itemgetter("position"), updates))
sum_points = sum(map(itemgetter("points"), updates))
if not Submission.objects.filter(id=id).update(
current_testcase=max_position + 1, points=F("points") + sum_points
):
logger.warning("Unknown submission: %s", id)
json_log.error(
self._make_json_log(
packet, action="test-case", info="unknown submission"
)
)
return
bulk_test_case_updates = []
for result in updates:
test_case = SubmissionTestCase(submission_id=id, case=result["position"])
status = result["status"]
if status & 4:
test_case.status = "TLE"
elif status & 8:
test_case.status = "MLE"
elif status & 64:
test_case.status = "OLE"
elif status & 2:
test_case.status = "RTE"
elif status & 16:
test_case.status = "IR"
elif status & 1:
test_case.status = "WA"
elif status & 32:
test_case.status = "SC"
else:
test_case.status = "AC"
test_case.time = result["time"]
test_case.memory = result["memory"]
test_case.points = result["points"]
test_case.total = result["total-points"]
test_case.batch = self.batch_id if self.in_batch else None
test_case.feedback = (result.get("feedback") or "")[:max_feedback]
test_case.extended_feedback = result.get("extended-feedback") or ""
test_case.output = result["output"]
bulk_test_case_updates.append(test_case)
json_log.info(
self._make_json_log(
packet,
action="test-case",
case=test_case.case,
batch=test_case.batch,
time=test_case.time,
memory=test_case.memory,
feedback=test_case.feedback,
extended_feedback=test_case.extended_feedback,
output=test_case.output,
points=test_case.points,
total=test_case.total,
status=test_case.status,
)
)
do_post = True
if id in self.update_counter:
cnt, reset = self.update_counter[id]
cnt += 1
if time.monotonic() - reset > UPDATE_RATE_TIME:
del self.update_counter[id]
else:
self.update_counter[id] = (cnt, reset)
if cnt > UPDATE_RATE_LIMIT:
do_post = False
if id not in self.update_counter:
self.update_counter[id] = (1, time.monotonic())
if do_post:
event.post(
"sub_%s" % Submission.get_id_secret(id),
{
"type": "test-case",
"id": max_position,
},
)
self._post_update_submission(id, state="test-case")
SubmissionTestCase.objects.bulk_create(bulk_test_case_updates)
def on_malformed(self, packet):
logger.error("%s: Malformed packet: %s", self.name, packet)
json_log.exception(
self._make_json_log(sub=self._working, info="malformed json packet")
)
def on_ping_response(self, packet):
end = time.time()
self._ping_average.append(end - packet["when"])
self._time_delta.append((end + packet["when"]) / 2 - packet["time"])
self.latency = sum(self._ping_average) / len(self._ping_average)
self.time_delta = sum(self._time_delta) / len(self._time_delta)
self.load = packet["load"]
self._update_ping()
def _free_self(self, packet):
self.judges.on_judge_free(self, packet["submission-id"])
def _ping_thread(self):
try:
while True:
self.ping()
if self._stop_ping.wait(10):
break
except Exception:
logger.exception("Ping error in %s", self.name)
self.close()
raise
def _make_json_log(self, packet=None, sub=None, **kwargs):
data = {
"judge": self.name,
"address": self.judge_address,
}
if sub is None and packet is not None:
sub = packet.get("submission-id")
if sub is not None:
data["submission"] = sub
data.update(kwargs)
return json.dumps(data)
def _post_update_submission(self, id, state, done=False):
if self._submission_cache_id == id:
data = self._submission_cache
else:
self._submission_cache = data = (
Submission.objects.filter(id=id)
.values(
"problem__is_public",
"contest_object__key",
"user_id",
"problem_id",
"status",
"language__key",
)
.get()
)
self._submission_cache_id = id
if data["problem__is_public"]:
event.post(
"submissions",
{
"type": "done-submission" if done else "update-submission",
"state": state,
"id": id,
"contest": data["contest_object__key"],
"user": data["user_id"],
"problem": data["problem_id"],
"status": data["status"],
"language": data["language__key"],
},
)
def on_cleanup(self):
db.connection.close()
@cache_wrapper(prefix="gjp", timeout=3600)
def _get_judge_problems(judge):
return set(judge.problems.values_list("code", flat=True))