"""
Convenience functions for dealing with job server
    (starting, killing, writing config file).
"""
import datetime
import getpass
import json
import logging
import os
import posixpath
import shutil
import socket
import sys
import tempfile
import time
from collections import namedtuple
from contextlib import contextmanager
from unittest.mock import patch
from typing import Optional
import paramiko
import yaml
import psutil
import backoff
import re
from schrodinger.application.licensing.licadmin import hostname_is_local
from schrodinger.job import jobcontrol
from schrodinger.job.server import jsc
from schrodinger.utils import sshconfig
from schrodinger.utils import subprocess
SCHRODINGER_JOBSERVER_CONFIG_FILE = "SCHRODINGER_JOBSERVER_CONFIG_FILE"
SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY = "SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY"
ServerInfo = namedtuple('ServerInfo', [
    'hostname', 'schrodinger', 'job_server_directory', 'username', 'pid',
    'job_server_port'
])
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
LINUX_PATH = "/"
INI_CONTENTS = """[program:{proc_id}]
command = {cmd}
redirect_stderr = true
stdout_logfile = {log}
# Only give the process a second to start before declaring it failed.
startsecs = 1
# Don't retry if it doesn't start.
startretries = 0
"""
[docs]def running_compatibility_tests():
    """
    Returns true if we are running backwards compatibility tests.
    This function is duplicated in server_management.py for backwards compatibility.
    """
    return "JOBSERVER_SCHRODINGER" in os.environ 
[docs]def get_user(hostname):
    # case for mac machines which has this account to use 'buildbot'
    if "buildbot" in getpass.getuser():
        return "buildbot"
    return getpass.getuser() 
[docs]def get_supervisord_dir(ssh: paramiko.client.SSHClient, hostname: str) -> str:
    """
    Gets the remote directory to start or interact with the supervisord.
    :param ssh: ssh client connection established with the hostname.
    :param hostname: name of the host where supervisord is being setup.
    :returns: the remote supervisord directory.
    """
    cmd = ["echo", "~"]
    supervisord_dir = run_command(ssh, cmd, login=True)
    return f"{supervisord_dir.strip()}/supervisord-{hostname}" 
[docs]def supv(ssh: paramiko.client.SSHClient, hostname: str):
    """
    Gets the command to work with remote supervisord.
    :param ssh: ssh client connection established with the hostname.
    :param hostname: name of the host where supervisord is setup.
    :returns: the supervisorctl command to interact with remote supervisord.
    """
    base_supervisord_dir = get_supervisord_dir(ssh, hostname)
    cmd = [f"{base_supervisord_dir}/supervisord/venv/bin/supervisorctl"]
    cmd.extend(["-c", f"{base_supervisord_dir}/supervisord/supervisord.conf"])
    return cmd 
[docs]def get_ini_file(ssh: paramiko.client.SSHClient, hostname: str, proc_id: str):
    """
    Returns the complete path of the configuration file to manage the process
    under supervisord in the remote host.
    :param: ssh: ssh client connection established with the hostname
    :param: hostname: name of the host where supervisord is setup.
    :param: proc_id: program name to use as the basename of the configuration file.
    :returns: full path of the configuration file to manage the process under supervisord
              in the remote host.
    """
    return get_supervisord_dir(ssh,
                               hostname) + f"/supervisord/conf/{proc_id}.ini" 
[docs]def get_log_dir(ssh: paramiko.client.SSHClient, hostname: str):
    """
    Returns the complete path of the directory to access log files
    of processes being managed by supervisord in the remote host.
    :param: ssh: ssh client connection established with the hostname
    :param: hostname: name of the host where supervisord is setup.
    :returns: full path of the log directory.
    """
    return get_supervisord_dir(ssh, hostname) + "/supervisord/logs" 
[docs]def get_log_location(ssh, hostname, proc_id):
    """
    Returns the complete path of the log file corresponding to
    the managed process under supervisord.
    :param: ssh: ssh client connection established with the hostname
    :param: hostname: name of the host where supervisord is setup.
    :param: proc_id: program name to use as the basename of the log file.
    :returns: full path of the log file corresponding to the process.
    """
    return os.path.join(get_log_dir(ssh, hostname), f"{proc_id}.log") 
