"""
This module provides Canvas fingerprint-related functionality needed for
combinatorial similarity and diversity applications.
Copyright Schrodinger LLC, All Rights Reserved.
"""
import csv
import heapq
import os
import zipfile
from schrodinger.infra import canvas
[docs]def combine_fingerprints(subjob_names, outfile):
    """
    Combines fingerprints from subjobs to the indicated output file.
    :param subjob_names: Subjob names.
    :type subjob_names: list(str)
    :param outfile: Output fingerprint file.
    :type outfile: str
    """
    fp_type_info = canvas.ChmFPIn32(f"{subjob_names[0]}.fp").getTypeInfo()
    fpout = canvas.ChmCustomOut32(fp_type_info, True)
    fpout.open(outfile)
    for subjob_name in subjob_names:
        fpin = canvas.ChmFPIn32(f"{subjob_name}.fp")
        prop_names = list(fpin.getExtraColumnNames())
        while fpin.hasNext():
            fp, title, prop_values = fpin.nextExtra()
            extra_data = dict(zip(prop_names, prop_values))
            fpout.write(fp, title, extra_data)
    fpout.close() 
[docs]def create_reactant_fp_file(reactants_file, fp_file):
    """
    Creates a Canvas dendritic fingerprint file for a set of reactants in a
    .csv file or a .pfx zip archive.
    :param reactants_file: Input .csv file with SMILES and titles,
            or .pfx archive
    :type reactants_file: str
    :param fp_file: Output fingerprint file. Will contain titles as the
            structure ids and SMILES as the lone extra data column.
    :type fp_file: str
    :return: The number of fingerprints written
    :rtype: int
    :raises FileNotFoundError: If reactants_file cannot be found
    :raises canvas.ChmException: If a Canvas-related error occurs
    """
    fp_generator = canvas.ChmDendriticOut32()
    if reactants_file.endswith(".pfx"):
        with zipfile.ZipFile(reactants_file) as zfile:
            with zfile.open("structures.csv") as fh:
                rows = list(csv.reader(str(fh.read(), 'utf-8').splitlines()))
    else:
        with open(reactants_file) as fh:
            rows = [row for row in csv.reader(fh)]
    rows.pop(0)
    fp_generator.open(fp_file)
    for row in rows:
        mol = canvas.ChmMol.fromSMILES(row[0])
        extra_data = [("SMILES", row[0])]
        fp_generator.writeNameAndProps(mol, row[1], extra_data)
    fp_generator.close()
    return len(rows) 
[docs]def get_reactant_combo_sim(query_fp, reactant_fp_lists, reactant_combo):
    """
    Returns the Tanimoto simliarity between the provided query fingerprint
    and the logical OR fingerprint of a particular reactant combination.
    :param query_fp: Query fingerprint
    :type query_fp: canvas.ChmSparseBitset
    :param reactant_fp_lists: Lists of reactant fingerprints
    :type reactant_fp_lists: list(list(canvas.ChmSparseBitset))
    :param reactant_combo: A list of 0-based positions into reactant_fp_lists
            which define the combination of reactants
    :type reactant_combo: list(int)
    :return: Tanimoto similarity
    :rtype: float
    """
    combo_fp = canvas.ChmSparseBitset()
    for i, pos in enumerate(reactant_combo):
        combo_fp = combo_fp | reactant_fp_lists[i][pos]
    return query_fp.simTanimoto(combo_fp) 
