"""
Functionality for "ark" file format handling.
'sea' stands for Schrodinger Enhanced Ark.
The syntax of the ark format can be found elsewhere (e.g., Desmond
User Manual).
Below is an example, demonstrating the most basic usage of this module.
Say we have a file called 'config.cfg' with the following content::
------start of the content------
fep = {
lambda = "default:12"
i_window = ?
output = {
name = "$JOBNAME$[_replica$REPLICA$].dE"
first = 0.0
interval = 1.2
}
}
temperature = [300.0 310.0 320.0]
------end of the content------
We can parse it with a code like this::
import schrodinger.utils.sea as sea
cfg = sea.Map( open( "config.cfg", "r" ).read() )
In fact, the code does much more than merely parsing the content. But the
important thing here is that at the end we get an 'Map' object that enables
us to easily manipulate settings for the file as shown below::
assert( cfg.fep.lambda_ .val == "default:12" )
assert( cfg.fep.i_window .val == None )
assert( cfg.output.first .val == 0.0 )
assert( cfg.output.interval.val == 1.2 )
assert( cfg.output.name.raw_val == "$JOBNAME$[_replica$REPLICA$].dE" )
assert( cfg.temperature[0].val == 300.0 )
assert( cfg.temperature[1].val == 310.0 )
assert( cfg.temperature[2].val == 320.0 )
assert( cfg.temperature .val == [300.0, 310.0, 320.0]
# Another way to access the data:
assert( cfg["fep"]["lambda" ].val == "default:12" )
assert( cfg["fep"]["i_window"].val == None )
cfg.output.first.val = 1.0
assert( cfg.output.first.val == 1.0 )
cfg.output.i_window.val = 1
assert( cfg.output.first.val == 1 )
cfg.fep.lambda_.val = "a different string"
assert( cfg.fep.lambda_.val == "a different string" )
cfg.temperature[0].val = 20.0
assert( cfg.temperature[0].val == 20.0 )
# Adds a new key-value pair.
cfg["new_key"] = 1
# Deletes an existing key-value pair.
del cfg.fep["interval"]
print str( cfg )
#Result:
#fep = {
# i_window = 1
# lambda = "a different string"
# new_key = 1
# output = {
# name = "$JOBNAME$[_replica$REPLICA$].dE"
# first = 1.0
# }
#}
#temperature = [20.0 310.0 320.0]
Some explanations of the code:
- The ".val" is in fact a python 'property' for reading and mutating the value
of the parameter.
- In the file, we have a parameter named "lambda", but in the code, we access it
with a trailing underscore as 'lambda' is a python keyword.
- The value '?' in the file will correspond to 'None' in python code.
- The ".raw_val" is similar to the ".val". The different is that the latter will
give a value where the macro's are expanded, whereas the former will give the
original string. More information regarding macros is beyond the scope of this
docstring, please refere to the document.
- Notice an expression like: 'temperature[0].val' returns the value of the 0-th
element of the list, while 'temperature.val' returns a buildin 'list' object
for the entire list.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Yujie Wu
import keyword
import re
import weakref
from copy import deepcopy
import schrodinger.infra.ark as dark
from .common import boolean
from .common import debug_print
from .common import is_equal
Set = set
def _val_filter(val):
"""
Filters a given value 'val' and returns a new 'Sea' (or its derivatives)
object representing the value.
`val` object can be of the following types:
- 'Sea' or derivative: This function will just return a deep copy of 'val'
- list or tuple: This function will return a 'List' (see below) object.
- Other non dict types: This function will return a 'Atom' object.
"""
if (isinstance(val, Sea)):
return deepcopy(val)
elif (isinstance(val, list) or isinstance(val, tuple)):
return _list_filter(val)
return Atom(val)
def _list_filter(val_list):
"""
Returns a 'List' object for the given list of objects. Usually, we do not
need to call this function directly. Call '_val_filter' instead.
:param val_list: This should be either a list or a tuple object. The
elements of the list or tuple can be of any type.
"""
ret = List()
for e in val_list:
ret.append(e)
return ret
# Contains the definitions of macro's that are used by all 'Sea' objects. The
# keys are macros names, and the values are values that the macros represent.
_macro_dict = {}
[docs]def update_macro_dict(new_macro_dict):
global _macro_dict
_macro_dict.update(new_macro_dict)
[docs]def set_macro_dict(new_macro_dict):
global _macro_dict
_macro_dict = new_macro_dict
[docs]def get_macro_dict():
return _macro_dict
def _escape_dollar(s):
"""
This function is needed by the 'expand_macro' function below.
It replaces dollar char ``$`` as in the following cases: ``$$, $[, $]``
In a string with the '\x00' char. The results for these cases will be the
following: \x00\x00, \x00[, \x00] respectively.
"""
s = s.replace("$[", "\x00[", -1)
s = s.replace("$]", "\x00]", -1)
s = s.replace("$$", "\x00\x00", -1)
return s
_STRIP_BRACKET_PATTERN = re.compile(r"\x00\[[^\[]*?\$[^\]]*?\x00\]")
def _strip_bracket(s):
"""
Mutates a given string 's' by following the following rules:
1. If there is a dollar sign '$' within a fragment of string that is
delimited by '\x00[' and '\x00]', the entire fragment including the
'\x00[' and '\x00]' delimiters will be removed.
2. If there is no dollar sign within the fragment, then the fragment will
be retained, and only the delimiters will be removed.
Examples::
# Returns "desmond"
_strip_bracket( "desmond\x00[_$JOBNAME\x00]" )
# Returns "desmond_jobname"
_strip_bracket( "desmond\x00[_jobname\x00]" )
This function is usually used together with the '_escape_dollar' function
in the 'expand_macro' function (see below).
"""
s = _STRIP_BRACKET_PATTERN.sub("", s)
s = s.replace("\x00[", "")
s = s.replace("\x00]", "")
s = s.replace("\x00\x00", "$")
return s
[docs]def expand_macro(s, macro_dict):
"""
Replaces the macros in the string 's' using the values given by the macro
dictionary 'macro_dict'. The expanded string will be returned.
Rules or conventions about macros and expansion:
- All macros should start with a single '$', followed by capital letters,
e.g., "$JOBNAME", "$USERNAME".
- Optional string fragments should be bracketed by '$[' and '$]', e.g.,
"myjob$[_lambda$LAMBDANO$]", where "_lambda$LAMBDANO" is an optional
string fragment.
- Optional string fragments will be retained with the brackets '$[' and
'$]' stripped off if the macro '$LAMBDANO' is defined; otherwise, the
optional string together with the brackets will be removed.
"""
s = _escape_dollar(s)
for m, v in list(macro_dict.items()):
s = s.replace(m, str(v), -1)
return _strip_bracket(s)
[docs]class Sea:
"""
This is the base class the 'Atom', 'List', and 'Map' classes. A 'Sea'
object will manage three types of information:
tag: As the name suggests, tags allow users of the 'Sea' object to label
the object with strings and latter on to extract or print those with
certain tags.
parent: Pointer (weak reference) to the parent of this 'Sea' object.
Operations to manipulate these data are defined in this base class.
"""
@staticmethod
def _normalize_tag(tag):
"""
User can supply either a single string, or a list, or a set of strings
as tags. This function will normalize the string or list types of tags
to the set type.
This function will return a set object.
"""
if (isinstance(tag, str)):
tag = [
tag,
]
return set(tag)
@staticmethod
def _gen_sea(ark, parent):
"""
Creates a new 'Map' or 'List' or 'Atom' object from the given data 'ark'
and sets the parent of this new object to the given 'parent', and then
returns this new object.
:param ark: The data object for which to create a new 'Map' or 'List' or
'Atom' object for. The 'ark' must be of a buildin type. If it is a
dict, a 'Map' object will be created and returned; if it is a list or
tuple, a 'List' object will be created and returned; otherwise, an
'Atom' object will be created and returned.
:param parent: The parent of the new object. It can be None.
"""
if (isinstance(ark, dict)):
v = Map(ark, parent)
elif (isinstance(ark, list) or isinstance(ark, tuple)):
v = List(parent=parent)
if (ark != []):
atomtype_directives = {
"!int!": int,
"!real!": float,
"!str!": str,
"!bool!": bool,
}
try:
atomtype = atomtype_directives[ark[0]]
for e in ark[1:]:
v.quick_append(Hydrogen(e, atomtype, parent))
except:
for e in ark:
v.quick_append(Sea._gen_sea(e, v))
else:
v = Atom(ark, parent=parent)
return v
[docs] def __init__(self, parent=None):
"""
Creates and initializes a 'Sea' object.
:param parent: This parameter provides the parent of this 'Sea' object.
It can be None.
"""
# In case you wonder, a subclass of `Sea` may overload the __setattr__
# method. We don't want to call that method for these attributes since
# it could form an endless recursion.
self.__parent = None
self.__tag = Set() if (Set) else None
self.__pmode = None # None | "hier" | "path"
self.__parent = None if (parent is None) else weakref.ref(
parent) #self.set_parent( parent )
def __getstate__(self):
"""
This is needed for pickling. The returned object is of a buildin type
(dict, list, int, str, float, ...).
"""
return self.raw_val
def __setstate__(self, ark):
"""
Restores the 'Sea' object from an object of the buildin type.
"""
v = Sea._gen_sea(ark, None)
self.__dict__.update(v.__dict__)
def __deepcopy__(self, memo={}): # noqa: M511
"""
"""
raise NotImplementedError()
def __str__(self):
"""
Subclasses of the 'Sea' class must implement this function if conversion
of the subclass's instances to strings is needed.
"""
raise NotImplementedError(
"`__str__` method not implemented for sea.%s class" %
self.__class__.__name__)
def __ne__(self, rhs):
"""
Returns True if the value of this object does not equal that of 'rhs'.
"""
return not self.__eq__(rhs)
def __eq__(self, rhs):
"""
Returns True if the value of this object equals that of 'rhs'.
Subclasses of the 'Sea' class must implement this function.
"""
raise NotImplementedError(
"`__eq__` method not implemented for sea.%s class" %
self.__class__.__name__)
def __hash__(self):
return hash(id(self))
def __get_sval(self):
"""
Used by the 'sval' property, which is to get an 'Sea' object of the
value that is represented by this 'Sea' object. Normally this function
just returns itself. But when this 'Sea' object is referring to another
'Sea' object, the latter should be returned -- this functionality is
left to subclasses to implement if it makes sense for that subclass.
"""
return self
sval = property(fget=__get_sval,
doc="Readonly. Returns the current `Sea` object.")
[docs] def apply(self, op):
"""
Recursively applies the operation as given by 'op' to all 'Sea'
subobjects of this 'Sea' object.
"""
[docs] def parent(self):
"""
Rerturns the parent of this 'Sea' object or None if it does not have a
parent.
"""
return None if (self.__parent is None) else self.__parent()
[docs] def set_parent(self, parent):
"""
Sets the parent of this 'Sea' object to the given 'parent'.
"""
self.__parent = None if (parent is None) else weakref.ref(parent)
if (self.__parent):
self.set_pmode(self.__parent().pmode())
return self
[docs] def tag(self):
"""
Returns tags, which are 'set' objects.
"""
return self.__tag
[docs] def has_tag(self, tag):
"""
Returns True if we already tagged this object with the given 'tag'.
:param tag: The given 'tag' can be a string, or a list of strings, or a
'set' of strings.
"""
if (self.__tag is not None):
return self._normalize_tag(tag).issubset(self.__tag)
def _add_tag_impl(self, tag, propagate):
"""
"""
if (self.__tag is not None):
self.__tag |= tag
if (propagate):
self.apply(lambda v: v._add_tag_impl(tag, propagate))
[docs] def add_tag(self, tag, propagate=True):
"""
Tags this object with another string(s).
:param tag: The given 'tag' can be a string, or a list of strings, or a
'set' of strings.
:param propagate: If True, the function will propagate the operation to
all 'Sea' subobjects.
"""
if (self.__tag is not None):
self._add_tag_impl(self._normalize_tag(tag), propagate)
def _remove_tag_impl(self, tag, propagate):
"""
"""
if (self.__tag is not None):
self.__tag -= tag
if (propagate):
self.apply(lambda v: v._remove_tag_impl(tag, propagate))
[docs] def remove_tag(self, tag, propagate=True):
"""
Removes a tag.
:param tag: The given 'tag' can be a string, or a list of strings, or a
'set' of strings.
:param propagate: If True, the function will propagate the operation to
all 'Sea' subobjects.
"""
if (self.__tag is not None):
self._remove_tag_impl(self._normalize_tag(tag), propagate)
def _reset_tag_impl(self, tag, propagate):
"""
"""
if (self.__tag is not None):
self.__tag = tag
if (propagate):
self.apply(lambda v: v._reset_tag_impl(tag, propagate))
[docs] def reset_tag(self, tag, propagate=True):
"""
Resets the tag of this object to the given 'tag'.
:param tag: The given 'tag' can be a string, or a list of strings, or a
'set' of strings.
:param propagate: If True, the function will propagate the operation to
all 'Sea' subobjects.
"""
if (self.__tag is not None):
self._reset_tag_impl(self._normalize_tag(tag), propagate)
[docs] def clear_all_tag(self, propagate=True):
"""
Removes all tags.
:param propagate: If True, the function will propagate the operation to
all 'Sea' subobjects.
"""
if (self.__tag is not None):
self.__tag = set()
if (propagate):
self.apply(lambda v: v.clear_all_tag(propagate))
[docs] def pmode(self):
"""
Returns the printing mode.
"""
return self.__pmode if (self.__pmode) else "hier"
[docs] def set_pmode(self, pmode, propagate=True):
"""
Resets the printing mode of this object to the given 'pmode'.
:param propagate: If True, the function will propagate the operation to
all 'Sea' subobjects.
"""
self.__pmode = pmode
if (propagate):
self.apply(lambda v: v.set_pmode(pmode, propagate))
def _dump_impl(self, tag):
"""
"""
raise NotImplementedError(
"Subclass of 'Sea' does not implement the '_dump_impl' method: %s" %
type(self))
[docs] def dump(self, tag=set()): # noqa: M511
"""
Converts this 'Sea' object into a string that looks ugly (yet syntactically correct).
This method is 50%-900% faster than the __str__ method.
"""
return self._dump_impl(self._normalize_tag(tag))
[docs]class Atom(Sea):
"""
This class represents the "atomic" parameters in a config file. Atomic
parameters do not contain further sub-elements. For example,
'force.type' is an atomic parameter, whereas 'force' is not because it
has sub-elements like 'type', 'gibbs', etc.
Public attributes:
- validate - None by default. This attribute can be set to be a
callable object that assesses whether a given value is legal or not
for this 'Atom' object. The callable object should be able to
take one argument -- the value subject to its assessment and returns
either True (if the value is OK) or False (if the value is illegal).
"""
WILDCARD_PATTERN = re.compile(r'\*\ *\.')
# For detecting circular references.
__refmemo = []
[docs] @staticmethod
def num_wildcard(s):
"""
This function is used to tell about strings like "*.*.keyword". It
returns a tuple object. The first element is the number of wildcards
"*", the second element is the word at the end after the '.' symbol.
For example: num_wildcard( "*.*.keyword" ) will yield (2, "keyword").
See more examples in the unit tests below.
"""
length = len(s)
p = Atom.WILDCARD_PATTERN.search(s)
if (p is None):
return 0, s
n, m, r = 1, 0, ""
i, j = p.span()
if (j < length):
m, r = Atom.num_wildcard(s[j:])
return n + m, r
[docs] @staticmethod
def guess_value_type(s):
"""
Guesses the type of the object that 's' represents. A tuple will be
returned. The first element is the object of the guessed type, the
second element is the type.
If 's' is a non-str object of buildin type, 's' and its type will be
returned.
If 's' is str object, the type of the object that the string
represents will be guessed and the string will be converted to an
object of the guessed type. Note these strings: "yes", "true", "on",
"no", "false", and "off" will be considered as bool type of objects.
If 's' is None, (None, None) will be returned.
If 's' is of other types than the above, a ValueError exception will
be returned.
"""
import numpy as np
val = None
type = None
if (s is None):
pass
elif (isinstance(s, bool)):
val = s
type = boolean
elif (isinstance(s, (int, np.integer))):
val = int(s)
type = int
elif (isinstance(s, (float, np.floating))):
val = float(s)
type = float
elif (isinstance(s, (bytes, str))):
# Extracts the value from the string `s` and determines the value type.
try:
val = int(s)
type = int
except ValueError:
try:
val = float(s)
type = float
except ValueError:
if isinstance(s, bytes):
s = s.decode()
try:
val = boolean(s)
type = boolean
except ValueError:
val = str(s)
type = str
else:
raise ValueError("cannot convert '%s' to `sea.Atom`" %
str(s.__class__))
return (
val,
type,
)
[docs] def __init__(self, s=None, type=None, parent=None):
"""
Constructs a 'Atom' object based on the value 's'.
:param s: Can be a floating number, a boolean value, an integer number,
or a string. If 's' is a string. the function will try to guess the
type of object that the string represents and convert the string to
the guessed type. 's' cannot be a dict, or tuple, or dict object.
:param type: Supplies a type, instead of using the guessed one. If it is
None, the guessed type will be used.
:param parent: Specifies the parent of this 'Atom' object.
"""
Sea.__init__(self, parent)
if (isinstance(s, dict)):
raise ValueError(
"cannot construct an `sea.Atom` object from a dict object")
elif (isinstance(s, list)):
raise ValueError(
"cannot construct an `sea.Atom` object from a list object")
elif (isinstance(s, tuple)):
raise ValueError(
"cannot construct an `sea.Atom` object from a tuple object")
if (type is None):
self._val, self._type = Atom.guess_value_type(s)
else:
self._val = None if (s is None) else type(s)
self._type = boolean if (type is bool) else type
def __str__(self):
"""
Converts the value to a string.
If the value itself is a string, then returns the value flanked with
double-quotes (i.e., "<value>").
"""
val = self._val
if (self._val is None):
return "?"
elif (self._type is boolean):
return "true" if (val) else "false"
elif (self._type is str):
if (re.match("^[A-Za-z0-9_-]+$", val)):
return val
s = ""
for e in val:
s += ('\\' if (e in [
'"',
'\\',
]) else '') + e
return '"' + s + '"'
else:
return str(val)
def __eq__(self, rhs):
"""
Returns True if the value of this object equals that of 'rhs'.
If the values of both this and 'rhs' are floating numbers, the
`is_equal` function (see above) will be used for comparison.
If 'rhs' is not an 'Atom' object, False will be returned.
"""
if (not isinstance(rhs, Atom)):
return False
val = self._val
va_ = rhs._val
result = True if (val is None and
va_ is None) else (False if
(val is None or va_ is None) else
(is_equal(val, va_) if
(self._type is float) else
(val == va_)))
if (not result):
debug_print("atom comparison: '{}' '{}'".format(
str(val),
str(va_),
))
return result
def __deepcopy__(self, memo={}): # noqa: M511
"""
"""
newobj = self.__new__(self.__class__)
memo[id(self)] = newobj
newobj._type = self._type
newobj._val = self._val
newobj._Sea__tag = deepcopy(self._Sea__tag)
newobj._Sea__pmode = self._Sea__pmode
newobj._Sea__parent = None
return newobj
def __set_val(self, val):
"""
Sets the value.
:param val: Can be any object as long as it can be converted to the
internal type of the value via the `_convert` method. If 'val'
cannot be converted, a ValueError exception will be raised.
"""
if (val is None):
self._val = val
else:
if (self._type is None):
self._val, self._type = Atom.guess_value_type(val)
else:
try:
self._val = self._type(val)
except ValueError:
raise ValueError("Invalid value")
def __get_raw_val(self):
"""
Returns the raw value.
"""
return self._val
def __get_sval(self):
"""
Used by the `sval` property, which is to get an `Sea` object of the
value that is represented by this `Sea` object. If the value is a
reference, the `sval` of the referenced `Sea` object will be returned;
otherwise, this object will be returned.
This function will raise ValueError if the reference is invalid or if
the reference is circular.
"""
# A general comment on `__refmemo`: We use it as a memo to keep track
# of references that this call is going through during the dereferencing
# process. This is because the value pointed to by the current ref
# could itself be another reference, and so on. `__refmemo` needs kept
# until the function exits.
va_ = self._val
if (va_ is not None and self._type is str):
va_ = expand_macro(va_, get_macro_dict())
val = va_.strip()
if (val != "" and val[0] == '@'):
if (self in Atom.__refmemo):
s = "Circular reference:\n "
for e in Atom.__refmemo:
s += ("%s => " % _pathname(e))
s += _pathname(self)
Atom.__refmemo = []
raise ValueError(s)
else:
Atom.__refmemo.append(self)
val = val[1:].strip()
if (val[0] == '.'):
root = self
parent = root.parent()
while (parent is not None):
root = parent
parent = root.parent()
key = val[1:]
elif (val[0] == '*'):
num_parent, key = Atom.num_wildcard(val)
root = self.parent()
for i in range(num_parent):
if (root is None):
Atom.__refmemo = []
raise ValueError("Invalid reference: %s" % val)
root = root.parent()
else:
key = val
root = self
parent = root.parent()
while (parent is not None):
root = parent
parent = root.parent()
if (isinstance(root, Map) and key in root):
ret = root.get_value(key).sval
Atom.__refmemo = []
return ret
if (isinstance(root, Map) and key in root):
ret = root.get_value(key).sval
Atom.__refmemo = []
return ret
Atom.__refmemo = []
raise ValueError("Invalid reference: %s" % val)
Atom.__refmemo = []
return self
def __get_bval(self):
"""
Returns a new `Sea` object, which has all macros expanded and
references dereferenced.
"""
sval = self.sval
if (sval is self):
newobj = Atom(self.val)
newobj.reset_tag(self.tag())
else:
newobj = sval.bval
return newobj
def __get_dval(self):
"""
Returns a new `Sea` object, which has all references dereferenced,
but macros are preserved.
"""
sval = self.sval
if (sval is self):
newobj = Atom(self.raw_val)
newobj.reset_tag(self.tag())
else:
newobj = sval.dval
return newobj
def __get_val(self):
"""
Returns the value.
If the raw value is a string, the returned value will have all
macros (if any) expanded.
If the value is a reference, the actual referenced value will be
returned.
"""
sval = self.sval
if (sval is self):
val = self._val
if (val is not None and self._type is str):
val = expand_macro(val, get_macro_dict())
else:
val = sval.val
return val
raw_val = property(fget=__get_raw_val,
fset=__set_val,
doc="Readwrite. When read, this returns the raw value.")
sval = property(fget=__get_sval,
doc="Readonly. Returns the dereferenced `Sea` object.")
bval = property(fget=__get_bval,
doc="Readonly. Returns a new `Atom` object, which has all macros expanded and references " \
"dereferenced.")
dval = property(
fget=__get_dval,
doc="Readonly. Returns a new `Atom` object with dereferenced value.")
val = property(fget=__get_val,
fset=__set_val,
doc="Readwrite. When read, this returns the current value.")
[docs] def update(self, val, tag=set()): # noqa: M511
"""
Updates the value with 'val'.
If 'val' is a `Atom`, then this `Atom` object will be altered to be
the same as 'val'. So the type of the value of this object can be
altered by the `update` function.
If 'val' is not a `Atom`, then this function will behave exactly the
same as setting the value via the 'val' property.
:param val: Can be any object as long as it can be converted to the
internal type of the value via the `_convert` method. If 'val'
cannot be converted, it is ignored.
:param tag: Add the tag to this object.
"""
if (isinstance(val, Atom)):
self._val = val._val
self._type = val._type
else:
self.__set_val(val)
self.add_tag(tag)
def _dump_impl(self, tag):
"""
"""
return self.__str__()
[docs]class Hydrogen(Atom):
[docs] def __init__(self, s, type, parent=None):
"""
Just a slightly faster way to construct a 'Atom' object
"""
Sea.__init__(self, parent)
self._val = type(s)
self._type = type
[docs]class List(Sea, list):
"""
This class represents the "list" parameters in a config file.
This class' behavior is very similar to that of the buildin `list` class.
"""
[docs] def __init__(self, val=[], parent=None): # noqa: M511
"""
Constructs a `List` object from the given 'val'.
:param val: Must be either a list or tuple. The elements must be of
either buildin types or `Sea`.
:param parent: Specifies the parent of this `List` object.
"""
Sea.__init__(self, parent)
if (self.parent()):
self.set_pmode(self.parent().pmode())
self.val = val
def __reduce__(self):
"""
DESMOND-10233: Patch for python 3.8.
In the new version of python, list.__reduce__() has
a special optimization which will extend pickled values
on restore instead of calling `__getstate__`.
This mechanism is not compatible with how the Sea parser
works. Instead, fall back to `Sea.__reduce__()` here
so the pickled List can be loaded.
"""
return super().__reduce__()
def __getitem__(self, i):
if isinstance(i, slice):
return List(list.__getitem__(self, i), self.parent())
return list.__getitem__(self, i)
def __getslice__(self, i, j):
"""
Returns a new list composed of elements deep-copied from this list from
i-th through the j-th element (but not including the j-th element).
The parent of the new list remains the same as this list.
DEPRECATION: This has been deprecated since Python version 2.0, and does
NOT work any more in Python version 3.x. The replacement is the
`__getitem__` method accepting a `slice` object.
"""
new_list = List(list.__getslice__(self, i, j), self.parent())
return new_list
def __add__(self, rhs):
"""
Returns a new list composed of elements deep-copied from this list and
the 'rhs' list.
The parent of the new list remains the same as this list.
:param rhs: The 'rhs' can be a buildin list or tuple object. In this
case, a `Sea` object will be made out of 'rhs' and then
concatenated to the copy of this list.
"""
return deepcopy(self).extend(rhs)
def __iadd__(self, rhs):
"""
Updates this list with the given 'rhs'.
:param rhs: The 'rhs' can be a buildin list or tuple object. In this
case, a `Sea` object will be made out of 'rhs' and then
concatenated to this list.
"""
return self.extend(rhs)
def __setitem__(self, index, val):
"""
Resets the 'index'-th element with the given value 'val'.
:param val: Can be either a 'Sea' object or a buildin type object. In
the latter case, a 'Sea' object will be made out of 'val' and then
be used to set the 'index'-th element.
"""
if (not isinstance(val, Sea)):
raise ValueError("val is not a sea.Sea object")
list.__setitem__(self, index, val)
self[index].set_parent(self)
def __str__(
self,
ind="",
tag=set(), # noqa: M511
pre="",
is_list_elem=False,
outdent=""):
"""
Converts this list to a string in the ark syntax and returns the string.
:param ind: Indentation of the converted string.
:param tag: Converts only the elements with the given tag.
"""
s = "["
if (is_atom_list(self)):
for e in self:
s += str(e) + " "
s += "]"
else:
ind2 = ind + " "
i = 0
if (is_list_elem):
i = 1
e = self[0]
if (isinstance(e, Atom)):
s += str(e)
elif (isinstance(e, Map)):
s += "{{{}{}}}\n".format(
e.__str__(ind2, tag, pre, True),
ind,
)
elif (isinstance(e, List)):
if (is_atom_list(e)):
s += "%s" % e.__str__(ind2, tag, pre, True, ind)
else:
s += "%s\n" % e.__str__(ind2, tag, pre, True, ind)
n = len(self)
while (i < n):
e = self[i]
i += 1
if (isinstance(e, Atom)):
s += "\n{}{}".format(
ind,
str(e),
)
elif (isinstance(e, Map)):
s += "\n{}{{{}{}}}\n".format(
ind,
e.__str__(ind2, tag, pre, True),
ind,
)
elif (isinstance(e, List)):
if (is_atom_list(e)):
s += "\n{}{}".format(
ind,
e.__str__(ind2, tag, pre, True, ind),
)
else:
s += "\n{}{}\n".format(
ind,
e.__str__(ind2, tag, pre, True, ind),
)
s += ("%s]" % outdent) if (s[-1] == "\n") else ("\n%s]" % outdent)
return s
def __eq__(self, rhs):
"""
Returns True if the value of this object equals that of 'rhs'.
"""
if (not isinstance(rhs, list)):
return False
if (self is not rhs):
len1 = len(self)
len2 = len(rhs)
if (len1 != len2):
debug_print("list comparison: list lengthes: %d %d" % (
len1,
len2,
))
return False
for i, e1, e2 in zip(list(range(len1)), self, rhs):
if (e1 != e2):
debug_print("list comparison: element %d changed" % i)
return False
return True
[docs] def __contains__(self, item):
return list.__contains__(self, _val_filter(item))
def __deepcopy__(self, memo={}): # noqa: M511
"""
"""
newobj = self.__new__(self.__class__)
memo[id(self)] = newobj
newobj._Sea__tag = deepcopy(self._Sea__tag)
newobj._Sea__pmode = self._Sea__pmode
newobj._Sea__parent = None
for e in self:
newobj.quick_append(deepcopy(e, memo))
return newobj
def __set_val(self, val):
"""
Sets the value, which must be a list or tuple.
"""
if (isinstance(val, tuple) or isinstance(val, list)):
list.__init__(self)
for e in val:
self.append(e)
else:
raise ValueError("value is not a list or tuple")
def __get_raw_val(self):
"""
Returns the raw value.
"""
val = []
for e in self:
val.append(e.raw_val)
return val
def __get_bval(self):
"""
Returns a new `Sea` object, which has all macros expanded and references dereferenced.
"""
newobj = List()
for e in self:
newobj.append(e.bval)
_asea_copy(self, newobj)
return newobj
def __get_dval(self):
"""
Returns a new `Sea` object with dereferenced values.
"""
newobj = List()
for e in self:
newobj.append(e.dval)
_asea_copy(self, newobj)
return newobj
def __get_val(self):
"""
Returns the value with macro's expended and references dereferenced if the raw value is a string.
"""
val = []
for e in self:
val.append(e.val)
return val
raw_val = property(
fget=__get_raw_val,
fset=__set_val,
doc="Readwrite. When read, this returns the current value.")
bval = property(fget=__get_bval,
doc="Readonly. Returns a new `List` object, which has all macros expanded and references " \
"dereferenced.")
dval = property(
fget=__get_bval,
doc="Readonly. Returns a new `List` object dereferenced values.")
val = property(fget=__get_val,
fset=__set_val,
doc="Readwrite. When read, this returns the current value.")
[docs] def append(self, val):
"""
Appends the given value 'val' to this list.
"""
list.append(self, _val_filter(val))
self[-1].set_parent(self)
[docs] def quick_append(self, val):
"""
Appends the given value 'val' to this list.
"""
list.append(self, val)
self[-1].set_parent(self)
[docs] def extend(self, val_list):
"""
Extends this list with the given list or tuple 'val_list'.
"""
n = len(val_list)
list.extend(self, _val_filter(val_list))
for e in range(-n, 0):
self[e].set_parent(self)
return self
[docs] def insert(self, index, val):
"""
Inserts the given value 'val' to the 'index'-th position in the list.
"""
list.insert(self, index, _val_filter(val))
self[index].set_parent(self)
[docs] def pop(self, index=-1):
"""
Removes the 'index'-th element from the list and returns the removed
element.
The parent of the returned element will be set to None.
"""
e = list.pop(self, index)
e.set_parent(None)
return e
[docs] def index(self, obj, **kwargs):
return list.index(self, _val_filter(obj), **kwargs)
[docs] def apply(self, op):
"""
Recursively applies the operation as given by 'op' to all 'Sea'
subobjects of this 'Sea' object.
"""
for e in self:
op(e)
[docs] def update(self, ark=None, tag=set()): # noqa: M511
"""
Updates this list with 'ark'.
:param ark: If 'ark' is None, no effects. If the first element has a
string value "!append!", the remaining elements in 'ark' will be
appended to this list. If the first element has a string value
"!removed!", the remaining elements in 'ark' will be removed from
this list. Otherwise, this list will be completely reset to 'ark'.
:param tag: Resets the tag of this list to the given 'tag'.
"""
if (ark is None):
return self
# Ensures that an execution list is immutable.
if (self and isinstance(self[0], Atom) and self[0].val in [
"!append!",
"!remove!",
]):
return self
if (not isinstance(ark, list)):
raise ValueError("type of the `ark` argument is not list")
ark = ark if (isinstance(ark, Sea)) else _val_filter(ark)
if (ark == []):
list.__init__(self, [])
else:
e0 = ark[0]
if (isinstance(e0, Atom) and isinstance(e0.val, str)):
e0 = e0.val.lower()
if (e0 == "!append!"):
self.extend(ark[1:])
elif (e0 == "!remove!"):
index = set()
for a in ark[1:]:
for i, e in enumerate(self):
if (e == a):
index.add(i)
index = list(index)
index.sort(reverse=True)
for i in index:
del self[i]
else:
list.__init__(self, deepcopy(ark))
for e in self:
e.set_parent(self)
else:
list.__init__(self, deepcopy(ark))
for e in self:
e.set_parent(self)
self.reset_tag(tag)
def _set_value_helper(self, key_index_list, value, tag):
"""
This functions uses the given 'key_index_list' to locate the `Sea`
object and changes its value to the given 'value'.
"""
k0 = key_index_list[0]
k_ = key_index_list[1:]
if (k_ == []):
value.set_parent(self)
self[k0] = value
else:
try:
v = self[k0]
except IndexError:
raise IndexError(
"cannot assign value to a non-existing element in list")
else:
if (isinstance(v, Atom)):
raise KeyError()
v._set_value_helper(k_, value, tag)
self.add_tag(tag, propagate=False)
def _del_key_helper(self, key_index_list):
k0 = key_index_list[0]
k_ = key_index_list[1:]
if k_ == []:
del self[k0]
else:
try:
v = self[k0]
except IndexError:
raise IndexError("cannot delete non-existing element in list")
else:
if isinstance(v, Atom):
raise KeyError("cannot delete non-existing key")
v._del_key_helper(k_)
def _dump_impl(self, tag):
"""
"""
s = "["
for e in self:
s += ("{" + e._dump_impl(tag) + "}" if
(isinstance(e, Map)) else e._dump_impl(tag)) + "\n"
return s + "]"
[docs]class Key(str):
"""
An instance of this class will be used as the key of the `Map` class (see below).
"""
def __new__(cls, key):
"""
Creates a new `Key` object for the given string 'key'.
If 'key' is a python keyword, a underscore '_' will be appended to the
original string.
"""
if (isinstance(key, Key)):
key = key.orig_key()
else:
key = str(key)
if (keyword.iskeyword(key)):
key = key + "_"
return str.__new__(cls, key)
[docs] def __init__(self, key):
"""
Constructs a new `Key` object for the given string 'key'.
"""
if (isinstance(key, Key)):
self._orig_key = key.orig_key()
else:
self._orig_key = str(key)
# __init__
[docs] def orig_key(self):
"""
Returns the original string.
"""
return self._orig_key
[docs]class Map(Sea):
"""
This class represents the "map" parameters in a config file.
This class' behavior is very similar to that of the buildin `dict` class.
"""
INDEX_PATTERN = re.compile(r'([^\[]*)\[ *([+-]*[1234567890]*) *\]')
INDEX_PATTERN2 = re.compile(r'\[ *[+-]*([1234567890]*) *\]')
@staticmethod
def _get_key_index(s):
"""
Parses a string that represents a key-index object in the form of "key[index]". Note that in this form, there is only
one key, but may be more than 1 indices (e.g., key[index1][index2]).
This function returns a list, where the first element is the key (a string object), and remaining are indices (integer
objects). For example, _get_key_index( "key[1][2]" ) should return ["key", 1, 2,].
"""
length = len(s)
p = Map.INDEX_PATTERN.search(s)
if (p is None):
return [
s,
]
key = p.group(1).strip()
index = [
key,
int(p.group(2)),
]
i, j = p.span()
if (j < length):
index.extend(Map._get_index(s[j:]))
return index
@staticmethod
def _get_index(s):
"""
Parses a string that represents an index of a list in the form of "[index]". Note that in this form, there may be more
than 1 indices (e.g., [index1][index2]).
This function returns a list, where the elements are the indices (integer objects). For example, _get_index( "[1][2]" )
should return [1, 2,].
This function is used by '_get_key_index' function (see above).
"""
length = len(s)
p = Map.INDEX_PATTERN2.search(s)
if (p is None):
return []
index = [
int(p.group(1)),
]
i, j = p.span()
if (j < length):
index.extend(Map._get_index(s[j:]))
return index
@staticmethod
def _parse_composite_key(key):
"""
Parses a composite key in the form of, e.g., "keyword1.keyword2[1][2].keyword3", and returns a list, e.g.,
["keyword1", "keyword2", 1, 2, "keyword3",].
"""
key = key.split('.')
ret = []
for k in key:
k = k.strip()
ret.extend(Map._get_key_index(k))
return ret
[docs] def __init__(
self,
ark=dark.fromString("{}"), # noqa: M511
parent=None):
"""
Constructs a 'Map' object with a given 'ark'. The 'ark' can be of the
following types of objects:
- dict The 'Map' object will be constructed consistent with the dict
object.
- Map The 'ark' will be deep-copied.
- str The string will be parsed and the 'Map' object will be
constructed for the parsed string.
- list The elements of the list object must be of the above types. A
new 'Map' object will be constructed using the first elements, and
then the 'Map' object will be updated with the remaining objects in
the list.
If 'ark' is not provided, an empty 'Map' will be constructed.
User can optionally specify the 'parent' parameter, which will set the
parent of this 'Map' object to the given value of the 'parent'
parameter.
"""
Sea.__init__(self, parent)
ark_update = []
if (isinstance(ark, list)):
ark_update = ark[1:]
ark = ark[0]
if isinstance(ark, bytes):
ark = ark.decode('utf-8')
if isinstance(ark, str):
ark = ark.strip()
if (ark == "" or ark[0] != "{"):
ark = "{" + ark + "\n}"
ark = dark.fromString(ark)
keys = list(ark)
for k in keys:
self.__dict__[Key(k)] = Sea._gen_sea(ark[k], self)
if ark_update != []:
self.update(ark_update)
# __init__
def __setattr__(self, key, value):
"""
Sets an attribute called 'key' with 'value'.
- 'key' can be a str object or a 'Key' object.
- 'value' can be a 'Sea' object or other types of object (including None).
- If 'key' already exists in this 'Map' object and its value is a 'Sea' object, then 'value' must be either None or a
- 'Sea' object. In other words, you cannot change a 'Sea' value to a non-'Sea' value except for None.
- The parent of the 'value', if it is a 'Sea' object, will reset to this 'Map' object after this function.
"""
if (isinstance(value, Sea)):
if ("_Sea__" == key[:6]):
key = str(key)
else:
key = Key(key)
else:
if (key in self.__dict__):
old_value = self.__dict__[key]
if (value is not None and isinstance(old_value, Sea) and
not isinstance(value, Sea)):
raise ValueError(
"cannot reassign attribute '%s' to a non-`sea.Sea` object"
% key)
self.__dict__[key] = value
if (isinstance(key, Key) and isinstance(value, Sea)):
value.set_parent(self)
def __str__(
self,
ind="",
tag=set(), # noqa: M511
pre="",
is_list_elem=False):
"""
Converts this 'Map' object into a string representation.
- 'ind' specifies the indentation of the converted string.
- 'tag' specifies the tags. And only the keys with the specified tags
will be converted. 'tag' can be a string (representing a single tag)
or a 'list' or 'set' of strings (representing multiple tags).
- 'pre' specifies the prefix.
"""
s = ""
ind2 = ind + " "
key_value = self.key_value(tag, should_sort=True)
is_first = True
for k, v in key_value:
ind3 = "" if (is_first and is_list_elem) else ind
k = k.orig_key()
is_first = False
if (isinstance(v, Map)):
if (v.pmode() == "hier"):
s += "{}{} = {{\n{}{}}}\n".format(
ind3,
k,
v.__str__(ind2, tag),
ind,
)
elif (v.pmode() == "path"):
s += v.__str__(ind, tag, pre + k + ".", is_list_elem)
else:
raise ValueError("Unrecoginized value of pmode: %s" %
v.pmode())
elif (isinstance(v, Atom)):
if (v.pmode() == "hier"):
s += "{}{} = {}\n".format(
ind3,
k,
str(v),
)
elif (v.pmode() == "path"):
s += "{}{}{} = {}\n".format(
ind3,
pre,
k,
str(v),
)
elif (isinstance(v, List)):
if (v.pmode() == "hier"):
s += "{}{} = {}\n".format(
ind3,
k,
v.__str__(ind2, tag, outdent=ind),
)
elif (v.pmode() == "path"):
s += "{}{}{} = {}\n".format(
ind,
pre,
k,
v.__str__(ind2, tag),
)
return s
def __eq__(self, rhs):
"""
Compares this 'Map' object with another 'Map' object 'rhs', returns True if the keys and values in both are equal, or
False if otherwise.
Note if '__CHECK_SEA_DEBUG' is set to True, difference in detail will be printed to the stdout.
"""
if (not isinstance(rhs, Map)):
return False
if (self is rhs):
return True
for k, v in self.key_value():
if (isinstance(v, Map) or isinstance(v, Atom)):
debug_print(f"kv: {k} {v} {self.raw_val} {rhs.raw_val}")
try:
rv = rhs[k]
except KeyError:
debug_print("map comparison: key '%s' was lost" % k)
return False
if (rv.__class__ != v.__class__):
debug_print(
"map comparison: key '{}'s type changed: {} vs {}".
format(
k,
rv.__class__,
v.__class__,
))
return False
if (v != rv):
debug_print("map comparison: key '%s's value changed" % k)
return False
elif (isinstance(v, List)):
try:
rv = rhs[k]
except KeyError:
debug_print("map comparison: key '%s' was lost" % k)
return False
if (isinstance(rv, List)):
if (v != rv):
debug_print("map comparison: key '%s' changed" % k)
return False
else:
debug_print(
"map comparison: key '%s's type changed: %s vs %s" %
(k, rv.__class__, v.__class__))
return False
for k, v in rhs.key_value():
if (k not in self):
debug_print("map comparison: key '%s' present only in the rhs" %
k)
return False
return True
def __hash__(self):
return hash(id(self))
def __iter__(self):
"""
This is the iterator fr the support for the 'for ... in ...' syntax. The element yielded is the key of this 'Map'
object.
"""
keys = self.keys()
yield from keys
def __getitem__(self, key):
"""
Returns the value associated with the 'key'.
- 'key' should be string or 'Key' object.
- If 'key' does not exist in this 'Map' object, the 'KeyError' exception will be raised.
"""
try:
v = self.__dict__[Key(key)]
if (not isinstance(v, Sea)):
raise KeyError
except KeyError:
raise KeyError("key '%s' not found" % key)
return v
def __setitem__(self, key, value):
"""
Associates the given 'value' with the given 'key'.
- 'key' should be string or 'Key' object.
- If 'key' does not exist in this 'Map' object before, the 'key' will
be put into the 'Map' object.
- 'value' will be processed first by the '_val_filter' function
(see above), the 'Sea' object returned by the '_val_filter' function will be the actual object associated with the key. So any object acceptable by the '_val_filter' function can be used here.
"""
value = _val_filter(value)
value.set_parent(self)
self.__dict__[Key(key)] = value
def __delitem__(self, key):
"""
Removes a key and the associated value from the 'Map' object.
"""
key = Key(key)
value = self.__dict__[key]
if (not isinstance(value, Sea)):
raise KeyError("key '%s' not in the `sea.Map` object" % key)
del self.__dict__[key]
def __deepcopy__(self, memo={}): # noqa: M511
"""
"""
newobj = self.__new__(self.__class__)
memo[id(self)] = newobj
newobj._Sea__parent = None
newobj._Sea__pmode = self._Sea__pmode
newobj_dict = newobj.__dict__
for k, v in list(self.__dict__.items()):
new_v = deepcopy(v, memo)
if (isinstance(new_v, Sea)):
new_v.set_parent(newobj)
newobj_dict[k] = new_v
newobj._Sea__parent = None
return newobj
def __set_val(self, val):
"""
This function sets the value of the 'val' property (see below).
:param val: 'val' must be either a dict or a 'Map' object, otherwise a 'ValueError' exception will be raised.
"""
if (isinstance(val, dict) or isinstance(val, Map)):
val = _val_filter(val)
keys = self.keys()
for k in keys:
del self[k]
key_value = val.key_value()
for k, v in key_value:
v.set_parent(self)
self[k] = v
else:
raise ValueError("value is not a dict or `sea.Map`")
def __get_raw_val(self):
"""
This function returns the value of the 'raw_val' property (see below).
Returns the raw value of this 'Map' object. The value will be in the form of a dict object, and all macros will be kept
as is.
"""
val = {}
for k, v in self.key_value():
val[k.orig_key()] = v.raw_val
return val
def __get_bval(self):
"""
Returns a new 'Sea' object, which has all macros expanded and references dereferenced.
This function is used by the 'bval' property (see below).
"""
newobj = Map()
for k, v in self.key_value():
newobj[k] = v.bval
_asea_copy(self, newobj)
return newobj
def __get_dval(self):
"""
Returns a new `Sea` object, which has all references dereferenced but macros kept as is.
This function is used by the 'dval' property (see below).
"""
newobj = Map()
for k, v in self.key_value():
newobj[k] = v.dval
_asea_copy(self, newobj)
return newobj
def __get_val(self):
"""
Returns the value of this 'Map' object. The value will be in the form of a dict object, and all macros will be expended
and references dereferenced.
"""
val = {}
for k, v in self.key_value():
val[k.orig_key()] = v.val
return val
raw_val = property(
fget=__get_raw_val,
fset=__set_val,
doc=
"Readwrite. When read, this returns the current raw value (references and macros kept as is)."
)
bval = property(fget=__get_bval,
doc="Readonly. Returns a new `Map` object, which has all macros expanded and references " \
"dereferenced.")
dval = property(
fget=__get_dval,
doc="Readonly. Returns a new `Map` object with dereferenced values.")
val = property(fget=__get_val, fset=__set_val,
doc="Readwrite. When read, this returns the current value (macros will be expanded, and references" \
" will be dereferenced.")
[docs] def keys(self, tag=set()): # noqa: M511
"""
Returns references of all keys in a list. Note each element in the returned list will be of the 'Key' type.
"""
ret = []
ret_append = ret.append
for k, v in self.__dict__.items():
if isinstance(k, Key) and isinstance(v, Sea) and v.has_tag(tag):
ret_append(k)
return ret
[docs] def values(self, tag=set()): # noqa: M511
"""
Returns references of all values in a list. Note each element in the returned list will be of the 'Sea' type.
"""
ret = []
ret_append = ret.append
for k, v in self.__dict__.items():
if isinstance(k, Key) and isinstance(v, Sea) and v.has_tag(tag):
ret_append(v)
return ret
[docs] def key_value(
self,
tag=set(), # noqa: M511
should_sort=False):
"""
Returns the key and associated value in a list. Note each element in the returned list will be a 2-tuple object. The
first element of the tuple is a reference of the key, and the second element is a reference of the value.
User can optionally set the 'should_sort' parameter to True, which will let the function return a sorted list. The
sorting will be based on the alphanumeric order of 'key'.
"""
ret = []
ret_append = ret.append
for k, v in self.__dict__.items():
if isinstance(k, Key) and isinstance(v, Sea) and v.has_tag(tag):
ret_append((k, v))
if should_sort:
ret.sort()
return ret
[docs] def clone(self, orig):
"""
Lets this 'Map' object become a deep copy of the 'orig' 'Map' object.
"""
for k in orig.__dict__:
if (k != "_Sea__parent"):
v = deepcopy(orig.__dict__[k])
if (isinstance(k, Key) and isinstance(v, Sea)):
v.set_parent(self)
self.__dict__[k] = v
key_to_delete = []
for k in self.__dict__:
if (k not in orig.__dict__):
key_to_delete.append(k)
for k in key_to_delete:
del self.__dict__[k]
[docs] def apply(self, op):
"""
Recursively applies the operation as given by 'op' to all 'Sea'
subobjects of this 'Sea' object.
"""
for v in self.values():
op(v)
[docs] def update(self, ark=None, file=None, tag=set()): # noqa: M511
"""
Updates this 'Map' object with the given 'ark' or with the given 'file'.
:param file: If 'file' is not None, it must be the name of a file in the ark file format. If 'file' is given, the 'ark'
parameter will be ignored.
:param ark: ark can be a string or a dict or a 'Map' object. Or ark can be list of the previous objects.
"""
if (file is not None):
with open(file) as file_reader:
ark = file_reader.read()
if (ark is None):
return self
if (isinstance(ark, (bytes, str))):
ark = Map(ark)
elif (isinstance(ark, dict)):
ark = Map(ark)
elif (isinstance(ark, Map)):
pass
elif (isinstance(ark, list)):
for e in ark:
self.update(e, tag=tag)
return self
else:
raise TypeError("Unsupported class of the `ark` argument: %s" %
ark.__class__)
key_value = ark.key_value()
for k, v in key_value:
if (k in self and v.__class__ == self[k].__class__):
self[k].update(v, tag=tag)
else:
self[k] = v
self[k].reset_tag(tag)
self.add_tag(tag, propagate=False)
return self
[docs] def has_key(self, key):
return self.__contains__(key)
[docs] def __contains__(self, key):
"""
Returns True if this 'Map' object has the 'key'. Returns False if otherwise.
"""
if key is None:
return False
kl = Map._parse_composite_key(key)
v = self
for k in kl:
try:
v = v[k]
except:
return False
return True
[docs] def get_value(self, key):
"""
Returns the value of the given 'key'.
:param key: The 'key' can be a composite key (i.e., the pathway notation), such as, e.g., "key[1][2].key2.key3[2]".
"""
kl = Map._parse_composite_key(key)
v = self
for k in kl:
v = v[k]
return v
def _set_value_helper(self, key_index_list, value, tag):
"""
This is the actual implmentation of the 'set_value' function.
:param key_index_list: A list returned by the 'Map._parse_composite_key' function.
:param value: Same as the 'value' parameter in the 'set_value' function.
:param tag: Same as the 'tag' parameter in the 'set_value' function.
"""
k0 = key_index_list[0]
k_ = key_index_list[1:]
if (k_ == []):
if (isinstance(value, List)):
e0 = value[0]
if (isinstance(e0, Atom) and isinstance(e0.val, str)):
e0 = e0.val.lower()
if (e0 == "!append!"):
value = self[k0] + value[1:]
elif (e0 == "!remove!"):
index = set()
for a in value[1:]:
for i, e in enumerate(self):
if (e == a):
index.add(i)
index = list(index)
index.sort(reverse=True)
for i in index:
del self[k0][i]
value = self[k0]
value.set_parent(self)
value.add_tag(tag, propagate=True)
self[k0] = value
else:
try:
v = self[k0]
if (isinstance(v, Atom)):
raise KeyError()
v._set_value_helper(k_, value, tag)
except KeyError:
for e in k_:
if (isinstance(e, int)):
raise KeyError(
"cannot assign value to a non-existing list")
self[k0] = Map("", self)
self[k0]._set_value_helper(k_, value, tag)
self.add_tag(tag, propagate=False)
[docs] def set_value(self, key, value, tag=set()): # noqa: M511
"""
Associates the given value with the given key. The difference between this function and the __setitem__ operator is
that the former allows us to reset the tag of the value.
:param key: The 'key' can be a composite key (i.e., the pathway notation), e.g., "key[1].key2[0].key3".
:param tag: If the "tag" parameter is specified, the value of 'tag' will be used to tag the 'value'.
"""
kl = Map._parse_composite_key(key)
value = _val_filter(value)
self._set_value_helper(kl, value, tag)
def _set_value_fast_helper(self, key_index_list, value, tag):
k0 = key_index_list[0]
k_ = key_index_list[1:]
if k_ == []:
self.__dict__[Key(k0)] = value
value.set_parent(self)
else:
try:
v = self[k0]
if (isinstance(v, Atom)):
raise KeyError()
v._set_value_helper(k_, value, tag)
except KeyError:
for e in k_:
if (isinstance(e, int)):
raise KeyError(
"cannot assign value to a non-existing list")
self[k0] = Map("", self)
self[k0]._set_value_helper(k_, value, tag)
self.add_tag(tag, propagate=False)
[docs] def set_value_fast(self, key, value, tag=set()): # noqa: M511
"""
Similar to `set_value` method. The difference is that if `value` is
a `Sea` object the `value` object itself (as opposed to a copy) will be
included into this `Map` object after this function call, as a result,
the original `Sea` object `value` might be mutated as necessary.
This function is much faster than `set_value`.
"""
kl = Map._parse_composite_key(key)
if not isinstance(value, Sea):
value = _val_filter(value)
self._set_value_fast_helper(kl, value, tag)
def _del_key_helper(self, key_index_list):
k0 = key_index_list[0]
k_ = key_index_list[1:]
if k_ == []:
del self.__dict__[Key(k0)]
else:
v = self[k0]
if isinstance(v, Atom):
raise KeyError()
v._del_key_helper(k_)
[docs] def del_key(self, key: str):
"""
Deletes the given key from this map.
:param key: The 'key' can be a composite key in the pathway notation,
e.g., "key[1].key2[0].key3".
"""
kl = Map._parse_composite_key(key)
self._del_key_helper(kl)
def _dump_impl(self, tag):
"""
"""
s = ""
for k, v in self.key_value(tag):
p, q = (
"={",
"}\n",
) if (isinstance(v, Map)) else (
"=",
"\n",
)
s += k.orig_key() + p + v._dump_impl(tag) + q
return s
[docs]def get_val(my_macro_dict, sea_object):
"""
- Returns values of the `sea_object` with the given macro dictionary (`macro_dict`).
- `sea_object` must be a single `Sea` object or a sequence `Sea` objects.
- This function does not change the global `macro_dict` object.
"""
orig_macro_dict = get_macro_dict()
set_macro_dict(my_macro_dict)
if (isinstance(sea_object, Sea)):
value = sea_object.val
else:
value = []
for e in sea_object:
value.append(e.val)
set_macro_dict(orig_macro_dict)
return value
def _asea_copy(src, des):
"""
Copies all keys and the associated values in the 'Map' object 'src' to the 'Map' object 'des', but does not change the
parent and pmode of the 'des'.
"""
for k in src.__dict__:
v = src.__dict__[k]
if ((not isinstance(k, Key) or not isinstance(v, Sea)) and (k not in [
"_Sea__parent",
"_Sea__pmode",
])):
des.__dict__[k] = deepcopy(v)
def _pathname(x):
"""
Given a 'Map' object 'x', returns its pathname.
"""
node = []
root = x
parent = root.parent()
node.append(root)
while (parent is not None):
root = parent
parent = root.parent()
node.append(root)
node.reverse()
ret = ""
parent = node[0]
for e in node[1:]:
if (isinstance(parent, List)):
n = len(parent)
for i in range(n):
if (e is parent[i]):
ret += ("[%d]" % i)
else:
key_value = parent.key_value()
for k, v in key_value:
if (e is v):
ret += (".%s" % k)
parent = e
return ret
[docs]def diff(x, reference):
"""
Returns the difference between 'x' and 'reference'. Both 'x' and 'reference' must be 'Map' objects.
The difference is a 4-tuple: The first element is a 'Map' object containing the changed values in 'x', the second element
is a 'Map' object containing the changed value in 'reference', the third element is a 'Map' object containing keys in x but
not in 'reference', the forth element is a 'Map' object containing keys in 'reference' but not in 'x'.
"""
empty_key = Map()
changed = Map()
referred = Map()
added = Map()
lost = Map()
x_param = x.key_value()
ref_param = reference.key_value()
ref = {}
xxx = {}
for k, v in ref_param:
ref[k] = v
for k, v in x_param:
xxx[k] = v
for k, v in x_param:
if (k not in ref):
added[k] = v
else:
r = ref[k]
if (isinstance(v, Map) and isinstance(r, Map)):
changed_, referred_, added_, lost_ = diff(v, r)
if (changed_ != empty_key):
changed[k] = changed_
if (referred_ != empty_key):
referred[k] = referred_
if (added_ != empty_key):
added[k] = added_
if (lost_ != empty_key):
lost[k] = lost_
elif (isinstance(v, Atom) and isinstance(r, Atom)):
if (v != r):
changed[k] = v
referred[k] = r
elif (isinstance(v, List) and isinstance(r, List)):
if (len(v) != len(r)):
changed[k] = v
referred[k] = r
else:
for e, er in zip(v, r):
if (e != er):
changed[k] = v
referred[k] = r
break
else:
changed[k] = v
referred[k] = r
for k, v in ref_param:
if (k not in xxx):
lost[k] = v
return changed, referred, added, lost
[docs]def sea_filter(x, tag=set()): # noqa: M511
"""
Extracts a subset of keys from the 'Map' object 'x' that has the tag. And returns a new 'Map' object containing the
extracted keys.
"""
return Map(x.dump(tag=tag))
[docs]def is_atom_list(a):
"""
This function returns:
True - if 'a' is a List object and all of its elements are instances of the 'Atom' class,
True - if 'a' is a List object and is empty,
False - if otherwise.
"""
if (isinstance(a, List)):
for e in a:
if (not isinstance(e, Atom)):
return False
return True
return False