[docs]def get_tls_config(cert_dir):
    """
    :param cert_dir: path to wildcard certificates
    :type cert_dir: str
    :rtype: dict
    :returns: dict for webserver tls config
    """
    return dict(certificate_key_file=cert_dir + "/wild.schrodinger.com.key",
                certificate_chain_file=cert_dir + "/wild.schrodinger.com.pem") 
[docs]def get_job_server_directory(basedir: str, username: str, hostname: str) -> str:
    """
    Returns a unique path to a directory to store job server data in.
    :param basedir: prefix of path, usually tmpdir on remote
    :param hostname: hostname on which to create the job server directory
    :rparam: Path to a directory
    """
    tmp_base_dir = f"{basedir}/{username}"
    timestamp = datetime.datetime.now().strftime("%Y%m%d")
    prefix = f"jobserver.{timestamp}."
    if hostname_is_local(hostname):
        os.makedirs(tmp_base_dir, exist_ok=True)
        tmpdir = tempfile.mkdtemp(prefix=prefix, dir=tmp_base_dir)
        return tmpdir
    else:
        with get_ssh_client(hostname, username) as ssh:
            # Ensure the parent directory exists
            mkdir_all_cmd = ["mkdir", "-p", tmp_base_dir]
            run_command(ssh, mkdir_all_cmd)
            # Remote servers are all currently linux so should have the "mktemp" command available.
            mktemp_cmd = [
                "mktemp", "-d", f"--tmpdir={tmp_base_dir}", "-t",
                f"{prefix}XXXXXXXXX"
            ]
            tmpdir = run_command(ssh, mktemp_cmd)
            return tmpdir.strip() 
[docs]def job_server_exe(schrodinger):
    # Use / as pathsep because this constructs local and remote paths
    return LINUX_PATH.join(
        [schrodinger, "internal", "bin", "job_server", "jobserverd"]) 
[docs]def job_server_setup_exe(schrodinger):
    return LINUX_PATH.join(
        [schrodinger, "internal", "bin", "job_server", "jsc_admin"]) 
[docs]def run(schrodinger):
    return f"{schrodinger}/run" 
[docs]def get_job_server_config(job_server_directory):
    """
    Return a path for a server-specific job server config in the job server
    directory.
    """
    return posixpath.join(job_server_directory, "jobserver.test.config") 
[docs]def get_queue_type(hostname: str) -> str:
    """
    Return a queue type in the format for jsc_admin for a given hostname.
    """
    hostname = hostname.lower()
    if "slurm" in hostname:
        return "Slurm"
    elif "torque" in hostname:
        return "Torque"
    elif "lsf" in hostname:
        return "LSF"
    elif "pbs" in hostname:
        return "PBS"
    return "UGE" 
[docs]def setup_host(hostname, schrodinger, job_server_directory, username,
               serve_queue_jobs):
    """
    Set up authentication in a new directory.
    """
    cmd = [
        run(schrodinger),
        job_server_setup_exe(schrodinger),
    ]
    # Create a new job_server config directory on the host. Includes
    # authentication information
    setup_cmd = cmd + [
        'setup-server', '-host', hostname, "-dir", job_server_directory
    ]
    if serve_queue_jobs:
        setup_cmd.extend(["-queue", get_queue_type(hostname)])
    else:
        setup_cmd.extend(["-queue", "local"])
    with get_ssh_client(hostname, username) as ssh:
        run_command(ssh, setup_cmd) 
[docs]def setup_supervisord(hostname, username):
    """ Setup supervisord to start jobserver in given hostname"""
    with get_ssh_client(hostname, username) as ssh:
        base_supervisord_dir = get_supervisord_dir(ssh, hostname)
        venv_dir = f"{base_supervisord_dir}/supervisord/venv"
        try:
            run_command(ssh, ['ls', venv_dir])
        except RuntimeError:
            try:
                run_command(ssh, ["python3", "-mvenv", venv_dir])
            except RuntimeError:
                run_command(ssh, [
                    "/utils/bin/python2.7",
                    "/home/buildbot/scripts/virtualenv.py", venv_dir
                ])
        supervisorctl_path = f"{venv_dir}/bin/supervisorctl"
        try:
            run_command(ssh, ['ls', supervisorctl_path])
        except RuntimeError:
            run_command(
                ssh, ["bash", "-lc", f"{venv_dir}/bin/pip install supervisor"])
        else:
            logger.info("supervisorctl script exists - "
                        f"{supervisorctl_path}; skipping supervisord setup")
            run_supervisord(ssh, hostname)
            return
        with ssh.open_sftp() as ftp:
            with ftp.file(
                    f"{base_supervisord_dir}/supervisord/supervisord.conf",
                    "w") as fh:
                fh.write(f"""
[unix_http_server]
file={base_supervisord_dir}/supervisord/supervisor.sock
[supervisord]
logfile={base_supervisord_dir}/supervisord/supervisord.log
pidfile={base_supervisord_dir}/supervisord/supervisord.pid
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix://{base_supervisord_dir}/supervisord/supervisor.sock ; use a unix:// URL  for a unix socket
[include]
files = conf/*.ini
""")
        run_supervisord(ssh, hostname) 
