import json
from operator import attrgetter
from django.conf import settings
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured, ObjectDoesNotExist, PermissionDenied
from django.db.models import Prefetch, Q
from django.http import Http404, HttpResponse, HttpResponseBadRequest, HttpResponseRedirect, JsonResponse
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.html import escape, format_html
from django.utils.safestring import mark_safe
from django.utils.translation import gettext as _, gettext_lazy
from django.views.decorators.http import require_POST
from django.views.generic import DetailView, ListView
from judge import event_poster as event
from judge.highlight_code import highlight_code
from judge.models import Contest, Language, Problem, ProblemTranslation, Profile, Submission, ProblemData, ProblemTestCase
from judge.utils.problems import get_result_data, user_authored_ids, user_completed_ids, user_editable_ids
from judge.utils.raw_sql import use_straight_join
from judge.utils.views import DiggPaginatorMixin, TitleMixin
import os.path
import zipfile
def submission_related(queryset):
return queryset.select_related('user__user', 'problem', 'language') \
.only('id', 'user__user__username', 'user__display_rank', 'user__rating', 'problem__name',
'problem__code', 'problem__is_public', 'language__short_name', 'language__key', 'date', 'time', 'memory',
'points', 'result', 'status', 'case_points', 'case_total', 'current_testcase', 'contest_object')
class SubmissionMixin(object):
model = Submission
context_object_name = 'submission'
pk_url_kwarg = 'submission'
class SubmissionDetailBase(LoginRequiredMixin, TitleMixin, SubmissionMixin, DetailView):
def get_object(self, queryset=None):
submission = super(SubmissionDetailBase, self).get_object(queryset)
profile = self.request.profile
problem = submission.problem
if self.request.user.has_perm('judge.view_all_submission'):
return submission
if submission.user_id == profile.id:
return submission
if problem.is_editor(profile):
return submission
if problem.is_public or problem.testers.filter(id=profile.id).exists():
if Submission.objects.filter(user_id=profile.id, result='AC', problem_id=problem.id,
points=problem.points).exists():
return submission
raise PermissionDenied()
def get_title(self):
submission = self.object
return _('Submission of %(problem)s by %(user)s') % {
'problem': submission.problem.translated_name(self.request.LANGUAGE_CODE),
'user': submission.user.user.username,
}
def get_content_title(self):
submission = self.object
return mark_safe(escape(_('Submission of %(problem)s by %(user)s')) % {
'problem': format_html('{1}',
reverse('problem_detail', args=[
submission.problem.code]),
submission.problem.translated_name(self.request.LANGUAGE_CODE)),
'user': format_html('{1}',
reverse('user_page', args=[
submission.user.user.username]),
submission.user.user.username),
})
class SubmissionSource(SubmissionDetailBase):
template_name = 'submission/source.html'
def get_queryset(self):
return super().get_queryset().select_related('source')
def get_context_data(self, **kwargs):
context = super(SubmissionSource, self).get_context_data(**kwargs)
submission = self.object
context['raw_source'] = submission.source.source.rstrip('\n')
context['highlighted_source'] = highlight_code(
submission.source.source, submission.language.pygments)
return context
def make_batch(batch, cases):
result = {'id': batch, 'cases': cases}
if batch:
result['points'] = min(map(attrgetter('points'), cases))
result['total'] = max(map(attrgetter('total'), cases))
return result
def group_test_cases(cases):
result = []
buf = []
last = None
for case in cases:
if case.batch != last and buf:
result.append(make_batch(last, buf))
buf = []
buf.append(case)
last = case.batch
if buf:
result.append(make_batch(last, buf))
return result
def get_visible_content(data):
data = data or b''
data = data.replace(b'\r\n', b'\r').replace(b'\r', b'\n')
if (len(data) > settings.TESTCASE_VISIBLE_LENGTH):
data = data[:settings.TESTCASE_VISIBLE_LENGTH]
data += b'.' * 3
elif not data.endswith(b'\n'):
data += b'\n'
return data.decode('utf-8')
def in_out_ans(case, submission, archive):
result = {}
result['input'] = get_visible_content(archive.read(case.input_file))
result['answer'] = get_visible_content(archive.read(case.output_file))
return result
def get_problem_data(submission):
archive_path = os.path.join(settings.DMOJ_PROBLEM_DATA_ROOT,
str(submission.problem.data_files.zipfile))
if not os.path.exists(archive_path):
raise InvalidInitException(
'archive file "%s" does not exist' % archive_path)
try:
archive = zipfile.ZipFile(archive_path, 'r')
except zipfile.BadZipfile:
raise InvalidInitException('bad archive: "%s"' % archive_path)
testcases = ProblemTestCase.objects.filter(dataset=submission.problem)\
.order_by('order')
problem_data = {case.order: in_out_ans(case, submission, archive)
for case in testcases}
return problem_data
class SubmissionStatus(SubmissionDetailBase):
template_name = 'submission/status.html'
def get_context_data(self, **kwargs):
context = super(SubmissionStatus, self).get_context_data(**kwargs)
submission = self.object
context['last_msg'] = event.last()
context['batches'] = group_test_cases(submission.test_cases.all())
context['time_limit'] = submission.problem.time_limit
context['cases_data'] = get_problem_data(submission)
try:
lang_limit = submission.problem.language_limits.get(
language=submission.language)
except ObjectDoesNotExist:
pass
else:
context['time_limit'] = lang_limit.time_limit
return context
class SubmissionTestCaseQuery(SubmissionStatus):
template_name = 'submission/status-testcases.html'
def get(self, request, *args, **kwargs):
if 'id' not in request.GET or not request.GET['id'].isdigit():
return HttpResponseBadRequest()
self.kwargs[self.pk_url_kwarg] = kwargs[self.pk_url_kwarg] = int(
request.GET['id'])
return super(SubmissionTestCaseQuery, self).get(request, *args, **kwargs)
class SubmissionSourceRaw(SubmissionSource):
def get(self, request, *args, **kwargs):
submission = self.get_object()
return HttpResponse(submission.source.source, content_type='text/plain')
@require_POST
def abort_submission(request, submission):
submission = get_object_or_404(Submission, id=int(submission))
if (not request.user.is_authenticated or (submission.was_rejudged or (request.profile != submission.user)) and
not request.user.has_perm('abort_any_submission')):
raise PermissionDenied()
submission.abort()
return HttpResponseRedirect(reverse('submission_status', args=(submission.id,)))
class SubmissionsListBase(DiggPaginatorMixin, TitleMixin, ListView):
model = Submission
paginate_by = 50
show_problem = True
title = gettext_lazy('All submissions')
content_title = gettext_lazy('All submissions')
tab = 'all_submissions_list'
template_name = 'submission/list.html'
context_object_name = 'submissions'
first_page_href = None
def get_result_data(self):
result = self._get_result_data()
for category in result['categories']:
category['name'] = _(category['name'])
return result
def _get_result_data(self):
return get_result_data(self.get_queryset().order_by())
def access_check(self, request):
pass
@cached_property
def in_contest(self):
return self.request.user.is_authenticated and self.request.profile.current_contest is not None
@cached_property
def contest(self):
return self.request.profile.current_contest.contest
def _get_queryset(self):
queryset = Submission.objects.all()
use_straight_join(queryset)
queryset = submission_related(queryset.order_by('-id'))
if self.show_problem:
queryset = queryset.prefetch_related(Prefetch('problem__translations',
queryset=ProblemTranslation.objects.filter(
language=self.request.LANGUAGE_CODE), to_attr='_trans'))
if self.in_contest:
queryset = queryset.filter(
contest__participation__contest_id=self.contest.id)
if self.contest.hide_scoreboard and self.contest.is_in_contest(self.request.user):
queryset = queryset.filter(
contest__participation__user=self.request.profile)
else:
queryset = queryset.select_related(
'contest_object').defer('contest_object__description')
# This is not technically correct since contest organizers *should* see these, but
# the join would be far too messy
if not self.request.user.has_perm('judge.see_private_contest'):
queryset = queryset.exclude(
contest_object_id__in=Contest.objects.filter(hide_scoreboard=True))
if self.selected_languages:
queryset = queryset.filter(
language_id__in=Language.objects.filter(key__in=self.selected_languages))
if self.selected_statuses:
queryset = queryset.filter(result__in=self.selected_statuses)
return queryset
def get_queryset(self):
queryset = self._get_queryset()
if not self.in_contest:
if not self.request.user.has_perm('judge.see_private_problem'):
queryset = queryset.filter(problem__is_public=True)
if not self.request.user.has_perm('judge.see_organization_problem'):
filter = Q(problem__is_organization_private=False)
if self.request.user.is_authenticated:
filter |= Q(
problem__organizations__in=self.request.profile.organizations.all())
queryset = queryset.filter(filter)
return queryset
def get_my_submissions_page(self):
return None
def get_all_submissions_page(self):
return reverse('all_submissions')
def get_searchable_status_codes(self):
hidden_codes = ['SC']
if not self.request.user.is_superuser and not self.request.user.is_staff:
hidden_codes += ['IE']
return [(key, value) for key, value in Submission.RESULT if key not in hidden_codes]
def get_context_data(self, **kwargs):
context = super(SubmissionsListBase, self).get_context_data(**kwargs)
authenticated = self.request.user.is_authenticated
context['dynamic_update'] = False
context['show_problem'] = self.show_problem
context['completed_problem_ids'] = user_completed_ids(
self.request.profile) if authenticated else []
context['authored_problem_ids'] = user_authored_ids(
self.request.profile) if authenticated else []
context['editable_problem_ids'] = user_editable_ids(
self.request.profile) if authenticated else []
context['all_languages'] = Language.objects.all(
).values_list('key', 'name')
context['selected_languages'] = self.selected_languages
context['all_statuses'] = self.get_searchable_status_codes()
context['selected_statuses'] = self.selected_statuses
context['results_json'] = mark_safe(json.dumps(self.get_result_data()))
context['results_colors_json'] = mark_safe(
json.dumps(settings.DMOJ_STATS_SUBMISSION_RESULT_COLORS))
context['page_suffix'] = suffix = (
'?' + self.request.GET.urlencode()) if self.request.GET else ''
context['first_page_href'] = (self.first_page_href or '.') + suffix
context['my_submissions_link'] = self.get_my_submissions_page()
context['all_submissions_link'] = self.get_all_submissions_page()
context['tab'] = self.tab
return context
def get(self, request, *args, **kwargs):
check = self.access_check(request)
if check is not None:
return check
self.selected_languages = set(request.GET.getlist('language'))
self.selected_statuses = set(request.GET.getlist('status'))
if 'results' in request.GET:
return JsonResponse(self.get_result_data())
return super(SubmissionsListBase, self).get(request, *args, **kwargs)
class UserMixin(object):
def get(self, request, *args, **kwargs):
if 'user' not in kwargs:
raise ImproperlyConfigured('Must pass a user')
self.profile = get_object_or_404(
Profile, user__username=kwargs['user'])
self.username = kwargs['user']
return super(UserMixin, self).get(request, *args, **kwargs)
class ConditionalUserTabMixin(object):
def get_context_data(self, **kwargs):
context = super(ConditionalUserTabMixin,
self).get_context_data(**kwargs)
if self.request.user.is_authenticated and self.request.profile == self.profile:
context['tab'] = 'my_submissions_tab'
else:
context['tab'] = 'user_submissions_tab'
context['tab_username'] = self.profile.user.username
return context
class AllUserSubmissions(ConditionalUserTabMixin, UserMixin, SubmissionsListBase):
def get_queryset(self):
return super(AllUserSubmissions, self).get_queryset().filter(user_id=self.profile.id)
def get_title(self):
if self.request.user.is_authenticated and self.request.profile == self.profile:
return _('All my submissions')
return _('All submissions by %s') % self.username
def get_content_title(self):
if self.request.user.is_authenticated and self.request.profile == self.profile:
return format_html('All my submissions')
return format_html('All submissions by {0}', self.username,
reverse('user_page', args=[self.username]))
def get_my_submissions_page(self):
if self.request.user.is_authenticated:
return reverse('all_user_submissions', kwargs={'user': self.request.user.username})
def get_context_data(self, **kwargs):
context = super(AllUserSubmissions, self).get_context_data(**kwargs)
context['dynamic_update'] = context['page_obj'].number == 1
context['dynamic_user_id'] = self.profile.id
context['last_msg'] = event.last()
return context
class ProblemSubmissionsBase(SubmissionsListBase):
show_problem = False
dynamic_update = True
check_contest_in_access_check = True
def get_queryset(self):
if self.in_contest and not self.contest.contest_problems.filter(problem_id=self.problem.id).exists():
raise Http404()
return super(ProblemSubmissionsBase, self)._get_queryset().filter(problem_id=self.problem.id)
def get_title(self):
return _('All submissions for %s') % self.problem_name
def get_content_title(self):
return format_html('All submissions for {0}', self.problem_name,
reverse('problem_detail', args=[self.problem.code]))
def access_check_contest(self, request):
if self.in_contest and not self.contest.can_see_scoreboard(request.user):
raise Http404()
def access_check(self, request):
if not self.problem.is_accessible_by(request.user):
raise Http404()
if self.check_contest_in_access_check:
self.access_check_contest(request)
def get(self, request, *args, **kwargs):
if 'problem' not in kwargs:
raise ImproperlyConfigured(_('Must pass a problem'))
self.problem = get_object_or_404(Problem, code=kwargs['problem'])
self.problem_name = self.problem.translated_name(
self.request.LANGUAGE_CODE)
return super(ProblemSubmissionsBase, self).get(request, *args, **kwargs)
def get_all_submissions_page(self):
return reverse('chronological_submissions', kwargs={'problem': self.problem.code})
def get_context_data(self, **kwargs):
context = super(ProblemSubmissionsBase,
self).get_context_data(**kwargs)
if self.dynamic_update:
context['dynamic_update'] = context['page_obj'].number == 1
context['dynamic_problem_id'] = self.problem.id
context['last_msg'] = event.last()
context['best_submissions_link'] = reverse('ranked_submissions', kwargs={
'problem': self.problem.code})
return context
class ProblemSubmissions(ProblemSubmissionsBase):
def get_my_submissions_page(self):
if self.request.user.is_authenticated:
return reverse('user_submissions', kwargs={'problem': self.problem.code,
'user': self.request.user.username})
class UserProblemSubmissions(ConditionalUserTabMixin, UserMixin, ProblemSubmissions):
check_contest_in_access_check = False
@cached_property
def is_own(self):
return self.request.user.is_authenticated and self.request.profile == self.profile
def access_check(self, request):
super(UserProblemSubmissions, self).access_check(request)
if not self.is_own:
self.access_check_contest(request)
def get_queryset(self):
return super(UserProblemSubmissions, self).get_queryset().filter(user_id=self.profile.id)
def get_title(self):
if self.is_own:
return _("My submissions for %(problem)s") % {'problem': self.problem_name}
return _("%(user)s's submissions for %(problem)s") % {'user': self.username, 'problem': self.problem_name}
def get_content_title(self):
if self.request.user.is_authenticated and self.request.profile == self.profile:
return format_html('''My submissions for {2}''',
self.username, reverse(
'user_page', args=[self.username]),
self.problem_name, reverse('problem_detail', args=[self.problem.code]))
return format_html('''{0}'s submissions for {2}''',
self.username, reverse(
'user_page', args=[self.username]),
self.problem_name, reverse('problem_detail', args=[self.problem.code]))
def get_context_data(self, **kwargs):
context = super(UserProblemSubmissions,
self).get_context_data(**kwargs)
context['dynamic_user_id'] = self.profile.id
return context
def single_submission(request, submission_id, show_problem=True):
request.no_profile_update = True
authenticated = request.user.is_authenticated
submission = get_object_or_404(submission_related(
Submission.objects.all()), id=int(submission_id))
if not submission.problem.is_accessible_by(request.user):
raise Http404()
return render(request, 'submission/row.html', {
'submission': submission,
'authored_problem_ids': user_authored_ids(request.profile) if authenticated else [],
'completed_problem_ids': user_completed_ids(request.profile) if authenticated else [],
'editable_problem_ids': user_editable_ids(request.profile) if authenticated else [],
'show_problem': show_problem,
'problem_name': show_problem and submission.problem.translated_name(request.LANGUAGE_CODE),
'profile_id': request.profile.id if authenticated else 0,
})
def single_submission_query(request):
request.no_profile_update = True
if 'id' not in request.GET or not request.GET['id'].isdigit():
return HttpResponseBadRequest()
try:
show_problem = int(request.GET.get('show_problem', '1'))
except ValueError:
return HttpResponseBadRequest()
return single_submission(request, int(request.GET['id']), bool(show_problem))
class AllSubmissions(SubmissionsListBase):
stats_update_interval = 3600
def get_my_submissions_page(self):
if self.request.user.is_authenticated:
return reverse('all_user_submissions', kwargs={'user': self.request.user.username})
def get_context_data(self, **kwargs):
context = super(AllSubmissions, self).get_context_data(**kwargs)
context['dynamic_update'] = context['page_obj'].number == 1
context['last_msg'] = event.last()
context['stats_update_interval'] = self.stats_update_interval
return context
def _get_result_data(self):
if self.in_contest or self.selected_languages or self.selected_statuses:
return super(AllSubmissions, self)._get_result_data()
key = 'global_submission_result_data'
result = cache.get(key)
if result:
return result
result = super(AllSubmissions, self)._get_result_data()
cache.set(key, result, self.stats_update_interval)
return result
class ForceContestMixin(object):
@property
def in_contest(self):
return True
@property
def contest(self):
return self._contest
def access_check(self, request):
super(ForceContestMixin, self).access_check(request)
if not request.user.has_perm('judge.see_private_contest'):
if not self.contest.is_visible:
raise Http404()
if self.contest.start_time is not None and self.contest.start_time > timezone.now():
raise Http404()
def get_problem_number(self, problem):
return self.contest.contest_problems.select_related('problem').get(problem=problem).order
def get(self, request, *args, **kwargs):
if 'contest' not in kwargs:
raise ImproperlyConfigured(_('Must pass a contest'))
self._contest = get_object_or_404(Contest, key=kwargs['contest'])
return super(ForceContestMixin, self).get(request, *args, **kwargs)
class UserContestSubmissions(ForceContestMixin, UserProblemSubmissions):
def get_title(self):
if self.problem.is_accessible_by(self.request.user):
return "%s's submissions for %s in %s" % (self.username, self.problem_name, self.contest.name)
return "%s's submissions for problem %s in %s" % (
self.username, self.get_problem_number(self.problem), self.contest.name)
def access_check(self, request):
super(UserContestSubmissions, self).access_check(request)
if not self.contest.users.filter(user_id=self.profile.id).exists():
raise Http404()
def get_content_title(self):
if self.problem.is_accessible_by(self.request.user):
return format_html(_('{0}\'s submissions for '
'{2} in {4}'),
self.username, reverse(
'user_page', args=[self.username]),
self.problem_name, reverse(
'problem_detail', args=[self.problem.code]),
self.contest.name, reverse('contest_view', args=[self.contest.key]))
return format_html(_('{0}\'s submissions for '
'problem {2} in {3}'),
self.username, reverse(
'user_page', args=[self.username]),
self.get_problem_number(self.problem),
self.contest.name, reverse('contest_view', args=[self.contest.key]))