Source code for schrodinger.application.livedesign.ld_import

# -*- coding: utf-8 -*-
"""
Classes used for LiveDesign import GUIs.

Copyright Schrödinger, LLC. All rights reserved.
"""
from pathlib import Path

from schrodinger import get_maestro
from schrodinger import project
from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.Qt import QtGui
from schrodinger.structutils import analyze
from schrodinger.ui.qt.appframework2 import tasks
from schrodinger.utils import fileutils

from . import import_controller
from . import ld_base_classes
from . import ld_utils
from . import login

maestro = get_maestro()
LR_ID = ld_base_classes.LR_ID
LR_COLUMNS = ld_base_classes.LR_COLUMNS
LR_ROWS = ld_base_classes.LR_ROWS
LR_STRUCTS = ld_base_classes.LR_STRUCTS
WARNING_COLOR = ld_base_classes.WARNING_COLOR

# task state
IMPORTED_STS = 'imported_sts'


[docs]class ImportThreadRunner88(ld_base_classes.ImportExportThreadRunner): """ Import specified data from a LiveReport using the new `LDClient` import API. """ process_type = 'import'
[docs] def runMain(self, task): curr_name = task.getName() settings = task.settings() self.ld_client = settings[login.CLIENT] lr_id = settings[LR_ID] lr_columns = settings[LR_COLUMNS] col_ids = [col.id for col in lr_columns] msg = f'{curr_name}: Importing from LiveDesign server.' self.status(msg, timeout=0) try: with self.errorHandler(): file_str = self.ld_client.export_to_maestro(lr_id, col_ids) except ld_base_classes.HandledError as err: self.setErrorStatus(curr_name, str(err)) return tasks.Status.ERROR imported_sts = self.getStructuresFromString(file_str) entity_id_map = import_controller.get_st_entity_id_map( self.ld_client, imported_sts, lr_id) import_controller.cache_entity_IDs(imported_sts, entity_id_map) selected_structs = settings[LR_STRUCTS] selected_titles = set(struct.title for struct in selected_structs) imported_sts = [ struct for struct in imported_sts if struct.title in selected_titles ] settings[IMPORTED_STS] = imported_sts msg = (f'{curr_name}: {len(imported_sts)} structures downloaded from' ' LiveDesign.') self.status(msg, timeout=0) return tasks.Status.DONE
[docs] def postProcess(self, task): """ If a task has successfully finished, perform synchronous post processing. If connected to Maestro, import the structures into the project table. Otherwise, write out to a MAE file. :param task: the finished task :type task: tasks.AbstractTaskWrapper """ if task.status() != tasks.Status.DONE: return imported_sts = task.settings()[IMPORTED_STS] if not imported_sts: msg = 'No structures found on specified live report.' self.info(msg) return # Apply structure-related hash to ligands to be used in the event of a # round trip scenario. PANEL-13713 for st in imported_sts: ligs = analyze.find_ligands(st) if len(ligs) == 1 and ligs[0].st.atom_total == st.atom_total: ld_utils.apply_round_trip_hash(st) if maestro: group_name = import_structures_to_maestro(imported_sts) # Set group title pt = maestro.project_table_get() for group in pt.groups: if group.title == group_name: group.title = f'ld_import_{task.getName()}' break msg = 'Structures have been imported into Maestro.' else: outfile = fileutils.tempfilename('ld_import_', '.mae') with structure.StructureWriter(outfile) as writer: writer.extend(imported_sts) msg = f'Structures have been saved to file {outfile}' self.info(msg)
[docs] def getStructuresFromString(self, file_str): """ Gets structures from the given string representation :param file_str: string representation of a structure :type file_str: str | None :return: list(structure.Structure) """ sts = [] if file_str: file_str = str(file_str, encoding='utf-8') with structure.StructureReader.fromString(file_str) as reader: sts = list(reader) return sts
[docs]class ImportThreadRunner(ld_base_classes.ImportExportThreadRunner): """ Perform the actual import in a thread to not block Maestro. """ process_type = 'import'
[docs] def runMain(self, task): """ See tasks.BaseFunctionRunner for documentation. """ curr_name = task.getName() settings = task.settings() self.ld_client = settings[login.CLIENT] lr_id = settings[LR_ID] lr_columns = settings[LR_COLUMNS] lr_rows = settings[LR_ROWS] msg = f'{curr_name}: Importing from LiveDesign server.' self.status(msg, timeout=0) controller = import_controller.ImportController(self.ld_client) # get tabular data try: with self.errorHandler(): col_ids = [col.id for col in lr_columns] tabular_data_sts = controller.importTabularData(lr_id, col_ids) except ld_base_classes.HandledError as err: self.setErrorStatus(curr_name, str(err)) return tasks.Status.ERROR if not tabular_data_sts: status_msg = 'No structures were found in live report.' self.status(status_msg, timeout=0, color=QtGui.QColor(WARNING_COLOR)) # TODO add live report url and tech support contact? error_msg = status_msg + '' self.error(error_msg) return tasks.Status.FAILED selected_structs = settings[LR_STRUCTS] selected_titles = set(struct.title for struct in selected_structs) tabular_data_sts = [ struct for struct in tabular_data_sts if struct.title in selected_titles ] # get 3d data if needed otherwise copies the 2D Data def callback(col_name): self.status('{}: Downloading 3D Data from column ' '"{}"'.format(curr_name, col_name), timeout=0) try: with self.errorHandler(): imported_st_map = controller.import3DColumns( lr_id, lr_columns, tabular_data_sts, lr_rows, callback) except ld_base_classes.HandledError as err: self.setErrorStatus(curr_name, str(err)) return tasks.Status.ERROR # get attachments # TODO download image/file attachments. # count structures num_proteins = num_ligands = 0 for groupname, group_sts in imported_st_map.items(): pose_id_st_map, protein_st_pose_id_map = group_sts num_proteins += len( protein_st_pose_id_map) - 1 # 'None' key proteins don't count # FIXME: should check pose_id_st_map not protein_st_pose_id_map # after refactoring num_ligands += sum(map(len, list(protein_st_pose_id_map.values()))) self.status('{}: {} proteins and {} ligands downloaded from LiveDesign' ''.format(curr_name, num_proteins, num_ligands), timeout=0) # pass on structures for post-processing task.settings()[IMPORTED_STS] = imported_st_map return tasks.Status.DONE
[docs] def postProcess(self, task): """ If a task has successfully finished, perform synchronous post processing. If connected to maestro, import the structures into the project table. Otherwise, write out to a MAE file. :param task: the finished task :type task: tasks.AbstractTaskWrapper """ if task.status() == tasks.Status.DONE: settings = task.settings() imported_st_map = settings[IMPORTED_STS] # Apply structure-related hash to be used in the event of a round # trip scenario. PANEL-13713 for pose_map, _ in imported_st_map.values(): for st in pose_map.values(): ld_utils.apply_round_trip_hash(st) lr_is_filtered = self._liveReportIsFiltered() if maestro: project_table_import(imported_st_map, lr_is_filtered) else: outfile = fileutils.tempfilename('ld_import_', '.mae') save_to_disk(imported_st_map, lr_is_filtered, outfile) self.info(f"Structures have been saved to file {outfile}")
def _liveReportIsFiltered(self): """ :return: whether the live report has column filters in effect :rtype: bool """ lr_id = self.settings()[LR_ID] metadata = self.ld_client.live_report_results_metadata(lr_id) return metadata['stats']['filtered_rows'] > 0
[docs]def import_structures_to_maestro(sts): """ Imports structures into maestro all at once by way of a file :param sts: Structures to import :type sts: list(structure.Structure) :return: Name of group that structures were imported to :rtype: str """ pt = maestro.project_table_get() with fileutils.tempfilename('ld_import_', '.mae') as outfile_path: # Save outfile path name as this is how we identify the group created group_name = Path(outfile_path).stem with structure.StructureWriter(outfile_path) as writer: writer.extend(sts) pt.importStructureFile(outfile_path) return group_name
[docs]def save_to_disk(imported_st_map, lr_is_filtered, outfile): """ Save the imported structures to disk. :param imported_st_map: The imported structures, expressed as an ordered dictionary by column name of pairs or ordered dictionaries containing the pose_id: ligand and protein: pose_id mappings. :type imported_st_map: {str: ({int: structure.Structure}, {structure.Structure: list(int)})} :param lr_is_filtered: whether the live report that is exporting these structures is filtered in some way :type lr_is_filtered: bool :param outfile: the name of the output file to create :type outfile: str """ # this could be expanded to preserve grouping in saved files, but it's # only for standalone testing group_sep = mm.M2IO_SUBGROUP_SEPARATOR unique_subgroup_ids = set() with structure.StructureWriter(outfile) as writer: for col_name, col_dicts in imported_st_map.items(): pose_id_st_map, protein_st_pose_id_map = col_dicts col_name = col_name.replace(' ', '_') # col names are unique, so we don't need to worry about unique id subgroup_title = subgroup_id = col_name for protein_st, pose_ids in protein_st_pose_id_map.items(): if protein_st is not None: subgroup_title = col_name + group_sep + protein_st.title subgroup_id = get_next_subgroup_id(subgroup_title, unique_subgroup_ids) set_group(protein_st, subgroup_title, subgroup_id) writer.append(protein_st) for pose_id in pose_ids: ligand_st = pose_id_st_map.get(pose_id) if lr_is_filtered and ligand_st is None: # This may happen due to a bug (SS-28352) in # `LDClient.pose_search()` when some poses are filtered # from the live report (PANEL-15003), so if the LR is # filtered, skip the pose ID silently continue elif ligand_st is None: msg = f'Pose ID "{pose_id}" is not recognized.' raise ValueError(msg) set_group(ligand_st, subgroup_title, subgroup_id) writer.append(ligand_st)
[docs]def project_table_import(imported_st_map, lr_is_filtered): """ Import structures into the project table, grouping them first by column name, then by protein. :param imported_st_map: The imported structures, expressed as an ordered dictionary by column name of pairs or ordered dictionaries containing the pose_id: ligand and protein: pose_id mappings. :type imported_st_map: {str: ({int: structure.Structure}, {structure.Structure: int})} :param lr_is_filtered: whether the live report that is exporting these structures is filtered in some way :type lr_is_filtered: bool """ pt = maestro.project_table_get() all_imported_eids = [] for groupname, group_sts in imported_st_map.items(): pose_id_st_map, protein_st_pose_id_map = group_sts col_group_eids = [] for protein_st, pose_ids in protein_st_pose_id_map.items(): prot_group_eids = [] subgroupname = 'No Protein' if protein_st is not None: prot_row = pt.importStructure(protein_st) prot_group_eids.append(prot_row.entry_id) subgroupname = protein_st.title for pose_id in pose_ids: ligand_st = pose_id_st_map.get(pose_id) if ligand_st is None and lr_is_filtered: # This may happen due to a bug (SS-28352) in # `LDClient.pose_search()` when some poses are filtered # from the live report (PANEL-15003), so if the LR is # filtered, skip the pose ID silently continue elif ligand_st is None: msg = f'Pose ID "{pose_id}" is not recognized.' raise ValueError(msg) lig_row = pt.importStructure(ligand_st) prot_group_eids.append(lig_row.entry_id) # None protein is for all poses that don't have a # protein - which may be zero, which we cannot select. # we also don't group these poses. if protein_st is not None: maestro_create_group(prot_group_eids, subgroupname, pt) col_group_eids.extend(prot_group_eids) # a column could be empty if col_group_eids: maestro_create_group(col_group_eids, groupname, pt) all_imported_eids.extend(col_group_eids) # change selection to all imported structures, to distinguish from # pre-existing structures project_table_select_only(all_imported_eids, pt)
[docs]def project_table_select_only(entry_ids, pt): """ shorthand for pt.selectRows with select_mode REPLACE """ if entry_ids: pt.selectRows(project.REPLACE, entry_ids=entry_ids)
[docs]def maestro_create_group(entry_ids, name, pt): """ Creates a group from the current selection with the name specified. If groups are included in the selection this will create nested groups. """ project_table_select_only(entry_ids, pt) maestro.command('entrygroupcreatewithselected "{}"'.format(name))
[docs]def set_group(st, group_title, group_id): """ Set the group for a structure by setting the M2IO_DATA_SUBGROUP_TITLE and M2IO_DATA_SUBGROUP_ID properties. """ st.property[mm.M2IO_DATA_SUBGROUP_TITLE] = group_title st.property[mm.M2IO_DATA_SUBGROUPID] = group_id
[docs]def get_next_subgroup_id(subgroup_id_base, unique_subgroup_ids): """ Get the next unique subgroup id by appending an index to a given base name. """ subgroup_id = subgroup_id_base n = 0 while subgroup_id in unique_subgroup_ids: subgroup_id = subgroup_id_base + str(n) n += 1 return subgroup_id