#!/usr/bin/env python
"""
The sshconfig module contains code to configure Passwordless SSH.
Example:
A) Get the keypair to use using find_key_pair(..).
B) Check the keypair is okay before local host ssh setup using
check_key_pair(..).
C) If the keypair is not okay, generate keypair in local system using
configure_local_ssh_setup().
D) Setup passwordless SSH to desired host using install_remote_pubkey(..).
::
from schrodinger.utils import sshconfig
(priv_file, pub_file) = sshconfig.find_key_pair()
pub_key = sshconfig.check_key_pair(priv_file, pub_file)
if not pub_key:
pub_key = sshconfig.configure_local_ssh_setup(priv_file, pub_file)
sshconfig.install_remote_pubkey("bobio", "rajagopa", "blah-blah", pub_key)
"""
import base64
import getpass
import os
import re
import shutil
import stat
import sys
import warnings
from schrodinger.utils import fileutils
from . import subprocess
# Remove warning filter after BLDMGR-2748
with warnings.catch_warnings():
try:
import Crypto.pct_warnings
warnings.simplefilter("ignore", Crypto.pct_warnings.PowmInsecureWarning)
except ImportError: # once paramiko is updated, it doesn't depend on pycrypto
pass
import paramiko
GENERATE_KEY_BITS = 4096
##########################################################################
def _get_pub_key(pub_file):
"""
Get the public key from SSH keypair setup on your local machine.
:type pub_file: string
:param pub_file:
Public key file to read and get the data.
:returntype: string
:return:
public key as base64 string.
"""
pub_key_text = ""
try:
with open(pub_file) as f:
if sys.platform == "win32":
pub_footer = ""
for line in f:
line = line.rstrip()
if line.find("BEGIN ") != -1:
pub_key_text = "ssh-rsa "
continue
elif line.find("END ") != -1:
continue
elif line.find("Comment:") == 0:
continue
else:
pub_key_text += line
pub_key_text += pub_footer
else:
pub_key_text = f.read()
pub_key_text = pub_key_text.rstrip()
except OSError:
pass
return pub_key_text
def _convert_ppk_openssh(ppk_file, priv_file):
"""
Convert ppk to OpenSSH.
:type ppk_file: string
:param ppk_file:
file to convert the ppk format.
:type priv_file: string
:param priv_file:
file to write the OpenSSH format.
:raise OSError:
if plink_keygen binary to convert ppk to OpenSSH doesn't exist
:raise RuntimeError:
if the command to convert ppk to OpenSSH failed
"""
plink_keygen = os.path.join(os.environ["SCHRODINGER"],
r"utilities\plink-keygen.exe")
proc = subprocess.Popen([
plink_keygen, "-b",
"%d" % GENERATE_KEY_BITS, ppk_file, "-O", "private-openssh", "-o",
priv_file
],
stderr=subprocess.PIPE)
stderr = proc.communicate()[1]
if proc.returncode:
raise RuntimeError("Command to convert ppk to OpenSSH failed "
"with exit code - {}. Error is - {}"\
.format(proc.returncode, stderr))
def _convert_openssh_ppk(priv_file, ppk_file):
"""
Convert OpenSSH to ppk.
:type priv_file: string
:param priv_file:
file to convert the OpenSSH format.
:type ppk_file: string
:param ppk_file:
file to write the ppk format.
:raise OSError:
if plink_keygen binary to convert OpenSSH to ppk doesn't exist
:raise RuntimeError:
if the command to convert OpenSSH to ppk failed
"""
plink_keygen = os.path.join(os.environ["SCHRODINGER"],
r"utilities\plink-keygen.exe")
proc = subprocess.Popen([
plink_keygen, "-b",
"%d" % GENERATE_KEY_BITS, priv_file, "-O", "private", "-o", ppk_file
],
stderr=subprocess.PIPE)
stderr = proc.communicate()[1]
if proc.returncode:
raise RuntimeError("Command to convert OpenSSH to ppk failed "
"with exit code - {}. Error is - {}"\
.format(proc.returncode, stderr))
def _grep(filename, search_string):
"""
Grep the given search string in provided filename.
:type filename: string
:param filename:
filename to search the given pattern.
:type search_string: string
:param search_string:
Pattern to search in the given filename.
:returntype: bool
:return:
True if search string is found.
False if search string is not found or given filename doesn't exist.
"""
if not os.path.isfile(filename):
return False
pattern = re.compile(search_string)
with open(filename) as f:
for line in f:
if pattern.search(line):
return True
return False
############################################################################
[docs]def find_key_pair():
"""
Get the private and public key filename to use.
:returntype: tuple
:return:
A tuple containing private and public key filename.
"""
home = fileutils.get_directory_path(fileutils.HOME)
ssh_identity = os.environ.get("SCHRODINGER_SSH_IDENTITY", "")
if ssh_identity:
return (ssh_identity, ssh_identity + ".pub")
if sys.platform == 'win32':
priv_file = os.path.join(home, getpass.getuser() + ".ppk")
pub_file = os.path.join(home, getpass.getuser() + ".pub")
else:
priv_file = os.path.join(home, ".ssh", "id_rsa")
pub_file = os.path.join(home, ".ssh", "id_rsa.pub")
return (priv_file, pub_file)
[docs]def check_key_pair(priv_file, pub_file):
"""
Check the given private and public key file to make sure they match.
For Windows, the private key file is assumed to be in ppk understandable
format.
:type priv_file: string
:param priv_file:
Private key file.
:type pub_file: string
:param pub_file:
Public key file.
:returntype: string
:return:
base64 string containing public part of the key pair, on success
empty string otherwise
"""
pub_key = ""
try:
if sys.platform == 'win32':
ppk_file = priv_file
priv_file = ppk_file + "%s.openssh" % os.getpid()
_convert_ppk_openssh(ppk_file, priv_file)
# Compare public key from private key file and public key file
# to make sure they match.
k_priv = paramiko.RSAKey.from_private_key_file(priv_file)
orig_pub_key = _get_pub_key(pub_file)
pub_key_data = orig_pub_key.replace("ssh-rsa ", "")
pub_key_data = pub_key_data.split()[0]
k_pub = paramiko.RSAKey(data=base64.b64decode(pub_key_data))
if k_priv == k_pub:
pub_key = orig_pub_key
except:
# Catch and ignore cases where the key files don't exist (and possibly
# other cases). See PYTHON-3036.
pass
finally:
if sys.platform == 'win32':
try:
os.unlink(priv_file)
except OSError:
pass
return pub_key
[docs]def install_remote_pubkey(hostname, user, password, pubkey):
"""
Setup passwordless ssh to the given remote host.
:type hostname: string
:param hostname:
Setup passwordless ssh to this remote host.
:type user: string
:param user:
Specify the user to log in as on the remote machine.
:type password: string
:param password:
Password to use for logging into the remote machine.
:type pubkey: string
:param pubkey:
Public key to cache in 'authorized_keys' of the remote machine.
:raise paramiko.AuthenticationException:
if the authentication failed.
:raise paramiko.SSHException:
if there was an error connecting or establishing an SSH session.
:raise RuntimeError:
if the command to cached public key at remote host fail or
if the command to cache remote host's fingerprint in registry
fail (Windows).
"""
config_cmd = 'mkdir -p ~/.ssh && ' \
'echo "%s" >> ~/.ssh/authorized_keys && ' \
'chmod 600 ~/.ssh/authorized_keys && ' \
'chmod 700 ~/.ssh/' % pubkey
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname, username=user, password=password)
stdin, stdout, stderr = ssh.exec_command(config_cmd)
exit_status = stdout.channel.recv_exit_status()
stdin.flush()
stdin.channel.shutdown_write()
ssh.close()
if exit_status > 0:
raise RuntimeError("Remote command to cache public key failed "
"with exit status - {0}. Error is - {1}".\
format(exit_status, format('\n'.join(stderr.readlines()))))
cache_hostname_plink(user, hostname)
[docs]def cache_hostname_plink(user, hostname):
"""
Caches particular host name. This is intended to be run after we
already have set up public key on remote host.
:type hostname: string
:param hostname: name of host
:type user: string
:param user: username for remote host
"""
# This is required for windows to cache the remote host fingerprint
# in the registry similar to known_hosts list.
if sys.platform == 'win32':
if not hostname_in_registry(hostname):
proc = subprocess.Popen(['plink.exe', '-l', user, hostname, 'exit'],
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
stderr = proc.communicate(input=b'y')[1]
if not hostname_in_registry(hostname):
raise RuntimeError("Failed to cache fingerprint for {} in "
"in registry. Error: {}".format(
hostname, stderr))
[docs]def has_plink_ssh_host_key(prefix: str, hostname: str) -> bool:
"""
Check PuTTY registry entries for hostname key.
:param prefix: encryption type prefix (rsa2@22: or ssh-ed25519@22:)
:param hostname: name of machine for connecting
"""
import winreg
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\SimonTatham\PuTTY\SshHostKeys") as aKey:
winreg.QueryValueEx(aKey, prefix + hostname)[0]
except OSError:
return False
return True
[docs]def hostname_in_registry(hostname):
"""
If hostname is in cached registry, returns True. Only
relevant to call on Windows.
:param hostname: name of host referred to in plink
:type hostname: str
:return: bool
"""
if sys.platform != "win32":
return
for keytype in {"rsa2@22:", "ssh-ed25519@22:"}:
if has_plink_ssh_host_key(keytype, hostname):
return True
return False
[docs]def known_hostname(hostname):
"""
Checks if hostname has already been configured to use Passwordless SSH.
:param hostname: name of host
:type hostname: str
:return: bool
"""
if sys.platform == "win32":
return hostname_in_registry(hostname)
else:
# Check if hostname is listed in known_hosts file. Empty str will be
# returned if no occurences of hostname are found. '-H' specified so
# found keys are returned in hashed format.
try:
return bool(
subprocess.check_output(["ssh-keygen", "-H", "-F", hostname]))
except subprocess.CalledProcessError:
return False