[docs]def setup_server(hostname,
                 schrodinger,
                 job_server_directory,
                 username,
                 append=True,
                 licensing=False,
                 use_certs=False,
                 certs_dir=None,
                 use_shared_supervisors=False,
                 use_ldap=True,
                 use_socket_auth=True):
    """
    Setup new server on arbitrary ports.
    :param hostname: name of host to set up jobserver
    :type hostname: str
    :param schrodinger: path to SCHRODINGER
    :type schrodinger: str
    :param job_server_directory: base directory for job server
    :type job_server_directory: str
    :param username: username to use on hostname
    :type username: str
    :param append: If True, add server config to jobserver.config, If False,
        overwrite jobserver.config
    :type append: bool
    :param licensing: If True, pass licensing check params to queued jobs
    :type licensing: bool
    :param use_certs: If True, use wildcard certificates in standard internal
        locations
    :type user_certs: bool
    :param bool use_shared_supervisors: Use shared supervisor executables
    :param bool use_ldap: Enable LDAP authentication
    :param bool use_socket_auth: Enable unix socket authentication
    """
    # For local jobserver, this cannot be a localhost for linux and need to be
    # proper address that will map to the machine.
    if sys.platform.startswith("linux") and hostname == "localhost":
        hostname = socket.getfqdn()
    serve_queue_jobs = not hostname_is_local(hostname)
    if serve_queue_jobs:
        setup_host(hostname, schrodinger, job_server_directory, username,
                   serve_queue_jobs)
        setup_supervisord(hostname, username)
        if use_certs or certs_dir:
            with get_ssh_client(hostname, username) as ssh:
                try:
                    if certs_dir is None:
                        base_supervisord_dir = get_supervisord_dir(
                            ssh, hostname)
                        certs_dir = f"{base_supervisord_dir}/supervisord/cert"
                    run_command(ssh,
                                ["ls", f"{certs_dir}/wild.schrodinger.com.pem"])
                except RuntimeError:
                    raise RuntimeError(
                        f"You need a wildcard cert on {hostname} before "
                        "setting up jobserver under supervisord. You can "
                        "specify the directory with --certs-dir.")
        modify_jobserver_yml(hostname, job_server_directory, username,
                             licensing, use_shared_supervisors, certs_dir,
                             use_ldap, use_socket_auth)
    server = start_server(hostname,
                          schrodinger,
                          job_server_directory,
                          username,
                          serve_queue_jobs=serve_queue_jobs)
    if serve_queue_jobs:
        create_job_server_config(hostname, username, job_server_directory)
    return server 
[docs]def run_command(ssh, command, login=False):
    """
    Runs a command.
    :param ssh: a paramiko.SSHClient with an established connection to the
        remote machine. If ssh is None, the command will be invoked by
        subprocess.run
    :type ssh: paramiko.SSHClient
    :param command: The command to run as a list of string arguments
    :type command: list[str]
    :param login: If True, command requries login shell for ssh
    :type login: bool
    :return: The output of the executed command over ssh; None if local using
        subprocess
    :rtype: str, or None
    This function is duplicated in server_management.py for backward
    compatibility.
    """
    logger.info(f"Running {command}")
    if not ssh:
        subprocess.run(command, universal_newlines=True, check=True)
        return
    env = {"PYTHONIOENCODING": "utf-8"}
    command = subprocess.list2cmdline(command)
    if login:
        command = f"bash --login -c '{command}'"
    _, out, err = ssh.exec_command(command, environment=env)
    output = out.readlines()
    error = err.readlines()
    logger.info(f"Stdout: {output}")
    exit_status = out.channel.recv_exit_status()
    if exit_status:
        raise RuntimeError(
            f"{command} exited with {exit_status}; subprocess stderr: {error}")
    elif len(error) != 0:
        logger.info(f"Stderr: {error}")
    return '\n'.join(output) 
