Source code for bento_meta.mdb.mdb_tools.mdb_tools

"""
ToolsMDB: subclass of 'WriteableMDB' to support interactions with the MDB.

EntityValidator: validates that entities have required attributes.
"""

from __future__ import annotations

import csv
import logging
from collections.abc import Iterable
from importlib.util import find_spec
from logging.config import fileConfig
from pathlib import Path
from subprocess import check_call
from sys import executable
from typing import TYPE_CHECKING, ClassVar

from minicypher.clauses import (
    As,
    Collect,
    DetachDelete,
    Match,
    Merge,
    OptionalMatch,
    Return,
    With,
)
from minicypher.entities import N0, R0, G, N, P, R, T, _plain_var
from minicypher.functions import count
from minicypher.statement import Statement

from bento_meta.entity import Entity
from bento_meta.mdb import make_nanoid, read_txn_data, read_txn_value
from bento_meta.mdb.writeable import WriteableMDB, write_txn
from bento_meta.objects import (
    Concept,
    Edge,
    Node,
    Predicate,
    Property,
    Tag,
    Term,
    ValueSet,
)

if TYPE_CHECKING:
    from neo4j import Record

# logging stuff
log_ini_path = Path(__file__).parents[2].joinpath("logs/log.ini")
log_file_path = Path(__file__).parents[2].joinpath(f"logs/{__name__}.log")
fileConfig(log_ini_path, defaults={"logfilename": log_file_path.as_posix()})
logger = logging.getLogger(__name__)