[docs]def rank_reactants(fp_file, query_fp, max_reactants, alpha=1.0, beta=0.0):
    """
    Reads a file of reactant fingerprints, scores them against a query
    fingerprint, and returns titles, SMILES, reactant indices and bitsets
    sorted by decreasing score and capped at max_reactants. Scores are
    computed as the Tversky similarity between reactant and query:
    ON(R & Q) / (ON(R & Q) + alpha * ON(R - Q) + beta * ON(Q - R))
    where:
    ON(R & Q) = Number of 'on' bits shared by reactant and query
    ON(R - Q) = Number of 'on' bits that are unique to reactant
    ON(Q - R) = Number of 'on' bits that are unique to query
    :param fp_file: Input file of reactant fingerprints
    :type fp_file: str
    :param query_fp: Query fingerprint for scoring reactants
    :type query_fp: canvas.ChmSparseBitset
    :param max_reactants: Cap on the number of rows in each returned list
    :type max_reactants: int
    :param alpha: Reactant weight
    :type alpha: float
    :param beta: Query weight (use 1.0 for Tanimoto similarity)
    :type beta: float
    :return: Lists of titles, SMILES, zero-based indices and bitsets
    :rtype: list(str), list(str), list(int), list(canvas.ChmSparseBitset)
    :raises FileNotFoundError: If fp_file cannot be found
    :raises canvas.ChmException: If a Canvas-related error occurs
    """
    if not os.path.isfile(fp_file):
        raise FileNotFoundError(f'Fingerprint file"{fp_file}" not found')
    heap_rows = []
    fpin = canvas.ChmFPIn32(fp_file)
    i = 0
    while fpin.hasNext():
        reactant_fp, title, props = fpin.nextExtra()
        count_reactant = reactant_fp.count()
        count_query = query_fp.count()
        count_common = reactant_fp.countCommonOn(query_fp)
        denom = (alpha * count_reactant + beta * count_query +
                 (1 - alpha - beta) * count_common)
        score = count_common / denom
        # Note that reactant_fp is positioned last so that its nonexistent
        # '<' operator will never be needed to break a tie in the score.
        row = [score, title, props[0], i, reactant_fp]
        if len(heap_rows) == max_reactants:
            heapq.heappushpop(heap_rows, row)
        else:
            heapq.heappush(heap_rows, row)
        i += 1
    return list(zip(*sorted(heap_rows, reverse=True)))[1:] 
[docs]def read_reactant_fps(fp_file):
    """
    Reads a file of reactant fingerprints and returns titles, SMILES and
    bitsets for all rows in the file.
    :param fp_file: Input file of reactant fingerprints
    :type fp_file: str
    :return: Lists of bitsets, titles and SMILES
    :rtype: list(canvas.ChmSparseBitset), list(str), list(str)
    :raises FileNotFoundError: If fp_file cannot be found
    :raises canvas.ChmException: If a Canvas-related error occurs
    """
    if not os.path.isfile(fp_file):
        raise FileNotFoundError(f'Fingerprint file"{fp_file}" not found')
    bitsets = []
    titles = []
    smiles = []
    fpin = canvas.ChmFPIn32(fp_file)
    while fpin.hasNext():
        bitset, title, props = fpin.nextExtra()
        bitsets.append(bitset)
        titles.append(title)
        smiles.append(props[0])
    return bitsets, titles, smiles 
[docs]def smiles_to_fingerprint(smiles):
    """
    Returns a dendritic fingerprint for the provided SMILES string.
    :param smiles: SMILES string
    :type smiles: str
    :return: Bitset that represents the fingerprint
    :rtype: canvas.ChmSparseBitset
    :raises RuntimeError: If a Canvas-related error occurs
    """
    return smiles_to_fingerprints([smiles])[0] 
[docs]def smiles_to_fingerprints(smiles_list):
    """
    Returns dendritic fingerprints for a list of SMILES strings.
    :param smiles_list: List of SMILES strings
    :type smiles_list: list(str)
    :return: Fingerprints for the provided SMILES strings
    :rtype: list(canvas.ChmSparseBitset)
    :raises RuntimeError: If a Canvas-related error occurs
    """
    try:
        fp_generator = canvas.ChmDendriticOut32()
        bitsets = []
        for smiles in smiles_list:
            mol = canvas.ChmMol.fromSMILES(smiles)
            bitsets.append(fp_generator.generate(mol))
        return bitsets
    except canvas.ChmException as err:
        raise RuntimeError(str(err))