Source code for bento_meta.mdb.mdb_tools.mdb_tools

"""
ToolsMDB: subclass of 'WriteableMDB' to support interactions with the MDB.

EntityValidator: validates that entities have required attributes.
"""

import csv
import logging
from importlib.util import find_spec
from logging.config import fileConfig
from pathlib import Path
from subprocess import check_call
from sys import executable
from typing import Dict, Iterable, List, Optional, Set, Tuple, Type, Union

from bento_meta.entity import Entity
from bento_meta.mdb import make_nanoid, read_txn_data, read_txn_value
from bento_meta.mdb.writeable import WriteableMDB, write_txn
from bento_meta.objects import (
    Concept,
    Edge,
    Node,
    Predicate,
    Property,
    Tag,
    Term,
    ValueSet,
)

from minicypher.statement import Statement
from minicypher.clauses import (
    As,
    Collect,
    Match,
    Merge,
    DetachDelete,
    OptionalMatch,
    Return,
    With,
)

from minicypher.entities import N0, R0, G, N, P, R, T, _plain_var
from minicypher.functions import count

# logging stuff
log_ini_path = Path(__file__).parents[2].joinpath("logs/log.ini")
log_file_path = Path(__file__).parents[2].joinpath(f"logs/{__name__}.log")
fileConfig(log_ini_path, defaults={"logfilename": log_file_path.as_posix()})
logger = logging.getLogger(__name__)


