Source code for schrodinger.application.matsci.graph
"""
Module containing methods applied on networkx graph
Copyright Schrodinger, LLC. All rights reserved.
"""
from itertools import combinations
import networkx
BFS_SEARCH = networkx.algorithms.traversal.breadth_first_search
MAX_CHECKS = 10000
[docs]class NetworkLoopError(Exception):
    """Raised when the whole network is infinite"""
    pass 
[docs]def get_sub_graphs(graph):
    """
    Generator of the disconnect graphs in the passed graph.
    :type graph: 'networkx.classes.graph.Graph'
    :param graph: The graph to find subgraphs in
    :rtype: 'networkx.classes.graph.Graph'
    :return: subgraph of the graph
    """
    for gindex in networkx.connected_components(graph):
        yield graph.subgraph(gindex).copy() 
[docs]def get_fused_cycles(graph):
    """
    Return cycles (fused and non-fused) in the graph
    :type graph: 'networkx.classes.graph.Graph'
    :param graph: The graph to find cycles in
    :rtype: list(set)
    :return: list of sets of nodes that make each cycle
    """
    # Get simple cycles
    cycles = list(set(x) for x in networkx.cycle_basis(graph))
    # No cycles found
    if not cycles:
        return cycles
    # Fuse cycles
    while True:
        for cid1, cid2 in combinations(range(len(cycles)), 2):
            if not cycles[cid1].isdisjoint(cycles[cid2]):
                fused_ring = cycles[cid1].union(cycles[cid2])
                new_rings = [
                    cycles[x]
                    for x in range(len(cycles))
                    if x not in [cid1, cid2]
                ]
                new_rings.append(fused_ring)
                cycles = new_rings
                break
        else:
            break
    return cycles 
[docs]def get_sorted_shortest_path(graph, end_nodes, max_fix=False):
    """
    Get the shortest path with lowest indexes. Networkx does not handle
    degeneracy due to cycles properly, so using all_shortest_paths to compare
    each path
    :type graph: 'networkx.classes.graph.Graph'
    :param graph: The graph to calculate the shortest path in
    :param end_nodes: The start and end nodes
    :type end_nodes: list
    :param max_fix: Whether to fix internal degeneracy when maximum number of
        path checks are reached
    :type max_fix: bool
    :returns: The sorted shortest path.
    :rtype: list of node index for shortest ordered path between the end nodes
    """
    all_paths = networkx.all_shortest_paths(graph,
                                            source=end_nodes[0],
                                            target=end_nodes[-1])
    sorted_path = []
    for count, path in enumerate(all_paths):
        if count > MAX_CHECKS:
            # The number of paths increase exponentially with number of rings,
            # at this point its better to fix the degeneracy locally or
            # accept the degeneracy
            if max_fix:
                sorted_path = _fix_ring_path(graph, sorted_path)
            break
        # Update if current path is less than backbone path, if yes update
        # it as backbone path. The less than operator on path (list type) checks
        # for sorting (https://docs.python.org/3/tutorial/datastructures.html
        # #comparing-sequences-and-other-types). Also update current path as
        # backbone path if backbone path is empty
        if not sorted_path or path < sorted_path:
            sorted_path = path
    return sorted_path 
[docs]def break_infinite_segment(graph, end_nodes):
    """
    Break a infinite loop segment to make it finite
    :type graph: 'networkx.classes.graph.Graph'
    :param graph: The graph to make finite
    :param end_nodes: The start and end nodes
    :type end_nodes: list
    :raises NetworkLoopError: If the segment cannot be made finite
    """
    backbone_path = networkx.shortest_path(graph,
                                           source=end_nodes[0],
                                           target=end_nodes[-1])
    # Loop through backbone path edges to find the edge which will break the
    # loop
    for index in range(len(backbone_path) - 1):
        # Remove the edge
        main_ring_node = [backbone_path[index + x] for x in range(2)]
        graph.remove_edge(*main_ring_node)
        # Breaking the edge can cause no change if the edge is part of an
        # internal loop. Check if graph is still an fused loop
        cycles = get_fused_cycles(graph)
        if cycles:
            max_cycle = max([len(x) for x in cycles])
        else:
            max_cycle = 0
        is_fully_fused = max_cycle < len(backbone_path)
        # Breaking the edge does not result into splitting of graph
        did_not_split = len(list(get_sub_graphs(graph))) == 1
        if (is_fully_fused and did_not_split):
            return
        # Reform the edge since it was not the right edge to break
        graph.add_edge(*main_ring_node)
    else:
        # Raise error if the loop cannot be broken
        raise NetworkLoopError() 
