"""
Class to set up a nicely formatted logger
Copyright Schrodinger, LLC. All rights reserved."""
# Contributor: Thomas F. Hughes
import base64
import logging
import sys
import textwrap
import time
import schrodinger
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import cmdline
from schrodinger.utils import fileutils
DESMOND_CUI_FLAG = ['trj', 'cms']
DEFAULT_LOG_LINE_WIDTH = 80
PLAIN_TEXT_COMMAND = 'PlainTextCmd'
DRIVER_LOG_EXT = '-driver.log'
[docs]def log_error(msg, logger=None):
"""
Add a message to the log file and exit with an error code
:type msg: str
:param msg: The message to log
:type logger: `logging.Logger`
:param logger: The logger object or None if msg should be printed
"""
log_msg(msg, logger=logger)
log_msg('Finished', timestamp=True, logger=logger)
sys.exit(1)
[docs]def log_msg(msg, timestamp=False, pad=False, pad_below=False, logger=None):
"""
Add a message to the log file
:type msg: str
:param msg: The message to log
:type timestamp: bool
:param timestamp: Whether to print a timestamp with the message
:type pad: bool
:param pad: Whether to pad above this message with a blank line
:type pad_below: bool
:param pad_below: Whether to pad below this message with a blank line
:type logger: `logging.Logger`
:param logger: The logger object or None if msg should be printed
"""
if timestamp:
msg = msg + ' at ' + time.ctime()
if pad:
log(logger, "")
log(logger, msg)
if pad_below:
log(logger, "")
[docs]def log(logger, msg, level=logging.INFO):
"""
Log a message if the logger is defined, else print it
:type logger: `logging.Logger`
:param logger: The logger object or None if no logger is defined
:type msg: str
:param msg: The message to log
:type level: int
:param level: The log level to use when logging. These are constants from
the Python logging module. The default is INFO.
"""
if logger:
logger.log(level, msg)
else:
print(msg)
NAMESPACE_HEADER = 'Command line options:'
JOB_HEADER = 'Job information'
[docs]def log_initial_data(logger,
namespace,
msgwidth=None,
namespace_header=NAMESPACE_HEADER,
job_header=JOB_HEADER):
"""
Log information about the currently running job, if running under job
control, and all information in the namespace object.
:type namespace: object
:param namespace: The object whose properties should be logged. In typical
use, this is an `argparse.Namespace` object. All the values in the
__dict__ of this object will be logged. Note that __dict__ does not contain
the double-underscore class methods.
:type logger: `logging.Logger`
:param logger: The logger to log with
:type msgwidth: int
:param msgwidth: The width of the log messages. Taken from the logger if not
supplied
:type namespace_header: str or None
:param namespace_header: If not None, this line will be printed above the
logged namespace properties
:type job_header: str or None
:param job_header: If not None, this line will be printed above the logged
job properties
"""
if not msgwidth:
msgwidth = logger.handlers[0].formatter.msgwidth
log_jobinfo(logger, msgwidth=msgwidth, header=job_header)
log(logger, "")
log_namespace(namespace, logger, msgwidth=msgwidth, header=namespace_header)
[docs]def log_namespace(namespace, logger, msgwidth=None, header=NAMESPACE_HEADER):
"""
Log the value of all properties on a object, such as logging all the
parameters of an `argparse.Namespace` object.
:type namespace: object
:param namespace: The object whose properties should be logged. In typical
use, this is an `argparse.Namespace` object. All the values in the
__dict__ of this object will be logged. Note that __dict__ does not contain
the double-underscore class methods.
:type logger: `logging.Logger`
:param logger: The logger to log with
:type msgwidth: int
:param msgwidth: The width of the log messages. Taken from the logger if not
supplied
:type header: str or None
:param header: If not None, this line will be printed above the logged
properties
"""
if not msgwidth:
msgwidth = logger.handlers[0].formatter.msgwidth
if header:
log(logger, header)
try:
keys = list(namespace.__dict__)
except AttributeError:
# This object has no __dict__ method, so nothing to log
return
# Log the properties in alphabetical order
keys.sort()
jc_options = {x.name for x in cmdline.Options}
for attribute in keys:
# Do not log job control properties (MATSCI-9553)
if str(attribute).upper() in jc_options:
continue
value = namespace.__dict__[attribute]
if isinstance(value, (list, tuple, set)):
if attribute in DESMOND_CUI_FLAG:
value = value[-1]
else:
value = ','.join([str(x) for x in value])
elif isinstance(value, dict):
value = ','.join(
['%s=%s' % (str(a), str(b)) for a, b in value.items()])
log(logger, get_param_string(attribute, value, msgwidth))
[docs]def log_jobinfo(logger, msgwidth=None, header=JOB_HEADER):
"""
Log the information about the current running job if there is one.
:type logger: `logging.Logger`
:param logger: The logger to log with
:type msgwidth: int
:param msgwidth: The width of the log messages. Taken from the logger if not
supplied
:type header: str or None
:param header: If not None, this line will be printed above the logged
properties
"""
backend = jobcontrol.get_backend()
if not backend:
return
if not msgwidth:
msgwidth = logger.handlers[0].formatter.msgwidth
if header:
log(logger, header)
job = backend.getJob()
jobinfo = job.getApplicationHeaderFields()
for key, value in jobinfo.items():
log(logger, get_param_string(key, value, msgwidth))
if key == 'Commandline' and 'BASE64' in value:
ptext, raw_text = decode_base64_cmd(value)
log(logger, get_param_string(PLAIN_TEXT_COMMAND, ptext, msgwidth))
log(logger, get_param_string('JobHostStartTime', time.ctime(), msgwidth))
[docs]def run_jobs_with_update_logging(jobq,
logfn,
fraction=0.10,
interval=3500,
add_subjob_files=True):
"""
Run all the current jobs in the JobDJ and logs progress information at
semi-regular intervals so the user can track status
:type jobq: `schrodinger.job.queue.JobDJ`
:param jobq: The queue to run
:type logfn: callable
:param logfn: A function that takes a string and a boolean timestamp
argument and logs the string
:type fraction: float
:param fraction: A value between 0 and 1. When this fraction of the total
jobs has completed since the last log update, a new update will be
triggered.
:type interval: int
:param interval: A new update will be triggered when this number of seconds
have passed since the last log update
:param bool add_subjob_files: Add subjob files to the backend when a subjob
completes (if a backend exists)
:note: fraction and interval are not obeyed strictly as updates can only be
triggered when a job completes
"""
if add_subjob_files:
# Delayed import to avoid circular imports
from schrodinger.application.matsci import jobutils
backend = jobcontrol.get_backend()
else:
backend = None
# The queue may have already run some of its jobs
already_done = jobq.total_finished + jobq.total_failed
num_jobs = float(jobq.total_added - already_done)
logfn('There are %d jobs to run' % num_jobs)
num_completed = 0
last_log_fraction = 0
last_log_time = time.time()
done_states = {queue.JobState.DONE, queue.JobState.FAILED}
def on_job_status_change(job):
"""
Process status changes for a JobDJ job
:param job: JobDJ's snapshot of job state upon going through a status
change
:type job: schrodinger.job.queue.BaseJob
"""
nonlocal num_completed
nonlocal last_log_fraction
nonlocal last_log_time
if job.state in done_states:
num_completed += 1
if backend:
jobutils.add_subjob_files_to_backend(job,
backend=backend,
path=jobutils.FROM_SUBJOB)
current_fraction = num_completed / num_jobs
fraction_elapsed = current_fraction - last_log_fraction
current_time = time.time()
time_elapsed = current_time - last_log_time
# Log status if more than fraction of jobs have completed or more than
# inverval seconds have passed since we last did a status log
if fraction_elapsed >= fraction or time_elapsed >= interval:
percent = int(100 * current_fraction)
logfn('%d%% of jobs have completed' % percent, timestamp=True)
last_log_fraction = current_fraction
last_log_time = current_time
jobq.run(status_change_callback=on_job_status_change)
[docs]def create_logger(logfilename=None,
extension=DRIVER_LOG_EXT,
related_filename=None,
verbose=False,
width=DEFAULT_LOG_LINE_WIDTH):
"""
Create a logger that can be used for logging information to a file
If running under job control, the log file this logger writes to will be
added to the backend as a Log File.
:type logfilename: str
:param logfilename: The name of the log file the logger should write to. If
not supplied, the name will be based off the Job name if running under job
control or based off of the basename of a related_file if not. As a last
resort, the logfile name will use 'logfile' as the base name.
:type extension: str
:param extension: The extension to add to the file name if logfilename is
not supplied.
:type related_filename: str
:param related_filanem: The basename of this filename will be used as the
base of the log file name IF logfilename is not supplied directly and we
are not running under job control.
:type verbose: bool, or None
:param verbose: Whether to enable verbose logging. If None, the verbosity
is automatically setup according to debugging mode.
:type width: int
:param width: Text width of the file
:rtype: (`logging.Logger`, str)
:return: The logger and the name of the file it logs to.
"""
if verbose is None:
verbose = schrodinger.in_dev_env()
backend = jobcontrol.get_backend()
if not logfilename:
if backend:
job = backend.getJob()
basename = job.Name
elif related_filename:
basename = fileutils.splitext(related_filename)[0]
else:
basename = 'logfile'
logfilename = basename + extension
if backend:
backend.addLogFile(logfilename)
loggerobj = GetLogger(logfilename, verbose, width)
return loggerobj.logger, logfilename
[docs]def get_param_string(identifier, value, msgwidth):
"""
Return a formatted line with identifier on the left and value
right-justified. A line of '...' will consume the intervening space.
:type identifier: str
:param identifier: description of the value
:type value: any
:param value: the value of the descriptor
:type msgwidth: int
:param msgwidth: the length of the msg
:rtype: str
:return: msg, formatted msg
"""
uvalue = str(value)
takenup = len(identifier) + len(uvalue) + 2
separatorlen = msgwidth - takenup
msg = ' '.join([identifier, separatorlen * '.', uvalue])
return msg
[docs]class GetLogger(object):
"""
Set up a textwrapped log file.
"""
[docs] def __init__(self, logfilename, verbose, msgwidth):
"""
Create a GetLogger instance and process it.
:type logfilename: str
:param logfilename: name of the log file
:type verbose: boolean
:param verbose: enable verbose logging
:type msgwidth: int
:param msgwidth: message width
"""
self.logfilename = logfilename
self.verbose = verbose
self.msgwidth = msgwidth
self.setItUp()
[docs] def setItUp(self):
"""
Set up the logger.
"""
# use logging.DEBUG category to log verbose printing options and
# use custom formatting.
logger = logging.getLogger(self.logfilename)
logger.setLevel(logging.INFO)
if self.verbose:
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(self.logfilename, mode='w')
handler.setFormatter(self.MyFormatter(self.msgwidth))
logger.addHandler(handler)
self.logger = logger
[docs]def decode_base64_cmd(value):
"""
Extract a BASE64 encoded command line from the full command line in value
and return it as plain text string
:type value: str
:param value: A full job control command line containing a -cmd BASE64 etc.
encoded command line string
:rtype: str, str
:return: The second value is the raw decoded BASE64 value and the first
value is the raw value modified to be similar to what a user would type to
on the command line (double quotes, redirects, parens, "python" removed,
$SCHRODINGER/run added)
"""
value = value.replace('"', "").replace("'", "")
tokens = value.split()
try:
cmd_index = tokens.index('-cmd')
except ValueError:
raise ValueError('No -cmd flag found in value')
if tokens[cmd_index + 1] != 'BASE64':
raise ValueError('Command does not appear to be BASE64 encoded')
bstring = tokens[cmd_index + 2]
raw_command = base64.b64decode(bstring).decode()
# Now turn this into something the user would actually use
mod_command = raw_command.replace('"', "")
mod_tokens = mod_command.split()
# Remove the redirect
try:
mod_tokens = mod_tokens[:mod_tokens.index('>')]
except ValueError:
pass
# Remove the surrounding '()'
mod_tokens = [x for x in mod_tokens if x not in ['(', ')']]
for badstart in ['python', '%SCHRODINGER%/run']:
if mod_tokens[0] == badstart:
mod_tokens = mod_tokens[1:]
mod_tokens.insert(0, '$SCHRODINGER/run')
# Create the final command
final_cmd = ' '.join(mod_tokens)
return final_cmd, raw_command
[docs]def close_logger_file_handlers(logger):
"""
Close all logging.FileHandler's for the given logger.
:type logger: `logging.Logger`
:param logger: the logger object
"""
for handler in logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
handler.close()
# Important for Windows
logger.removeHandler(handler)
[docs]def create_debug_logger(name, formatter=None, add_to_backend=None, stream=True):
"""
Create a logger for module level debug usage.
:param name: the logger name
:type name: str
:param formatter: the logging format
:type formatter: 'logging.Formatter'
:param add_to_backend: if True and logfile is created, add it to the backend.
If None, add the file to backend in debug environment.
:type add_to_backend: bool or None
:param stream: if True and the newly created logfile is added to the backend,
stream the file to the submission host
:type stream: bool
:return: The logger created for this function
:rtype: `logging.Logger`
"""
# Delayed import to avoid circular imports
from schrodinger.application.matsci import jobutils
logger = logging.getLogger(name)
is_debug = schrodinger.in_dev_env()
effective_level = logging.DEBUG if bool(is_debug) else logging.INFO
logger.setLevel(effective_level)
if logger.handlers:
return logger
logfile = name + '.log'
try:
fileutils.force_remove(logfile)
except PermissionError:
# PermissionError: [WinError 32] The process cannot access the file
# because it is being used by another process:
pass
backend = jobcontrol.get_backend()
if backend:
if add_to_backend is None:
add_to_backend = is_debug
if add_to_backend:
jobutils.add_outfile_to_backend(logfile, backend, stream=stream)
logger_handle = logging.FileHandler(logfile)
if not formatter:
formatter = logging.Formatter("%(levelname)s: %(message)s")
logger_handle.setFormatter(formatter)
logger.addHandler(logger_handle)
return logger