"""
Interface to the active Maestro Project.
Projects may be accessed while within
Maestro or without Maestro from a python script that
imports this module. Usually when running from within Maestro
the Project object would be returned via maestro.project_table_get()
For a description of basic concepts that clarify the relationship between the
Workspace and Maestro Projects as well as when and how changes in one affect
the other, please see the Basic Concepts section in the Python Module Overview
located at schrodinger.com/pythonapi.
Adding a copy of a structure to a project can be done using ::
pt = maestro.project_table_get()
pt.importStructure(ct)
All row access is with respect to the order of
rows in the project (which can differ from the order of rows
in Maestro's Project Table since the table's rows can be sorted
and rearranged).
Column and Row indices start at 1.
There are 3 ways to refer to rows:
1. Internal project index in mmproj, they are effectively indices into an array.
These values are hidden from the user; avoid using them in scripts.
2. Project Table row number, as it appears in the left-most column of the PT in
Maestro. This number can change when rows are deleted or the table is sorted.
3. Entry ID, which is unique for each row and never changes. These values are
ints, but are often represented as strings, e.g. "1". Scratch entries will
have non-int entry IDs, but are not stored in the project table.
These are used in Maestro commands, ASLs, etc, and are visible to the user.
Iterating over all rows in the project is done using an expression of the form::
pt=maestro.project_table_get()
for row in pt.all_rows:
# Now do something with row
Iterating over selected rows in the project is done using an expression of
the form::
pt=maestro.project_table_get()
for row in pt.selected_rows:
# Now do something with row
Accessing each row of the project table returns a ProjectRow object. Common
operations on ProjectRow objects are to get (or set) a structure associated
with that row::
st = row.getStructure()
# and then update
row.setStructure(st)
Also common is accessing the project data. This can be done either by the
name of the project table column or the dataname (the latter being as it
appears in Maestro format files)::
energy = row.property['r_mmod_Relative_Potential_Energy-MMFF94']
New columns and new properties for existing entries can be added to the
project in a similar way. Note that the refreshTable() function is usually
required to be called to show the results of additions to the project table.
A project should be open by at most one process at any given time.
It is an error to open a project currently opened by another process.
Copyright Schrodinger LLC, All rights reserved.
"""
import os
import shutil
import tempfile
import time
from collections.abc import MutableMapping
from contextlib import contextmanager
from schrodinger import get_maestro
from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.infra import mmbitset
from schrodinger.infra import mmobject
from schrodinger.infra import mmproj
from schrodinger.infra import mmsurf
from schrodinger.infra import projectmodel
from schrodinger.project import surface as project_surface
from schrodinger.structutils import analyze
from schrodinger.ui import maestro_ui
ASCENDING = 1
DESCENDING = -1
# EV 52287 Windows: Property Merge dumps error messages to terminal
# EV 51784 gen_smiles.py and gen_smarts.py displaying python errors (NameError)
# These all were due to using incorrect module hierarchy to invoke
# project_table_update(). The invocation must match the import
maestro = get_maestro()
_in_maestro = bool(maestro)
# Try to import the maestro module. If it fails, simply assume that
# this module is being used outside of Maestro. If it succeeds, then there
# are additional actions we perform for certain project module tasks (mostly
# to make sure that the user can see the results).
_module_name = "project"
INVALID_PROJECT_HANDLE = -1
# Constants for select actions
ADD, REPLACE, INVERT, ALL, NONE = list(range(5))
# Constants for ProjectRow.in_workspace:
NOT_IN_WORKSPACE, IN_WORKSPACE, LOCKED_IN_WORKSPACE = list(range(3))
###########################################################################
# Options that can be enabled or disabled on a per Project instance basis
# Use the enable/disable method in the Project class.
###########################################################################
AUTOMATIC_CACHE_FREEING = list(range(1))
[docs]class EmptyIterator:
"""
Empty iterator. Useful if you have some object that returns
an iterator in most cases but in some cases does not. In the "not"
case you can return an empty iterator and the calling code can still use
a "for i in iterator" form as it will be a no-op.
"""
[docs] def __init__(self):
pass
[docs] def __len__(self):
"""
Needed to allow "if empty_iterator" to evaluate to False.
Halts the iteration. Can also be used to check to see
if the iterator exists or not.
"""
return 0
def __iter__(self):
"""
Make it possible to iterate without first having
to check to see if there is an iterator on which to iterate.
"""
return self
def __next__(self):
"""
We never have anything to iterate over. So simply stop iterating.
"""
raise StopIteration()
class _ProjectRowProperty(MutableMapping):
"""
Dictionary-like container of entry properties. These can be accessed
via the property name as it appears in the maestro file.
Property names must be m2io data names, which are in the format
'<type>_<author>_<property_name>', where '<type>' is a data type prefix,
'<author>' is a source specification, and '<property_name>' is the
actual name of the data.
The data type prefix can specified as 's' for string, 'i' for integer,
'r' for real and 'b' for boolean. The author specification should be
'user' for user created properties. The property name can have embedded
underscores.
"""
def __init__(self, row):
"""
Create an instance of the entry property container.
:param row: Entry row.
:type row: `ProjectRow`
"""
self._project_handle = row._project_handle
self._entry_index = row._entry_index
self._row = row
def _getPropertyType(self, prop):
"""
Given a property (data name), return the property type.
:raises KeyError if property does not exist.
:raises ValueError if property "user" name is passed in.
"""
try:
i = mmproj.mmproj_project_get_property_index(
self._project_handle, prop)
except mm.MmException:
raise KeyError("Could not find property '%s'" % prop)
# Since mmproj_project_get_property_index() will happily accept
# "user" names, check for them:
data_name = mmproj.mmproj_property_get_data_name(
self._project_handle, i)
if data_name != prop:
raise ValueError("User property names not supported as keys: '%s'" %
prop)
# Get column's property type
try:
data_type = \
mmproj.mmproj_property_get_data_type(self._project_handle,
i)
except mm.MmException:
raise RuntimeError("Could not get type for property '%s'" % prop)
return data_type
def __getitem__(self, prop):
"""
Return the given property value.
:param prop: Key object for property dict, must be a string starting
with s, r, i, or b.
:type prop: str
:raises KeyError: If the property is missing.
"""
# Will raise KeyError if property is not present:
data_type = self._getPropertyType(str(prop))
# Get the value of the correct type. If we cannot, raise KeyError
try:
if data_type == mm.M2IO_REAL_TYPE:
return mmproj.mmproj_index_entry_get_real_data(
self._project_handle, self._entry_index, prop)
elif data_type == mm.M2IO_BOOLEAN_TYPE:
val = \
mmproj.mmproj_index_entry_get_boolean_data(self._project_handle,
self._entry_index,
prop)
return bool(val) # return True or False
elif data_type == mm.M2IO_INT_TYPE:
return mmproj.mmproj_index_entry_get_integer_data(
self._project_handle, self._entry_index, prop)
elif data_type == mm.M2IO_STRING_TYPE:
return mmproj.mmproj_index_entry_get_string_data(
self._project_handle, self._entry_index, prop)
except mm.MmException:
raise KeyError("Could not retrieve property '%s'" % prop)
def __setitem__(self, prop, value):
"""
Set entry property value.
"""
try:
data_type = self._getPropertyType(prop)
except KeyError:
# The property does not exist. Assume the property name
# is a valid m2io data name and try to create it.
# Check whether the property name has invalid characters
# Note this doesn't mean it's necessarily a valid
# m2io data name
if mmproj.mmproj_entity_name_is_invalid("property", prop, False):
raise ValueError("The name \'%s\' is not a valid data name\n%s\n%s" % \
(prop,
"Form is [i,b,s,r]_<author>_<name>.",
"For example: i_herc_my_property"))
# If got here, property name is valid, will add a new property.
data_type = mm.m2io_get_type_by_name(prop)
# Set the value of the correct type
if data_type == mm.M2IO_REAL_TYPE:
mmproj.mmproj_index_entry_set_real_data(self._project_handle,
self._entry_index, prop,
value)
elif data_type == mm.M2IO_BOOLEAN_TYPE:
# Pass in as an integer
value = 1 if value else 0
mmproj.mmproj_index_entry_set_boolean_data(self._project_handle,
self._entry_index, prop,
value)
elif data_type == mm.M2IO_INT_TYPE:
mmproj.mmproj_index_entry_set_integer_data(self._project_handle,
self._entry_index, prop,
value)
elif data_type == mm.M2IO_STRING_TYPE:
mmproj.mmproj_index_entry_set_string_data(self._project_handle,
self._entry_index, prop,
value)
else:
raise ValueError('Unknown data_type: %s' % data_type)
def __delitem__(self, prop):
"""
Delete the given entry property.
"""
# A way to check for existence of the property, and to verify that
# it is specified as a "data" name. Can raise KeyError or ValueError:
self._getPropertyType(prop)
try:
mmproj.mmproj_index_entry_delete_property(self._project_handle,
self._entry_index, prop)
except mm.MmException:
raise RuntimeError("Could not delete property '%s'" % prop)
self._row._pt.update()
def keys(self):
"""
Return a list of data names of all properties available in this entry.
"""
# TODO an optimized mmproj API for this would be helpful
entry_props = []
for prop in self._row._pt.getPropertyNames():
try:
self[prop]
except KeyError:
# If this property is present in the PT, but is missing for
# this specific entry, skip it.
continue
else:
entry_props.append(prop)
return entry_props
def __len__(self):
return len(self.keys())
def __iter__(self):
return iter(self.keys())
[docs]class ProjectRow(object): # Must descend from object for properties to work
"""
ProjectRow allows access to the structure and properties of an entry
in the project. It is an access mechanism and not a completely
self-contained object - project row objects should be treated as
transient, they may become invalid by subsequent project operations.
This class represents a project entry.
Each row is linked to one entry, but row != entry
There are 3 ways to identify a row:
* ProjectRow.index: Internal project order, entry in project - NOT sorted
and NOT same as entry_id.
* ProjectRow.row_number: Table row position - changes when table is
sorted
* ProjectRow.entry_id: ID of the entry represented by this row, never
changes.
Normally it won't be necessary to create an explicit ProjectRow object,
one will be returned by indexing into a ProjectTable object
"""
# ***********************************************************************
# Constructor
# ***********************************************************************
[docs] def __init__(self, pt, row_index):
"""
Construct a ProjectRow for the given Project instance
based on an entry index.
"""
self._pt = pt
self._project_handle = pt.handle
self._entry_index = row_index
self._property = None
# ***********************************************************************
#
# ***********************************************************************
def __repr__(self):
"""
Return string representation
"""
return "ProjectRow: Project handle %d, row %d" % \
(self._project_handle, self._entry_index)
def __eq__(self, other):
"""
Test ProjectRow equality by project handle and entry index.
"""
try:
if (self._project_handle == other._project_handle) and \
(self._entry_index == other._entry_index):
return True
else:
return False
except AttributeError:
return False
def __ne__(self, other):
"""
Test ProjectRow inequality by project handle and entry index.
"""
return not self.__eq__(other)
# ***********************************************************************
#
# ***********************************************************************
[docs] def getStructure(self, props=True, copy=True, workspace_sync=True):
"""
:return: The entry's structure
:rtype: `structure.Structure`
:param props: Whether the associated PT properties are included in the
returned structure (default).
:type props: bool
:param copy: Whether to return a copy of the PT structure (default). If
set to False, returns the original CT for the entry. Such use should
in general be avoided, except as an optimization.
NOTE: With copy=False, when the returned CT is modified, the changes
are instantly propagated to the PT, but not the Workspace, and
changes to properties do not propagate. Unless it's certain that
properties didn't change, and that the structure is not included in
the Workspace, any changes to it should be followed up by a call to
setStructure().
:param workspace_sync: If this entry is included in Workspace, sync
the Workspace with Project Table before retreiving entry's
structure. As an optimization, when getStructure() is called
in a loop, call maestro.project_table_synchronize(), then
call getStructure() with workspace_sync=False for each entry.
WARNING: The current default (copy=True) is to make a duplicate
of the entry's structure. These will be marked for garbage
collection once they go out of scope in the calling code, but if,
for example, you are in a loop your memory usage will grow until
the loop is exited (and it may even take a while for it to drop
since garbage collection is not necessarily immediate). This can
cause large memory usage if, for example, you are iterating over
a large number entries. In some cases you may want to
explicitly delete the returned Structure. For example,
in a loop iterating over a large number of entries you may want to
delete the Structure while still in the loop (after you're
done processing the Structure) to prevent memory bloat.
"""
if _in_maestro and workspace_sync and self.in_workspace:
# EV 57529 - Make sure any outstanding Workspace changes
# have a chance to be synched with the Project. Otherwise,
# outstanding changes to the Workspace structure will be lost.
# Note: whether the project is actually modified depends on
# the user's Maestro projectsync preference. So it's possible
# the Workspace changes won't be placed back into
# the project. See docstring for project_table_synchronize().
maestro.project_table_synchronize()
if props:
ct = mmproj.mmproj_index_entry_get_ct_and_prop(
self._project_handle, self._entry_index)
else:
ct = mmproj.mmproj_index_entry_get_ct(self._project_handle,
self._entry_index)
# Make a copy to return a Python-managed Structure that the
# user will be in total control of Ev:59662
if copy:
ct_copy = mm.mmct_ct_duplicate(ct)
# Free up the memory used by the CT:
mmproj.mmproj_index_entry_free_ct_and_prop(self._project_handle,
self._entry_index, ct)
return structure.Structure(ct_copy)
else: # Return a reference to the ORIGINAL ct:
# Store so that we can free up the memory later
self._pt._cached_entries.add(self._entry_index)
struct = structure.Structure(ct)
struct._cpp_structure.releaseOwnership()
return struct
# ***********************************************************************
#
# ***********************************************************************
def _getStructure(self):
"""
See docstring for structure property
"""
import warnings
msg = "ProjectRow.structure API is deprecated; use ProjectRow.getStructure() instead."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
if _in_maestro:
# EV 57529 - Make sure any outstanding Workspace changes
# have a chance to be synched with the Project. Otherwise,
# outstanding changes to the Workspace structure will be lost.
# Note: whether the project is actually modified depends on
# the user's Maestro projectsync preference. So it's possible
# the Workspace changes won't be placed back into
# the project. See docstring for project_table_synchronize().
maestro.project_table_synchronize()
ct = mmproj.mmproj_index_entry_get_ct(self._project_handle,
self._entry_index)
# Store so that we can free up the memory later
self._pt._cached_entries.add(self._entry_index)
struct = structure.Structure(ct)
struct._cpp_structure.releaseOwnership()
return struct
# ***********************************************************************
#
# ***********************************************************************
[docs] def setStructure(self, struct, props=True, copy=True, sync_workspace=True):
"""
Set the structure of the entry to the specified structure.
If the entry is included in the Workspace, the Workspace CT will be
updated accordingly.
:param struct: Set the entry to this Structure object
:type struct: schrodinger.structure.Structure
:param copy: If True, a copy of the Structure (CT) is made and that
copy is used to set the entry. If False, the original Structure,
struct, is placed into the project table. Doing this hands off
control of struct and you should no longer use struct.
:type copy: bool
:param props: If True, update properties in the entry. If False,
properties are ignored.
:type props: bool
:param sync_workspace: Whether to update the maestro workspace
:type sync_workspace: bool
"""
# Note - related to 36858 make a copy of the CT by default because the
# entry will have complete control over this CT:
if copy:
struct = mm.mmct_ct_duplicate(struct)
if props:
# Update structure and properties:
mmproj.mmproj_index_entry_set_ct_and_prop(self._project_handle,
self._entry_index, struct)
else:
# Update structure only:
mmproj.mmproj_index_entry_set_ct(self._project_handle,
self._entry_index, struct,
0) # <- Ev:57721
# Update the Workspace if this entry is included:
if _in_maestro and self.in_workspace and sync_workspace:
maestro.command('synchronizeworkspacefromproject entry "%s"' %
self.entry_id)
# Store so that we can free up the memory later
# FIXME we need to do this only if copy is False, right?
self._pt._cached_entries.add(self._entry_index)
# ***********************************************************************
#
# ***********************************************************************
def _setStructure(self, struct, copy=True):
"""
See docstring for structure property
"""
import warnings
msg = "ProjectRow.structure API is deprecated; use ProjectRow.setStructure() instead."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
# Note - related to 36858 make a copy of the CT by default because the
# entry will have complete control over this CT:
if copy:
struct = mm.mmct_ct_duplicate(struct)
mmproj.mmproj_index_entry_set_ct(self._project_handle,
self._entry_index, struct,
0) # <- Ev:57721
# Store so that we can free up the memory later
self._pt._cached_entries.add(self._entry_index)
#####################################################################
# Treat 'structure' as a Python property
#####################################################################
structure = property(_getStructure,
_setStructure,
doc="""
This attribute is deprecated. Please use ProjectRow.getStructure() and
ProjectRow.setStructure() instead.
""")
# ***********************************************************************
#
# ***********************************************************************
def _userNameToDataName(self, prop):
"""
Given a property name (either user name or data name) return the
data name. If the property is not present, return the input.
Used to implement deperecated API for accessing properties by
user names, and will be removed in the future.
"""
try:
i = mmproj.mmproj_project_get_property_index(
self._project_handle, prop)
except mm.MmException:
return prop
return mmproj.mmproj_property_get_data_name(self._project_handle, i)
def __getitem__(self, prop):
"""
Deprecated, instead use:
value = ProjectRow.property[prop]
"""
data_name = self._userNameToDataName(prop)
return self.property.get(data_name)
# ***********************************************************************
#
# ***********************************************************************
def __setitem__(self, prop, value):
"""
Deprecated, instead use:
ProjectRow.property[prop] = value
"""
data_name = self._userNameToDataName(prop)
try:
self.property[data_name] = value
except RuntimeError as err:
# This original API ignores errors
Project._printDebug("ProjectRow::__setitem__ - %s" % err)
# ***********************************************************************
#
# ***********************************************************************
def __delitem__(self, prop):
"""
Deprecated, instead use:
del ProjectRow.property[prop]
"""
data_name = self._userNameToDataName(prop)
try:
del self.property[data_name]
except (KeyError, ValueError) as err:
# This original API ignores errors
Project._printDebug("ProjectRow::__delitem__ - %s" % err)
[docs] def inWorkspace(self):
"""
Obsolete. Use ProjectRow.in_workspace property instead.
"""
import warnings
msg = "ProjectRow.inWorkspace() is obslete. Use the in_workspace property instead."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return self.in_workspace
[docs] def includeOnly(self, scroll_to_row=False):
"""
Include this entry in the workspace and exclude all other entries.
:param scroll_to_row: If True, scroll to the included row in case it's out of view.
:type scroll_to_row: bool
"""
if _in_maestro:
cmd = 'entrywsincludeonly entry "%s"' % self.entry_id
maestro.command(cmd)
if scroll_to_row:
maestro_ui.MaestroHub.instance().scrollToEntryID.emit(
int(self.entry_id))
return
# We are not in maestro
# First remove all the included entries
# Do this in reverse order as the array shrinks
num_included_entries = \
mmproj.mmproj_project_get_included_entry_total(self._project_handle)
for i in range(num_included_entries, 0, -1):
entry = \
mmproj.mmproj_project_get_included_entry(self._project_handle,
i)
ws_state = \
mmproj.mmproj_index_entry_get_workspace_state(self._project_handle,
entry)
if ws_state != mmproj.MMPROJ_ENTRY_LOCKED_IN_WORKSPACE:
mmproj.mmproj_index_entry_set_workspace_state(
self._project_handle, entry,
mmproj.MMPROJ_ENTRY_NOT_IN_WORKSPACE)
# Include just this one entry
entry_index = \
mmproj.mmproj_project_entry_id_to_index(self._project_handle,
int(self.entry_id))
mmproj.mmproj_index_entry_set_workspace_state(
self._project_handle, entry_index, mmproj.MMPROJ_ENTRY_IN_WORKSPACE)
[docs] def selectOnly(self):
"""
Select this entry and de-select all other entries in the Project Table.
"""
self._pt.selectRows(REPLACE, entry_ids=[self.entry_id])
[docs] def delete(self):
"""
Delete this row/entry from the project.
"""
self._pt._deleteRowByIndex(self._entry_index)
# Methods for getting and setting the title of the entry:
# Added for consistency with Structure objects
def _getTitle(self):
return self.property['s_m_title']
def _setTitle(self, title):
self.property['s_m_title'] = title
title = property(_getTitle, _setTitle, doc="The title of the entry")
# Internal project index of the row (not visible to the user). Will change
# when rows above it are deleted, but not when table is re-sorted:
def _getProjectIndex(self):
"""
The index property is not the same as Project Table row.
This property indicates what order entries were added to the project.
This can change when an entry in the project is deleted.
"""
return self._entry_index
index = property(
_getProjectIndex,
doc="Internal Project index of the row. "
"This is different from Project Table row number or from entry ID.")
# Get the project table (visible) row number of this entry; will change if
# rows are deleted or table is re-sorted:
@property
def row_number(self):
"""
This is the Project Table row number, as it appears to the user in
Maestro. It is different from the internal row index.
"""
return mmproj.mmproj_table_get_entry_row(self._project_handle, 1,
self._entry_index)
# Get the entry ID string (it is different from the entry index); it
# does not change even if rows are deleted or table is re-sorted:
def _getEntryID(self):
return self.property[mm.M2IO_DATA_CT_ENTRY_ID] # 's_m_entry_id'
entry_id = property(_getEntryID, doc="Entry ID of the row")
# Get EntryGroup object for the group of this entry:
def _getGroup(self):
groupi = mmproj.mmproj_table_index_entry_get_group(
self._project_handle, 1, self._entry_index)
if groupi:
return EntryGroup(self._pt, groupi)
else:
return None
group = property(_getGroup, doc="EntryGroup for the row")
def _getEntryInWorkspace(self):
# FIXME need to fix the SWIG wrapper
inws = mmproj.mmproj_index_entry_get_workspace_state(
self._project_handle, self._entry_index)
# Possible returns:
# MMPROJ_ENTRY_NOT_IN_WORKSPACE (0)
# MMPROJ_ENTRY_IN_WORKSPACE (1)
# MMPROJ_ENTRY_LOCKED_IN_WORKSPACE (2)
return inws
def _setEntryInWorkspace(self, include):
# In Maestro we want to use the commands as it has to ensure
# other state in Maestro is properly kept in synch.
if _in_maestro:
if include == LOCKED_IN_WORKSPACE:
cmd = 'entrywsincludelock entry "%s"' % self.entry_id
elif include:
# True or IN_WORKSPACE
cmd = 'entrywsinclude entry "%s"' % self.entry_id
else:
# False or NOT_IN_WORKSPACE
cmd = 'entrywsexclude entry "%s"' % self.entry_id
maestro.command(cmd)
else: # Not in Maestro
mmproj.mmproj_index_entry_set_workspace_state(
self._project_handle, self._entry_index, include)
doc = "Inclusion state of the entry (NOT_IN_WORKSPACE/IN_WORKSPACE/LOCKED_IN_WORKSPACE)"
doc += "\nWARNING: This property should NOT be treated as a boolean."
in_workspace = property(_getEntryInWorkspace, _setEntryInWorkspace, doc=doc)
def _getEntryIsSelected(self):
issel = mmproj.mmproj_index_entry_get_select_state(
self._project_handle, self._entry_index)
return bool(issel) # More Pythonic to return True or False
def _setEntryIsSelected(self, select):
if select:
mmproj_select = mmproj.MMPROJ_ENTRY_SELECTED
else:
mmproj_select = mmproj.MMPROJ_ENTRY_NOT_SELECTED
try:
mmproj.mmproj_index_entry_set_select_state(self._project_handle,
self._entry_index,
mmproj_select)
except mm.MmException:
print("%s.is_select: Could not select row: %d" % \
(_module_name, self._entry_index))
self._pt.update()
is_selected = property(_getEntryIsSelected,
_setEntryIsSelected,
doc="Whether the entry is selected")
def _getEntryIsReadOnly(self):
read_only = mmproj.mmproj_index_entry_get_read_only(
self._project_handle, self._entry_index)
return bool(read_only)
def _setEntryIsReadOnly(self, is_read_only):
if is_read_only:
mmproj.mmproj_index_entry_set_read_only(
self._project_handle, self._entry_index,
mmproj.MMPROJ_ENTRY_READ_ONLY)
else:
mmproj.mmproj_index_entry_set_read_only(
self._project_handle, self._entry_index,
mmproj.MMPROJ_ENTRY_NOT_READ_ONLY)
return
read_only = property(_getEntryIsReadOnly,
_setEntryIsReadOnly,
doc="Whether the entry is read only or not")
def _getEntryIsDeletable(self):
deletable = mmproj.mmproj_index_entry_get_deletable(
self._project_handle, self._entry_index)
return bool(deletable)
def _setEntryIsDeletable(self, deletable):
if deletable:
mmproj.mmproj_index_entry_set_deletable(
self._project_handle, self._entry_index,
mmproj.MMPROJ_ENTRY_DELETABLE)
else:
mmproj.mmproj_index_entry_set_deletable(
self._project_handle, self._entry_index,
mmproj.MMPROJ_ENTRY_NOT_DELETABLE)
return
deletable = property(_getEntryIsDeletable,
_setEntryIsDeletable,
doc="Whether the entry is deletable or not")
#**********************************************************************
def _getCMSStructureReader(self):
"""
If there is no associated cms file, then we return an empty
iterator. This allows you to always just write a loop::
for i in cms_structure_reader:
and avoid checking if cms_structure_reader exists. If it
does not, the loop will do nothing.
If there is an associated cms file, then we return a
StructureReader that allows one to retrieve the data by iterating.
"""
cms_file = self.cms_file
if not cms_file:
return EmptyIterator()
sreader = structure.StructureReader(cms_file)
return (sreader)
cms_structure_reader = property(
_getCMSStructureReader,
doc=
"Return StructureReader for associated CMS file or EmptyIterator if there is no associated file"
)
def _getCMSFile(self):
"""
Returns the file instead of the reader. This is more convenient to use
than the reader in some cases.
If there is no associated cms file, then we return None.
"""
# If there is no cms property, then return None
cms_name = self.property.get('s_m_original_cms_file')
if cms_name is None:
return None
# Build up the path to the file.
# WARNING: Applications should avoid using
# mmproj_index_entry_get_additional_data_dir as the underlying
# location could change for any release. Our own python
# modules should exercise caution when using this function for
# the same reason.
additional_dir = \
mmproj.mmproj_index_entry_get_additional_data_dir(self._project_handle,
self.index)
return additional_dir + '/' + cms_name
cms_file = property(
_getCMSFile,
doc="Return associated CMS file or None if there is no associated file")
def _getSurfaces(self):
"""
Returns an iterator to the available surfaces
"""
return _EntrySurfaceIterator(self._pt, self.entry_id)
surfaces = property(
_getSurfaces,
doc="Return an interator to the surface objects available for this entry"
)
[docs] def newMolecularSurface(self, *args, **kwargs):
"""
Create a new molecular surface for this row
:param name: The name of the surface. Note that project rows require
all surfaces to be named uniquely. See `overwrite`.
:type name: str
:param asl: If given, the surface will only be created for atoms in the
structure that match the provided ASL. Note that only one of `asl` and
`atoms` may be given. If neither are given, then the surface will be
created for all atoms in the structure.
:type asl: str or NoneType
:param atoms: An optional list of atom numbers. If given, the surface
will only be created for the specified atoms. Note that only one of
`asl` and `atoms` may be given. If neither are given, then the
surface will be created for all atoms in the structure.
:type atoms: list or NoneType
:param resolution: The resolution of the surface, generally between
0 and 1. Smaller numbers lead to a more highly detailed surface.
:type resolution: float
:param probe_radius: The radius of the rolling sphere used to calculate
the surface. Defaults to 1.4 if `mol_surf_type` is
`surface.MolSurfType.Molecular` or `surface.MolSurfType.Extended`.
May not be given if `mol_surf_type` is `surface.MolSurfType.vdw`.
:type probe_radius: float
:param vdw_scaling: If given, all atomic radii will be scaled by the
provided value before the surface is calculated.
:type vdw_scaling: float
:param mol_surf_type: The type of surface to create.
:type mol_surf_type: `surface.MolSurfType`
:param overwrite: What to do if the new surface has the same name as an
existing surface for this project row. If True, the existing surface
will be overwritten. In False, a ValueError will be raised.
:type overwrite: bool
:return: The new surface
:rtype: `project_surface.Surface`
"""
return project_surface.ProjectSurface.newMolecularSurface(
self._pt, self, *args, **kwargs)
@property
def surface(self):
"""
A dictionary of all surfaces for this row. Keys are surface names and
values are `project_surface.Surface` objects.
:type: `project_surface.SurfaceDict`
"""
return project_surface.SurfaceDict(self._pt, self)
[docs] def moveToGroup(self, group_name):
"""
Move this entry to group. If group does not exist it will be
created.
:param group_name: Name of group to which to move this entry. If such
group doesn't exist, it will be created. Note, this is different
from the user-visible group title.
:type group_name: str
"""
# FIXME: Factor out duplication with Project.moveRowsToGroup().
# See PANEL-11205
# if group does not exist, create it first
group_exist = mmproj.mmproj_table_does_group_exist(
self._project_handle, 1, group_name)
if self._pt.isCurrentMaestroProject():
if not group_exist:
cmd = 'entrygroupcreate "%s" entry "%s"' % \
(group_name, self.entry_id)
else:
cmd = 'entrymovetogroup "%s" entry "%s"' % \
(group_name, self.entry_id)
maestro.command(cmd)
else:
if not group_exist:
title = group_name
mmproj.mmproj_table_add_entry_group(self._project_handle, 1,
group_name, title, False)
group_index = mmproj.mmproj_table_get_entry_group_index(
self._project_handle, 1, group_name)
mmproj.mmproj_table_index_entry_move_and_set_group(
self._project_handle, self._entry_index, group_index)
self._pt.update()
[docs] def ungroup(self):
"""
Remove this entry from its current group.
"""
if self._pt.isCurrentMaestroProject():
cmd = 'ungroupentries entry "%s"' % self.entry_id
maestro.command(cmd)
else:
size = \
mmproj.mmproj_table_get_row_total(self._project_handle, 1)
bs = mmbitset.Bitset(size=size)
entry_index = \
mmproj.mmproj_project_entry_id_to_index(self._project_handle,
int(self.entry_id))
mm.mmbs_on(bs, entry_index)
mmproj.mmproj_table_ungroup_entries(self._project_handle, 1, bs)
self._pt.update()
@property
def property(self):
"""
Dictionary-like container of entry properties. Keys are strings
of the form `type_family_name` as described in `structure.PropertyName`
documentation.
"""
if self._property is None:
self._property = _ProjectRowProperty(self)
return self._property
# ***************************************************************************
# ***************************************************************************
class _AllRowsIterator:
""" Iterator for all rows in the project or group """
def __init__(self, pt, group=None):
"""
:param pt: Project table instance
:type pt: `Project`
:param group: Group (if iterating over entries in a group)
:type group: `EntryGroup` or None (default)
"""
self._pt = pt
self._project_handle = pt.handle
self._group = group
def __iter__(self):
# We may need to use this to get only the visible table rows:
# num_rows = mmproj.mmproj_table_get_row_total(self._project_handle, table)
# But there should be a way the get thr ROW NUMBERS for those rows
# as opposed to just the number of visible table rows.
if self._group:
# We are iterating over rows in a group
for project_index in self._group._getAllEntryIndices():
if self._pt._auto_free_cache:
self._pt.freeCachedEntries()
yield ProjectRow(self._pt, project_index)
else:
# For each row in the table (position by table order):
num_rows = mmproj.mmproj_project_get_entry_total(
self._project_handle)
for row_position in range(1, num_rows + 1):
# Convert row position to project entry index:
project_index = mmproj.mmproj_table_get_row_entry_index(
self._project_handle, 1, row_position)
if self._pt._auto_free_cache:
self._pt.freeCachedEntries()
yield ProjectRow(self._pt, project_index)
def __len__(self):
""" Returns the total number of rows """
if self._group:
return mmproj.mmproj_table_get_group_entry_total(
self._project_handle, 1, self._group._group_index)
else:
return mmproj.mmproj_project_get_entry_total(self._project_handle)
class _SelectedRowsIterator:
""" Iterator for selected rows of the project or group"""
def __init__(self, pt, group=None):
"""
:param pt: Project table instance
:type pt: `Project`
:param group: Group (if iterating over entries in a group)
:type group: `EntryGroup` or None (default)
"""
self._pt = pt
self._project_handle = pt.handle
self._group = group
def __iter__(self):
# This always gets the selection from the project table order
# (not directly from the entry order on disk) and then maps the
# project table row to the project's entry index. Project table
# order can differ from the project order (which is what is on
# the disk) and that's why we do the mapping
# bitset of selected row indecies:
sel_bs = mmproj.mmproj_table_get_selected_rows(self._project_handle, 1)
if self._group:
group_entry_indices = self._group._getAllEntryIndices()
for sel_position in mmbitset.Bitset(sel_bs, manage_handle=False):
# sel_position is the table row index
# Convert row index into a project index:
project_index = mmproj.mmproj_table_get_row_entry_index(self._project_handle,\
1, sel_position)
if self._group:
if project_index in group_entry_indices:
if self._pt._auto_free_cache:
self._pt.freeCachedEntries()
yield ProjectRow(self._pt, project_index)
else:
if self._pt._auto_free_cache:
self._pt.freeCachedEntries()
yield ProjectRow(self._pt, project_index)
def __len__(self):
""" Returns the number of selected rows """
sel_bs = mmproj.mmproj_table_get_selected_rows(self._project_handle, 1)
if self._group:
group_entry_indices = self._group._getAllEntryIndices()
total = 0
for sel_position in mmbitset.Bitset(sel_bs, manage_handle=False):
project_index = mmproj.mmproj_table_get_row_entry_index(
self._project_handle,\
1, sel_position)
if project_index in group_entry_indices:
total += 1
return total
else:
return mm.mmbs_get_count(sel_bs)
# ***************************************************************************
# ***************************************************************************
class _IncludedRowsIterator:
"""
Iterator for included rows of the project or group
Order of iteration should be treated as "random". That is
the returned order is not necessarily the order in which entries were
included into the Workspace, nor is it necessarily the order
in which they appear in the Project Table.
"""
def __init__(self, pt, group=None):
"""
:param pt: Project table instance
:type pt: `Project`
:param group: Group (if iterating over entries in a group)
:type group: `EntryGroup` or None (default)
"""
self._pt = pt
self._project_handle = pt.handle
self._group = group
def __iter__(self):
total = \
mmproj.mmproj_project_get_included_entry_total(self._project_handle)
if self._group:
group_entry_indices = self._group._getAllEntryIndices()
# Do in reverse order in case this is being
# used to exclude entries. Probably bad idea to do
# this with the iterator anyway, but just in case
# someone tries. See mmproj.h for details.
for i in range(total, 0, -1):
entry_index = \
mmproj.mmproj_project_get_included_entry(self._project_handle, i)
if self._group:
if entry_index in group_entry_indices:
if self._pt._auto_free_cache:
self._pt.freeCachedEntries()
yield ProjectRow(self._pt, entry_index)
else:
if self._pt._auto_free_cache:
self._pt.freeCachedEntries()
yield ProjectRow(self._pt, entry_index)
def __len__(self):
"""Returns the number of included entries (rows)"""
if self._group:
group_entry_indices = self._group._getAllEntryIndices()
total = 0
tot = \
mmproj.mmproj_project_get_included_entry_total(self._project_handle)
for i in range(tot, 0, -1):
entry_index = \
mmproj.mmproj_project_get_included_entry(self._project_handle, i)
if entry_index in group_entry_indices:
total += 1
else:
total = \
mmproj.mmproj_project_get_included_entry_total(self._project_handle)
return total
# ***************************************************************************
# ***************************************************************************
[docs]class EntryGroup(object):
"""
A class which represents an entry group in the project table. Entry
groups are returned from the Project.group property.
The entry group itself has a number of properties:
collapsed - set or get the entry collapse/expand state
title - set or get the entry group title (What the user sees)
name - set or get the entry group name (Hidden unique name)
The following iterators are available which are very similar to
those available from the Project class but only operate on the
entries in the group:
all_rows - an iterator for all the rows in the group
selected_rows - an iterator for selected rows in the group
included_rows - an iterator for included rows in the group
"""
[docs] def __init__(self, pt, group_index):
self._project_handle = pt.handle
self._pt = pt
self._group_index = group_index
project = projectmodel.schrodinger.MM_Project(self._pt)
self._rowmodel = project.getRowModel()
def __index__(self):
return self._group_index
def _getName(self):
"""
Return the unique ID (name) of this group. NOTE: This is is different
from the group title (which is displayed in Maestro).
"""
return mmproj.mmproj_table_get_entry_group_name(self._project_handle, 1,
self._group_index)
def _setName(self, gname):
"""
Change the unique ID (name) of this group. NOTE: This does not update
the user-visible group title.
"""
mmproj.mmproj_table_set_entry_group_name(self._project_handle, 1,
self._group_index, gname)
self._pt.update()
return
# Treat unique 'name' as a Python property
# FIXME: Do we really want to make this property so accessible?
name = property(
_getName, _setName, None,
"Get and set the group name (NOTE: this is different from user-visible group title)"
)
def _getParent(self):
"""
Return the name of parent group, empty if group is under root.
"""
_, parent_group_id = self._rowmodel.getParentGroupId(self._getName())
return parent_group_id
[docs] def getParentGroup(self):
"""
Return the parent EntryGroup, or None if this group is top-level.
:return: Parent entry, or None
:rtype: EntryGroup or None
"""
parent_gid = self._getParent()
if parent_gid == "":
return None
return self._pt.groups[parent_gid]
def __str__(self):
# FIXME: Should this really return the name as opposed to title?
return self._getName()
def _getTitle(self):
"""
Return the title of this group. This is the user-visible "name" of the
group.
"""
return mmproj.mmproj_table_get_entry_group_title(
self._project_handle, 1, self._group_index)
def _setTitle(self, gtitle):
"""
Set the title of this group. This is the user-visible "name" of the
group.
"""
mmproj.mmproj_table_set_entry_group_title(self._project_handle, 1,
self._group_index, gtitle)
self._pt.update()
return
# Treat 'title' as a Python property
title = property(
_getTitle, _setTitle, None,
"Get and set the title of this group, as displayed in the PT.")
def _getCollapsed(self):
"""
Return the collapsed state of this group
"""
return mmproj.mmproj_table_index_is_entry_group_collapsed(
self._project_handle, 1, self._group_index)
def _setCollapsed(self, collapsed):
"""
Set the collapsed state of this group
"""
mmproj.mmproj_table_index_set_entry_group_collapsed(
self._project_handle, 1, self._group_index, collapsed)
self._pt.update()
return
# Treat 'collapsed' as a Python property
collapsed = property(_getCollapsed, _setCollapsed, None,
"Get and set the collapsed state of this group")
# all_rows - dynamic list of all rows in this project (table order)
def _getAllRowsIterator(self):
return _AllRowsIterator(self._pt, self)
all_rows = property(
_getAllRowsIterator,
doc=
"Iterator for all rows in the group using the visible project table order"
)
def _getAllEntryIndices(self):
"""
Return all entry indices of the group, it also includes entries of
children groups.
"""
return self._rowmodel.getGroupAllEntryIndices(self._getName())
# selected_rows - dynamic list of selected rows (table order)
def _getSelectedRowsIterator(self):
return _SelectedRowsIterator(self._pt, self)
selected_rows = property(
_getSelectedRowsIterator,
doc=
"Iterator for the selected rows in the group usig the visible project table order"
)
def _getIncludedRowsIterator(self):
"""
Private method. The property should be used to access the iterator.
"""
return _IncludedRowsIterator(self._pt, self)
# The property
included_rows = property(
_getIncludedRowsIterator,
doc=
"Iterator for all included rows in the group. Order should be treated as random"
)
class _EntryGroupIterator:
"""
Iterator for entry groups in the project
Order of iteration should be treated as "random". That is
the returned order is not necessarily necessarily the order
in which they appear in the Project Table.
"""
def __init__(self, pt):
self._project_handle = pt.handle
self._pt = pt
def __iter__(self):
total = mmproj.mmproj_table_get_group_total(self._project_handle, 1)
for i in range(1, total + 1):
yield EntryGroup(self._pt, i)
def __len__(self):
"""Returns the number of groups """
total = mmproj.mmproj_table_get_group_total(self._project_handle, 1)
return total
def __getitem__(self, group_id):
"""
Return the EntryGroup object for the group with the specified group
ID (name).
:param group_id:
:type group_id: str
:return: Group object
:rtype: EntryGroup
"""
for group in self:
if group.name == group_id:
return group
raise KeyError('No group with ID "%s" found' % group_id)
############################################################################
[docs]class EntrySurface(object):
"""
A class for accessing the surfaces associated with a given entry. This
class will usually only be created from the EntrySurfaceIterator
"""
[docs] def __init__(self, pt, surface_name, surface_handle):
self._project_handle = pt.handle
self.name = surface_name
self.surface_handle = surface_handle
def _getSurfaceIncluded(self):
"""
A private method that returns a boolean to indicate if the surface
is included in the Workspace.
"""
return mmsurf.mmsurf_get_visibility(self.surface_handle)
# The property for whether the surface is included:
included = property(_getSurfaceIncluded,
doc="Whether the surface is included in the Workspace.")
class _EntrySurfaceIterator:
"""
An iterator for the surfaces associated with a given entry.
"""
def __init__(self, pt, entry_id):
self._pt_handle = pt.handle
self._pt = pt
self._entry_id = entry_id
def __iter__(self):
surf_name = None
if not mmproj.mmproj_entry_get_has_vis(self._pt_handle, self._entry_id):
return
while (1):
if surf_name is None:
try:
surf_name = mmproj.mmproj_entry_get_first_surface_name(
self._pt_handle, self._entry_id)
except mm.MmException:
# If you delete the last surface in the entry, the has_vis
# flag does not get toggled
return
else:
try:
surf_name = mmproj.mmproj_entry_get_next_surface_name(
self._pt_handle, self._entry_id, surf_name)
except:
return
surf_handle = mmproj.mmproj_entry_get_surface(
self._pt_handle, self._entry_id, surf_name)
yield EntrySurface(self._pt, surf_name, surf_handle)
##############################################################################
##############################################################################
##############################################################################
[docs]class Project(mmobject.MmObject):
"""
Class to handle Maestro Projects. This is largely a wrapper to
the underlying C library which stores all the state information.
The Project class allows selection of rows in the project via
various interfaces.
A Maestro project may be accessed from within a Maestro session
which has it open via maestro.project_table_get().
A Maestro project may alternatively be accessed without Maestro running
by specifying a the name of the project when creating a project object.
See the doc string for the module for more details.
"""
# ***************
# Class variables
# ***************
# This dict will keep track of the number of objects for each handle, so
# we know when to call mmproj_proj_close. This allows someone to use
# foo = Project(1) as a way of gaining access to a garbage
# collected project handle. This is all handled in the
# MmObject base class.
# Required by MmObject base class
_instances = {}
# Controls whether debugging output is generated or not
# On by default. Class level, not instance.
_debug = False
# ***********************************************************************
# Static method. Enable or disable debug output
# ***********************************************************************
[docs] def setDebug(state):
"""
Enable or disable debug output. Use False or True.
Sets this for the class, i.e. affects all instances,
current and future.
"""
# Set at the class level, not for a given instance
Project._debug = state
setDebug = staticmethod(setDebug)
# ***********************************************************************
# Static printing method. Print out debug if enabled
# *******************************************************************
def _printDebug(*args):
"""
Print debugging output if enabled
"""
if Project._debug:
print(args)
_printDebug = staticmethod(_printDebug)
# ***********************************************************************
#
# ***********************************************************************
[docs] def enable(self, option=None):
"""
Enable the specified option. Currently only
AUTOMATIC_CACHE_FREEING is available. This will
cause any cached entries to be freed automatically
after each iteration when using the all_rows iterator.
Other iterators do not allow automatic cache
freeing to be enabled.
"""
if (option == AUTOMATIC_CACHE_FREEING):
self._auto_free_cache = True
return
# ***********************************************************************
#
# ***********************************************************************
[docs] def disable(self, option=None):
"""
Disable the specified option. Currently only
AUTOMATIC_CACHE_FREEING is available. See enable() for
details.
"""
if (option == AUTOMATIC_CACHE_FREEING):
self._auto_free_cache = False
return
# ***********************************************************************
# Static initialization method. Used to initialize various mmlibs
# ***********************************************************************
[docs] def initialize(error_handler=None):
"""
Initialize necessary mmlibs (which will also implicitly
initialize all the dependent mmlibs)
"""
if error_handler is None:
error_handler = mm.error_handler
Project._printDebug("initialize(): Initializing mmlibs")
mmproj.mmesl_initialize(error_handler)
mmproj.mmproj_initialize(error_handler)
initialize = staticmethod(initialize)
# ***********************************************************************
# Static termination method. Used to terminate mmlibs libraries
# ***********************************************************************
[docs] def terminate():
"""
Terminate various mmlibs (which also implicitly terminates
all the libraries they are dependent upon)
"""
mmproj.mmesl_terminate()
mmproj.mmproj_terminate()
terminate = staticmethod(terminate)
# ***********************************************************************
# Constructor
# ***********************************************************************
[docs] def __init__(self,
project_name="",
project_handle=INVALID_PROJECT_HANDLE,
manage=True,
show_cleanup_dialog_on_close=False):
"""
Construct a Project instance either by opening a project file or using
a handle to an already opened project.
:param project_name: The name of the project to open
:type project_name: str
:param project_handle: The handle of an already open project
:note: Either project_name or project_handle must be passed in, but not
both
:param manage: Whether to perform garbage collection and close project
when the project is delete or goes out of scope.
:type manage: bool
:param show_cleanup_dialog_on_close: Whether to block the process and
show a clean up dialog when closing the project
:type show_cleanup_dialog_on_close: bool
:note: If show_cleanup_dialog_on_close is False, project_cleanup is run
in process
When a non-managed instance goes out of scope or is deleted the project
is left open. This is desirable, for example, when you construct a
python Project instance using the project handle of Maestro's currently
opened project. This allows Maestro to continue working with the opened
project after the python Project instance is gone. Otherwise, the
project would be closed and Maestro left in an invalid state, thinking
that the project was still open.
"""
Project._printDebug("In __init__()")
self._show_cleanup_dialog_on_close = show_cleanup_dialog_on_close
if not project_name and project_handle == INVALID_PROJECT_HANDLE:
raise ProjectException("Need to specify a project name or a handle")
if project_name and project_handle != INVALID_PROJECT_HANDLE:
raise ProjectException(
"Cannot specify both project name and handle")
if project_name:
Project._printDebug("__init__(): opening via name")
manage = True
# Open up the project and get a project handle
Project._printDebug("__init()__: project name exists... :%s:" % \
project_name)
if not os.path.exists(project_name):
raise ProjectException("Project \'%s\' does not exist" %
project_name)
if project_name.endswith("zip") or project_name.endswith("ZIP"):
raise ArchivedProjectException(
"Zipped projects must be unzipped first. "
"See schrodinger.project.unzip_project()")
# Have to initialize here because this is before
# the base class initialization.
try:
mmproj.mmproj_initialize(mm.error_handler)
except mm.MmException:
pass
#Record fact that extra mmproj termination will be needed.
self._extra_terminate = True
if not mmproj.mmproj_is_project_dir(project_name):
raise InvalidProjectException("\'%s\' is not a valid (.prj) project" % \
project_name)
# See if we can read this version of entry data.
# In a future version we should check workflow data
# but we'll wait until there is more easily accessible
# support in mmproj for this.
current_version = mmproj.mmproj_get_current_entry_version()
project_version = mmproj.mmproj_get_entry_version(project_name)
if current_version != project_version:
raise InvalidProjectVersion(
"Project \'%s\' cannot be opened. %s %s %s %s %s %s %s" %\
(project_name, "It uses version",
project_version, "\nbut version", current_version,
"is expected. You may be able to run",
"\n$SCHRODINGER/utilities/project_convert",
"to convert it to the new format."))
if mmproj.mmproj_project_is_locked(project_name):
raise LockedProjectException("Project %s is locked" %
project_name)
try:
tmp_handle = mmproj.mmproj_project_open(project_name)
except mm.MmException as err:
msg = "Project '%s' cannot be opened: %s" % (project_name,
str(err))
raise ProjectException(msg)
Project._printDebug("Successfully opened project")
# Call base class initalization
super(Project, self).__init__(tmp_handle, manage, mm.error_handler)
elif int(project_handle) != INVALID_PROJECT_HANDLE:
Project._printDebug("__init__(): using already opened handle")
manage = False
# Nothing to do as it's already been initialized and
# we have a handle
# Call base class initalization
super(Project, self).__init__(project_handle, manage,
mm.error_handler)
if not mmproj.mmproj_project_in_use(self.handle):
print("There is no active project for the handle %d." % \
self.handle)
return
# By default update the project after each change (if in Maestro):
self.manual_update = False
# Entry cache is empty from the python perspective
self._cached_entries = set()
self._auto_free_cache = True
self._already_closed = False
# Create instance of MM_Project
self.project_model = projectmodel.schrodinger.MM_Project(self.handle)
self.project_name = self.project_model.getPath()
# ***********************************************************************
# Private deletion method. Required by MmObject base class
# ***********************************************************************
def _delete(self):
"""
A function to delete this object. Required for MmObject
interface.
"""
if self._already_closed:
# Don't re-close the project if closeImmediately() has already been
# called
pass
elif self._show_cleanup_dialog_on_close:
# Clean up in a separate process and show a cleanup dialog
mmproj.mmproj_project_close(self.handle)
else:
# Clean up in process (blocking) and don't show a clean up dialog.
mmproj.mmproj_project_close_with_cleanup(self.handle)
if self._extra_terminate:
mmproj.mmproj_terminate()
[docs] def close(self):
"""
Close the project immediately. Call this method on projects after
being done using them. Current maestro project can not be closed.
Note that the project cleanup will occur in-process; cleanup can be
delayed until the process exits (or Maestro is closed) by relying on
garbage collector to close the project instead of explicitly calling
the close() method - but then the project can not be used anywhere
again until that happens.
"""
if self.isCurrentMaestroProject():
raise ProjectException(
"A project can not be closed while it's in "
"use by Maestro. Use projectclose maestro command instead.")
if mmproj.mmproj_project_in_use(self.handle):
mmproj.mmproj_project_close_with_cleanup(self.handle)
self._already_closed = True
# ***********************************************************************
# Full string representation
# ***********************************************************************
def __repr__(self):
"""
Return full string representation
Warning: you cannot create another project instance
for a project which is currently open. That is, a project
can only be opened by a single process at a time.
"""
return "Project (%d)" % self.handle
# ***********************************************************************
# Human readable string
# ***********************************************************************
def __str__(self):
"""Return string representation"""
return "Project Name (%s), handle (%d)" % \
(self.project_name, self.handle)
# ***********************************************************************
# MMlibs mmproj handle
# ***********************************************************************
def __index__(self):
"""
Return mmproj handle. Can be used with mmproj functions.
"""
return self.handle
[docs] def __len__(self):
"""
Return the number of entries (rows) in the project
"""
num_entries = mmproj.mmproj_project_get_entry_total(self.handle)
return num_entries
# ***********************************************************************
# Allow retrieval of a an entry
# ***********************************************************************
[docs] def getRow(self, entry_id):
"""
Retrieve a ProjectRow instance based on the entry ID.
:type entry_id: int or str
:param entry_id: is the entry ID of this project's entry. This
is the internally assigned identifier that is invariant for the
life of the project. The ID does not change even if other
entries are deleted or the entries (rows) in the Project
are sorted. Entry ID can be passed in as a string also (e.g. "1").
:rtype: ProjectRow
:return: ProjectRow if entry ID is valid
Otherwise, returns None.
"""
try:
entry_id2 = int(entry_id)
except TypeError:
raise TypeError("Entry IDs must be an integer or castable to one, "
"but %s is a %s" % (entry_id, type(entry_id)))
except ValueError:
raise ValueError("Entry IDs must be in integer form, "
"'%s' cannot be converted to an int" % entry_id)
try:
# Just let mmproj determine if it is a valid id
row = mmproj.mmproj_project_entry_id_to_index(
self.handle, entry_id2)
except mm.MmException:
# Now row with this entry ID found in the project.
return None
return ProjectRow(self, row)
# ***********************************************************************
# Provide sequence key style access
# ***********************************************************************
def __getitem__(self, entry):
"""
Return project row object, ProjectRow, for the given entry ID.
Same as Project.getRow() except that it raises KeyError instead of
returning None if the row does not exist.
The entry ID ('entry') can be specified as either an int or a str, and
is used to look up the corresponding index into the project. Older
versions used to allow look up by entry name and treated an
'entry' argument of type string as an entry name. This is no
longer valid and will generate an exception - the unique
identifiers in projects are now entry IDs not entry names.
If you find your memory usage becoming excessive,
you may want to use freeCachedEntries(). See that
method's docstring for more information.
"""
row = self.getRow(entry)
if row is None:
raise KeyError("No row with such entry ID found")
return row
[docs] def __contains__(self, entry):
"""
Determine if the project contains the specified entry ID
:type entry_id: int or str
:param entry_id: The entry ID to check the project for.
:rtype: bool
:return: True if the specified entry ID is present. False otherwise.
"""
try:
row = self.getRow(entry)
except (TypeError, ValueError):
return False
return row is not None
def __delitem__(self, entry_id):
"""
Delete the project row for the given entry ID.
Obsolete; use Project.deleteRow(entry_id) instead.
"""
import warnings
msg = "del Project[entry_id] API is deprecated; use Project.deleteRow() instead."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
self.deleteRow(entry_id)
[docs] def deleteRow(self, entry_id):
"""
Delete the row with the given entry ID from this project.
"""
try:
entry = int(entry_id)
except ValueError:
raise ValueError("Entry IDs must be in integer form, "
"'%s' cannot be converted to an int" % entry_id)
try:
# Just let mmproj determine if it is a valid id
index = mmproj.mmproj_project_entry_id_to_index(self.handle, entry)
except mm.MmException:
raise KeyError("No row with such entry ID found")
self._deleteRowByIndex(index)
def _deleteRowByIndex(self, index):
"""
Delete the row with the given index from the Project, and if it was
the last entry in its group, remove that group as well.
"""
mmproj.mmproj_project_delete_index_entry(self.handle, index)
# The HPT can't handle empty groups
self.project_model.getRowModel().deleteEmptyGroups()
[docs] def update(self):
"""
If running in Maestro, update the project after a change as the
user wants to see their changes in the PT.
Does not do anything if the manual_update attribute is set to True.
"""
if _in_maestro and not self.manual_update:
maestro.project_table_update()
[docs] def findRowsMatching(self, prop, cmp_op, value):
"""
Returns a list of ProjectRow entries for all rows matching the given
criteria.
cmp_op should be "<", "<=", ">", ">=", "==", "!="
"""
if cmp_op not in ("<", "<=", ">", ">=", "==", "!="):
raise ValueError("Invalid operator: %s;" % cmp_op +
' Allowed values: <, <=, >, >=, ==, !=.')
data_type = mm.m2io_get_type_by_name(prop)
# Set the value of the correct type
if data_type == mm.M2IO_REAL_TYPE:
assert type(value) == float
func = mmproj.mmproj_property_real_get_matching_entries
elif data_type == mm.M2IO_BOOLEAN_TYPE:
assert type(value) == bool
value = 1 if value else 0
raise NotImplementedError("Entry matching by boolean properties "
"is not supported")
# There appears to be an issue where the function
# mmproj_property_bool_get_matching_entries() is not producing
# the right results.
elif data_type == mm.M2IO_INT_TYPE:
assert type(value) == int
func = mmproj.mmproj_property_int_get_matching_entries
elif data_type == mm.M2IO_STRING_TYPE:
assert type(value) == str
if '"' not in value:
value = '"%s"' % value
elif "'" not in value:
value = "'%s'" % value
else:
raise ValueError("value contains both single and double quotes")
func = mmproj.mmproj_property_string_get_matching_entries
bs_handle = func(self.handle, prop, value, cmp_op)
bs = mmbitset.Bitset(bs_handle)
return [ProjectRow(self, entry_index) for entry_index in bs]
[docs] def getSelectedRowTotal(self):
"""
Return the total number of selected rows
"""
# 1 is the only table we have in Maestro (at the moment)
sel_bs = mmproj.mmproj_table_get_selected_rows(self.handle, 1)
return mm.mmbs_get_count(sel_bs)
# ***********************************************************************
# Select rows in the project.
# ***********************************************************************
[docs] def selectRows(self, select_mode=REPLACE, *args, **keywords):
"""
Select rows in the project. Valid modes of selection are:
* project.ADD - add rows to existing PT selection.
* project.REPLACE - replace current selection.
* project.INVERT - invert the PT selection.
* project.ALL - select all rows in the PT.
* project.NONE - deselect all rows in the PT.
Examples::
pt.selectRows(project.REPLACE, entry_ids=[1, 2, 3])
pt.selectRows(project.ALL)
pt.selectRows(project.NONE)
pt.selectRows(entry_ids=[1, 2, 3])
pt.selectRows(ADD, esl="entry_re entry*")
:param select_mode: Selection mode.
:param entry_ids: List of entry IDs for the rows to select.
:type entry_ids: list(int) or list(str)
:param esl: This is an ESL definition.
:type esl: str
:param rows: Project indices for the rows to select (deprecated).
Values refer to the values of the ProjectRow.index property.
:param rows: list(int)
:param function: Callback for determining whether a row should be
selected or not (deprecated).
:type function: callable
"""
if (select_mode == ADD):
Project._printDebug("ADD")
self._selectAdd(*args, **keywords)
elif (select_mode == REPLACE):
Project._printDebug("REPLACE")
self._selectNone()
self._selectAdd(*args, **keywords)
elif (select_mode == NONE):
Project._printDebug("NONE")
self._selectNone()
elif (select_mode == ALL):
Project._printDebug("ALL")
self._selectAll()
elif (select_mode == INVERT):
Project._printDebug("INVERT")
self._selectInvert()
else:
raise ValueError("Unrecognized select_mode value: %s" % select_mode)
# If this script is being run from within Maestro, then
# user really wants to operate on the PT. So we need to
# let them see their changes.
self.update()
# ***********************************************************************
# Private method to add to the selection
# ***********************************************************************
def _selectAdd(self, *args, **keywords):
"""
Private method to add specified rows to the already
selected rows
"""
for option, value in keywords.items():
if option == "entry_ids":
Project._printDebug("Select by entry IDs")
for entry_id in value:
try:
entry_id = int(entry_id)
except TypeError:
raise TypeError("Entry IDs must be an integer or "
"castable to one, but %s is a %s" %
(entry_id, type(entry_id)))
try:
entry_index = mmproj.mmproj_project_entry_id_to_index(
self.handle, entry_id)
except mm.MmException:
raise ValueError("No row with entry ID: %i" % entry_id)
try:
mmproj.mmproj_index_entry_set_select_state(
self.handle, entry_index,
mmproj.MMPROJ_ENTRY_SELECTED)
except mm.MmException:
print("%s.selectRows(): Could not select row %d %s" \
% (_module_name, entry_index, "by entry ID"))
elif option == "rows":
# Add each row to the selection if within range
Project._printDebug("by rows")
import warnings
msg = "Project.selectRows(): The rows option is deprecated."
warnings.warn(msg, DeprecationWarning, stacklevel=3)
try:
num_entries = mmproj.mmproj_project_get_entry_total(
self.handle)
except mm.MmException:
print("%s.selectRows(): Could not get entry total" % \
_module_name)
return
for r in value:
if r > num_entries:
print("Warning: Row %d exceeds table end which is %d. %s" % \
(r, num_entries, "Skipping."))
elif r < 1:
print("Warning: Rows start at 1. Ignoring %d" % r)
else:
try:
mmproj.mmproj_index_entry_set_select_state(
self.handle, r, mmproj.MMPROJ_ENTRY_SELECTED)
except mm.MmException:
print("%s.selectRows(): %s %d %s" % \
_module_name, \
"Could not select row ", r, "by esl")
elif option == "esl":
# Add rows matching ESL
bs = \
mmproj.mmesl_get_matching_entries(value, self.handle)
# TODO: Need to add to mmbs.i
# Make sure it's a valid handle
# mm.mmbs_in_use(bs)
for e in mmbitset.Bitset(bs):
try:
mmproj.mmproj_index_entry_set_select_state(
self.handle, e, mmproj.MMPROJ_ENTRY_SELECTED)
except mm.MmException:
print("%s.selectRows(): Could not select row %d" %\
_module_name, e)
return
else:
raise ValueError("Unrecognized option: %s" % option)
# ***********************************************************************
# Private method to unselect all entry rows
# ***********************************************************************
def _selectNone(self):
"""
Private method to unselect all entry rows
The public API for de-selecting all rows is:
Project.selectRows(NONE)
"""
num_entries = mmproj.mmproj_project_get_entry_total(self.handle)
# Ranges start at 0. Project rows at 1. Add 1 when using.
for r in range(1, num_entries + 1):
mmproj.mmproj_index_entry_set_select_state(
self.handle, r, mmproj.MMPROJ_ENTRY_NOT_SELECTED)
# ***********************************************************************
# Private method to select all rows
# ***********************************************************************
def _selectAll(self):
"""
Private method to select all rows
The public API for selecting all rows is:
Project.selectRows(ALL)
"""
num_entries = mmproj.mmproj_project_get_entry_total(self.handle)
# Ranges start at 0. Project's at 1. Add 1 when using.
for r in range(1, num_entries + 1):
mmproj.mmproj_index_entry_set_select_state(
self.handle, r, mmproj.MMPROJ_ENTRY_SELECTED)
# ***********************************************************************
# Private method to invert to the selection
# ***********************************************************************
def _selectInvert(self):
"""
Private method to invert all rows in the project
The public API for inverting row selection is:
Project.selectRows(INVERT)
"""
num_entries = mmproj.mmproj_project_get_entry_total(self.handle)
# Ranges start at 0. Project's at 1. Add 1 when using.
for r in range(1, num_entries + 1):
try:
selstate = mmproj.mmproj_index_entry_get_select_state(
self.handle, r)
except mm.MmException:
raise Exception("Could not get selection state fora %d" % r)
return
if selstate == mmproj.MMPROJ_ENTRY_SELECTED:
mmproj.mmproj_index_entry_set_select_state(
self.handle, r, mmproj.MMPROJ_ENTRY_NOT_SELECTED)
else:
mmproj.mmproj_index_entry_set_select_state(
self.handle, r, mmproj.MMPROJ_ENTRY_SELECTED)
[docs] def includeRows(self, entry_ids, exclude_others=True, autofit=True):
"""
Include rows with the given entry IDs in the Workspace, while
optionally excluding all other entries. If entry_ids list is empty
and exclude_others is True, all PT entries will be excluded.
:param entry_ids: List of Entry IDs to include.
:type entry_ids: list of ints or str.
:param exclude_others: Whether to also exclude previously included entries.
:type exclude_others: bool
:param autofit: Whether to fit WS towards included entries
:type autofit: bool
"""
if _in_maestro:
eid_str = ','.join(map(str, entry_ids))
if exclude_others:
if entry_ids:
if autofit:
maestro.command(
'entrywsincludeonly entry {0}'.format(eid_str))
else:
# Exclude, then re-include to avoid entrywsincludeonly's autofititing
# TODO: MAE-43603 - Do this in one command
maestro.command('beginundoblock Only include entries')
maestro.command('entrywsexclude all')
maestro.command(
f'entrywsinclude skipautofit=yes entry {eid_str}')
maestro.command('endundoblock')
else:
maestro.command('entrywsexclude all')
else:
if entry_ids:
skip_autofit = 'no' if autofit else 'yes'
maestro.command(
f'entrywsinclude skipautofit={skip_autofit} entry {eid_str}'
)
return
# Outside of Maestro
indices_to_include = {
mmproj.mmproj_project_entry_id_to_index(self.handle, int(eid))
for eid in entry_ids
}
if exclude_others:
num_included_entries = \
mmproj.mmproj_project_get_included_entry_total(self.handle)
# Do this in reverse order as the array shrinks
for i in range(num_included_entries, 0, -1):
entry_index = mmproj.mmproj_project_get_included_entry(
self.handle, i)
# Do not exlclude entries that are in the input include list:
if entry_index in indices_to_include:
indices_to_include.remove(entry_index)
continue
ws_state = mmproj.mmproj_index_entry_get_workspace_state(
self.handle, entry_index)
if ws_state != mmproj.MMPROJ_ENTRY_LOCKED_IN_WORKSPACE:
mmproj.mmproj_index_entry_set_workspace_state(
self.handle, entry_index,
mmproj.MMPROJ_ENTRY_NOT_IN_WORKSPACE)
# Now include all entries that are not already included:
for entry_index in indices_to_include:
mmproj.mmproj_index_entry_set_workspace_state(
self.handle, entry_index, mmproj.MMPROJ_ENTRY_IN_WORKSPACE)
[docs] def moveRowsToGroup(self, entry_ids, group_name):
"""
Move a list of project entries into a group.
:param entry_ids: a list of project entry IDs
:type entry_ids: list(str)
:param group_name: The unique ID of a group; if a group with this
name doesn't exist, it will be created. Note, this is different
from the user-visible group title.
:type group_name: str
"""
# TODO: Consider requiring that the group exist already - that way
# the behavior is more explicit. Project.createNewGroup() can be
# used to create the group if it doesn't exist.
if not entry_ids:
return
group_exists = mmproj.mmproj_table_does_group_exist(
self.handle, 1, group_name)
if self.isCurrentMaestroProject():
eid_str = ', '.join(entry_ids)
if group_exists:
mae_command = 'entrymovetogroup'
else:
mae_command = 'entrygroupcreate'
cmd = '{0} "{1}" entry_id "{2}"'.format(mae_command, group_name,
eid_str)
maestro.command(cmd)
else:
if not group_exists:
title = group_name
mmproj.mmproj_table_add_entry_group(self.handle, 1, group_name,
title, False)
group_index = mmproj.mmproj_table_get_entry_group_index(
self.handle, 1, group_name)
entry_bs = mmbitset.Bitset(size=len(self.all_rows))
for entry_id in entry_ids:
row = self.getRow(entry_id)
entry_bs.set(row.index)
mmproj.mmproj_table_index_entries_set_group(self.handle, 1,
entry_bs, group_index)
self.update()
# ***********************************************************************
#
# ***********************************************************************
def __iter__(self):
"""
Return an iterator object
Allows iteration over the selected entries.
To iterate over all entries use:
\'for row in xrange(1, len(Project)+1)\'
OBSOLETE. Use Project.selected_rows instead.
If you find your memory usage becoming excessive,
you may want to use freeCachedEntries(). See that
method's docstring for more information.
"""
import warnings
msg = "'The form 'sel_entry in pt' is deprecated. Use 'sel_entry in pt.selected_rows' instead"
warnings.warn(msg, DeprecationWarning, stacklevel=2)
# This always gets the selection from the project table order
# (not directly from the entry order on disk) and then maps the
# project table row to the project's entry index. Project table
# order can differ from the project order (which is what is on
# the disk) and that's why we do the mapping
sel_bs = mmproj.mmproj_table_get_selected_rows(self.handle, 1)
for sel_position in mmbitset.Bitset(sel_bs, manage_handle=False):
# Convert row index into a project index:
project_index = \
mmproj.mmproj_table_get_row_entry_index(self.handle, 1, \
sel_position)
yield project_index
# ***********************************************************************
#
# ***********************************************************************
[docs] def refreshTable(self):
"""
Refresh the project table
This is only usable from within Maestro.
"""
if _in_maestro:
self.update()
else:
print("Warning: Project.refreshTable() only works when run ")
print("from within Maestro")
# ***********************************************************************
#
# ***********************************************************************
[docs] def getPropertyNames(self):
"""
Return a list of the data names of usable properties in this
project instance, including properties which were hidden.
There are some additional properties, like whether an entry is
selected or included, which are not returned by this function.
"""
hidden_properties = [
'b_m_entry_is_selected', 'b_m_entry_has_vis',
'b_m_entry_in_workspace', 'b_m_entry_has_hypothesis',
'b_m_entry_has_vibration'
]
ret_list = []
num_prop = mmproj.mmproj_project_get_property_total(self.handle)
for i in range(1, num_prop + 1):
try:
pname = mmproj.mmproj_property_get_data_name(self.handle, i)
if pname not in hidden_properties:
ret_list.append(pname)
except mm.MmException:
pass # Should we do something here??
return ret_list
[docs] def getVisiblePropertyNames(self):
"""
Return a list of the data names of visible properties in this project
instance (hidden properties are not included).
:rtype: list(str)
:return: list of names of the property columns that are currently
displayed in the Project Table
"""
table = 1
tot_col = mmproj.mmproj_table_get_column_total(self.handle, table)
ret_list = []
for col_index in range(1, tot_col + 1):
is_subset = mmproj.mmproj_table_index_column_get_subset(
self.handle, table, col_index)
if is_subset:
dname = mmproj.mmproj_table_get_column_data_name(
self.handle, table, col_index)
ret_list.append(dname)
return ret_list
[docs] def getPropertyNamesForSelectedRows(self):
"""
Return a set of data names of properties (including hidden ones)
that are common to all selected entries in this project.
:return: List of property data names
:rtype: list of str
"""
# TODO there should be a way to get properties only - PANEL-15258
sts = (row.getStructure(workspace_sync=False)
for row in self.selected_rows)
return analyze.find_common_properties(sts)
# ***********************************************************************
#
# ***********************************************************************
def _getShortName(self):
"""
Get the name of the project (without the path)
"""
pname = mmproj.mmproj_project_get_path(self.handle)
pname_stripped = os.path.basename(pname)
return pname_stripped
# Treat 'shortname' as a Python property
shortname = property(_getShortName, None, None,
"Get the short project name (without the path)")
# ***********************************************************************
# fullname - full project path
# ***********************************************************************
def _getFullName(self):
"""
Get the full name of the project (which includes the whole path)
"""
pname = mmproj.mmproj_project_get_path(self.handle)
return pname
fullname = property(
_getFullName, None, None,
"Get the full name of the project (including the path)")
# ***********************************************************************
# additional_data - full project path to additional data
# ***********************************************************************
[docs] def getAdditionalDataDir(self):
"""
Get the additional data directory of the project
"""
return os.path.join(self.fullname, ".mmproj-admin", "additional_data")
# ***********************************************************************
# all_rows - dynamic list of all rows in this project (table order)
# ***********************************************************************
def _getAllRowsIterator(self):
return _AllRowsIterator(self)
all_rows = property(
_getAllRowsIterator,
doc=
"Iterator for all rows in the project using the visible project table order"
)
# ***********************************************************************
# selected_rows - dynamic list of selected rows (table order)
# ***********************************************************************
def _getSelectedRowsIterator(self):
return _SelectedRowsIterator(self)
selected_rows = property(
_getSelectedRowsIterator,
doc=
"Iterator for the selected rows using the visible project table order")
# ***********************************************************************
# included_rows - dynamic list of included rows (undefined order)
# ***********************************************************************
def _getIncludedRowsIterator(self):
return _IncludedRowsIterator(self)
included_rows = property(
_getIncludedRowsIterator,
doc=
"Iterator for all included rows. No specific return order for the rows."
)
# ***********************************************************************
# groups:
# ***********************************************************************
def _getGroups(self):
return _EntryGroupIterator(self)
groups = property(_getGroups,
None,
None,
doc="Get the entry groups in the project")
[docs] def createNewGroup(self, title, parent_gid=None, collapsed=False):
"""
Create a new group with the given title. The EntryGroup object for
the new group will be returned. Group name/ID will be auto-generated.
:param title: User-visible title for the new group.
:type title: string
:param parent_gid: (Optional) Group ID/name of the parent group. By
default the new group will be created at top level.
:type parent_gid: str
:param collapsed: Whether new group should be collapsed or not.
:type collapsed: bool
:return: New group object
:rtype: EntryGroup
"""
# Automatically generate a new unique group ID
i = 1
while True:
gid = str(i)
if not mmproj.mmproj_table_does_group_exist(self.handle, 1, gid):
break
i += 1
assert i < 99999 # just in case
if parent_gid is None:
parent_gid = ''
model = self.project_model.getRowModel()
status = model.addEntryGroup(gid, title, collapsed, parent_gid)
assert status == 0
return self.groups[gid]
# ***********************************************************************
# Last added entry
# ***********************************************************************
def _getLastAddedEntry(self):
num_entries = mmproj.mmproj_project_get_entry_total(self.handle)
return ProjectRow(self, num_entries)
last_added_entry = property(
_getLastAddedEntry,
None,
None,
doc=
"Return a ProjectRow instance for the last entry added to the project")
#*********************************************************************
#
#*********************************************************************
[docs] def freeCachedEntries(self):
"""
Frees all entries that are cached.
Things like ProjectRow.getStructure(), ProjectRow.structure,
ProjectRow.setStructure() will cause entries to be cached.
Unless the cache is freed memory usage will grow and
can become quite large (if, for example, you are iterating
over and retrieving a large number of structures).
Note that on some operating systems any memory already allocated
within a process is not returned to the system when it is freed.
Instead it is simply marked as available to the process and
only gets returned to the system when the process exits.
Given this you may not see your memory usage decrease
after calling this method. However, by calling this
method at carefully chosen points you can minimize your memory
footprint.
If you free the cache, then the next time the above-mentioned
methods/properties are invoked they will get and cache the data
by fetching it from disk.
"""
for entry_index in self._cached_entries:
# Should have been cached. So getting it again should be
# a no-op.
ct = mmproj.mmproj_index_entry_get_ct(self.handle, entry_index)
mmproj.mmproj_index_entry_free_ct_and_prop(self.handle, entry_index,
ct)
self._cached_entries.clear()
[docs] def importStructure(self,
st,
name=None,
wsreplace=False,
copy=True,
add_group=False):
"""
Create a new entry in the Project Table from structure <st> and return
the ProjectRow object for the new entry.
In rare cases (when your Structure is managed by C code and use of
it is no longer needed in python) using copy=False can give better
performance. Doing so will make the Structure invalid for further
use in your script.
:type st: `structure.Structure`
:param st: Structure to add to the Project Table.
:type name: str
:param name: Entry name to give to the new entry. By default, the value
of the s_m_entry_name property will be used.
:type wsreplace: bool
:param wsreplace: whether to replace the Workspace with new entry.
WARNING: if wsreplace is True, then any scratch entries in the
Workspace will be disposed of without warning, which is not how
Maestro would usually behave.
:type copy: bool
:param copy: If set to False, will insert the actual input CT into the
PT instead of copying it first.
:type add_group: bool
:param add_group: Whether to create new group(s) based on the
s_m_subgroup_title and move the new entry to it/them.
:rtype: ProjectRow
:return: Return ProjectRow object for the new entry.
"""
if copy:
ct_handle = mm.mmct_ct_duplicate(st.handle)
else: # copy set to False AND is C-managed
ct_handle = st.handle
if name is None:
name = st.property.get('s_m_entry_name', '')
entry_index = mmproj.mmproj_project_add_entry(self.handle, name.strip(),
ct_handle)
# EV 96284 Project.importStructure() fails to include SEQRES block.
# Freeing the ct in mmproj will force everything to be read from
# disk the next time the entry is requested. In this case the
# desired m_PDB_SEQRES block will be returned but in the unrequested
# data handle.
mmproj.mmproj_index_entry_free_ct_and_prop(self.handle, entry_index,
ct_handle)
self.updateCrystalPropertiesIfRequired(entry_index)
# Entry is imported without Maestro command, so we have to explicitly
# initiate project table update.
self.update()
row = ProjectRow(self, entry_index)
if wsreplace:
# Will update the PT partially.
row.includeOnly()
if add_group:
group_titles = st.property.get('s_m_subgroup_title')
if group_titles is None:
return
group = None
parent_group_id = None
for group_title in group_titles.split('->'):
group = self.createNewGroup(group_title, parent_group_id)
parent_group_id = group.name
# Move entry to the most inner group:
row.moveToGroup(group.name)
return row
[docs] def updateCrystalPropertiesIfRequired(self, entry_index):
# Attempt to derive crystal properties from desmond
# properties if exists - MAE-34164.
# self.project_model = projectmodel.schrodinger.MM_Project(self)
if projectmodel.mm_convert_and_set_desmond_to_crystal_props(
self.project_model, entry_index):
# Delete the properties in project entry.
projectmodel.mm_entry_delete_crystal_properties(
self.project_model, entry_index)
[docs] def importStructureFile(self,
filename,
wsreplace=True,
creategroups="multiple",
format='any',
wsinclude='',
ptselect=True):
"""
Imports all structures from the specified file into the Project Table.
:param filename: File to import.
:type filename: str
:param wsreplace: Whether to replace the Workspace with the first
structure in the specified file (default True).
:type wsreplace: bool
:param creategroups: Which of the imported structures are to be
grouped. Valid values are "multiple", "all", and "none".
:type creategroups: str
:param format: Format of the file. Default is to derive from extension.
:type format: str
:param wsinclude: Whether to include all entries in the workspace, or
first entry or all entries. Valid values are 'none', 'all',
'first', or empty string (honor maestro preference).
:type wsinclude: str
:param bool ptselect: Whether to select imported entries or preserve
originally selected entries in the PT
This method is only available when used from within Maestro.
"""
# TODO: Expose preservegroups option.
if not _in_maestro:
print("Warning: Project.importStructureFile() only works when run ")
print("from within Maestro")
return
previous_format = maestro.get_command_option("entryimport", "format")
previous_wsreplace = \
maestro.get_command_option("entryimport",
"wsreplace")
previous_all = maestro.get_command_option("entryimport", "all")
previous_cg = maestro.get_command_option("entryimport", "creategroups")
previous_wsinclude = maestro.get_command_option("entryimport",
"wsinclude")
# entryimport doesn't have an option to preserve selected, so do it manually
if not ptselect:
previous_selected_rows = [r.entry_id for r in self.selected_rows]
cmd = 'entryimport format=%s' % format
cmd += '\nentryimport creategroups=%s' % creategroups
cmd += '\nentryimport all=true'
if wsreplace:
cmd += '\nentryimport wsreplace=true'
else:
cmd += '\nentryimport wsreplace=false'
if wsinclude:
cmd += '\nentryimport wsinclude=%s' % wsinclude
cmd += '\nentryimport "%s"' % filename
# Reset options to previous values
cmd += '\nentryimport format=%s wsreplace=%s all=%s creategroups=%s ' \
'wsinclude=%s' % \
(previous_format, previous_wsreplace, previous_all, previous_cg,
previous_wsinclude)
maestro.command(cmd)
if not ptselect:
self.selectRows(REPLACE, entry_ids=previous_selected_rows)
[docs] def exportSelectedEntries(self, filename):
"""
Export the selected entries to given file.
:param filename: File to write structures to.
:type filename: str
"""
if len(self.selected_rows) == 0:
raise RuntimeError("No entries are selected in the Project Table.")
# TODO: grouping information is not preserved. See PYTHON-3122
if maestro:
# if not running in unit tests
maestro.project_table_synchronize()
st_iterator = (row.getStructure(workspace_sync=False)
for row in self.selected_rows)
structure.write_cts(st_iterator, filename)
# ***********************************************************************
#
# ***********************************************************************
[docs] def getPropertyPrecision(self, property_name):
"""
Return the precision of the property.
:param property_name: is the m2io data name (the long name)
of the property for which you
want the precision
:type property_name: string
:return: precision of the property as an integer
Throws a ValueError if the property name isn't valid.
"""
try:
column = mmproj.mmproj_table_get_property_column(
self.handle, 1, property_name)
except mm.MmException:
raise ValueError(
"Could not get precision - unknown property \'%s\'" %
property_name)
# Only reals have precision
index = mmproj.mmproj_project_get_property_index(
self.handle, property_name)
data_type = mmproj.mmproj_property_get_data_type(self.handle, index)
if data_type != mm.M2IO_REAL_TYPE:
raise TypeError("Property must be of type real. %s is not a real"\
% property_name)
precision = mmproj.mmproj_table_get_column_display_precision(
self.handle, 1, column)
return precision
[docs] def isColumnFixed(self, column):
"""
Return whether the column is in fixed area of the Project Table.
:type column: int
:param column: Project column
:rtype: bool
:return: Whether the column is in the fixed area.
"""
_, result = self.project_model.isColumnFixed(column)
return result
[docs] def getFixedColumnsCount(self, in_subset=True):
"""
Return number of columns in fixed area. This does not include
always fixed columns (Row, Included, Stars, 2D Structure and Title).
:type in_subset: bool
:param column: Whether to return fixed columns count only for
the columns in subset (True by default)
:rtype: int
:return: Number of columns in the fixed area.
"""
_, result = self.project_model.getFixedColumnsCount(in_subset)
return result
[docs] def sortEntryGroups(self,
sort_only_selected_groups,
parent_group_id,
sort_groups_using,
sort_fields_list=None,
sort_group_fields_list=None,
is_alphanumeric_sort=True,
is_ascending_sort=True):
"""
Sorts the groups in project table based on the sort_groups_using which
has MM_OPTION_TABLE_SORT_GROUP_OPTION values.
:type sort_only_selected_groups: bool
:param sort_only_selected_groups: if true then only groups with
selection will be sorted,
otherwise all groups will be sorted.
:type parent_group_id: str
:param parent_group_id: parent of groups to be sorted.
:type sort_groups_using: int
:param sort_groups_using: option value of
MM_OPTION_TABLE_SORT_GROUP_OPTION based on
which groups will be sorted.
:type sort_fields_list: list of tuples
:param sort_fields_list: list of tuples having property name and sort
order (ASCENDING OR DESCENDING), together making
sort fields of entries.
e.g.[("Entry Name",ASCENDING),('Stars',DESCENDING)]
:type sort_group_fields_list: list of tuples
:param sort_group_fields_list: list of tuples having property name and sort
order (ASCENDING OR DESCENDING), together making
sort fields of groups.
e.g.[("Entry Name",ASCENDING),('Stars',DESCENDING)]
:type is_alphanumeric_sort: bool
:param is_alphanumeric_sort: whether strings should be sorted by
treating sequences of digits as single
numbers or string values will be compared
using strcmp for standard sorting.
:type is_ascending_sort: bool
:param is_ascending_sort: whether sort values in ascending or
descending order.It is not required if groups
are sorted based on given fields entry values
as then sort fields will have their own sort
order.
"""
return self.project_model.sortHPTGroups(
sort_only_selected_groups, parent_group_id, sort_groups_using,
convertToMMSortFieldsList(sort_fields_list),
convertToMMSortFieldsList(sort_group_fields_list),
is_alphanumeric_sort, is_ascending_sort)
[docs] def sortEntries(self,
sort_selected_entries,
sort_fields_list,
blank_cell_sort=projectmodel.BlankCellSort.BOTTOM,
is_alphanumeric_sort=True):
"""
Sorts HPT entries based on given table_sort_fields
:type sort_selected_entries: bool
:param sort_selected_entries: if true then only selected entries will be
sorted,otherwise all entries will be sorted.
:type sort_fields_list: list of tuples
:param sort_fields_list: list of tuples having property name and sort
order (ASCENDING OR DESCENDING), together making sort fields of
entries.e.g.[("Entry Name",ASCENDING),('Stars',DESCENDING)]
:type blank_cell_sort: enum projectmodel.BlankCellSort
:param blank_cell_sort: value of enum projectmodel.BlankCellSort that
tells how blank cells should be sorted.
BlankCellSort.TOP - sort so that blank cells are at top,
BlankCellSort.BOTTOM - sort such that blank cells are at bottom,
BlankCellSort.LOWEST - sort as if blank cells contain
lowest possible value
BlankCellSort.HIGHEST - sort as if blank cells contain highest
possible value.
:type is_alphanumeric_sort: bool
:param is_alphanumeric_sort: whether strings should be sorted by
treating sequences of digits as single numbers or string values
will be compared using strcmp for standard sorting.
"""
return self.project_model.sortEntries(
sort_selected_entries, convertToMMSortFieldsList(sort_fields_list),
blank_cell_sort, is_alphanumeric_sort)
[docs] def isCurrentMaestroProject(self):
"""
Return True if this project is the current project for this
Maestro session. Returns False if running outside of Maestro.
"""
if not _in_maestro:
return False
return (self.handle == maestro.project_get())
[docs]@contextmanager
def open_mmzip(prjzip, mmzip_mode, prjname=None):
"""
Initializes the mmzip module and opens a file in mmzip. Checks for errors.
:param prjzip: path to zipped project (or path to where the new .prjzip should go)
:type prjzip: str
:param mmzip_mode: mode for mmzip (MMZIP_WRITE or MMZIP_READ)
:param prjname: path to prj. Optional, because unzip_project does not
have a path to a project yet.
:type prjname: str
:return: handle to mmzip proj
"""
# Copies prjzip into prjname (for unzip_project)
if prjname is None:
prjname = (os.path.abspath(prjzip))
mm.mmzip_initialize(mm.error_handler)
if not mmproj.mmproj_is_project(prjname):
raise ProjectException("%s is not a valid project" % prjname)
# Get handle to zip file
try:
mmzip_handle = mm.mmzip_open(prjzip, mmzip_mode)
except mm.MmException:
mm.mmzip_terminate()
raise ProjectException("Failed to open/create zipped project %s" %
prjzip)
yield mmzip_handle
mm.mmzip_close(mmzip_handle)
mm.mmzip_terminate()
[docs]def unzip_project(prjzip, newdir=None):
"""
Unzip a prjzip. newdir specifies directory into which the new project
will be created. It will be created with the same name as the old project.
:param prjzip: path to zipped project
:type prjzip: str
:param newdir: destination directory of new unzipped project. This does not
include the name of the prjdir. If None, unzip into temporary directory under
SCHRODINGER_TMP.
:type newdir: str
:rtype: str
:return: path of unzipped project
"""
if newdir is None:
user_tmp = mm.mmfile_get_schrodinger_temp()
newdir = tempfile.mkdtemp(prefix='tproj', dir=user_tmp)
with open_mmzip(prjzip, mm.MMZIP_READ) as mmzip_handle:
archive_name = None
# Loop over each file in the zip and extract it
for file_no in range(mm.mmzip_num_files(mmzip_handle)):
name = mm.mmzip_file_name(mmzip_handle, file_no)
if archive_name is None:
# The first "file" is the top level directory of the
# project.
archive_name = name
mm.mmzip_extract_file_to_dir(mmzip_handle, name, newdir)
projdir = os.path.join(newdir, archive_name)
return projdir
[docs]def zip_project(prjname, newdir):
"""
Zip a .prj file. newdir specifies directory into which the zipped file will go.
It will be created with the same name as the input project.
:param prjname: path to project
:type prjname: str
:param newdir: destination directory for zipped project.
:type newdir: str
:return: path of zipped project
:rtype: str
"""
# Normalize project path name to remove any trailing path separators.
# Otherwise os.path.split call below will not work correctly.
prjname = os.path.normpath(prjname)
#The add_directory_recursively call takes in the entire file path of a file, starting at
#the user's home directory, if the full path is given. Because of this, it would zip a small
#section of the home directory and all the folders that lead up to the desired files instead
#of just the files themselves. E.g. instead of testpt2.prj being zipped, the folder in the
#archive would be /Users/yourname/Documents/Schrodinger/testpt2.prj. To avoid this, we CD
#to the directory of the project and pass in just the project itself, to avoid passing a
#file path.
old_cwd = os.getcwd()
direc, prj = os.path.split(prjname)
if direc:
os.chdir(direc)
# Strip the name of the project file and add zip
prjzip = prj + "zip"
# Add the zip file to the intended path
projdir = os.path.join(newdir, prjzip)
if not os.path.exists(newdir):
raise Exception("Chosen directory %s does not exist." % newdir)
with open_mmzip(projdir, mm.MMZIP_WRITE, prjname) as mmzip_handle:
# Add project directory to newly created archive
mm.mmzip_add_directory_recursively(mmzip_handle, prj)
#CD back to old directory
os.chdir(old_cwd)
return projdir
[docs]def open_project(projdir, widget=None, autounlock=False, askunlock=True):
"""
Open a project file, unlocking it if it is locked and the users chooses to.
open_project is a convenience function for constructing a Project()
that provides information and options to users when projects are locked.
If projdir points to a zip archive of a project, we unzip it into a temp
directory before opening it.
Note that this routine interacts with the user if the project is locked.
When closing a project that was extracted from a zip file, the project
should first be closed, and then the temp directory deleted. Closing the
project causes it to delete some of its own subdirectories, and this will
create an error if the temp directory has already been deleted. First close
the project and then use delete_temp_project_directory to do this safely.
:type projdir: str
:param projdir: The path to a project
:type widget: QWidget or None
:param widget: The parent widget that any informational dialogs should be
modal to. Use None if running without a GUI - any messages/input
requests will go to the terminal. Note that using None when running
under a PyQt gui will cause any questions to be asked in the terminal
and "QCoreApplication::exec: The event loop is already running" to
print to the terminal if the user is asked whether to unlock a file or
not. The code runs correctly, however.
:type autounlock: bool
:param autounlock: If True, any project will be automatically unlocked and
opened if it is locked, if False (default), the state of askunlock
determines behavior.
Caution - use of autounlock=True can leave the project in an uncertain
state if it is in use elsewhere, and should only be used if it is essential
not to interact with the user.
:type askunlock: bool
:param askunlock: This parameter is overridden if autounlock is True.
If askunlock is True (and autounlock is False), will ask the user if a
locked project should be unlocked. If False, locked projects will not
be opened.
:rtype: schrodinger.project.Project, str, str
:return: tuple of (project, path to project, path to temp directory). If
project could not be opened, project will be None. If supplied
project was a zip archive, path to temporary directory that was
created to hold project.
Note that a Project object of a valid but empty project evaluates to False,
so the way to check if a valid project was returned is to check::
if project is not None:
pass
"""
handle = None
tempdir = None
if projdir.endswith('zip'):
projdir = unzip_project(projdir)
tempdir = os.path.dirname(projdir)
if mmproj.mmproj_project_is_locked(projdir):
# The project is locked.
if not autounlock and not askunlock:
return handle, projdir, tempdir
if autounlock:
mmproj.mmproj_project_force_unlock(projdir)
else:
# The bulk of this except statement is
# just building a sensible message for the user to let them know
# who/what has locked this project so they can decide whether to
# override the lock or not.
lock_file = mm.m2io_open_file(
os.path.join(projdir, '.mmproj-admin/lock'), mm.M2IO_READ)
# Move down the file to the data block of interest
mm.m2io_goto_block(lock_file, 'f_m_mmproj_lock_file', 1)
# Grab the list of data labels so we can verify the existance of
# the ones we are interested in.
labels = mm.m2io_get_data_names(lock_file, mm.M2IO_ALL_TYPES)
def get_s_data(astring):
# Gets string data associated with the label astring
if astring in labels:
try:
val = mm.m2io_get_string(lock_file, [astring])[0]
if val == 'localhost.localdomain':
val = 'the local host'
return val
except (mm.MmException, IndexError):
return ""
else:
return ""
def get_i_data(astring):
# Gets integer data associated with the label astring
if astring in labels:
try:
return str(mm.m2io_get_int(lock_file, [astring])[0])
except (mm.MmException, IndexError):
return ""
else:
return ""
# Display a warning message to the user and ask if it is OK to
# unlock the project
lockinfo = "".join([
'This project is currently locked by user ',
get_s_data('s_m_username'), ' with user ID ',
get_i_data('i_m_uid'), '.\nThe locking process has PID ',
get_i_data('i_m_pid'), ' and is running on', ' ',
get_s_data('s_m_hostname'), '.'
])
mm.m2io_close_file(lock_file)
lock_message = "".join([
'\nIf this project is not really in use',
' you may safely remove the lock.\n\n',
'Do you want to remove the lock?'
])
if widget is not None:
# Only import PyQt-dependent modules if we are already under a
# gui (widget != None)
import schrodinger.ui.qt.appframework as appframework
doit = appframework.question('\n'.join([lockinfo,
lock_message]),
button1='Yes',
button2='No',
title='Locked Project',
parent=widget)
else:
prompt = '(yes/no)'
answer = input('\n'.join([lockinfo, lock_message, prompt]))
doit = answer.lower() == 'yes' or answer.lower() == 'y'
if doit:
# Unlock the project if the user says it is OK to
mmproj.mmproj_project_force_unlock(projdir)
else:
return handle, projdir, tempdir
try:
# Open the project
handle = Project(project_name=projdir)
except Exception as msg:
if widget is not None:
# Only import PyQt-dependent modules if we are already under a
# gui (widget != None). We only need to point the user to the
# terminal if we are running a gui.
from schrodinger.Qt.QtWidgets import QMessageBox
QMessageBox.warning(
widget, 'Warning',
str(msg) + '\nPlease check the terminal for more information')
return handle, projdir, tempdir
[docs]@contextmanager
def temp_unzip_project(project_filename):
"""
Yields a Project instance which is suitable for read-only modifications.
This will open a Project in a temporary directory and delete the temporary
directory when finished.
:param project_filename: name of a prjzip file
:type project_filename: str
"""
unzipped_project = unzip_project(project_filename)
try:
proj = Project(project_name=unzipped_project)
yield proj
finally:
proj.close()
shutil.rmtree(unzipped_project)
[docs]def delete_temp_project_directory(projdir, tempdir, tries=10, force=False):
"""
Called AFTER closing a project to safely delete the temp directory it
resides in.
The project needs access to one of its subdirectories for a few ticks after
it closes, so this routine waits until that directory disappears before
deleting the entire directory tree. Otherwise, exceptions will result.
Note that after tries * 0.1 seconds, the routine will exit without removing
the directory to avoid an infinite wait for a project that isn't closing
properly. The default yields a maximum wait of 1 second.
:type projdir: str
:param projdir: path to the project directory
:type tempdir: str
:param tempdir: path to the temp directory to be removed (this is normally
the parent directory of the project directory
:type tries: int
:param tries: The number of times to check if the project has finished
closing. Once tries attempts have been made, the routine exists
without removing the temp directory unless force=True
:type force: bool
:param force: If True, then the directory is removed even if the project
has not yet closed properly - this can lead to exceptions being printed
to the terminal, though no harm is actually done. If False (defualt),
then the temp directory is left intact if the project hasn't closed
after tries checks.
"""
count = 0
check_dir = os.path.join(projdir, '.mmproj-cleanup')
while os.path.exists(check_dir) and count < tries:
time.sleep(0.1)
count = count + 1
if count < tries or force:
shutil.rmtree(tempdir)
[docs]def convertToMMSortFieldsList(fields_list):
"""
Convert given tuples list to MM_SortField list.
:type fields_list: list of tuples
:param fields_list: list of tuples having property name and sort order.
e.g. [("Entry ID",ASCENDING),('Stars',DESCENDING)]
"""
mmsort_fields = []
if fields_list is not None:
for prop_name, sort_order in fields_list:
mmsort_fields.append(
projectmodel.MM_SortField(prop_name, sort_order == ASCENDING))
return mmsort_fields
ProjectException = projectmodel.ProjectException
[docs]class LockedProjectException(ProjectException):
pass
[docs]class InvalidProjectException(ProjectException):
pass
[docs]class ArchivedProjectException(ProjectException):
pass
[docs]class InvalidProjectVersion(ProjectException):
pass
# *** Other methods/actions/features to possibly add:
# - export selected rows PYTHON-3122
# - delete property column
#EOF