[docs] class ToolsMDB(WriteableMDB): """:class:`bento_meta.mdb.writeable.WriteableMDB` subclass with mdb-tools.""" def __init__(self, uri: str | None, user: str | None, password: str | None) -> None: """Initialize a :class:`ToolsMDB` object.""" super().__init__(uri=uri, user=user, password=password)
[docs] class EntityNotUniqueError(Exception): """Entity's attributes identify more than 1 property graph node in an MDB."""
[docs] class EntityNotFoundError(Exception): """Entity's attributes fail to identify a property graph node in an MDB."""
[docs] class PatternNotUniqueError(Exception): """Match pattern's attributes identify more than 1 triple in an MDB."""
[docs] class PatternNotFoundError(Exception): """Match pattern's attributes fail to identify a triple in an MDB."""
[docs] @read_txn_value # type: ignore[reportArgumentType] def _get_entity_count(self, entity: Entity) -> list[Record]: """ Return count of given entity found in an MDB. - If count = 0, entity with given attributes not found in MDB. - If count = 1, entity with given attributes is unique in MDB. - If count > 1, more attributes needed to uniquely id entity in MDB. Args: entity: The entity to count. Returns: List with count value. """ ent = N(label=entity.get_label(), props=entity.get_attr_dict()) stmt = Statement( Match(ent), Return(count(ent)), As("entity_count"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "entity_count") # type: ignore[reportReturnType]
[docs] @read_txn_value # type: ignore[reportArgumentType] def _get_pattern_count(self, pattern: T | G) -> list[Record]: """ Return count of given match pattern (triple or path) found in an MDB. Property graph triple: (n)-[r]->(m) Path: set of overlapping triples - If count = 0, pattern with given attributes not found in MDB. - If count = 1, pattern with given attributes is unique in MDB. - If count > 1, more attributes needed to uniquely id pattern in MDB. Args: pattern: The pattern (triple or path) to count. Returns: List with count value. """ stmt = Statement( Match(pattern), Return(count("*")), As("pattern_count"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "pattern_count") # type: ignore[reportReturnType]
[docs] def validate_entity_unique(self, entity: Entity) -> None: """ Validate that the given entity occurs once (& only once) in an MDB. Note: doesn't validate the entity itself because not all entity attributes are necessarily required to locate an entity in the MDB. (e.g. handle and model OR nanoid alone can identify a node) Args: entity: The entity to validate. Raises: EntityNotUniqueError: If entity attributes match multiple property graph nodes in the MDB. EntityNotFoundError: If entity attributes don't match any in the MDB. """ ent_count = int(self._get_entity_count(entity)[0]) if ent_count > 1: logger.error(str(self.EntityNotUniqueError)) raise self.EntityNotUniqueError if ent_count < 1: logger.error(str(self.EntityNotFoundError)) raise self.EntityNotFoundError
[docs] def validate_entities_unique(self, entities: Iterable[Entity]) -> None: """Run self.validate_entity_unique() over multiple entities.""" for entity in entities: self.validate_entity_unique(entity)
[docs] def validate_pattern_unique(self, pattern: T | G) -> None: """ Validate that the given match pattern occurs once (& only once) in an MDB. Args: pattern: The pattern to validate. Raises: PatternNotUniqueError: If pattern attributes match multiple triples in the MDB. PatternNotFoundError: If pattern attributes don't match any in the MDB. """ pattern_count = int(self._get_pattern_count(pattern)[0]) if pattern_count > 1: logger.error( str( self.PatternNotUniqueError( f"Pattern: {pattern.pattern()} not unique.", ), ), ) msg = f"Pattern: {pattern.pattern()} not unique." raise self.PatternNotUniqueError(msg) if pattern_count < 1: logger.error( str( self.PatternNotFoundError( f"Pattern: {pattern.pattern()} not found.", ), ), ) msg = f"Pattern: {pattern.pattern()} not found." raise self.PatternNotFoundError(msg)
[docs] @write_txn # type: ignore[reportArgumentType] def remove_entity_from_mdb(self, entity: Entity) -> list[Record]: """ Remove given Entity node from the database. Accepts the following bento-meta Entities: Concept, Node, Predicate, Property, Edge, Term Args: entity: The entity to remove. Returns: List of Records from the transaction. """ self.validate_entity_unique(entity) ent_label = entity.get_label() ent_attrs = entity.get_attr_dict() ent = N(label=ent_label, props=ent_attrs) stmt = Statement(Match(ent), DetachDelete(ent._var), use_params=True) qry = str(stmt) parms = stmt.params logger.info("Removing %s entity with with properties: %s", ent_label, ent_attrs) return (qry, parms) # type: ignore[reportReturnType]
[docs] @write_txn # type: ignore[reportArgumentType] def add_entity_to_mdb( self, entity: Entity, _commit: str | None = None, ) -> list[Record]: """ Add given Entity node to MDB instance. Args: entity: The entity to add. _commit: Optional commit string to tag the entity. Returns: List of Records from the transaction. """ EntityValidator.validate_entity(entity) EntityValidator.validate_entity_has_attribute(entity, "nanoid") if _commit: entity._commit = _commit ent_label = entity.get_label() ent_attrs = entity.get_attr_dict() ent = N(label=ent_label, props=ent_attrs) # type: ignore[arg-type] if isinstance(entity, Edge): src = N(label="node", props=entity.src.get_attr_dict()) dst = N(label="node", props=entity.dst.get_attr_dict()) src_rel = R(Type="has_src") dst_rel = R(Type="has_dst") src_trip = src_rel.relate(_plain_var(ent), _plain_var(src)) dst_trip = dst_rel.relate(_plain_var(ent), _plain_var(dst)) stmt = Statement( Merge(ent), Merge(src), Merge(dst), Merge(src_trip), Merge(dst_trip), use_params=True, ) else: stmt = Statement(Merge(ent), use_params=True) qry = str(stmt) parms = stmt.params logger.info( "Merging new %s node with properties: %s into the MDB", ent_label, ent_attrs, ) return (qry, parms) # type: ignore[reportReturnType]
[docs] @read_txn_value # type: ignore[reportArgumentType] def get_concept_nanoids_linked_to_entity( self, entity: Entity, mapping_source: str | None = None, ) -> list[Record]: """ Return list of concept nanoids linked to given entity. The concept(s) have "represents" or "has_concept" relationships tagged with the given mapping source. If no mapping source is provided, return all. Args: entity: The entity to get concepts for. mapping_source: Optional mapping source to filter by. Returns: List of concept nanoids. """ self.validate_entity_unique(entity) ent = N(label=entity.get_label(), props=entity.get_attr_dict()) concept = N(label="concept") if isinstance(entity, Term): rel = R(Type="represents") else: rel = R(Type="has_concept") ent_trip = rel.relate(ent, concept) if mapping_source is not None: tag = N( label="tag", props={"key": "mapping_source", "value": mapping_source}, ) tag_trip = T(concept, R(Type="has_tag"), tag) path = G(ent_trip, tag_trip) else: path = ent_trip stmt = Statement( Match(path), Return(f"{concept._var}.nanoid"), As("concept_nanoids"), use_params=True, ) qry = str(stmt) parms = stmt.params logger.debug("%s; %s", qry, parms) return (qry, parms, "concept_nanoids") # type: ignore[reportReturnType]
[docs] @write_txn # type: ignore[reportArgumentType] def add_relationship_to_mdb( self, relationship_type: str, src_entity: Entity, dst_entity: Entity, _commit: str = "", ) -> list[Record]: """ Add relationship between given entities in MDB. Args: relationship_type: The type of relationship to create. src_entity: Source entity for the relationship. dst_entity: Destination entity for the relationship. _commit: Optional commit string to tag the relationship. Returns: List of Records from the transaction. """ self.validate_entities_unique([src_entity, dst_entity]) rel = R(Type=relationship_type) src = N(label=src_entity.get_label(), props=src_entity.get_attr_dict()) dst = N(label=dst_entity.get_label(), props=dst_entity.get_attr_dict()) trip = rel.relate(src, dst) plain_trip = rel.relate(_plain_var(src), _plain_var(dst)) try: # check for existance shouldn't include _commit? self.validate_pattern_unique(trip) except self.PatternNotFoundError: # if triple doesn't already exist, add _commit? this means that # if triple exists w/ different _commit, merge shouldn't add anything? rel.props["_commit"] = P(handle="_commit", value=_commit) stmt = Statement(Match(src, dst), Merge(plain_trip), use_params=True) qry = str(stmt) parms = stmt.params logger.info(stmt) logger.info( "Adding %s relationship between src %s with properties: %s to dst %s " "with properties: %s", relationship_type, src.label, src_entity.get_attr_dict(), dst.label, dst_entity.get_attr_dict(), ) return (qry, parms) # type: ignore[reportReturnType]
[docs] @read_txn_value # type: ignore[reportArgumentType] def get_entity_nanoid(self, entity: Entity) -> list[Record]: """Take a unique entity in the MDB and return its nanoid.""" self.validate_entity_unique(entity) ent = N(label=entity.get_label(), props=entity.get_attr_dict()) if isinstance(entity, Edge): src = N(label="node", props=entity.src.get_attr_dict()) dst = N(label="node", props=entity.dst.get_attr_dict()) src_rel = R(Type="has_src") dst_rel = R(Type="has_dst") src_trip = src_rel.relate(ent, src) dst_trip = dst_rel.relate(ent, dst) path = G(src_trip, dst_trip) match_clause = Match(path) else: match_clause = Match(ent) stmt = Statement( match_clause, Return(f"{ent._var}.nanoid"), As("entity_nanoid"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "entity_nanoid") # type: ignore[reportReturnType]
[docs] def get_or_make_entity_nanoid(self, entity: Entity) -> str: """Obtain existing entity's nanoid or create one for new entity.""" try: return self.get_entity_nanoid(entity)[0] except self.EntityNotFoundError: return make_nanoid()
[docs] @read_txn_value # type: ignore[reportArgumentType] def get_term_nanoids( self, concept: Concept, mapping_source: str = "", ) -> list[Record]: """Return list of term nanoids representing given concept.""" self.validate_entity_unique(concept) ent = N(label="concept", props=concept.get_attr_dict()) term = N(label="term") ent_trip = T(term, R(Type="represents"), ent) if mapping_source: tag_trip = T( concept, R(Type="has_tag"), N( label="tag", props={"key": "mapping_source", "value": mapping_source}, ), ) path = G(ent_trip, tag_trip) else: path = ent_trip stmt = Statement( Match(path), Return(f"{term._var}.nanoid"), As("term_nanoids"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "term_nanoids") # type: ignore[reportReturnType]
[docs] @read_txn_value # type: ignore[reportArgumentType] def get_predicate_nanoids( self, concept: Concept, mapping_source: str = "", ) -> list[Record]: """Return list of predicate nanoids with relationship to given concept.""" self.validate_entity_unique(concept) ent = N(label="concept", props=concept.get_attr_dict()) predicate = N(label="predicate") ent_trip = T(predicate, R0(), ent) if mapping_source: tag = N( label="tag", props={"key": "mapping_source", "value": mapping_source}, ) tag_trip = T(ent, R(Type="has_tag"), tag) path = G(ent_trip, tag_trip) else: path = ent_trip stmt = Statement( Match(path), Return(f"{predicate._var}.nanoid"), As("predicate_nanoids"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "predicate_nanoids") # type: ignore[reportReturnType]
[docs] @read_txn_value # type: ignore[reportArgumentType] def get_relationship_between_entities( self, src_entity: Entity, dst_entity: Entity, ) -> list[Record]: """ Return relationship type between given entities. Relationship pattern: (src)-[:rel_type]->(dst). """ self.validate_entities_unique([src_entity, dst_entity]) trip = T( N(label=src_entity.get_label(), props=src_entity.get_attr_dict()), R(), N(label=dst_entity.get_label(), props=dst_entity.get_attr_dict()), ) stmt = Statement( Match(trip), Return(f"TYPE({trip._edge._var})"), As("relationship_type"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms, "relationship_type") # type: ignore[reportReturnType]
[docs] def merge_two_concepts( self, concept_1: Concept, concept_2: Concept, mapping_source: str = "", _commit: str = "", ) -> None: """ Combine two synonymous Concepts into a single Concept. This function takes two synonymous Concepts as bento-meta objects and merges them into a single Concept along with any connected Terms and Predicates. Args: concept_1: First concept (will remain after merge). concept_2: Second concept (will be deleted after merge). mapping_source: Source of the mapping relationship. _commit: Optional commit string to tag relationships. """ self.validate_entities_unique([concept_1, concept_2]) # get list of terms connected to concept 2 c2_term_nanoids = self.get_term_nanoids(concept_2, mapping_source) c2_terms: list[Term] = [Term({"nanoid": nanoid}) for nanoid in c2_term_nanoids] # get list of predicates connected to concept 2 c2_predicate_nanoids = self.get_predicate_nanoids(concept_2, mapping_source) c2_predicates_with_rel = [] for nanoid in c2_predicate_nanoids: predicate = Predicate({"nanoid": nanoid}) predicate_rel = self.get_relationship_between_entities( src_entity=predicate, dst_entity=concept_2, )[0] c2_predicates_with_rel.append((predicate, predicate_rel)) # delete concept 2 self.remove_entity_from_mdb(concept_2) # connect terms from deleted (c2) to remaining concept (c1) for term in c2_terms: self.add_relationship_to_mdb( relationship_type="represents", src_entity=term, dst_entity=concept_1, _commit=_commit, ) # connect predicates from deleted (c2) to remaining concept (c1) for predicate, rel in c2_predicates_with_rel: self.add_relationship_to_mdb( relationship_type=rel, src_entity=predicate, dst_entity=concept_1, _commit=_commit, )
[docs] @read_txn_data # type: ignore[reportArgumentType] def _get_all_terms(self) -> list[Record]: """Return list of all terms in an MDB.""" term = N(label="term") stmt = Statement(Match(term), Return(term._var)) qry = str(stmt) parms = {} logger.debug(qry) return (qry, parms) # type: ignore[reportReturnType]
[docs] def get_potential_term_synonyms( self, term: Term, threshhold: float = 0.8, ) -> list[dict]: """ Return list of dicts representing potential synonymous Term nodes. Args: term: The term to find synonyms for. threshhold: Similarity threshold (0-1) for considering terms synonymous. Returns: List of dicts with value, origin_name, nanoid, similarity, valid_synonym. """ self.validate_entity_unique(term) nlp = _get_nlp_model() all_terms_result = self._get_all_terms() all_terms = [list(item.values())[0] for item in all_terms_result] # get likely synonyms synonyms = [] for term_attr_dict in all_terms: # calculate similarity between each Term and input Term term_1 = nlp(term.value) term_2 = nlp(term_attr_dict["value"]) similarity_score = term_1.similarity(term_2) # if similarity threshold met, add to list of potential synonyms if similarity_score >= threshhold: synonym = { "value": term_attr_dict["value"], "origin_name": term_attr_dict["origin_name"], "nanoid": term_attr_dict["nanoid"], "similarity": similarity_score, "valid_synonym": 0, # mark 1 if synonym when uploading later } synonyms.append(synonym) return sorted(synonyms, key=lambda d: d["similarity"], reverse=True)
[docs] def potential_synonyms_to_csv( self, input_data: list[dict], output_path: str, ) -> None: """ Write CSV with list of synonymous Terms as dicts to given output path. Args: input_data: List of dicts representing potential synonyms. output_path: Path to write CSV file to. """ with Path(output_path).open("w", encoding="utf8", newline="") as output_file: dict_writer = csv.DictWriter(output_file, fieldnames=input_data[0].keys()) dict_writer.writeheader() dict_writer.writerows(input_data)
[docs] @read_txn_data # type: ignore[reportArgumentType] def get_property_synonyms_direct( self, entity: Property, mapping_source: str = "", ) -> list[Record]: """ Return list of properties linked by concept to given property. Args: entity: The property to get synonyms for. mapping_source: Optional mapping source to filter by. Returns: List of Records with property synonyms. """ self.validate_entity_unique(entity) ent = N(label="property", props=entity.get_attr_dict()) prop = N(label="property") concept = N(label="concept") ent_trip_1 = T(ent, R(Type="has_concept"), concept) ent_trip_2 = T(prop, R(Type="has_concept"), concept) if mapping_source: tag = N( label="tag", props={"key": "mapping_source", "value": mapping_source}, ) tag_trip = T(concept, R(Type="has_tag"), tag) path = G(ent_trip_1, ent_trip_2, tag_trip) else: path = G(ent_trip_1, ent_trip_2) stmt = Statement( Match(path), With(f"{Collect(_plain_var(prop).pattern())}"), As("synonyms"), Return("synonyms"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms) # type: ignore[reportReturnType]
[docs] def _get_property_synonyms_direct_as_list(self, entity: Property) -> list[Property]: """ Convert results of read_txn_data-wrapped function with one Property to a list. Args: entity: The property to get synonyms for. Returns: List of bento_meta.objects.Property entities. """ data = self.get_property_synonyms_direct(entity) return [Property(s) for s in data[0]["synonyms"]]
[docs] def get_property_synonyms_all(self, entity: Property) -> list[Property]: """ Return list of properties linked by concept to given property or its synonyms. Chains through synonyms of given property (and so on). Args: entity: The property to get all synonyms for. Returns: List of Property entities that are synonyms. """ self.validate_entity_unique(entity) all_synonyms = [] queue = [entity] visited = {entity.nanoid} while queue: current = queue.pop(0) direct_synonyms = self._get_property_synonyms_direct_as_list(current) for synonym in direct_synonyms: if synonym.nanoid not in visited: visited.add(synonym.nanoid) queue.append(synonym) all_synonyms.append(synonym) return all_synonyms
[docs] @read_txn_data # type: ignore[reportArgumentType] def _get_property_parents_data(self, entity: Property) -> list[Record]: """ Get list of nodes/edges connected to given property. Parents are Nodes or Edges via the "has_property" relationship. Args: entity: The property to get parents for. Returns: List of Records with nodes and edges. """ self.validate_entity_unique(entity) p_attrs = entity.get_attr_dict() child_prop = N(label="property", props=p_attrs) parent_node = N(label="node") parent_edge = N(label="relationship") rel = R(Type="has_property", _dir="_left") trip1 = rel.relate(child_prop, N0()) trip2 = rel.relate(_plain_var(child_prop), parent_node) trip3 = rel.relate(_plain_var(child_prop), parent_edge) stmt = Statement( Match(trip1), OptionalMatch(trip2), OptionalMatch(trip3), With(), Collect(_plain_var(parent_node).pattern()), As("nodes,"), Collect(_plain_var(parent_edge).pattern()), As("edges"), Return("nodes, edges"), use_params=True, ) qry = str(stmt) parms = stmt.params return (qry, parms) # type: ignore[reportReturnType]
[docs] def get_property_parents(self, entity: Property) -> list[Node | Edge]: """ Get parents of a property as a list of bento_meta Nodes or Edges. Args: entity: The property to get parents for. Returns: List of Node or Edge entities. """ self.validate_entity_unique(entity) data = self._get_property_parents_data(entity) node_parents = [Node(p) for p in data[0]["nodes"]] edge_parents = [Edge(p) for p in data[0]["edges"]] return node_parents + edge_parents
[docs] def add_tag_to_mdb_entity(self, tag: Tag, entity: Entity) -> None: """ Add a tag to an existing entity in an MDB. Args: tag: The tag to add. entity: The entity to tag. """ self.validate_entity_unique(entity) self.add_entity_to_mdb(tag) self.add_relationship_to_mdb( relationship_type="has_tag", src_entity=entity, dst_entity=tag, )
[docs] class EntityValidator: """Class to validate that bento-meta entities have all required attributes.""" required_attrs_by_entity_type: ClassVar[dict[type[Entity], list[str]]] = { Node: ["handle", "model"], Edge: ["handle", "model", "src", "dst"], Property: ["handle", "model"], Term: ["origin_name", "value"], # add? "origin_id", "origin_version" Concept: ["nanoid"], Predicate: ["handle", "subject", "object"], Tag: ["key", "value"], ValueSet: ["handle"], } valid_attrs: ClassVar[dict[tuple[type[Entity], str], set[str]]] = { (Predicate, "handle"): { "exactMatch", "closeMatch", "broader", "narrower", "related", } }
[docs] class MissingAttributeError(Exception): """An entity doesn't have the attributes required for unique identification."""
[docs] class InvalidAttributeError(Exception): """An entity attribute is invalid."""
[docs] @staticmethod def validate_entity_has_attribute(entity: Entity, attr_name: str) -> None: """Validate the presence of an entity's attribute.""" if not getattr(entity, attr_name): msg = f"{entity.__class__.__name__} needs a {attr_name} attribute" raise EntityValidator.MissingAttributeError(msg)
[docs] @staticmethod def _validate_entity_attribute(entity_type: type[Entity], attr_name: str) -> None: """Check that an entity attribute is in a set of valid attributes.""" valid_attrs = EntityValidator.valid_attrs.get((entity_type, attr_name)) if valid_attrs and getattr(entity_type, attr_name) not in valid_attrs: msg = ( f"{entity_type.__name__} {attr_name} must be one of: " f"{', '.join(list(valid_attrs))}" ) raise EntityValidator.InvalidAttributeError(msg)
[docs] @staticmethod def validate_entity(entity: Entity) -> None: """ Check if entity has all attributes required by MDB for its type. Also checks validates all of those attributes are valid themselves if they are entities or if they have a fixed set of possible values. If looking for a unique identifier for the entity (i.e. nanoid), ensures entity has all the required attributes for unique identification. Args: entity: The entity to validate. Raises: ValueError: If entity type not supported. MissingAttributeError: If required attribute missing. InvalidAttributeError: If attribute value invalid. """ entity_type = type(entity) required_attrs = EntityValidator.required_attrs_by_entity_type.get(entity_type) if not required_attrs: msg = f"Entity type {entity_type.__name__} not supported" raise ValueError(msg) try: for attr_name in required_attrs: # validate attr that is an entity (e.g. Edge.src) if isinstance(getattr(entity, attr_name), Entity): EntityValidator.validate_entity(getattr(entity, attr_name)) # check entity has attr if required EntityValidator.validate_entity_has_attribute(entity, attr_name) # check attr with fixed set of possible values is valid EntityValidator._validate_entity_attribute(entity_type, attr_name) except ( EntityValidator.MissingAttributeError, EntityValidator.InvalidAttributeError, ): logger.exception("Error validating entity") raise
[docs] def _get_nlp_model(): """Install and import spacy & scispacy nlp model if any not already installed.""" model_name = "en_ner_bionlp13cg_md" model_url = "https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.1/en_ner_bionlp13cg_md-0.5.1.tar.gz" try: if not find_spec("spacy"): # ensure spacy is installed check_call([executable, "-m", "pip", "install", "spacy"]) import spacy if not find_spec(model_name): # ensure model is installed check_call([executable, "-m", "pip", "install", model_url]) nlp = spacy.load(model_url) return nlp except Exception: logger.exception("Error getting NLP model") raise