# -*- coding: utf-8 -*-
"""
Classes used for LiveDesign import GUIs.
Copyright Schrödinger, LLC. All rights reserved.
"""
from pathlib import Path
from schrodinger import get_maestro
from schrodinger import project
from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.Qt import QtGui
from schrodinger.structutils import analyze
from schrodinger.ui.qt.appframework2 import tasks
from schrodinger.utils import fileutils
from . import import_controller
from . import ld_base_classes
from . import ld_utils
from . import login
maestro = get_maestro()
LR_ID = ld_base_classes.LR_ID
LR_COLUMNS = ld_base_classes.LR_COLUMNS
LR_ROWS = ld_base_classes.LR_ROWS
LR_STRUCTS = ld_base_classes.LR_STRUCTS
WARNING_COLOR = ld_base_classes.WARNING_COLOR
# task state
IMPORTED_STS = 'imported_sts'
[docs]class ImportThreadRunner88(ld_base_classes.ImportExportThreadRunner):
"""
Import specified data from a LiveReport using the new `LDClient` import API.
"""
process_type = 'import'
[docs] def runMain(self, task):
curr_name = task.getName()
settings = task.settings()
self.ld_client = settings[login.CLIENT]
lr_id = settings[LR_ID]
lr_columns = settings[LR_COLUMNS]
col_ids = [col.id for col in lr_columns]
msg = f'{curr_name}: Importing from LiveDesign server.'
self.status(msg, timeout=0)
try:
with self.errorHandler():
file_str = self.ld_client.export_to_maestro(lr_id, col_ids)
except ld_base_classes.HandledError as err:
self.setErrorStatus(curr_name, str(err))
return tasks.Status.ERROR
imported_sts = self.getStructuresFromString(file_str)
entity_id_map = import_controller.get_st_entity_id_map(
self.ld_client, imported_sts, lr_id)
import_controller.cache_entity_IDs(imported_sts, entity_id_map)
selected_structs = settings[LR_STRUCTS]
selected_titles = set(struct.title for struct in selected_structs)
imported_sts = [
struct for struct in imported_sts if struct.title in selected_titles
]
settings[IMPORTED_STS] = imported_sts
msg = (f'{curr_name}: {len(imported_sts)} structures downloaded from'
' LiveDesign.')
self.status(msg, timeout=0)
return tasks.Status.DONE
[docs] def postProcess(self, task):
"""
If a task has successfully finished, perform synchronous post
processing. If connected to Maestro, import the structures into the
project table. Otherwise, write out to a MAE file.
:param task: the finished task
:type task: tasks.AbstractTaskWrapper
"""
if task.status() != tasks.Status.DONE:
return
imported_sts = task.settings()[IMPORTED_STS]
if not imported_sts:
msg = 'No structures found on specified live report.'
self.info(msg)
return
# Apply structure-related hash to ligands to be used in the event of a
# round trip scenario. PANEL-13713
for st in imported_sts:
ligs = analyze.find_ligands(st)
if len(ligs) == 1 and ligs[0].st.atom_total == st.atom_total:
ld_utils.apply_round_trip_hash(st)
if maestro:
group_name = import_structures_to_maestro(imported_sts)
# Set group title
pt = maestro.project_table_get()
for group in pt.groups:
if group.title == group_name:
group.title = f'ld_import_{task.getName()}'
break
msg = 'Structures have been imported into Maestro.'
else:
outfile = fileutils.tempfilename('ld_import_', '.mae')
with structure.StructureWriter(outfile) as writer:
writer.extend(imported_sts)
msg = f'Structures have been saved to file {outfile}'
self.info(msg)
[docs] def getStructuresFromString(self, file_str):
"""
Gets structures from the given string representation
:param file_str: string representation of a structure
:type file_str: str | None
:return: list(structure.Structure)
"""
sts = []
if file_str:
file_str = str(file_str, encoding='utf-8')
with structure.StructureReader.fromString(file_str) as reader:
sts = list(reader)
return sts
[docs]class ImportThreadRunner(ld_base_classes.ImportExportThreadRunner):
"""
Perform the actual import in a thread to not block Maestro.
"""
process_type = 'import'
[docs] def runMain(self, task):
"""
See tasks.BaseFunctionRunner for documentation.
"""
curr_name = task.getName()
settings = task.settings()
self.ld_client = settings[login.CLIENT]
lr_id = settings[LR_ID]
lr_columns = settings[LR_COLUMNS]
lr_rows = settings[LR_ROWS]
msg = f'{curr_name}: Importing from LiveDesign server.'
self.status(msg, timeout=0)
controller = import_controller.ImportController(self.ld_client)
# get tabular data
try:
with self.errorHandler():
col_ids = [col.id for col in lr_columns]
tabular_data_sts = controller.importTabularData(lr_id, col_ids)
except ld_base_classes.HandledError as err:
self.setErrorStatus(curr_name, str(err))
return tasks.Status.ERROR
if not tabular_data_sts:
status_msg = 'No structures were found in live report.'
self.status(status_msg,
timeout=0,
color=QtGui.QColor(WARNING_COLOR))
# TODO add live report url and tech support contact?
error_msg = status_msg + ''
self.error(error_msg)
return tasks.Status.FAILED
selected_structs = settings[LR_STRUCTS]
selected_titles = set(struct.title for struct in selected_structs)
tabular_data_sts = [
struct for struct in tabular_data_sts
if struct.title in selected_titles
]
# get 3d data if needed otherwise copies the 2D Data
def callback(col_name):
self.status('{}: Downloading 3D Data from column '
'"{}"'.format(curr_name, col_name),
timeout=0)
try:
with self.errorHandler():
imported_st_map = controller.import3DColumns(
lr_id, lr_columns, tabular_data_sts, lr_rows, callback)
except ld_base_classes.HandledError as err:
self.setErrorStatus(curr_name, str(err))
return tasks.Status.ERROR
# get attachments
# TODO download image/file attachments.
# count structures
num_proteins = num_ligands = 0
for groupname, group_sts in imported_st_map.items():
pose_id_st_map, protein_st_pose_id_map = group_sts
num_proteins += len(
protein_st_pose_id_map) - 1 # 'None' key proteins don't count
# FIXME: should check pose_id_st_map not protein_st_pose_id_map
# after refactoring
num_ligands += sum(map(len, list(protein_st_pose_id_map.values())))
self.status('{}: {} proteins and {} ligands downloaded from LiveDesign'
''.format(curr_name, num_proteins, num_ligands),
timeout=0)
# pass on structures for post-processing
task.settings()[IMPORTED_STS] = imported_st_map
return tasks.Status.DONE
[docs] def postProcess(self, task):
"""
If a task has successfully finished, perform synchronous post
processing. If connected to maestro, import the structures into the
project table. Otherwise, write out to a MAE file.
:param task: the finished task
:type task: tasks.AbstractTaskWrapper
"""
if task.status() == tasks.Status.DONE:
settings = task.settings()
imported_st_map = settings[IMPORTED_STS]
# Apply structure-related hash to be used in the event of a round
# trip scenario. PANEL-13713
for pose_map, _ in imported_st_map.values():
for st in pose_map.values():
ld_utils.apply_round_trip_hash(st)
lr_is_filtered = self._liveReportIsFiltered()
if maestro:
project_table_import(imported_st_map, lr_is_filtered)
else:
outfile = fileutils.tempfilename('ld_import_', '.mae')
save_to_disk(imported_st_map, lr_is_filtered, outfile)
self.info(f"Structures have been saved to file {outfile}")
def _liveReportIsFiltered(self):
"""
:return: whether the live report has column filters in effect
:rtype: bool
"""
lr_id = self.settings()[LR_ID]
metadata = self.ld_client.live_report_results_metadata(lr_id)
return metadata['stats']['filtered_rows'] > 0
[docs]def import_structures_to_maestro(sts):
"""
Imports structures into maestro all at once by way of a file
:param sts: Structures to import
:type sts: list(structure.Structure)
:return: Name of group that structures were imported to
:rtype: str
"""
pt = maestro.project_table_get()
with fileutils.tempfilename('ld_import_', '.mae') as outfile_path:
# Save outfile path name as this is how we identify the group created
group_name = Path(outfile_path).stem
with structure.StructureWriter(outfile_path) as writer:
writer.extend(sts)
pt.importStructureFile(outfile_path)
return group_name
[docs]def save_to_disk(imported_st_map, lr_is_filtered, outfile):
"""
Save the imported structures to disk.
:param imported_st_map: The imported structures, expressed as an ordered
dictionary by column name of pairs or ordered dictionaries
containing the pose_id: ligand and protein: pose_id mappings.
:type imported_st_map: {str:
({int: structure.Structure}, {structure.Structure: list(int)})}
:param lr_is_filtered: whether the live report that is exporting these
structures is filtered in some way
:type lr_is_filtered: bool
:param outfile: the name of the output file to create
:type outfile: str
"""
# this could be expanded to preserve grouping in saved files, but it's
# only for standalone testing
group_sep = mm.M2IO_SUBGROUP_SEPARATOR
unique_subgroup_ids = set()
with structure.StructureWriter(outfile) as writer:
for col_name, col_dicts in imported_st_map.items():
pose_id_st_map, protein_st_pose_id_map = col_dicts
col_name = col_name.replace(' ', '_')
# col names are unique, so we don't need to worry about unique id
subgroup_title = subgroup_id = col_name
for protein_st, pose_ids in protein_st_pose_id_map.items():
if protein_st is not None:
subgroup_title = col_name + group_sep + protein_st.title
subgroup_id = get_next_subgroup_id(subgroup_title,
unique_subgroup_ids)
set_group(protein_st, subgroup_title, subgroup_id)
writer.append(protein_st)
for pose_id in pose_ids:
ligand_st = pose_id_st_map.get(pose_id)
if lr_is_filtered and ligand_st is None:
# This may happen due to a bug (SS-28352) in
# `LDClient.pose_search()` when some poses are filtered
# from the live report (PANEL-15003), so if the LR is
# filtered, skip the pose ID silently
continue
elif ligand_st is None:
msg = f'Pose ID "{pose_id}" is not recognized.'
raise ValueError(msg)
set_group(ligand_st, subgroup_title, subgroup_id)
writer.append(ligand_st)
[docs]def project_table_import(imported_st_map, lr_is_filtered):
"""
Import structures into the project table, grouping them first by column
name, then by protein.
:param imported_st_map: The imported structures, expressed as an ordered
dictionary by column name of pairs or ordered dictionaries containing
the pose_id: ligand and protein: pose_id mappings.
:type imported_st_map: {str:
({int: structure.Structure}, {structure.Structure: int})}
:param lr_is_filtered: whether the live report that is exporting these
structures is filtered in some way
:type lr_is_filtered: bool
"""
pt = maestro.project_table_get()
all_imported_eids = []
for groupname, group_sts in imported_st_map.items():
pose_id_st_map, protein_st_pose_id_map = group_sts
col_group_eids = []
for protein_st, pose_ids in protein_st_pose_id_map.items():
prot_group_eids = []
subgroupname = 'No Protein'
if protein_st is not None:
prot_row = pt.importStructure(protein_st)
prot_group_eids.append(prot_row.entry_id)
subgroupname = protein_st.title
for pose_id in pose_ids:
ligand_st = pose_id_st_map.get(pose_id)
if ligand_st is None and lr_is_filtered:
# This may happen due to a bug (SS-28352) in
# `LDClient.pose_search()` when some poses are filtered
# from the live report (PANEL-15003), so if the LR is
# filtered, skip the pose ID silently
continue
elif ligand_st is None:
msg = f'Pose ID "{pose_id}" is not recognized.'
raise ValueError(msg)
lig_row = pt.importStructure(ligand_st)
prot_group_eids.append(lig_row.entry_id)
# None protein is for all poses that don't have a
# protein - which may be zero, which we cannot select.
# we also don't group these poses.
if protein_st is not None:
maestro_create_group(prot_group_eids, subgroupname, pt)
col_group_eids.extend(prot_group_eids)
# a column could be empty
if col_group_eids:
maestro_create_group(col_group_eids, groupname, pt)
all_imported_eids.extend(col_group_eids)
# change selection to all imported structures, to distinguish from
# pre-existing structures
project_table_select_only(all_imported_eids, pt)
[docs]def project_table_select_only(entry_ids, pt):
"""
shorthand for pt.selectRows with select_mode REPLACE
"""
if entry_ids:
pt.selectRows(project.REPLACE, entry_ids=entry_ids)
[docs]def maestro_create_group(entry_ids, name, pt):
"""
Creates a group from the current selection with the name specified. If
groups are included in the selection this will create nested groups.
"""
project_table_select_only(entry_ids, pt)
maestro.command('entrygroupcreatewithselected "{}"'.format(name))
[docs]def set_group(st, group_title, group_id):
"""
Set the group for a structure by setting the M2IO_DATA_SUBGROUP_TITLE and
M2IO_DATA_SUBGROUP_ID properties.
"""
st.property[mm.M2IO_DATA_SUBGROUP_TITLE] = group_title
st.property[mm.M2IO_DATA_SUBGROUPID] = group_id
[docs]def get_next_subgroup_id(subgroup_id_base, unique_subgroup_ids):
"""
Get the next unique subgroup id by appending an index to a given base name.
"""
subgroup_id = subgroup_id_base
n = 0
while subgroup_id in unique_subgroup_ids:
subgroup_id = subgroup_id_base + str(n)
n += 1
return subgroup_id