NDOJ/judge/utils/problem_data.py

294 lines
10 KiB
Python
Raw Normal View History

2021-06-28 04:44:25 +00:00
import hashlib
2020-01-21 06:35:58 +00:00
import json
import os
import re
2020-03-18 02:41:09 +00:00
import shutil
2020-01-21 06:35:58 +00:00
import yaml
2021-06-01 20:34:42 +00:00
import zipfile
2020-01-21 06:35:58 +00:00
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import FileSystemStorage
from django.urls import reverse
from django.utils.translation import gettext as _
2021-06-01 20:34:42 +00:00
from django.core.cache import cache
2020-03-18 02:41:09 +00:00
VALIDATOR_TEMPLATE_PATH = 'validator_template/template.py'
2020-01-21 06:35:58 +00:00
if os.altsep:
def split_path_first(path, repath=re.compile('[%s]' % re.escape(os.sep + os.altsep))):
return repath.split(path, 1)
else:
def split_path_first(path):
return path.split(os.sep, 1)
class ProblemDataStorage(FileSystemStorage):
def __init__(self):
super(ProblemDataStorage, self).__init__(settings.DMOJ_PROBLEM_DATA_ROOT)
def url(self, name):
path = split_path_first(name)
if len(path) != 2:
raise ValueError('This file is not accessible via a URL.')
return reverse('problem_data_file', args=path)
def _save(self, name, content):
if self.exists(name):
self.delete(name)
return super(ProblemDataStorage, self)._save(name, content)
def get_available_name(self, name, max_length=None):
return name
def rename(self, old, new):
return os.rename(self.path(old), self.path(new))
class ProblemDataError(Exception):
def __init__(self, message):
super(ProblemDataError, self).__init__(message)
self.message = message
class ProblemDataCompiler(object):
def __init__(self, problem, data, cases, files):
self.problem = problem
self.data = data
self.cases = cases
self.files = files
self.generator = data.generator
def make_init(self):
cases = []
batch = None
def end_batch():
if not batch['batched']:
raise ProblemDataError(_('Empty batches not allowed.'))
cases.append(batch)
2020-03-18 02:41:09 +00:00
def make_checker_for_validator(case):
checker_name = "cppvalidator.py"
validator_path = split_path_first(case.custom_validator.name)
if len(validator_path) != 2:
raise ProblemDataError(_('How did you corrupt the custom checker path?'))
checker = os.path.join(settings.DMOJ_PROBLEM_DATA_ROOT,
validator_path[0],
checker_name)
validator_name = validator_path[1]
shutil.copy(VALIDATOR_TEMPLATE_PATH, checker)
# replace {{filecpp}} and {{problemid}} in checker file
filedata = open(checker, 'r').read()
filedata = filedata.replace('{{filecpp}}', "\'%s\'" % validator_name)
filedata = filedata.replace('{{problemid}}', "\'%s\'" % validator_path[0])
open(checker, 'w').write(filedata)
return checker_name
2020-01-21 06:35:58 +00:00
def make_checker(case):
2020-01-24 02:32:46 +00:00
if (case.checker == 'custom'):
custom_checker_path = split_path_first(case.custom_checker.name)
if len(custom_checker_path) != 2:
raise ProblemDataError(_('How did you corrupt the custom checker path?'))
return(custom_checker_path[1])
2020-03-18 02:41:09 +00:00
if (case.checker == 'customval'):
2020-03-18 02:41:09 +00:00
return make_checker_for_validator(case)
2020-01-21 06:35:58 +00:00
if case.checker_args:
return {
'name': case.checker,
'args': json.loads(case.checker_args),
}
return case.checker
for i, case in enumerate(self.cases, 1):
if case.type == 'C':
data = {}
if batch:
case.points = None
case.is_pretest = batch['is_pretest']
else:
if case.points is None:
raise ProblemDataError(_('Points must be defined for non-batch case #%d.') % i)
data['is_pretest'] = case.is_pretest
if not self.generator:
if case.input_file not in self.files:
raise ProblemDataError(_('Input file for case %d does not exist: %s') %
(i, case.input_file))
if case.output_file not in self.files:
raise ProblemDataError(_('Output file for case %d does not exist: %s') %
(i, case.output_file))
if case.input_file:
data['in'] = case.input_file
if case.output_file:
data['out'] = case.output_file
if case.points is not None:
data['points'] = case.points
if case.generator_args:
data['generator_args'] = case.generator_args.splitlines()
if case.output_limit is not None:
data['output_limit_length'] = case.output_limit
if case.output_prefix is not None:
data['output_prefix_length'] = case.output_prefix
if case.checker:
data['checker'] = make_checker(case)
else:
case.checker_args = ''
case.save(update_fields=('checker_args', 'is_pretest'))
(batch['batched'] if batch else cases).append(data)
elif case.type == 'S':
if batch:
end_batch()
if case.points is None:
raise ProblemDataError(_('Batch start case #%d requires points.') % i)
batch = {
'points': case.points,
'batched': [],
'is_pretest': case.is_pretest,
}
if case.generator_args:
batch['generator_args'] = case.generator_args.splitlines()
if case.output_limit is not None:
batch['output_limit_length'] = case.output_limit
if case.output_prefix is not None:
batch['output_prefix_length'] = case.output_prefix
if case.checker:
batch['checker'] = make_checker(case)
else:
case.checker_args = ''
case.input_file = ''
case.output_file = ''
case.save(update_fields=('checker_args', 'input_file', 'output_file'))
elif case.type == 'E':
if not batch:
raise ProblemDataError(_('Attempt to end batch outside of one in case #%d') % i)
case.is_pretest = batch['is_pretest']
case.input_file = ''
case.output_file = ''
case.generator_args = ''
case.checker = ''
case.checker_args = ''
case.save()
end_batch()
batch = None
if batch:
end_batch()
init = {}
if self.data.zipfile:
zippath = split_path_first(self.data.zipfile.name)
if len(zippath) != 2:
raise ProblemDataError(_('How did you corrupt the zip path?'))
init['archive'] = zippath[1]
if self.generator:
generator_path = split_path_first(self.generator.name)
if len(generator_path) != 2:
raise ProblemDataError(_('How did you corrupt the generator path?'))
init['generator'] = generator_path[1]
pretests = [case for case in cases if case['is_pretest']]
for case in cases:
del case['is_pretest']
if pretests:
init['pretest_test_cases'] = pretests
if cases:
init['test_cases'] = cases
if self.data.output_limit is not None:
init['output_limit_length'] = self.data.output_limit
if self.data.output_prefix is not None:
init['output_prefix_length'] = self.data.output_prefix
if self.data.checker:
init['checker'] = make_checker(self.data)
else:
self.data.checker_args = ''
return init
def compile(self):
from judge.models import problem_data_storage
yml_file = '%s/init.yml' % self.problem.code
try:
init = yaml.safe_dump(self.make_init())
except ProblemDataError as e:
self.data.feedback = e.message
self.data.save()
problem_data_storage.delete(yml_file)
else:
self.data.feedback = ''
self.data.save()
problem_data_storage.save(yml_file, ContentFile(init))
@classmethod
def generate(cls, *args, **kwargs):
self = cls(*args, **kwargs)
self.compile()
2021-06-01 20:34:42 +00:00
def get_visible_content(data):
data = data or b''
data = data.replace(b'\r\n', b'\r').replace(b'\r', b'\n')
data = data.decode('utf-8')
if (len(data) > settings.TESTCASE_VISIBLE_LENGTH):
data = data[:settings.TESTCASE_VISIBLE_LENGTH]
data += '.' * 3
return data
def get_file_cachekey(file):
2021-06-28 04:44:25 +00:00
return hashlib.sha1(file.encode()).hexdigest()
2021-06-01 20:34:42 +00:00
def get_problem_case(problem, files):
result = {}
uncached_files = []
for file in files:
cache_key = 'problem_archive:%s:%s' % (problem.code, get_file_cachekey(file))
qs = cache.get(cache_key)
if qs is None:
uncached_files.append(file)
else:
result[file] = qs
if not uncached_files:
return result
archive_path = os.path.join(settings.DMOJ_PROBLEM_DATA_ROOT,
str(problem.data_files.zipfile))
if not os.path.exists(archive_path):
raise Exception(
'archive file "%s" does not exist' % archive_path)
try:
archive = zipfile.ZipFile(archive_path, 'r')
except zipfile.BadZipfile:
raise Exception('bad archive: "%s"' % archive_path)
for file in uncached_files:
cache_key = 'problem_archive:%s:%s' % (problem.code, get_file_cachekey(file))
with archive.open(file) as f:
s = f.read(settings.TESTCASE_VISIBLE_LENGTH + 3)
# add this so there are no characters left behind (ex, 'á' = 2 utf-8 chars)
while True:
try:
s.decode('utf-8')
break
except UnicodeDecodeError:
s += f.read(1)
qs = get_visible_content(s)
cache.set(cache_key, qs, 86400)
result[file] = qs
2021-11-28 03:28:48 +00:00
return result