[docs]def setup_log_dir(log_dir, ssh=None):
    if ssh:
        run_command(ssh, ["mkdir", "-p", log_dir])
    else:
        if not os.path.exists(log_dir):
            os.makedirs(log_dir) 
[docs]def write_ini_file(ini_file, ini_contents, ssh=None):
    """ Write the configuration file to manage jobserver """
    logger.info(f"Writing {ini_file}")
    dirname = os.path.dirname(ini_file)
    if ssh:
        run_command(ssh, ["mkdir", "-p", dirname])
        with ssh.open_sftp() as ftp:
            with ftp.file(ini_file, "w") as fh:
                fh.write(ini_contents)
    else:
        if not os.path.exists(dirname):
            os.makedirs(dirname)
        with open(ini_file, "w") as fh:
            fh.write(ini_contents) 
[docs]def run_supervisord(ssh: paramiko.client.SSHClient, hostname: str):
    """
    Run the daemon with the configuration in the supervisord directory.
    :param ssh: a paramiko.SSHClient with an established connection to the
                remote machine.
    :param hostname: name of the host where supervisord is being setup.
    :raises RuntimeError: skip raising the exception if another program is
                          already listening on the port.
    """
    # This is to pick path for queue commands.
    base_supervisord_dir = get_supervisord_dir(ssh, hostname)
    try:
        run_command(ssh, [
            "bash", "-lc",
            f'"{base_supervisord_dir}/supervisord/venv/bin/supervisord" -c "{base_supervisord_dir}/supervisord/supervisord.conf"'
        ])
    except RuntimeError as e:
        if not "Another program is already listening" in str(e):
            raise 
[docs]def monitor_job_server_with_supervisord(cmd, hostname, job_server_directory,
                                        username):
    """
    monitor the given job server command under supervisord
    """
    proc_id = os.path.basename(job_server_directory)
    with get_ssh_client(hostname, username) as ssh:
        log = get_log_location(ssh, hostname, proc_id)
        conf = get_ini_file(ssh, hostname, proc_id)
        ini_contents = INI_CONTENTS.format(proc_id=proc_id,
                                           cmd=" ".join(cmd),
                                           log=log)
        setup_log_dir(get_log_dir(ssh, hostname), ssh)
        write_ini_file(conf, ini_contents, ssh)
        try:
            update_supervisord(ssh, hostname, proc_id)
            logger.info(f"Show status of newly started program {proc_id}")
            try:
                run_command(ssh, supv(ssh, hostname) + ["status", proc_id])
            except RuntimeError:
                logger.info("Supervisord status command failed with an error; "
                            "attempting to tail the process logs.")
                run_command(ssh, supv(ssh, hostname) + ["tail", proc_id])
                raise
        except:
            # cleanup the ini file on exception, so the next run doesn't pick this one.
            run_command(ssh, ["rm", "-rf", conf])
            raise
        return get_ports_from_file(job_server_directory, ssh=ssh) 
@contextmanager
def _get_file_handle(ssh, log_filename):
    """
    :param ssh: open ssh client, or None to read locally
    :type ssh: paramiko.SSHClient
    :param log_filename: path of file to open
    :type log_filename: str
    :rtype: yields readable file-like object
    """
    if ssh:
        with ssh.open_sftp() as ftp:
            with ftp.file(log_filename, "r") as fh:
                yield fh
    else:
        with open(log_filename) as fh:
            yield fh
[docs]def get_ports_from_file(job_server_directory, ssh=None):
    """
    Parse the output of jobserver to get the pid and ports.
    :param log_filename: path of file to open
    :type log_filename: str
    :param ssh: open ssh client, or None to read locally
    :type ssh: paramiko.SSHClient
    :rtype: tuple(int,int,int,int)
    """
    filename = job_server_directory + "/runstate"
    host = "localhost"
    if ssh:
        _, out, _ = ssh.exec_command("hostname")
        host = out.read().strip()
    logger.info(f"Reading {filename} from {host}")
    current_time = 0
    while True:
        try:
            with _get_file_handle(ssh, filename) as fh:
                return json.loads(fh.read())
        except FileNotFoundError:
            # wait for successful read for 60s
            if current_time < 60:
                sleep_time = 2
                logger.info(
                    f"Failed to read ports from {filename} from {host}, "
                    f"retry in {sleep_time}s")
                time.sleep(sleep_time)
                current_time += sleep_time
                continue
            raise 
