import json import os.path import zipfile from operator import attrgetter from django.conf import settings from django.contrib.auth.mixins import LoginRequiredMixin from django.core.cache import cache from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import PermissionDenied from django.db.models import Prefetch from django.db.models import Q from django.http import Http404 from django.http import HttpResponse from django.http import HttpResponseBadRequest from django.http import HttpResponseRedirect from django.http import JsonResponse from django.shortcuts import get_object_or_404 from django.shortcuts import render from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property from django.utils.html import escape from django.utils.html import format_html from django.utils.safestring import mark_safe from django.utils.translation import gettext as _ from django.utils.translation import gettext_lazy from django.views.decorators.http import require_POST from django.views.generic import DetailView from django.views.generic import ListView from judge import event_poster as event from judge.highlight_code import highlight_code from judge.models import Contest from judge.models import Language from judge.models import Problem from judge.models import ProblemTestCase from judge.models import ProblemTranslation from judge.models import Profile from judge.models import Submission from judge.utils.problems import get_result_data from judge.utils.problems import user_authored_ids from judge.utils.problems import user_completed_ids from judge.utils.problems import user_editable_ids from judge.utils.problem_data import get_problem_case from judge.utils.raw_sql import join_sql_subquery, use_straight_join from judge.utils.views import DiggPaginatorMixin from judge.utils.views import TitleMixin def submission_related(queryset): return queryset.select_related("user__user", "problem", "language").only( "id", "user__user__username", "user__display_rank", "user__rating", "problem__name", "problem__code", "problem__is_public", "language__short_name", "language__key", "date", "time", "memory", "points", "result", "status", "case_points", "case_total", "current_testcase", "contest_object", ) class SubmissionMixin(object): model = Submission context_object_name = "submission" pk_url_kwarg = "submission" class SubmissionDetailBase(LoginRequiredMixin, TitleMixin, SubmissionMixin, DetailView): def get_object(self, queryset=None): submission = super(SubmissionDetailBase, self).get_object(queryset) profile = self.request.profile problem = submission.problem if self.request.user.has_perm("judge.view_all_submission"): return submission if submission.user_id == profile.id: return submission if problem.is_editor(profile): return submission if problem.is_public or problem.testers.filter(id=profile.id).exists(): if Submission.objects.filter( user_id=profile.id, result="AC", problem_id=problem.id, points=problem.points, ).exists(): return submission if hasattr( submission, "contest" ) and submission.contest.participation.contest.is_editable_by( self.request.user ): return submission raise PermissionDenied() def get_title(self): submission = self.object return _("Submission of %(problem)s by %(user)s") % { "problem": submission.problem.translated_name(self.request.LANGUAGE_CODE), "user": submission.user.user.username, } def get_content_title(self): submission = self.object return mark_safe( escape(_("Submission of %(problem)s by %(user)s")) % { "problem": format_html( '{1}', reverse("problem_detail", args=[submission.problem.code]), submission.problem.translated_name(self.request.LANGUAGE_CODE), ), "user": format_html( '{1}', reverse("user_page", args=[submission.user.user.username]), submission.user.user.username, ), } ) class SubmissionSource(SubmissionDetailBase): template_name = "submission/source.html" def get_queryset(self): return super().get_queryset().select_related("source") def get_context_data(self, **kwargs): context = super(SubmissionSource, self).get_context_data(**kwargs) submission = self.object context["raw_source"] = submission.source.source.rstrip("\n") context["highlighted_source"] = highlight_code( submission.source.source, submission.language.pygments, linenos=False ) return context def make_batch(batch, cases): result = {"id": batch, "cases": cases} if batch: result["points"] = min(map(attrgetter("points"), cases)) result["total"] = max(map(attrgetter("total"), cases)) return result def group_test_cases(cases): result = [] buf = [] last = None for case in cases: if case.batch != last and buf: result.append(make_batch(last, buf)) buf = [] buf.append(case) last = case.batch if buf: result.append(make_batch(last, buf)) return result def get_cases_data(submission): testcases = ProblemTestCase.objects.filter(dataset=submission.problem).order_by( "order" ) if submission.is_pretested: testcases = testcases.filter(is_pretest=True) files = [] for case in testcases: if case.input_file: files.append(case.input_file) if case.output_file: files.append(case.output_file) case_data = get_problem_case(submission.problem, files) problem_data = {} count = 0 for case in testcases: if case.type != "C": continue count += 1 problem_data[count] = { "input": case_data[case.input_file] if case.input_file else "", "answer": case_data[case.output_file] if case.output_file else "", } return problem_data class SubmissionStatus(SubmissionDetailBase): template_name = "submission/status.html" def access_testcases_in_contest(self): contest = self.object.contest_or_none if contest is None: return False if contest.problem.problem.is_editable_by(self.request.user): return True if contest.problem.contest.is_in_contest(self.request.user): return False if contest.participation.ended: return True return False def get_context_data(self, **kwargs): context = super(SubmissionStatus, self).get_context_data(**kwargs) submission = self.object context["last_msg"] = event.last() context["batches"] = group_test_cases(submission.test_cases.all()) context["time_limit"] = submission.problem.time_limit context["can_see_testcases"] = False contest = submission.contest_or_none prefix_length = 0 can_see_testcases = self.access_testcases_in_contest() if contest is not None: prefix_length = contest.problem.output_prefix_override if contest is None or prefix_length > 0 or can_see_testcases: context["cases_data"] = get_cases_data(submission) context["can_see_testcases"] = True try: lang_limit = submission.problem.language_limits.get( language=submission.language ) except ObjectDoesNotExist: pass else: context["time_limit"] = lang_limit.time_limit return context class SubmissionTestCaseQuery(SubmissionStatus): template_name = "submission/status-testcases.html" def get(self, request, *args, **kwargs): if "id" not in request.GET or not request.GET["id"].isdigit(): return HttpResponseBadRequest() self.kwargs[self.pk_url_kwarg] = kwargs[self.pk_url_kwarg] = int( request.GET["id"] ) return super(SubmissionTestCaseQuery, self).get(request, *args, **kwargs) class SubmissionSourceRaw(SubmissionSource): def get(self, request, *args, **kwargs): submission = self.get_object() return HttpResponse(submission.source.source, content_type="text/plain") @require_POST def abort_submission(request, submission): submission = get_object_or_404(Submission, id=int(submission)) # if (not request.user.is_authenticated or (submission.was_rejudged or (request.profile != submission.user)) and # not request.user.has_perm('abort_any_submission')): # raise PermissionDenied() if not request.user.is_authenticated or not request.user.has_perm( "abort_any_submission" ): raise PermissionDenied() submission.abort() return HttpResponseRedirect(reverse("submission_status", args=(submission.id,))) class SubmissionsListBase(DiggPaginatorMixin, TitleMixin, ListView): model = Submission paginate_by = 50 show_problem = True title = gettext_lazy("All submissions") content_title = gettext_lazy("All submissions") tab = "all_submissions_list" template_name = "submission/list.html" context_object_name = "submissions" first_page_href = None def get_result_data(self): result = self._get_result_data() for category in result["categories"]: category["name"] = _(category["name"]) return result def _get_result_data(self): return get_result_data(self.get_queryset().order_by()) def access_check(self, request): pass @cached_property def in_contest(self): return ( self.request.user.is_authenticated and self.request.profile.current_contest is not None and self.request.in_contest_mode ) @cached_property def contest(self): return self.request.profile.current_contest.contest def _get_queryset(self): queryset = Submission.objects.all() use_straight_join(queryset) queryset = submission_related(queryset.order_by("-id")) if self.show_problem: queryset = queryset.prefetch_related( Prefetch( "problem__translations", queryset=ProblemTranslation.objects.filter( language=self.request.LANGUAGE_CODE ), to_attr="_trans", ) ) if self.in_contest: queryset = queryset.filter(contest_object=self.contest) if not self.contest.can_see_full_scoreboard(self.request.user): queryset = queryset.filter(user=self.request.profile) else: queryset = queryset.select_related("contest_object").defer( "contest_object__description" ) # This is not technically correct since contest organizers *should* see these, but # the join would be far too messy if not self.request.user.has_perm("judge.see_private_contest"): # Show submissions for any contest you can edit or visible scoreboard contest_queryset = Contest.objects.filter( Q(authors=self.request.profile) | Q(curators=self.request.profile) | Q(scoreboard_visibility=Contest.SCOREBOARD_VISIBLE) | Q(end_time__lt=timezone.now()) ).distinct() queryset = queryset.filter( Q(user=self.request.profile) | Q(contest_object__in=contest_queryset) | Q(contest_object__isnull=True) ) if self.selected_languages: queryset = queryset.filter( language__in=Language.objects.filter(key__in=self.selected_languages) ) if self.selected_statuses: queryset = queryset.filter(result__in=self.selected_statuses) return queryset def get_queryset(self): queryset = self._get_queryset() if not self.in_contest: join_sql_subquery( queryset, subquery=str( Problem.get_visible_problems(self.request.user) .distinct() .only("id") .query ), params=[], join_fields=[("problem_id", "id")], alias="visible_problems", ) return queryset def get_my_submissions_page(self): return None def get_all_submissions_page(self): return reverse("all_submissions") def get_searchable_status_codes(self): hidden_codes = ["SC"] if not self.request.user.is_superuser and not self.request.user.is_staff: hidden_codes += ["IE"] return [ (key, value) for key, value in Submission.RESULT if key not in hidden_codes ] def get_context_data(self, **kwargs): context = super(SubmissionsListBase, self).get_context_data(**kwargs) authenticated = self.request.user.is_authenticated context["dynamic_update"] = False context["show_problem"] = self.show_problem context["completed_problem_ids"] = ( user_completed_ids(self.request.profile) if authenticated else [] ) context["authored_problem_ids"] = ( user_authored_ids(self.request.profile) if authenticated else [] ) context["editable_problem_ids"] = ( user_editable_ids(self.request.profile) if authenticated else [] ) context["all_languages"] = Language.objects.all().values_list("key", "name") context["selected_languages"] = self.selected_languages context["all_statuses"] = self.get_searchable_status_codes() context["selected_statuses"] = self.selected_statuses context["results_json"] = mark_safe(json.dumps(self.get_result_data())) context["results_colors_json"] = mark_safe( json.dumps(settings.DMOJ_STATS_SUBMISSION_RESULT_COLORS) ) context["page_suffix"] = suffix = ( ("?" + self.request.GET.urlencode()) if self.request.GET else "" ) context["first_page_href"] = (self.first_page_href or ".") + suffix context["my_submissions_link"] = self.get_my_submissions_page() context["all_submissions_link"] = self.get_all_submissions_page() context["tab"] = self.tab return context def get(self, request, *args, **kwargs): check = self.access_check(request) if check is not None: return check self.selected_languages = set(request.GET.getlist("language")) self.selected_statuses = set(request.GET.getlist("status")) if "results" in request.GET: return JsonResponse(self.get_result_data()) return super(SubmissionsListBase, self).get(request, *args, **kwargs) class UserMixin(object): def get(self, request, *args, **kwargs): if "user" not in kwargs: raise ImproperlyConfigured("Must pass a user") self.profile = get_object_or_404(Profile, user__username=kwargs["user"]) self.username = kwargs["user"] return super(UserMixin, self).get(request, *args, **kwargs) class ConditionalUserTabMixin(object): def get_context_data(self, **kwargs): context = super(ConditionalUserTabMixin, self).get_context_data(**kwargs) if self.request.user.is_authenticated and self.request.profile == self.profile: context["tab"] = "my_submissions_tab" else: context["tab"] = "user_submissions_tab" context["tab_username"] = self.profile.user.username return context class AllUserSubmissions(ConditionalUserTabMixin, UserMixin, SubmissionsListBase): def get_queryset(self): return ( super(AllUserSubmissions, self) .get_queryset() .filter(user_id=self.profile.id) ) def get_title(self): if self.request.user.is_authenticated and self.request.profile == self.profile: return _("All my submissions") return _("All submissions by %s") % self.username def get_content_title(self): if self.request.user.is_authenticated and self.request.profile == self.profile: return format_html("All my submissions") return format_html( 'All submissions by {0}', self.username, reverse("user_page", args=[self.username]), ) def get_my_submissions_page(self): if self.request.user.is_authenticated: return reverse( "all_user_submissions", kwargs={"user": self.request.user.username} ) def get_context_data(self, **kwargs): context = super(AllUserSubmissions, self).get_context_data(**kwargs) context["dynamic_update"] = context["page_obj"].number == 1 context["dynamic_user_id"] = self.profile.id context["last_msg"] = event.last() return context class ProblemSubmissionsBase(SubmissionsListBase): show_problem = False dynamic_update = True check_contest_in_access_check = True def get_queryset(self): if ( self.in_contest and not self.contest.contest_problems.filter( problem_id=self.problem.id ).exists() ): raise Http404() return ( super(ProblemSubmissionsBase, self) ._get_queryset() .filter(problem_id=self.problem.id) ) def get_title(self): return _("All submissions for %s") % self.problem_name def get_content_title(self): return format_html( 'All submissions for {0}', self.problem_name, reverse("problem_detail", args=[self.problem.code]), ) def access_check_contest(self, request): if self.in_contest and not self.contest.can_see_own_scoreboard(request.user): raise Http404() def access_check(self, request): if not self.problem.is_accessible_by(request.user): raise Http404() if self.check_contest_in_access_check: self.access_check_contest(request) def get(self, request, *args, **kwargs): if "problem" not in kwargs: raise ImproperlyConfigured(_("Must pass a problem")) self.problem = get_object_or_404(Problem, code=kwargs["problem"]) self.problem_name = self.problem.translated_name(self.request.LANGUAGE_CODE) return super(ProblemSubmissionsBase, self).get(request, *args, **kwargs) def get_all_submissions_page(self): return reverse( "chronological_submissions", kwargs={"problem": self.problem.code} ) def get_context_data(self, **kwargs): context = super(ProblemSubmissionsBase, self).get_context_data(**kwargs) if self.dynamic_update: context["dynamic_update"] = context["page_obj"].number == 1 context["dynamic_problem_id"] = self.problem.id context["last_msg"] = event.last() context["best_submissions_link"] = reverse( "ranked_submissions", kwargs={"problem": self.problem.code} ) return context class ProblemSubmissions(ProblemSubmissionsBase): def get_my_submissions_page(self): if self.request.user.is_authenticated: return reverse( "user_submissions", kwargs={ "problem": self.problem.code, "user": self.request.user.username, }, ) class UserProblemSubmissions(ConditionalUserTabMixin, UserMixin, ProblemSubmissions): check_contest_in_access_check = False @cached_property def is_own(self): return ( self.request.user.is_authenticated and self.request.profile == self.profile ) def access_check(self, request): super(UserProblemSubmissions, self).access_check(request) if not self.is_own: self.access_check_contest(request) def get_queryset(self): return ( super(UserProblemSubmissions, self) .get_queryset() .filter(user_id=self.profile.id) ) def get_title(self): if self.is_own: return _("My submissions for %(problem)s") % {"problem": self.problem_name} return _("%(user)s's submissions for %(problem)s") % { "user": self.username, "problem": self.problem_name, } def get_content_title(self): if self.request.user.is_authenticated and self.request.profile == self.profile: return format_html( """My submissions for {2}""", self.username, reverse("user_page", args=[self.username]), self.problem_name, reverse("problem_detail", args=[self.problem.code]), ) return format_html( """{0}'s submissions for {2}""", self.username, reverse("user_page", args=[self.username]), self.problem_name, reverse("problem_detail", args=[self.problem.code]), ) def get_context_data(self, **kwargs): context = super(UserProblemSubmissions, self).get_context_data(**kwargs) context["dynamic_user_id"] = self.profile.id return context def single_submission(request, submission_id, show_problem=True): request.no_profile_update = True authenticated = request.user.is_authenticated submission = get_object_or_404( submission_related(Submission.objects.all()), id=int(submission_id) ) if not submission.problem.is_accessible_by(request.user): raise Http404() return render( request, "submission/row.html", { "submission": submission, "authored_problem_ids": user_authored_ids(request.profile) if authenticated else [], "completed_problem_ids": user_completed_ids(request.profile) if authenticated else [], "editable_problem_ids": user_editable_ids(request.profile) if authenticated else [], "show_problem": show_problem, "problem_name": show_problem and submission.problem.translated_name(request.LANGUAGE_CODE), "profile_id": request.profile.id if authenticated else 0, }, ) def single_submission_query(request): request.no_profile_update = True if "id" not in request.GET or not request.GET["id"].isdigit(): return HttpResponseBadRequest() try: show_problem = int(request.GET.get("show_problem", "1")) except ValueError: return HttpResponseBadRequest() return single_submission(request, int(request.GET["id"]), bool(show_problem)) class AllSubmissions(SubmissionsListBase): stats_update_interval = 3600 def get_my_submissions_page(self): if self.request.user.is_authenticated: return reverse( "all_user_submissions", kwargs={"user": self.request.user.username} ) def get_context_data(self, **kwargs): context = super(AllSubmissions, self).get_context_data(**kwargs) context["dynamic_update"] = context["page_obj"].number == 1 context["last_msg"] = event.last() context["stats_update_interval"] = self.stats_update_interval return context def _get_result_data(self): if self.in_contest or self.selected_languages or self.selected_statuses: return super(AllSubmissions, self)._get_result_data() key = "global_submission_result_data" result = cache.get(key) if result: return result result = super(AllSubmissions, self)._get_result_data() cache.set(key, result, self.stats_update_interval) return result class ForceContestMixin(object): @property def in_contest(self): return True @property def contest(self): return self._contest def access_check(self, request): super(ForceContestMixin, self).access_check(request) if not request.user.has_perm("judge.see_private_contest"): if not self.contest.is_visible: raise Http404() if ( self.contest.start_time is not None and self.contest.start_time > timezone.now() ): raise Http404() def get_problem_number(self, problem): return ( self.contest.contest_problems.select_related("problem") .get(problem=problem) .order ) def get(self, request, *args, **kwargs): if "contest" not in kwargs: raise ImproperlyConfigured(_("Must pass a contest")) self._contest = get_object_or_404(Contest, key=kwargs["contest"]) return super(ForceContestMixin, self).get(request, *args, **kwargs) class UserContestSubmissions(ForceContestMixin, UserProblemSubmissions): def get_title(self): if self.problem.is_accessible_by(self.request.user): return "%s's submissions for %s in %s" % ( self.username, self.problem_name, self.contest.name, ) return "%s's submissions for problem %s in %s" % ( self.username, self.get_problem_number(self.problem), self.contest.name, ) def access_check(self, request): super(UserContestSubmissions, self).access_check(request) if not self.contest.users.filter(user_id=self.profile.id).exists(): raise Http404() def get_content_title(self): if self.problem.is_accessible_by(self.request.user): return format_html( _( '{0}\'s submissions for ' '{2} in {4}' ), self.username, reverse("user_page", args=[self.username]), self.problem_name, reverse("problem_detail", args=[self.problem.code]), self.contest.name, reverse("contest_view", args=[self.contest.key]), ) return format_html( _( '{0}\'s submissions for ' 'problem {2} in {3}' ), self.username, reverse("user_page", args=[self.username]), self.get_problem_number(self.problem), self.contest.name, reverse("contest_view", args=[self.contest.key]), )