import copy
from collections import defaultdict
from requests.exceptions import HTTPError
from schrodinger.application.livedesign import ld_export
from schrodinger.application.livedesign import login
from schrodinger.application.livedesign.live_report_widget import LiveReportType
from schrodinger.test import mock_ld_models
USERNAME = 'username'
DEFAULT_USER = 'Sam'
MockLiveReport = mock_ld_models.MockLiveReport
PROJ_IDS = mock_ld_models.PROJ_IDS
PROJ_NAME_ID_MAP = {f'project{proj_id}': proj_id for proj_id in PROJ_IDS}
PROJ_NAME_ID_MAP[login.GLOBAL_PROJECT_NAME] = login.GLOBAL_PROJECT_ID
DEFAULT_PROJ_ID = mock_ld_models.DEFAULT_PROJ_ID
FAKE_HOST = 'https://unit_test-8-5.dev.bb.schrodinger.com'
DEFAULT_ADMINS = ['admin', 'demo']
DEFAULT_USER_NAMES = DEFAULT_ADMINS + [DEFAULT_USER, 'alex']
DEFAULT_LR_TITLE = 'title'
DEFAULT_LR_OWNER = DEFAULT_USER_NAMES[2]
DEFAULT_LR_TAG = '112358'
TEST_LR_ID_1 = '1001'
TEST_LR_ID_2 = '1002'
LR_DEFAULT_VALUES = MockLiveReport(title=DEFAULT_LR_TITLE,
owner=DEFAULT_LR_OWNER,
tags=[DEFAULT_LR_TAG],
id_='10000')
LR_NO_TITLE = MockLiveReport(owner=DEFAULT_LR_OWNER,
tags=[DEFAULT_LR_TAG],
id_='10001')
LR_NO_OWNER = MockLiveReport(title='No Owner',
tags=[DEFAULT_LR_TAG],
id_='10002')
LR_NO_DIRECTORY = MockLiveReport(title='No directory',
owner=DEFAULT_LR_OWNER,
id_='10003')
LR_TEST_1 = MockLiveReport(title='Test report 1',
owner=DEFAULT_LR_OWNER,
id_=TEST_LR_ID_1)
LR_TEST_2 = MockLiveReport(title='Test report 2',
owner=DEFAULT_LR_OWNER,
id_=TEST_LR_ID_2)
LR_TYPE_DEVICE = MockLiveReport(title='DeviceLiveReport',
owner=DEFAULT_LR_OWNER,
id_='10004',
lr_type=LiveReportType.DEVICE)
STANDARD_LRS = [
LR_DEFAULT_VALUES, LR_NO_TITLE, LR_NO_OWNER, LR_NO_DIRECTORY, LR_TEST_1,
LR_TEST_2, LR_TYPE_DEVICE
]
# Attachment constants
KEY_FILE_TYPE = 'file_type'
KEY_FILE_NAME = 'file_name'
KEY_ID = 'id'
KEY_PROJECT_ID = 'project_id'
FT_IMAGE = 'IMAGE'
FT_ATTACHMENT = 'ATTACHMENT'
FT_3D = 'THREE_D'
[docs]class MockLDClient:
[docs] def __init__(self,
*args,
username=DEFAULT_USER,
password=None,
refresh_token=None,
version_string='8.1.0-SNAPSHOT',
host=FAKE_HOST,
compatibility_mode=login.LD_VERSION_COMPATIBILITY_MODE):
self._username = username
self._password = password
self._refresh_token = refresh_token
self._host = host
self._version_string = version_string
self._compatibility_mode = compatibility_mode
self._mode = str(login.LDMode.DRUG_DISCOVERY)
self._user_data = [{USERNAME: name} for name in DEFAULT_USER_NAMES]
self._task_result = None
self._project_name_id_map = dict(PROJ_NAME_ID_MAP)
self._attach_settings = []
self._models = set()
self._protocols = set(get_standard_protocols())
self.is_connected = True
# Populate projects
self._project_map = {}
self._admins = list(DEFAULT_ADMINS)
for project_id in PROJ_IDS:
name = 'project' + project_id
mock_project = mock_ld_models.MockLDProject(name, project_id)
self._project_map[name] = mock_project
# Populate live report list
self._live_reports = [copy.copy(lr) for lr in STANDARD_LRS]
# Make some owner values lowercase to test that sorting is case-
# insensitive
for idx, owner in enumerate(['matthew', 'Mark', 'Luke', 'John']):
title = f'livereport_{idx}'
lr = MockLiveReport(title=title,
owner=owner,
tags=[str(idx)],
id_=idx)
self._live_reports.append(lr)
# Create folders corresponding to live reports
self._folders = set()
folder_ids = {lr.tags[0] for lr in self._live_reports if lr.tags}
for folder_id in folder_ids:
folder_name = 'folder_' + folder_id
folder = mock_ld_models.MockFolder(name=folder_name, id_=folder_id)
self._folders.add(folder)
self._id_column_map = {}
self._lr_column_map = defaultdict(list)
self._folder_tree_data = []
[docs] def live_report(self, lr_id):
"""
Return the live report model associated with the specified live report
ID, if possible.
:raises HTTPError: if no live report on the server has the specified
live report ID
:param lr_id: a live report ID (string representation of a nonnegative
integer)
:type lr_id: str
:return: a live report, if possible
:rtype: mock_ld_models.MockLiveReport
"""
for lr in self._live_reports:
if lr.id == lr_id:
return lr
url = f'{self._host}/livedesign/api/live_reports/{lr_id}?_type=json'
raise HTTPError(f'400 Client Error: Bad Request for url: {url}')
[docs] def live_reports(self, project_ids=None):
if project_ids is None:
return self._live_reports
return [r for r in self._live_reports if r.project_id in project_ids]
[docs] def projects(self):
"""
Return a list of projects on this server that the user has permission to
edit.
"""
projects = []
user_is_admin = self._username in self._admins
for project in self._project_map.values():
if user_is_admin or project.id != login.GLOBAL_PROJECT_ID:
projects.append(project)
return projects
[docs] def list_folders(self, project_ids=None):
if project_ids is None:
return self._folders
return {f for f in self._folders if f.project_id in project_ids}
[docs] def setFolderTreeData(self, data):
"""
Set the value returned by `get_folder_tree_data()`.
This is a test method that does not correspond to a `LDClient` method.
"""
self._folder_tree_data = data
[docs] def export_to_maestro(self, lr_id):
return None
[docs] def get_folder_tree_data(self, project_id, toplevel_folder):
return self._folder_tree_data
[docs] def get_subfolders(self, folder_name, data):
"""
Return specified subfolders.
This method closely reproduces the behavior of the corresponding
`LDClient` method.
"""
if data['name'] != folder_name:
raise ValueError(f'Data name field does not match {folder_name}')
return {child['name']: child for child in data['children']}
[docs] def about(self):
return {login.VERSION_NUMBER: self.version_string}
[docs] def get_project_id_by_name(self, project_name):
"""
:param project_name: the name of a project on the LD server
:type project_name: str
:return: the project ID associated with the specified project; if none
is found, return an empty list (LDIDEAS-2910)
:rtype: int or list
"""
return self._project_name_id_map.get(project_name, [])
[docs] def get_protocol_id_by_name(self, protocol_name):
"""
:param protocol_name: the name of a protocol on the LD server
:type protocol_name: str
:return: the protocol ID associated with the specified protocol; if
none is found, return an empty list (LDIDEAS-2910)
:rtype: int or list
"""
for prot in self._protocols:
if prot.name == protocol_name:
return prot.id
return []
@property
def live_report_ids(self):
return {lr.id for lr in self._live_reports}
[docs] def create_live_report(self, live_report):
msg = 'Can only create live reports using valid live report instance.'
assert isinstance(live_report, MockLiveReport), msg
msg = 'Can only create live report if a valid project is specified.'
assert isinstance(live_report.project_id, (int, str)), msg
new_int_id = 1
while new_int_id in self.live_report_ids:
new_int_id += 1
live_report.id = str(new_int_id)
self._live_reports.append(live_report)
return live_report
[docs] def execute_live_report(self, live_report_id, params=None, max_tries=30):
return {'rows': {}, 'columns': {}}
[docs] def column_descriptors(self, live_report_id, column_id=None):
return []
@property
def mode(self):
return self._mode
[docs] def setMode(self, mode):
"""
Set the mode for this LD client instance.
This method is for testing purposes only, and has no analog in
`LDClient`.
:param mode: the new mode for this instance; must be one of the values
defined in `login.LDMode` or the string equivalent of same
:type mode: login.LDMode or str
"""
self._mode = str(mode)
[docs] def addProject(self, project_name, project_id=None):
"""
Add a project to the dictionary of projects stored on this client.
This method is for testing purposes only, and has no analog in
`LDClient`.
:raises ValueError: if a non-unique name or ID is supplied
:param project_name: the name of the new project
:type project_name: str
:param project_id: optionally, the ID of the new project; if none is
supplied, a unique ID will be generated automatically
:type project_id: int or NoneType
"""
if project_name in self._project_name_id_map.keys():
raise ValueError('The supplied project name is not unique.')
if project_id in self._project_name_id_map.values():
raise ValueError('The supplied project ID is not unique.')
if project_id is None:
int_ids = [int(val) for val in self._project_name_id_map.values()]
project_id = str(max(int_ids, default=0) + 1)
self._project_name_id_map[project_name] = project_id
[docs] def addProtocol(self, protocol):
"""
Add a protocol to the dictionary of protocols stored on this client.
This method is for testing purposes only, and has no analog in
`LDClient`.
:raises ValueError: if a protocol with a non-unique ID is supplied
:param protocol: a protocol
:type protocol: mock_ld_models.Model
"""
protocol_ids = {prot.id for prot in self._protocols}
if protocol.id in protocol_ids:
msg = f'The supplied protocol ID "{protocol.id}" is not unique.'
raise ValueError(msg)
self._protocols.add(protocol)
@property
def version_string(self):
return self._version_string
@version_string.setter
def version_string(self, version_string):
self._version_string = version_string
[docs] def list_users(self):
"""
:return: a list of dictionaries containing data about users registered
to the server
:rtype: list(dict)
"""
return list(self._user_data)
[docs] def add_columns(self, live_report_id, column_ids):
"""
Add the supplied list of columns to the specified live report.
:raises TypeError: if the supplied live report ID is not a string
:raises ValueError: if a column hasn't been created on this server yet
:param live_report_id: a live report ID
:type live_report_id: str
:param column_ids: a list of column IDs corresponding to columns that
should be added to the live report
:type column_ids: list(int)
"""
if not isinstance(live_report_id, str):
raise TypeError(f'Expected str, got "{live_report_id}"')
add_columns = []
for column_id in column_ids:
column = self._id_column_map.get(column_id)
if column is None:
msg = (f'No column found corresponding to ID "{column_id}".')
raise ValueError(msg)
else:
add_columns.append(column)
self._lr_column_map[live_report_id] += add_columns
return {}
[docs] def wait_and_get_result_url(self, task_id, timeout=1200):
"""
Wait on a running asynchronous task and return relevent data when it
completes.
:raises Exception: when timeout elapses before task completes (for the
purposes of this mock, an exception will be raised if
`task_id='timeout'`)
:raises ldclient.models.AsyncServiceTaskFailedError: when the status
from a completed task is not 'finished' (for the purposes of this
mock, an exception will be raised if `task_id='error'`)
:param task_id: ID of an asynchronous task
:type task_id: str
:param timeout: time (in seconds) to wait before raising an exception
:type timeout: int
:return: the path associated with the asynchronous task
:rtype: str
"""
if task_id == 'timeout':
raise Exception('Timed out while waiting for export to finish.')
if task_id == 'error':
msg = (f'An error has occurred during the export process: Async'
f' task {task_id}: Status = failed')
raise mock_ld_models.AsyncServiceTaskFailedError(msg)
return f'/api/import/async_task/{task_id}/result'
[docs] def get_task_result(self, result_url):
"""
Given the URL for a completed task, return a data dictionary containing
information about that task.
:param result_url: the path associated with the completed task; should
be the output from `wait_and_get_result_url()`
:type result_url: str
:return: a data dictionary from the completed task
:rtype: dict(str, object)
"""
if self._task_result is None:
return {
ld_export.LIVE_REPORT_URL: result_url,
ld_export.IMPORT_RESPONSE: []
}
return self._task_result
[docs] def setTaskResult(self, result):
"""
Set the value that will be returned by `get_task_result()`.
This is a debugging method that has no equivalent in `LDClient`.
:param result: the result dictionary to return from the method
`get_task_result()`. Setting this value to `None` reverts the
`get_task_result()` return value to a minimal default dictionary.
:type result: dict[str, object] or NoneType
"""
self._task_result = result
[docs] def getLRColumns(self, live_report_id):
"""
Return all columns that have been added to the specified live report.
:param live_report_id: a live report ID
:type live_report_id: str
:return: the columns associated with the specified live report
:rtype: list(mock_ld_models.MockFFC)
"""
return self._lr_column_map[live_report_id]
[docs] def get_or_create_attachment(self,
attachment_file_name,
file_type,
project_ids,
file_obj=None,
alternate_id=None,
remote_file_name=None):
"""
Find or create an attachment, then return a data dictionary.
"""
attach_map = {
KEY_ID: alternate_id or self._getUniqueAttachmentID(),
KEY_FILE_TYPE: file_type,
KEY_PROJECT_ID: project_ids,
KEY_FILE_NAME: remote_file_name or attachment_file_name
}
if attach_map not in self._attach_settings:
self._attach_settings.append(attach_map)
return attach_map
def _getUniqueAttachmentID(self):
"""
Return an attachment ID that is not currenlty used for any attachments.
This method is for testing purposes only, and has no analog in
`LDClient`.
:return: a unique attachment ID
:rtype: str
"""
attach_ids = {at_map[KEY_ID] for at_map in self._attach_settings}
for idx in range(len(attach_ids) + 1):
attach_id = str(idx)
if attach_id not in attach_ids:
return attach_id
[docs] def create_model(self, model):
"""
Mimic the behavior of `LDClient.create_model()` by simply adding the
supplied model to the set of known models.
If no model ID is defined, assign one.
:raise ValueError: if the supplied model has a non-unique ID
:param model: a model
:type model: mock_ld_models.Model
"""
model_ids = {m.id for m in self._models}
if model.id is None:
for id_idx in range(len(model_ids)):
new_id = str(id_idx)
if new_id not in model_ids:
model.id = new_id
break
if model.id in model_ids:
msg = f'The supplied model ID "{model.id}" is not unique.'
raise ValueError(msg)
self._models.add(model)
return model
[docs] def models(self):
"""
Convenience method to return all stored models.
This method is for testing purposes only, and has no analog in
`LDClient`.
:return: models stored on this object
:rtype: list[mock_ld_models.Model]
"""
return list(self._models)
[docs] def attachmentSettings(self):
"""
Convenience method to return all stored attachment settings maps.
This method is for testing purposes only, and has no analog in
`LDClient`.
:return: all attachment settings maps stored on this object
:rtype: list[dict[str, str]]
"""
return list(self._attach_settings)
[docs] def get_protocols_by_project_id(self, project_ids=None):
"""
Return a list of protocols associated with the supplied list of project
IDs. If none is provided, return all protocols.
"""
matching_protocols = []
project_ids = set(project_ids) if project_ids is not None else None
for protocol in self._protocols:
prot_proj_ids = set(protocol.project_ids)
if project_ids is None or bool(prot_proj_ids & project_ids):
matching_protocols.append(protocol)
return matching_protocols
[docs] def ping(self):
"""
:return: whether this fake `LDClient` instance is connected to a fake
LiveDesign server
:rtype: bool
"""
return self.is_connected
[docs]def get_standard_protocols():
"""
:return: a list of standard LD protocol mocks
:rtype: list[mock_ld_models.Model]
"""
project_ids = [login.GLOBAL_PROJECT_ID]
# Create mock Glide Docking Protocols
tvar_args = [
('READ_ONLY', 'FILE', 'pml_script'),
('READ_ONLY', 'FILE', 'Glide Runner Script'),
('ABSTRACT', 'SDF_FILE', 'SDF-FILE'),
('ABSTRACT', 'FILE', 'Grid File (.zip)'),
('ABSTRACT', 'FILE', 'Input File (.in)')
] # yapf: disable
template_vars = [
mock_ld_models.ModelTemplateVar(*args) for args in tvar_args
]
protocol_dock = mock_ld_models.Model('Glide Docking Protocol',
False,
False,
'admin',
'folder',
None,
project_ids,
template_vars,
description='',
id='0')
template_vars = [
mock_ld_models.ModelTemplateVar(*args) for args in tvar_args
]
protocol_dock_wlig = mock_ld_models.Model(
'Glide Docking Protocol with Reference Ligand',
False,
False,
'admin',
'folder',
None,
project_ids,
template_vars,
description='',
id='1')
# Create mock Realtime 3D protocol (for LD v8.9+)
template_vars = [
mock_ld_models.ModelTemplateVar('ABSTRACT', 'FILE', 'grid_file'),
mock_ld_models.ModelTemplateVar('ABSTRACT', 'FILE', 'reference'),
mock_ld_models.ModelTemplateVar('DEFAULT', 'FILE', 'input'),
mock_ld_models.ModelTemplateVar('READ_ONLY', 'FILE', 'script')
] # yapf: disable
protocol_3d = mock_ld_models.Model('Realtime 3D Protocol Without Overlay',
False,
False,
'admin',
'folder',
None,
project_ids,
template_vars,
description='',
id='2')
return [protocol_dock, protocol_dock_wlig, protocol_3d]