[docs]def start_server(hostname,
                 schrodinger,
                 job_server_directory,
                 username,
                 serve_queue_jobs=False):
    """
    Start a job_server in the `schrodinger` directory on the `hostname`
    provided, using the `job_server_directory` as its local storage.
    """
    if not serve_queue_jobs:
        return start_localhost_server(hostname, schrodinger,
                                      job_server_directory)
    cmd = [
        run(schrodinger),
        job_server_exe(schrodinger),
        "--dir",
        job_server_directory,
        "--with-low-performance-db",
    ]
    runstate = monitor_job_server_with_supervisord(cmd, hostname,
                                                   job_server_directory,
                                                   username)
    return ServerInfo(hostname=hostname,
                      schrodinger=schrodinger,
                      job_server_directory=job_server_directory,
                      username=username,
                      pid=runstate['pid'],
                      job_server_port=runstate['job_server_port']) 
[docs]def create_job_server_config(hostname, username, job_server_directory):
    """
    Copy the user_authentication that is created automatically at server
    setup from the remote server machine to the local launch host and dump
    it into a job server config file.
    :param hostname: name of hostname where job server config is located
    :type hostname: str
    """
    with get_ssh_client(hostname, username) as ssh:
        with ssh.open_sftp() as ftp:
            runstate_file = posixpath.join(job_server_directory, "runstate")
            with ftp.file(runstate_file, "r") as fh:
                runstate = json.loads(fh.read())
            with ftp.file(runstate["user_certificate"], "r") as fh:
                auth = json.loads(
                    str(fh.read(), encoding="ascii", errors="strict"))
    jobport = runstate["job_server_port"]
    job_server_config_path = os.environ[SCHRODINGER_JOBSERVER_CONFIG_FILE]
    if os.path.exists(job_server_config_path):
        if os.path.getsize(job_server_config_path) > 0:
            with open(job_server_config_path) as fh:
                job_server_config = json.loads(fh.read())
        else:
            job_server_config = []
    else:
        job_server_config = []
    for server_config in job_server_config[:]:
        if hostname == server_config.get("hostname", ""):
            job_server_config.remove(server_config)
    job_server_config.append({
        "hostname": hostname,
        "jobport": jobport,
        "auth": auth
    })
    with open(job_server_config_path, "w") as fh:
        json.dump(job_server_config, fh) 
[docs]def modify_jobserver_yml(hostname,
                         job_server_directory,
                         username,
                         licensing,
                         use_shared_supervisors,
                         certs_dir=None,
                         use_ldap=True,
                         use_socket_auth=True):
    """
    Modify the jobserver config to respect licensing and web server
    certificates for a multi-user queue server.
    :param hostname: name of hostname where job server config is located
    :type hostname: str
    :param job_server_directory: path to job server on hostname
    :type get_job_server_directory: str
    :param licensing: If True, enable license checking on job_server
    :type licensing: bool
    :param certs_dir: If provided, use wildcard certificates from that
        directory.
    :type certs_dir: str
    :param bool use_shared_supervisors: Use shared supervisor executables
    :param bool use_ldap: Enable LDAP authentication
    :param bool use_socket_auth: Enable unix socket authentication
 """
    config_directory = LINUX_PATH.join([job_server_directory, "config"])
    jobserver_yml = LINUX_PATH.join([config_directory, "jobserver.yml"])
    jobserver_yml_orig = LINUX_PATH.join(
        [config_directory, "jobserver.yml.orig"])
    with get_ssh_client(hostname, username) as ssh:
        ssh.exec_command(f"cp {jobserver_yml} {jobserver_yml_orig}")
        with ssh.open_sftp() as ftp:
            with ftp.file(jobserver_yml_orig, "r") as fh:
                config = yaml.load(fh.read())
            config["server_mode"] = "multi-user"
            config["job_server"]["port"] = 0
            config["file_store"]["port"] = 0
            config["web_server"]["port"] = 0
            config["job_server"]["use_local_auth"] = use_socket_auth
            if use_ldap:
                config["ldap_auth"]["addr"] = "ldap1.schrodinger.com:636"
                config["ldap_auth"][
                    "bind_dn_template"] = "uid={{.User}},ou=people,dc=schrodinger,dc=com"
                config["ldap_auth"]["insecure_tls_skip_verify"] = True
            else:
                # This field is set by default in jsc_admin setup-server for
                # internal multi-user deployments so needs to be explicitly disabled.
                config["ldap_auth"]["addr"] = ""
            config["job_server"]["license_checking"] = bool(licensing)
            if certs_dir:
                config["web_server"]["tls"] = get_tls_config(certs_dir)
            if use_shared_supervisors:
                config["job_server"][
                    "supervisor_directory"] = "/nfs/working/builds/job_server_execs"
            if hostname.startswith("pdxgpu"):
                config["job_server"][
                    "schrodinger_hosts_hostname"] = "pdxgpusub1.schrodinger.com"
            # allow deployment of test job servers on test systems which don't
            # have enough fds
            config["file_descriptors"] = 1024
            with ftp.file(jobserver_yml, "w") as fh:
                fh.write(yaml.dump(config, default_flow_style=False)) 