def _fix_ring_path(graph, backbone_path):
    """
    Backbone path can have degenerate results if cycle nodes are part of it.
    This method fixes the issue by selecting the path with lowest indexes
    :type graph: 'networkx.classes.graph.Graph'
    :param graph: The graph to fix the ring path in
    :type backbone_path: list
    :param backbone_path: list of nodes in the longest path in the graph, with
        no preference in case of nodes that are path of the cycle
    :rtype: list
    :return: list of nodes in the longest path in the graph. Between the two end
        node, the node with lower index is considered as the first element of
        the list and the other as the last. If cycles nodes (rings) are part of
        the path then among the degenerate paths, path containing lower indexes
        (sorted) is selected.
    """
    cycles = get_fused_cycles(graph)
    if not cycles:
        return backbone_path
    max_cycle = max([len(x) for x in cycles])
    # Largest cycle will be larger than the backbone only when the whole
    # segment is a ring (infinite chain)
    if max_cycle > len(backbone_path):
        raise NetworkLoopError()
    # Create cycle associations for faster checks
    node_to_cid = {}
    for cid, cycle in enumerate(cycles, 1):
        for cycle_node in cycle:
            node_to_cid[cycle_node] = cid
    # Generate new sorted backbone path from current backbone path
    sorted_path = []
    ring_path = []
    last_ring = None
    for node in backbone_path:
        current_ring = node_to_cid.get(node, None)
        if not last_ring and not current_ring:
            # Add to the sorted path when last atom and current atom is linear
            sorted_path.append(node)
        elif last_ring == current_ring or (not last_ring and current_ring):
            # Add to ring path when last atom and current atom are part of
            # same ring or when a new ring start
            ring_path.append(node)
        elif last_ring != current_ring and ring_path:
            # When last atom and current atom are not of same ring, find the
            # local sorted path for the ring path and add to the sorted path
            ring_path = get_sorted_shortest_path(graph,
                                                 [ring_path[0], ring_path[-1]])
            sorted_path.extend(ring_path)
            # Reset ring path and add the current node to the sorted path
            ring_path = []
            sorted_path.append(node)
            if current_ring:
                # Two rings are connected to each other but are not fused
                last_ring = current_ring
        else:
            # This should never happen, but good to have a check for it
            raise ValueError(f'Node {node} does not meet any conditions.')
        last_ring = current_ring
    return sorted_path
[docs]def find_backbone(graph, prefer_indexes=None, max_fix=True):
    """
    Find the shortest path between atoms that are furthest apart
    :type graph: 'networkx.classes.graph.Graph'
    :param graph: The graph to find longest path in
    :param set prefer_indexes: A list of preferred atom indexes to choose for
        the head and tail of the backbone. *For paths of equal lengths*, the
        chosen path will start and end with atoms in the prefer_indexes set if
        one is available. Can be used, for instance, to prefer backbones that
        start and end with hydrogen atoms.
    :param max_fix: Whether to fix internal degeneracy when maximum number of
        path checks are reached
    :type max_fix: bool
    :rtype: list
    :return: list of nodes in the longest path in the graph. Between the two end
        node, the node with lower index is considered as the first element of
        the list and the other as the last. In case of degeneracy due to cycles
        nodes in the path, the shortest path containing lowest indexes is
        selected.
    """
    # Get all nodes
    nodes = sorted(graph.nodes())
    # Return empty path if backbone does not have atleast 2 atoms
    if len(nodes) < 2:
        return []
    # First select lowest index node in the graph and find the
    # farthest node from it using bfs. This will be one of the end nodes
    # then find the other end node by finding the farthest node from the
    # first end node using bfs. The method is described in
    # "https://stackoverflow.com/questions/20010472/proof-of-correctness-
    # algorithm-for-diameter-of-a-tree-in-graph-theory"
    source_node = nodes[0]
    end_nodes = []
    for _ in range(2):
        # Transverse the graph using breadth first search, and get the ordered
        # dict where the key is the source node and item is the list of next
        # nodes
        bfs_nodes_dict = BFS_SEARCH.bfs_successors(graph, source=source_node)
        # Convert the bfs nodes dict to list to get the last key source node
        # and its values as end_targets
        end_source, end_targets = list(bfs_nodes_dict)[-1]
        # Get the last node, with largest atom index
        if prefer_indexes:
            # If any of the end_targets is preferred, remove all non-preferred
            # targets
            preferred = prefer_indexes.intersection(end_targets)
            if preferred:
                end_targets = list(preferred)
        source_node = sorted(end_targets)[-1]
        end_nodes.append(source_node)
    # Sort end atoms indexes to make backbone path always go from
    # lower to higher node
    end_nodes.sort()
    try:
        backbone_path = get_sorted_shortest_path(graph, end_nodes, max_fix)
    except NetworkLoopError:
        break_infinite_segment(graph, end_nodes)
        backbone_path = find_backbone(graph)
    return backbone_path 
[docs]def find_segments(graph):
    """
    Find the segments in the input graph. Segments are path containing nodes
    where no node appears in a segment twice. Longest segments are always
    created first and then recursively smaller ones.
    :type graph: 'networkx.classes.graph.Graph'
    :param graph: The graph to find segments in
    :rtype: list(list)
    :return: list of list of nodes. Each sublist is a list of ordered nodes,
        starting from lower index of the two end nodes, forming the longest
        path graph not containing the nodes already considered.
    """
    segments = []
    # Loop over all disconnected graphs
    for sub_graph in get_sub_graphs(graph):
        # Search for segment only if graph contains more than 1 node
        if len(set(sub_graph.nodes())) > 1:
            # Find the backbone for the graph, do not add to segment if no
            # backbone is found
            backbone = find_backbone(sub_graph)
            # Check if backbone was found and continue to next graph it was not
            if not backbone:
                continue
            # Add the backbone to segments
            segments.append(backbone)
            # Create a copy of graph with backbone removed
            new_graph = sub_graph.copy()
            for backbone_node in backbone:
                new_graph.remove_node(backbone_node)
            # Find child segments of the backbone removed graph recursively
            child_segments = find_segments(new_graph)
            for child_chain in child_segments:
                # Check if there were any segments (backbones) in the child
                if child_chain:
                    segments.append(child_chain)
    return segments