"""
A wrapper to the standard library subprocess module.
This module automatically checks for executables in the $SCHRODINGER
and $SCHRODINGER/utilities to make it absolute, and adds an interpreter to the
command where appropriate.
A general limitation of this module is that the subprocess commands must be
specified as lists of strings. This is a conscious design decision meant to
avoid issues with quoting arguments that contain spaces.
Copyright Schrodinger, LLC. All rights reserved.
"""
import errno
import functools
import os
# The custom subprocess.py should have all the functions classes/globals as
# subprocess.py. Hence importing *
import subprocess
import sys
from subprocess import Popen as subprocPopen
from subprocess import * # noqa:F401,F403
from subprocess import call as subproccall
from subprocess import check_call as subproccheckcall
from subprocess import check_output as subproccheckoutput
from subprocess import list2cmdline # noqa:F401
import psutil
def _fix_call_cmd(cmd):
"""
If the program name isn't an absolute path, look for the program in the
$SCHRODINGER and $SCHRODINGER/utilities directories.
If the program executable has .pl or .py or .sh extension, and the
appropriate interpreter is not specified, 'perl', 'python' or 'sh'
interpreter is added for .pl, .py, .sh extension respectively.
Return a list of command line arguments that can be used to launch the
command.
:type cmd: list of str
:param cmd:
The command to be run, specified as a list of strings.
"""
# Check if the input is a list or not
# if it is a string do not process.
if not isinstance(cmd, list):
raise ValueError("Argument must be a list")
# Set to the first argument of list by default.
arg_index = 0
add_interpreter = True
if cmd[0] in ("sh", "perl", "python"):
add_interpreter = False
arg_index = 1
cmd[arg_index] = abs_schrodinger_path(cmd[arg_index])
if add_interpreter and os.path.isfile(cmd[arg_index]):
# Add the appropriate interpreter depending on file extension.
# This is maintained for backward compatibility.
(rootname, extension) = os.path.splitext(cmd[arg_index])
if extension == ".sh":
cmd.insert(0, "sh")
elif extension == ".py":
cmd.insert(0, os.path.basename(sys.executable))
elif extension == ".pl":
cmd.insert(0, "perl")
return cmd
[docs]def abs_schrodinger_path(prog_name):
"""
Search for `prog_name` in the $SCHRODINGER and $SCHRODINGER/utilities
directories. If found, returns `prog_name` prepended with the correct
directory path. If not found, `prog_name` is returned unchanged.
:param prog_name: name of the executable to search for
:type prog_name: str
"""
# use $SCHRODINGER or $SCHRODINGER/utilities as
# search path for the scripts.
search_path = [
os.environ['SCHRODINGER'],
os.path.join(os.environ['SCHRODINGER'], "utilities"),
]
# Check to see if the path is absolute or not
if not os.path.isabs(prog_name):
for directory in search_path:
newfile = os.path.join(directory, prog_name)
newfile_with_ext = newfile
if sys.platform == 'win32' and os.path.splitext(
newfile)[1] != ".exe":
newfile_with_ext = newfile + ".exe"
if os.path.exists(newfile):
return newfile
elif os.path.exists(newfile_with_ext):
# Modify the actual command to use absolute path
return newfile_with_ext
return prog_name
def _wrap_command(subprocess_function, cmd, **kwargs):
fixed_cmd = _fix_call_cmd(cmd)
# No need to use PYTHONIOENCODING like we do in Popen since
# you can't use subprocess.PIPE
if len(cmd) > 1 and kwargs.get('shell', False):
raise ValueError(
f"Cannot use shell=True with a with multi-part cmd: {cmd}")
try:
return subprocess_function(fixed_cmd, **kwargs)
except OSError as err:
if err.errno == errno.ENOENT:
# Ev:115437 Raise an exception that makes it clear WHICH file
# does not exist
raise OSError("No such file or directory: %s" % fixed_cmd[0])
raise
[docs]def call(cmd, **kwargs):
"""
Run a command with arguments and wait for it to return. Return the exit
status.
Look for executables in $SCHRODINGER and $SCHRODINGER/utilities, and
provide an interpreter if needed.
:type cmd: list of str
:param cmd: The command to be run, specified as a list of strings.
:rtype: int
:return: Exit status of the command.
"""
return _wrap_command(subproccall, cmd, **kwargs)
[docs]def check_call(cmd, **kwargs):
"""
Run a command with arguments and wait for it to return. Raises an exception
if the command exits with a non-zero exit status. Return the exit
status.
Look for executables in $SCHRODINGER and $SCHRODINGER/utilities, and
provide an interpreter if needed.
:type cmd: list of str
:param cmd: The command to be run, specified as a list of strings.
:rtype: int
:return: Exit status of the command.
:raises subprocess.CalledProcessError: If the exit code is not 0.
"""
return _wrap_command(subproccheckcall, cmd, **kwargs)
[docs]def check_output(cmd, **kwargs):
"""
Run a command with arguments and wait for it to return. Raises an exception
if the command exits with a non-zero exit status. Return the
Look for executables in $SCHRODINGER and $SCHRODINGER/utilities, and
provide an interpreter if needed.
:type cmd: list of str
:param cmd: The command to be run, specified as a list of strings.
:rtype: str
:return: StdOut of the command.
:raises subprocess.CalledProcessError: If the exit code is not 0.
"""
return _wrap_command(subproccheckoutput, cmd, **kwargs)
@functools.wraps(subprocess.run)
def run(cmd, **kwargs):
return _wrap_command(subprocess.run, cmd, **kwargs)
[docs]def Popen(cmd, **kwargs):
"""
A wrapper for the builtin subprocess module's Popen class. This function
will return a subprocess Popen instance, passing all kwargs on to the
underlying class.
The only difference is that it will automatically look for executables
in $SCHRODINGER and $SCHRODINGER/utilities, and provide an interpreter
if needed.
:type cmd: list of str
:param cmd:
The command to be run, specified as a list of strings.
"""
if 'env' in kwargs:
env = kwargs['env']
else:
env = os.environ.copy()
kwargs['env'] = env
return subprocPopen(_fix_call_cmd(cmd), **kwargs)
[docs]def kill_process(pid):
"""
Kill the process and all its children.
:param int pid: Process id to kill.
"""
try:
parent = psutil.Process(pid)
except (psutil.NoSuchProcess, psutil.AccessDenied):
# If the process doesn't exist anymore or we can't access it,
# we do nothing.
return
all_processes = [*parent.children(recursive=True), parent]
for p in all_processes:
try:
p.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Call to kill and actual killing of the process can differ, we wait for
# all process to finish. Add a timeout for safety measures.
psutil.wait_procs(all_processes, timeout=5)