[docs]def copy_server_log(server, destdir):
    """ Copy jobserverd log file """
    if hostname_is_local(server.hostname):
        log_directory = os.path.join(server.job_server_directory, "logs")
        for file in os.listdir(log_directory):
            shutil.copy(os.path.join(log_directory, file), destdir)
        return
    with get_ssh_client(server.hostname, server.username) as sshclient:
        with sshclient.open_sftp() as sftp:
            log_directory = server.job_server_directory + "/logs"
            for filename in sftp.listdir(log_directory):
                sftp.get(f'{log_directory}/{filename}', f'{destdir}/{filename}') 
[docs]def clean_localhost_server(server):
    """ Clean localhost jobserver  """
    return 
[docs]def clean_remotehost_server(ssh, server):
    """ Clean remotehost jobserver """
    # clean server files in supervisord
    proc_id = os.path.basename(server.job_server_directory)
    log_location = get_log_location(ssh, server.hostname, proc_id)
    ini_location = get_ini_file(ssh, server.hostname, proc_id)
    for filename in (ini_location, log_location):
        run_command(ssh, ["rm", "-rf", filename])
    update_supervisord(ssh, server.hostname, proc_id)
    return 
[docs]def cancel_active_jobs(server: ServerInfo):
    """
    Cancels all active jobs managed by the given localhost server. And waits for the job supervisor
    processes to exit before the function returns.
    """
    @backoff.on_predicate(backoff.expo, max_time=128)
    def wait_for_canceled_jobs(env):
        try:
            subprocess.check_output([jsc(server.schrodinger), "list"],
                                    stderr=subprocess.STDOUT,
                                    env=env)
        except subprocess.CalledProcessError as e:
            if 'No active jobs were found' in str(e.output):
                # This is to allow job_supervisor(s) to wrap up after its repective job is marked as
                # as completed.
                time.sleep(120)
                return True
            raise
    env = os.environ.copy()
    env["SCHRODINGER_LOCALHOST_JOBSERVER_DIRECTORY"] = server.job_server_directory
    env["SCHRODINGER_JOBSERVER_CONFIG_FILE"] = "not-existing"
    output = subprocess.check_output([jsc(server.schrodinger), "cancel", "all"],
                                     env=env)
    matches = re.findall("canceling job .*", str(output))
    # If there are active jobs that we initiated kill request, wait for the jobs to cancel
    # before we return.
    if len(matches) > 0 and not wait_for_canceled_jobs(env):
        raise RuntimeError(
            f"timed out in canceling localhost jobs managed by the server {server}"
        ) 
[docs]def kill_server(server: ServerInfo, cancel_jobs: Optional[bool] = False):
    """
    Kill the job_server on `server.hostname` that is using `server.job_server_directory`
    as its local storage.
    :param server: The metadata maintained about the managed jobserver.
    :param cancel_jobs: cancel the active jobs before killing the jobserver.
    """
    if hostname_is_local(server.hostname):
        if cancel_jobs:
            cancel_active_jobs(server)
        stop_localhost_server(server)
        if sys.platform != 'win32':
            clean_localhost_server(server)
        return
    with get_ssh_client(server.hostname, server.username) as ssh:
        proc_id = os.path.basename(server.job_server_directory)
        run_command(ssh, supv(ssh, server.hostname) + ["stop", proc_id])
        clean_remotehost_server(ssh, server)
        return 