[docs] class ToolsMDB(WriteableMDB): """Adds mdb-tools to WriteableMDB""" def __init__(self, uri, user, password): WriteableMDB.__init__(self, uri, user, password)
[docs] class EntityNotUniqueError(Exception): """Raised when an entity's attributes identify more than 1 property graph node in an MDB"""
[docs] class EntityNotFoundError(Exception): """Raised when an entity's attributes fail to identify a property graph node in an MDB"""
[docs] class PatternNotUniqueError(Exception): """ Raised when a match pattern's attributes identify more than 1 property graph triple or set of overlapping triples in an MDB """
[docs] class PatternNotFoundError(Exception): """ Raised when a match pattern's attributes fail to identify a property graph triple or set of overlapping triples in an MDB """
[docs] @read_txn_value def _get_entity_count(self, entity: Entity): """ Returns count of given entity found in an MDB. If count = 0, entity with given attributes not found in MDB. If count = 1, entity with given attributes is unique in MDB If count > 1, more attributes needed to uniquely id entity in MDB. """ ent = N(label=entity.get_label(), props=entity.get_attr_dict()) stmt = Statement( Match(ent), Return(count(ent)), As("entity_count"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "entity_count")
[docs] @read_txn_value def _get_pattern_count(self, pattern: Union[T, G]): """ Returns count of given match pattern, which could be a triple like: (n)-[r]->(m), or a set of overlapping triples (Path) found in MDB. If count = 0, pattern with given attributes not found in MDB. If count = 1, pattern with given attributes is unique in MDB If count > 1, more attributes needed to uniquely id pattern in MDB. """ stmt = Statement( Match(pattern), Return(count("*")), As("pattern_count"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "pattern_count")
[docs] def validate_entity_unique(self, entity: Entity) -> None: """ Validates that the given entity occurs once (& only once) in an MDB Raises EntityNotUniqueError if entity attributes match multiple property graph nodes in the MDB. Raises EntityNotFoundError if entity attributes don't match any in the MDB. Note: doesn't validate the entity itself because not all entity attibutes necessarily required to locate an entity in the MDB. (e.g. handle and model OR nanoid alone can identify a node) """ ent_count = int(self._get_entity_count(entity)[0]) if ent_count > 1: logger.error(str(self.EntityNotUniqueError)) raise self.EntityNotUniqueError if ent_count < 1: logger.error(str(self.EntityNotFoundError)) raise self.EntityNotFoundError
[docs] def validate_entities_unique(self, entities: Iterable[Entity]) -> None: """Runs self.validate_entity_unique() over multiple entities""" for entity in entities: self.validate_entity_unique(entity)
[docs] def validate_pattern_unique(self, pattern: Union[T, G]) -> None: """ Validates that the given match pattern occurs once (& only once) in an MDB Raises PatternNotUniqueError if entity attributes match multiple property graph nodes in the MDB. Raises PatternNotFoundError if entity attributes don't match any in the MDB. """ pattern_count = int(self._get_pattern_count(pattern)[0]) if pattern_count > 1: logger.error( str( self.PatternNotUniqueError( f"Pattern: {pattern.pattern()} not unique.", ), ), ) raise self.PatternNotUniqueError( f"Pattern: {pattern.pattern()} not unique.", ) if pattern_count < 1: logger.error( str( self.PatternNotFoundError( f"Pattern: {pattern.pattern()} not found.", ), ), ) raise self.PatternNotFoundError(f"Pattern: {pattern.pattern()} not found.")
[docs] @write_txn def remove_entity_from_mdb(self, entity: Entity): """ Remove given Entity node from the database. Accepts the following bento-meta Entities: Concept, Node, Predicate, Property, Edge, Term """ self.validate_entity_unique(entity) ent_label = entity.get_label() ent_attrs = entity.get_attr_dict() ent = N(label=ent_label, props=ent_attrs) stmt = Statement(Match(ent), DetachDelete(ent._var), use_params=True) qry = str(stmt) parms = stmt.params logging.info(f"Removing {ent_label} entity with with properties: {ent_attrs}") return (qry, parms)
[docs] @write_txn def add_entity_to_mdb( self, entity: Entity, _commit=None, ): """Adds given Entity node to MDB instance""" EntityValidator.validate_entity(entity) EntityValidator.validate_entity_has_attribute(entity, "nanoid") if _commit: entity._commit = _commit ent_label = entity.get_label() ent_attrs = entity.get_attr_dict() ent = N(label=ent_label, props=ent_attrs) if isinstance(entity, Edge): src = N(label="node", props=entity.src.get_attr_dict()) dst = N(label="node", props=entity.dst.get_attr_dict()) src_rel = R(Type="has_src") dst_rel = R(Type="has_dst") src_trip = src_rel.relate(_plain_var(ent), _plain_var(src)) dst_trip = dst_rel.relate(_plain_var(ent), _plain_var(dst)) stmt = Statement( Merge(ent), Merge(src), Merge(dst), Merge(src_trip), Merge(dst_trip), use_params=True, ) else: stmt = Statement(Merge(ent), use_params=True) qry = str(stmt) parms = stmt.params logging.info( f"Merging new {ent_label} node with properties: {ent_attrs} into the MDB", ) return (qry, parms)
[docs] @read_txn_value def get_concept_nanoids_linked_to_entity( self, entity: Entity, mapping_source: Optional[str] = None, ): """ Returns list of concept nanoids linked to given entity by "represents" or "has_concept" relationships tagged with the given mapping source. """ self.validate_entity_unique(entity) ent = N(label=entity.get_label(), props=entity.get_attr_dict()) concept = N(label="concept") if isinstance(entity, Term): rel = R(Type="represents") else: rel = R(Type="has_concept") ent_trip = rel.relate(ent, concept) if mapping_source is not None: tag = N( label="tag", props={"key": "mapping_source", "value": mapping_source}, ) tag_trip = T(concept, R(Type="has_tag"), tag) path = G(ent_trip, tag_trip) else: path = ent_trip stmt = Statement( Match(path), Return(f"{concept._var}.nanoid"), As("concept_nanoids"), use_params=True, ) qry = str(stmt) parms = stmt.params logging.debug(f"{qry=}; {parms=}") return (qry, parms, "concept_nanoids")
[docs] @write_txn def add_relationship_to_mdb( self, relationship_type: str, src_entity: Entity, dst_entity: Entity, _commit: str = "", ): """ Adds relationship between given entities in MDB. """ self.validate_entities_unique([src_entity, dst_entity]) rel = R(Type=relationship_type) src = N(label=src_entity.get_label(), props=src_entity.get_attr_dict()) dst = N(label=dst_entity.get_label(), props=dst_entity.get_attr_dict()) trip = rel.relate(src, dst) plain_trip = rel.relate(_plain_var(src), _plain_var(dst)) try: # check for existance shouldn't include _commit? self.validate_pattern_unique(trip) except self.PatternNotFoundError: # if triple doesn't already exist, add _commit? this means that # if triple exists w/ different _commit, merge shouldn't add anything? rel.props["_commit"] = P(handle="_commit", value=_commit) stmt = Statement(Match(src, dst), Merge(plain_trip), use_params=True) qry = str(stmt) parms = stmt.params print(stmt) logging.info( f"Adding {relationship_type} relationship between src {src.label} with " f"properties: {src_entity.get_attr_dict()} to dst {dst.label} " f"with properties: {dst_entity.get_attr_dict()}", ) return (qry, parms)
[docs] @read_txn_value def get_entity_nanoid(self, entity: Entity): """ Takes a unique entity in the MDB and returns its nanoid. """ self.validate_entity_unique(entity) ent = N(label=entity.get_label(), props=entity.get_attr_dict()) if isinstance(entity, Edge): src = N(label="node", props=entity.src.get_attr_dict()) dst = N(label="node", props=entity.dst.get_attr_dict()) src_rel = R(Type="has_src") dst_rel = R(Type="has_dst") src_trip = src_rel.relate(ent, src) dst_trip = dst_rel.relate(ent, dst) path = G(src_trip, dst_trip) match_clause = Match(path) else: match_clause = Match(ent) stmt = Statement( match_clause, Return(f"{ent._var}.nanoid"), As("entity_nanoid"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "entity_nanoid")
[docs] def get_or_make_entity_nanoid(self, entity: Entity) -> str: """Obtains existing entity's nanoid or creates one for new entity.""" try: return self.get_entity_nanoid(entity)[0] except self.EntityNotFoundError: nanoid = make_nanoid() return nanoid
[docs] @read_txn_value def get_term_nanoids(self, concept: Concept, mapping_source: str = ""): """Returns list of term nanoids representing given concept""" self.validate_entity_unique(concept) ent = N(label="concept", props=concept.get_attr_dict()) term = N(label="term") ent_trip = T(term, R(Type="represents"), ent) if mapping_source: tag_trip = T( concept, R(Type="has_tag"), N( label="tag", props={"key": "mapping_source", "value": mapping_source}, ), ) path = G(ent_trip, tag_trip) else: path = ent_trip stmt = Statement( Match(path), Return(f"{term._var}.nanoid"), As("term_nanoids"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "term_nanoids")
[docs] @read_txn_value def get_predicate_nanoids(self, concept: Concept, mapping_source: str = ""): """Returns list of predicate nanoids with relationship to given concept""" self.validate_entity_unique(concept) ent = N(label="concept", props=concept.get_attr_dict()) predicate = N(label="predicate") ent_trip = T(predicate, R0(), ent) if mapping_source: tag = N( label="tag", props={"key": "mapping_source", "value": mapping_source}, ) tag_trip = T(ent, R(Type="has_tag"), tag) path = G(ent_trip, tag_trip) else: path = ent_trip stmt = Statement( Match(path), Return(f"{predicate._var}.nanoid"), As("predicate_nanoids"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "predicate_nanoids")
[docs] @read_txn_value def get_relationship_between_entities(self, src_entity: Entity, dst_entity: Entity): """Returns relationship type between given entities with (src)-[:rel_type]->(dst)""" self.validate_entities_unique([src_entity, dst_entity]) trip = T( N(label=src_entity.get_label(), props=src_entity.get_attr_dict()), R(), N(label=dst_entity.get_label(), props=dst_entity.get_attr_dict()), ) stmt = Statement( Match(trip), Return(f"TYPE({trip._edge._var})"), As("relationship_type"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "relationship_type")
[docs] def merge_two_concepts( self, concept_1: Concept, concept_2: Concept, mapping_source: str = "", _commit="", ) -> None: """ Combine two synonymous Concepts into a single Concept. This function takes two synonymous Concept as bento-meta objects and merges them into a single Concept along with any connected Terms and Predicates. """ self.validate_entities_unique([concept_1, concept_2]) # get list of terms connected to concept 2 c2_term_nanoids = self.get_term_nanoids(concept_2, mapping_source) c2_terms: List[Term] = [] for nanoid in c2_term_nanoids: c2_terms.append(Term({"nanoid": nanoid})) # get list of predicates connected to concept 2 c2_predicate_nanoids = self.get_predicate_nanoids(concept_2, mapping_source) c2_predicates_with_rel = [] for nanoid in c2_predicate_nanoids: predicate = Predicate({"nanoid": nanoid}) predicate_rel = self.get_relationship_between_entities( src_entity=predicate, dst_entity=concept_2, )[0] c2_predicates_with_rel.append((predicate, predicate_rel)) # delete concept 2 self.remove_entity_from_mdb(concept_2) # connect terms from deleted (c2) to remaining concept (c1) for term in c2_terms: self.add_relationship_to_mdb( relationship_type="represents", src_entity=term, dst_entity=concept_1, _commit=_commit, ) # connect predicates from deleted (c2) to remaining concept (c1) for predicate, rel in c2_predicates_with_rel: self.add_relationship_to_mdb( relationship_type=rel, src_entity=predicate, dst_entity=concept_1, _commit=_commit, )
[docs] @read_txn_data def _get_all_terms(self): """Returns list of all terms in an MDB.""" term = N(label="term") stmt = Statement(Match(term), Return(term._var)) qry = str(stmt) parms = {} print(qry) return (qry, parms)
[docs] def get_potential_term_synonyms( self, term: Term, threshhold: float = 0.8, ) -> List[dict]: """ Returns list of dicts representing potential Term nodes synonymous to given Term in an MDB """ self.validate_entity_unique(term) nlp = _get_nlp_model() all_terms_result = self._get_all_terms() all_terms = [list(item.values())[0] for item in all_terms_result] # get likely synonyms synonyms = [] for term_attr_dict in all_terms: # calculate similarity between each Term and input Term term_1 = nlp(term.value) term_2 = nlp(term_attr_dict["value"]) similarity_score = term_1.similarity(term_2) # if similarity threshold met, add to list of potential synonyms if similarity_score >= threshhold: synonym = { "value": term_attr_dict["value"], "origin_name": term_attr_dict["origin_name"], "nanoid": term_attr_dict["nanoid"], "similarity": similarity_score, "valid_synonym": 0, # mark 1 if synonym when uploading later } synonyms.append(synonym) synonyms_sorted = sorted(synonyms, key=lambda d: d["similarity"], reverse=True) return synonyms_sorted
[docs] def potential_synonyms_to_csv( self, input_data: List[dict], output_path: str, ) -> None: """Given a list of synonymous Terms as dicts, outputs to CSV file at given output path""" with open(output_path, "w", encoding="utf8", newline="") as output_file: dict_writer = csv.DictWriter(output_file, fieldnames=input_data[0].keys()) dict_writer.writeheader() dict_writer.writerows(input_data)
[docs] @read_txn_data def get_property_synonyms_direct(self, entity: Property, mapping_source: str = ""): """ Returns list of properties linked by concept to given property """ self.validate_entity_unique(entity) ent = N(label="property", props=entity.get_attr_dict()) prop = N(label="property") concept = N(label="concept") ent_trip_1 = T(ent, R(Type="has_concept"), concept) ent_trip_2 = T(prop, R(Type="has_concept"), concept) if mapping_source: tag = N( label="tag", props={"key": "mapping_source", "value": mapping_source}, ) tag_trip = T(concept, R(Type="has_tag"), tag) path = G(ent_trip_1, ent_trip_2, tag_trip) else: path = G(ent_trip_1, ent_trip_2) stmt = Statement( Match(path), With(f"{Collect(_plain_var(prop).pattern())}"), As("synonyms"), Return("synonyms"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms)
[docs] def _get_property_synonyms_direct_as_list(self, entity: Property) -> List[Property]: """ Converts results of read_txn_data-wrapped function with one item to a simple list of bento_meta.objects.Property entities """ data = self.get_property_synonyms_direct(entity) return [Property(s) for s in data[0]["synonyms"]]
[docs] def get_property_synonyms_all(self, entity: Property) -> List[Property]: """ Returns list of properties linked by concept to given property or to synonym of given property (and so on) """ self.validate_entity_unique(entity) all_synonyms = [] queue = [entity] visited = {entity.nanoid} while queue: current = queue.pop(0) direct_synonyms = self._get_property_synonyms_direct_as_list(current) for synonym in direct_synonyms: if synonym.nanoid not in visited: visited.add(synonym.nanoid) queue.append(synonym) all_synonyms.append(synonym) return all_synonyms
[docs] @read_txn_data def _get_property_parents_data(self, entity: Property): """ Get list of nodes/edges connected to given property via the "has_property" relationship """ self.validate_entity_unique(entity) p_attrs = entity.get_attr_dict() child_prop = N(label="property", props=p_attrs) parent_node = N(label="node") parent_edge = N(label="relationship") rel = R(Type="has_property", _dir="_left") trip1 = rel.relate(child_prop, N0()) trip2 = rel.relate(_plain_var(child_prop), parent_node) trip3 = rel.relate(_plain_var(child_prop), parent_edge) stmt = Statement( Match(trip1), OptionalMatch(trip2), OptionalMatch(trip3), With(), Collect(_plain_var(parent_node).pattern()), As("nodes,"), Collect(_plain_var(parent_edge).pattern()), As("edges"), Return("nodes, edges"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms)
[docs] def get_property_parents(self, entity: Property) -> List[Union[Node, Edge]]: """ Returns results of _get_property_parents_data as a list of bento_meta Nodes or Edges """ self.validate_entity_unique(entity) data = self._get_property_parents_data(entity) node_parents = [Node(p) for p in data[0]["nodes"]] edge_parents = [Edge(p) for p in data[0]["edges"]] return node_parents + edge_parents
[docs] def add_tag_to_mdb_entity(self, tag: Tag, entity: Entity) -> None: """Adds a tag to an existing entity in an MDB.""" self.validate_entity_unique(entity) self.add_entity_to_mdb(tag) self.add_relationship_to_mdb( relationship_type="has_tag", src_entity=entity, dst_entity=tag, )
[docs] class EntityValidator: """Entity validator that validate entities have all required attributes""" required_attrs_by_entity_type: Dict[Type[Entity], List[str]] = { Node: ["handle", "model"], Edge: ["handle", "model", "src", "dst"], Property: ["handle", "model"], Term: ["origin_name", "value"], # add? "origin_id", "origin_version" Concept: ["nanoid"], Predicate: ["handle", "subject", "object"], Tag: ["key", "value"], ValueSet: ["handle"], } valid_attrs: Dict[Tuple[Type[Entity], str], Set[str]] = { (Predicate, "handle"): { "exactMatch", "closeMatch", "broader", "narrower", "related", }, }
[docs] class MissingAttributeError(Exception): """Raised when an entity doesn't have the attributes required for unique identification"""
[docs] class InvalidAttributeError(Exception): """Raised when an entity attribute is invalid"""
[docs] @staticmethod def validate_entity_has_attribute(entity: Entity, attr_name: str) -> None: """Validates the presence of an entity's attribute""" if not getattr(entity, attr_name): raise EntityValidator.MissingAttributeError( f"{entity.__class__.__name__} needs a {attr_name} attribute", )
[docs] @staticmethod def _validate_entity_attribute(entity_type: Type[Entity], attr_name: str) -> None: """Checks that an entity attribute is in a set of valid attributes""" valid_attrs = EntityValidator.valid_attrs.get((entity_type, attr_name)) if valid_attrs and getattr(entity_type, attr_name) not in valid_attrs: raise EntityValidator.InvalidAttributeError( f"{entity_type.__name__} {attr_name} must be one of: " f"{', '.join(list(valid_attrs))}", )
[docs] @staticmethod def validate_entity(entity: Entity) -> None: """ Checks if entity has all attributes required by MDB for its type, and that all of those attributes are valid themselves if they are entities or if they have a fixed set of possible values. Verifies that bento-meta entity has all necesssary attributes before it is added to an MDB instance. If looking for a unique identifier for the entity (i.e. nanoid), ensures entity has all the required attributes for unique identification. """ entity_type = type(entity) required_attrs = EntityValidator.required_attrs_by_entity_type.get(entity_type) if not required_attrs: raise ValueError(f"Entity type {entity_type.__name__} not supported") for attr_name in required_attrs: try: # validate attr that is an entity (e.g. Edge.src) if isinstance(getattr(entity, attr_name), Entity): EntityValidator.validate_entity(getattr(entity, attr_name)) # check entity has attr if required EntityValidator.validate_entity_has_attribute(entity, attr_name) # check attr with fixed set of possible values is valid EntityValidator._validate_entity_attribute(entity_type, attr_name) except ( EntityValidator.MissingAttributeError, EntityValidator.InvalidAttributeError, ) as error: logger.error(str(error)) raise
[docs] def _get_nlp_model(): """Installs and imports spacy & scispacy nlp model if any not already installed""" model_name = "en_ner_bionlp13cg_md" model_url = "https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.1/en_ner_bionlp13cg_md-0.5.1.tar.gz" try: if not find_spec("spacy"): # ensure spacy is installed check_call([executable, "-m", "pip", "install", "spacy"]) import spacy if not find_spec(model_name): # ensure model is installed check_call([executable, "-m", "pip", "install", model_url]) nlp = spacy.load(model_url) return nlp except Exception as error: logger.error(str(error)) raise