"""
Contains workups relating to the exit status of the job. Most of these will
not be explicitly passed in the workup specification, but will be run
implicitly.
@copyright: Schrodinger, LLC. All rights reserved.
"""
import csv
import os
from collections import namedtuple
from itertools import zip_longest
from schrodinger.test.stu import common
from schrodinger.test.stu import constants
from schrodinger.utils.csv_unicode import reader_open
from .failures import LicenseFailure
logger = common.logger
[docs]def is_structure_output_file(filename):
"""
Validate that the StructureOutputFile is present in the job and that its
name matches the filename provided.
:param filename: Expected name for the structure output file. Should match
exactly with what's in the job.
:type filename: str
"""
# Note: job_dj_job is available in this scope because it is added to
# __builtins__ by check_workup.
if not hasattr(job_dj_job, 'getJob'): # noqa: F821
msg = 'Job has type {}. No StructureOutputFile is available for this type of job'
raise TypeError(msg.format(type(job_dj_job))) # noqa: F821
job_record = job_dj_job.getJob() # noqa: F821
if not job_record:
raise AssertionError('No job record found for this test')
if filename != job_record.StructureOutputFile:
msg = 'Expected filename "{}" does not match StructureOutputFile "{}"'
raise AssertionError(
msg.format(filename, job_record.StructureOutputFile))
return True
[docs]def license_check(license_check_file=constants.LICENSE_CHECK_FILE,
ref_license_check_file=constants.REF_LICENSE_CHECK_FILE):
"""
Compare the license file against a standard. If either does not exist,
return True. If the two match, return True. The reference license file
should be called "license_check_reference.txt" and the license file being
tested should be called "license_check.txt".
"""
if not os.path.isfile(ref_license_check_file):
return True
if not os.path.isfile(license_check_file):
return True
reference = _LicenseInfo(ref_license_check_file)
check = _LicenseInfo(license_check_file)
reference.assertSame(check)
return True
# This is used to capture important info from a single license action
# (a Checkout, Checkin, or Exist).
LicenseAction = namedtuple('LicenseAction', 'feature count tag action')
class _LicenseInfo:
"""
Extract an in-memory representation of the license checkouts from a named
license checkout file. See LIC-312 for details.
"""
def __init__(self, filename):
self.filename = filename
self.existence_checks = set()
self.mmshare = set()
self.product_actions = []
with reader_open(filename) as fh:
reader = csv.DictReader(fh)
for lineno, row in enumerate(reader, start=1):
if row['Action'] not in ('Checkin', 'Checkout',
'Quick Checkout', 'Exist'):
msg = ('Unrecognized action {action} on line {lineno} '
'in {filename}.')
raise LicenseFailure(
msg.format(action=row['Action'],
lineno=lineno,
filename=filename))
if row['Action'] == 'Exist':
self.existence_checks.add(
LicenseAction(feature=row['Feature'],
count=int(row['Count']),
tag=row['Tag'],
action=row['Action']))
elif row['Feature'] == 'MMLIBS':
# For MMLIBS, we don't care about the count or tags
self.mmshare.add(
LicenseAction(feature=row['Feature'],
count=None,
tag=None,
action=row['Action']))
else:
self.product_actions.append(
(LicenseAction(feature=row['Feature'],
count=int(row['Count']),
tag=row['Tag'],
action=row['Action']), lineno))
def __str__(self):
return self.filename
def assertSame(self, other):
"""
Check that two License objects match. This is insensitive to the NUMBER
of mmshare checkins/checkouts/exists checks, but it is sensitive to
whether each type of mmshare check occurred at all. It is sensitive to
the NUMBER and ORDER of all product checks. See LIC-312, SHARED-2861,
and SHARED-3136.
:raise: LicenseFailure with a description of the differences between
them.
"""
failures = []
# Setwise comparison for MMLIBS and Exist actions
failures.extend(self._compare_sets(other, 'existence_checks'))
failures.extend(self._compare_sets(other, 'mmshare'))
# Compare all other actions line by line
for self_action, other_action in zip_longest(self.product_actions,
other.product_actions,
fillvalue=None):
if self_action is None or other_action is None:
failures.append(
self._mismatched_length(other, self_action, other_action))
continue
if self_action[:-1] != other_action[:-1]:
msg = '"{self}:{self_action}" does not match '\
'"{other}:{other_action}" on line {lineno}.'
failures.append(
msg.format(self=self.filename,
self_action=self_action[:-1],
other=other.filename,
other_action=other_action[:-1],
lineno=self_action[-1]))
if len(failures) > 0:
raise LicenseFailure('\n\n'.join(failures))
def _compare_sets(self, other, attribute):
"""
Compares two sets of license actions. Used to look at Exist actions
and MMLIBS Checkin/Checkout actions
"""
failures = []
self_set = getattr(self, attribute)
other_set = getattr(other, attribute)
for action in self_set.symmetric_difference(other_set):
message = '{feature} {action} is present in {present} but is not '\
'present in {missing}.'
if action in self_set:
failures.append(
message.format(feature=action.feature,
action=action.action,
present=self,
missing=other))
else:
failures.append(
message.format(feature=action.feature,
action=action.action,
present=other,
missing=self))
return failures
def _mismatched_length(self, other, self_action, other_action):
"""
Generates an error message when either self_action or other_action is
None.
"""
msg = '{present}: line {lineno} ({action}) has no match in {missing}.'
if self_action is None:
return msg.format(present=other,
lineno=other_action[1],
action=other_action[0],
missing=self)
elif other_action is None:
return msg.format(present=self,
lineno=self_action[1],
action=self_action[0],
missing=other)
else:
raise ValueError(
'This method should only be called if one of the actions is None.'
)