[docs]def clean_server_dir(server):
    """
    Remove `job_server_directory` on `hostname`.
    """
    def print_job_supervisor_info(action, name, exc):
        """
        Get information about the job_supervisor/job_supervisord processes
        to know the job using it and preventing the cleanup. And raise the
        original exception.
        """
        basename = os.path.basename(name).lower()
        if basename == "job_supervisor.exe" or basename == "job_supervisord.exe":
            for proc in psutil.process_iter(attrs=["cmdline"]):
                if basename in proc.name().lower():
                    print(
                        f"DEBUG {name} running with command line {proc.info['cmdline']}"
                    )
                    break
        raise
    if hostname_is_local(server.hostname):
        shutil.rmtree(server.job_server_directory,
                      onerror=print_job_supervisor_info)
        return
    cmd = ["rm", "-r", server.job_server_directory]
    with get_ssh_client(server.hostname, server.username) as ssh:
        run_command(ssh, cmd) 
[docs]def update_supervisord(ssh: paramiko.client.SSHClient, hostname: str,
                       proc_id: str):
    """
    Re-read the configuration file corresponding to the given proces group name
    and restarts the supervised program if the configuration has changed on disk.
    :param ssh: ssh client connection established with the hostname.
    :param hostname: name of the host where supervisord is setup to monitor
                     the given supervised program.
    :param proc_id: group name of the supervised program.
    """
    run_command(ssh, supv(ssh, hostname) + ["update", proc_id]) 
[docs]def start_localhost_server(hostname, schrodinger, job_server_directory):
    run_command(
        None,
        [jsc(schrodinger), "local-server-start", "-dir", job_server_directory],
    )
    username = get_username_from_host_entry("localhost")
    runstate = get_ports_from_file(job_server_directory, ssh=None)
    return ServerInfo(
        hostname=hostname,
        schrodinger=schrodinger,
        job_server_directory=job_server_directory,
        username=username,
        pid=runstate['pid'],
        job_server_port=runstate['job_server_port'],
    ) 
[docs]def stop_localhost_server(server):
    """ Stop the jobserver running in localhost """
    run_command(
        None,
        [
            jsc(server.schrodinger), "local-server-stop", "-dir",
            server.job_server_directory
        ],
    )
    return 
@contextmanager
def _get_temp_ssh_key():
    if sys.platform != "win32":
        yield None
    else:
        with tempfile.NamedTemporaryFile(delete=False) as fh:
            temp_file = fh.name
        try:
            ppk_file, _ = sshconfig.find_key_pair()
            sshconfig._convert_ppk_openssh(ppk_file, temp_file)
            yield temp_file
        finally:
            os.remove(temp_file)
[docs]@contextmanager
def get_ssh_client(hostname, username):
    """
    Return ssh client for hostname. Closes ssh connection automatically.
    :param hostname: name of remote host
    :type hostname: str
    """
    if hostname_is_local(hostname):
        yield
    else:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        with _get_temp_ssh_key() as key_filename:
            logger.info(f"Connecting to {username}@{hostname}")
            ssh.connect(hostname, username=username, key_filename=key_filename)
            yield ssh
            logger.info(f"Disconnecting from {username}@{hostname}")
            ssh.close() 
[docs]def get_username_from_host_entry(host_entry_name):
    """
    Return username from host_entry_name to find the correct remote user. This
    user name is useful for directory creation, and remote authentication.
    :param host_entry_name: name of host entry (localhost, bolt-gpu)
    :type host_entry_name: str
    :rtype: str
    """
    host_entry = jobcontrol.get_host(host_entry_name)
    if host_entry.user:
        return host_entry.user
    username = getpass.getuser()
    if "+" in username:  # msys2 username on domain
        return username.split("+")[-1]
    return username 
[docs]@contextmanager
def schrodinger_jobserver_config():
    """
    Set job server configuration to a temporary location. Cleans up
    the file after use.
    This function is duplicated in server_management.py for backwards compatibility.
    """
    tmpfile, filepath = tempfile.mkstemp(prefix="")
    os.close(tmpfile)
    with patch.dict(os.environ, {SCHRODINGER_JOBSERVER_CONFIG_FILE: filepath}):
        try:
            yield
        finally:
            os.remove(filepath)