Source code for schrodinger.application.matsci.graph
"""
Module containing methods applied on networkx graph
Copyright Schrodinger, LLC. All rights reserved.
"""
from itertools import combinations
import networkx
BFS_SEARCH = networkx.algorithms.traversal.breadth_first_search
MAX_CHECKS = 10000
[docs]class NetworkLoopError(Exception):
"""Raised when the whole network is infinite"""
pass
[docs]def get_sub_graphs(graph):
"""
Generator of the disconnect graphs in the passed graph.
:type graph: 'networkx.classes.graph.Graph'
:param graph: The graph to find subgraphs in
:rtype: 'networkx.classes.graph.Graph'
:return: subgraph of the graph
"""
for gindex in networkx.connected_components(graph):
yield graph.subgraph(gindex).copy()
[docs]def get_fused_cycles(graph):
"""
Return cycles (fused and non-fused) in the graph
:type graph: 'networkx.classes.graph.Graph'
:param graph: The graph to find cycles in
:rtype: list(set)
:return: list of sets of nodes that make each cycle
"""
# Get simple cycles
cycles = list(set(x) for x in networkx.cycle_basis(graph))
# No cycles found
if not cycles:
return cycles
# Fuse cycles
while True:
for cid1, cid2 in combinations(range(len(cycles)), 2):
if not cycles[cid1].isdisjoint(cycles[cid2]):
fused_ring = cycles[cid1].union(cycles[cid2])
new_rings = [
cycles[x]
for x in range(len(cycles))
if x not in [cid1, cid2]
]
new_rings.append(fused_ring)
cycles = new_rings
break
else:
break
return cycles
[docs]def get_sorted_shortest_path(graph, end_nodes, max_fix=False):
"""
Get the shortest path with lowest indexes. Networkx does not handle
degeneracy due to cycles properly, so using all_shortest_paths to compare
each path
:type graph: 'networkx.classes.graph.Graph'
:param graph: The graph to calculate the shortest path in
:param end_nodes: The start and end nodes
:type end_nodes: list
:param max_fix: Whether to fix internal degeneracy when maximum number of
path checks are reached
:type max_fix: bool
:returns: The sorted shortest path.
:rtype: list of node index for shortest ordered path between the end nodes
"""
all_paths = networkx.all_shortest_paths(graph,
source=end_nodes[0],
target=end_nodes[-1])
sorted_path = []
for count, path in enumerate(all_paths):
if count > MAX_CHECKS:
# The number of paths increase exponentially with number of rings,
# at this point its better to fix the degeneracy locally or
# accept the degeneracy
if max_fix:
sorted_path = _fix_ring_path(graph, sorted_path)
break
# Update if current path is less than backbone path, if yes update
# it as backbone path. The less than operator on path (list type) checks
# for sorting (https://docs.python.org/3/tutorial/datastructures.html
# #comparing-sequences-and-other-types). Also update current path as
# backbone path if backbone path is empty
if not sorted_path or path < sorted_path:
sorted_path = path
return sorted_path
[docs]def break_infinite_segment(graph, end_nodes):
"""
Break a infinite loop segment to make it finite
:type graph: 'networkx.classes.graph.Graph'
:param graph: The graph to make finite
:param end_nodes: The start and end nodes
:type end_nodes: list
:raises NetworkLoopError: If the segment cannot be made finite
"""
backbone_path = networkx.shortest_path(graph,
source=end_nodes[0],
target=end_nodes[-1])
# Loop through backbone path edges to find the edge which will break the
# loop
for index in range(len(backbone_path) - 1):
# Remove the edge
main_ring_node = [backbone_path[index + x] for x in range(2)]
graph.remove_edge(*main_ring_node)
# Breaking the edge can cause no change if the edge is part of an
# internal loop. Check if graph is still an fused loop
cycles = get_fused_cycles(graph)
if cycles:
max_cycle = max([len(x) for x in cycles])
else:
max_cycle = 0
is_fully_fused = max_cycle < len(backbone_path)
# Breaking the edge does not result into splitting of graph
did_not_split = len(list(get_sub_graphs(graph))) == 1
if (is_fully_fused and did_not_split):
return
# Reform the edge since it was not the right edge to break
graph.add_edge(*main_ring_node)
else:
# Raise error if the loop cannot be broken
raise NetworkLoopError()
def _fix_ring_path(graph, backbone_path):
"""
Backbone path can have degenerate results if cycle nodes are part of it.
This method fixes the issue by selecting the path with lowest indexes
:type graph: 'networkx.classes.graph.Graph'
:param graph: The graph to fix the ring path in
:type backbone_path: list
:param backbone_path: list of nodes in the longest path in the graph, with
no preference in case of nodes that are path of the cycle
:rtype: list
:return: list of nodes in the longest path in the graph. Between the two end
node, the node with lower index is considered as the first element of
the list and the other as the last. If cycles nodes (rings) are part of
the path then among the degenerate paths, path containing lower indexes
(sorted) is selected.
"""
cycles = get_fused_cycles(graph)
if not cycles:
return backbone_path
max_cycle = max([len(x) for x in cycles])
# Largest cycle will be larger than the backbone only when the whole
# segment is a ring (infinite chain)
if max_cycle > len(backbone_path):
raise NetworkLoopError()
# Create cycle associations for faster checks
node_to_cid = {}
for cid, cycle in enumerate(cycles, 1):
for cycle_node in cycle:
node_to_cid[cycle_node] = cid
# Generate new sorted backbone path from current backbone path
sorted_path = []
ring_path = []
last_ring = None
for node in backbone_path:
current_ring = node_to_cid.get(node, None)
if not last_ring and not current_ring:
# Add to the sorted path when last atom and current atom is linear
sorted_path.append(node)
elif last_ring == current_ring or (not last_ring and current_ring):
# Add to ring path when last atom and current atom are part of
# same ring or when a new ring start
ring_path.append(node)
elif last_ring != current_ring and ring_path:
# When last atom and current atom are not of same ring, find the
# local sorted path for the ring path and add to the sorted path
ring_path = get_sorted_shortest_path(graph,
[ring_path[0], ring_path[-1]])
sorted_path.extend(ring_path)
# Reset ring path and add the current node to the sorted path
ring_path = []
sorted_path.append(node)
if current_ring:
# Two rings are connected to each other but are not fused
last_ring = current_ring
else:
# This should never happen, but good to have a check for it
raise ValueError(f'Node {node} does not meet any conditions.')
last_ring = current_ring
return sorted_path
[docs]def find_backbone(graph, prefer_indexes=None, max_fix=True):
"""
Find the shortest path between atoms that are furthest apart
:type graph: 'networkx.classes.graph.Graph'
:param graph: The graph to find longest path in
:param set prefer_indexes: A list of preferred atom indexes to choose for
the head and tail of the backbone. *For paths of equal lengths*, the
chosen path will start and end with atoms in the prefer_indexes set if
one is available. Can be used, for instance, to prefer backbones that
start and end with hydrogen atoms.
:param max_fix: Whether to fix internal degeneracy when maximum number of
path checks are reached
:type max_fix: bool
:rtype: list
:return: list of nodes in the longest path in the graph. Between the two end
node, the node with lower index is considered as the first element of
the list and the other as the last. In case of degeneracy due to cycles
nodes in the path, the shortest path containing lowest indexes is
selected.
"""
# Get all nodes
nodes = sorted(graph.nodes())
# Return empty path if backbone does not have atleast 2 atoms
if len(nodes) < 2:
return []
# First select lowest index node in the graph and find the
# farthest node from it using bfs. This will be one of the end nodes
# then find the other end node by finding the farthest node from the
# first end node using bfs. The method is described in
# "https://stackoverflow.com/questions/20010472/proof-of-correctness-
# algorithm-for-diameter-of-a-tree-in-graph-theory"
source_node = nodes[0]
end_nodes = []
for _ in range(2):
# Transverse the graph using breadth first search, and get the ordered
# dict where the key is the source node and item is the list of next
# nodes
bfs_nodes_dict = BFS_SEARCH.bfs_successors(graph, source=source_node)
# Convert the bfs nodes dict to list to get the last key source node
# and its values as end_targets
end_source, end_targets = list(bfs_nodes_dict)[-1]
# Get the last node, with largest atom index
if prefer_indexes:
# If any of the end_targets is preferred, remove all non-preferred
# targets
preferred = prefer_indexes.intersection(end_targets)
if preferred:
end_targets = list(preferred)
source_node = sorted(end_targets)[-1]
end_nodes.append(source_node)
# Sort end atoms indexes to make backbone path always go from
# lower to higher node
end_nodes.sort()
try:
backbone_path = get_sorted_shortest_path(graph, end_nodes, max_fix)
except NetworkLoopError:
break_infinite_segment(graph, end_nodes)
backbone_path = find_backbone(graph)
return backbone_path
[docs]def find_segments(graph):
"""
Find the segments in the input graph. Segments are path containing nodes
where no node appears in a segment twice. Longest segments are always
created first and then recursively smaller ones.
:type graph: 'networkx.classes.graph.Graph'
:param graph: The graph to find segments in
:rtype: list(list)
:return: list of list of nodes. Each sublist is a list of ordered nodes,
starting from lower index of the two end nodes, forming the longest
path graph not containing the nodes already considered.
"""
segments = []
# Loop over all disconnected graphs
for sub_graph in get_sub_graphs(graph):
# Search for segment only if graph contains more than 1 node
if len(set(sub_graph.nodes())) > 1:
# Find the backbone for the graph, do not add to segment if no
# backbone is found
backbone = find_backbone(sub_graph)
# Check if backbone was found and continue to next graph it was not
if not backbone:
continue
# Add the backbone to segments
segments.append(backbone)
# Create a copy of graph with backbone removed
new_graph = sub_graph.copy()
for backbone_node in backbone:
new_graph.remove_node(backbone_node)
# Find child segments of the backbone removed graph recursively
child_segments = find_segments(new_graph)
for child_chain in child_segments:
# Check if there were any segments (backbones) in the child
if child_chain:
segments.append(child_chain)
return segments