Source code for bento_meta.object_map

"""
bento_meta.object_map
=====================

This module contains :class:`ObjectMap`, a class which provides the
machinery for mapping bento_meta objects to a Bento Metamodel Database
in Neo4j. Mostly not for human consumption. The ObjectMap:

* interprets the attribute specification (attspec) and map
  specification (mapspec) associated with :class:`Entity` subclasses
* provides the :meth:`get` and :meth:`put` methods to subclasses, that
  enable them to get and put themselves to the database
* generates appropriate
 `Cypher <https://neo4j.com/docs/cypher-manual/current/>`
 queries to do gets and puts
 One ObjectMap instance should be generated for each Entity subclass (see, e.g.,
 :class:`bento_meta.model.Model`)
"""

from __future__ import annotations

import re
import sys

sys.path.append("..")
from typing import Any, ClassVar, cast
from warnings import warn

from neo4j import BoltDriver, Driver, Neo4jDriver, Transaction
from typing_extensions import LiteralString

from bento_meta.entity import ArgError, CollValue, Entity
from bento_meta.objects import (
    Concept,
    Edge,
    Node,
    Origin,
    Predicate,
    Property,
    Tag,
    Term,
    ValueSet,
)


[docs] class ObjectMap: """ Machinery for mapping bento_meta objects to a Bento Metamodel Database in Neo4j. Mostly not for human consumption. """ cache: ClassVar[dict] = {} def __init__( self, *, cls: type[Entity] | None = None, drv: Driver | None = None, ) -> None: """ Initialize the ObjectMap. Args: cls: The class to map. drv: The Neo4j driver. """ if not cls: msg = "arg cls= is required" raise ArgError(msg) self.cls = cls if drv: if isinstance(drv, (Neo4jDriver, BoltDriver)): self.drv = drv else: msg = ( "drv= arg must be Neo4jDriver or BoltDriver " "(returned from GraphDatabase.driver())" ) raise ArgError(msg) self.maps = {}
[docs] @classmethod def clear_cache(cls) -> None: """Clear the cache.""" cls.cache = {}
[docs] @classmethod def cls_by_label(cls, lbl: str) -> type[Entity] | None: """Get the class by label.""" if not hasattr(cls, "_clsxlbl"): cls._clsxlbl = {} for o in (Node, Edge, Property, ValueSet, Term, Concept, Origin, Tag): cls._clsxlbl[o.mapspec()["label"]] = o return cls._clsxlbl.get(lbl)
[docs] @classmethod def keys_by_cls_and_reln( cls, qcls: type[Entity], reln: str, ) -> tuple[str, str | None] | None: """Get the keys by class and relationship.""" if not hasattr(cls, "_keysxcls"): cls._keysxcls = {} for o in ( Node, Edge, Property, ValueSet, Term, Concept, Predicate, Origin, Tag, ): for oatt in [x for x in o.attspec if o.attspec[x] == "object"]: r = o.mapspec()["relationship"][oatt]["rel"] r = re.match("[:<>]*([a-zA-Z_]+)[:<>]*", r).group(1) cls._keysxcls[(o.__name__, r)] = (oatt, None) for catt in [x for x in o.attspec if o.attspec[x] == "collection"]: r = o.mapspec()["relationship"][catt]["rel"] r = re.match("[:<>]*([a-zA-Z_]+)[:<>]*", r).group(1) cls._keysxcls[(o.__name__, r)] = (catt, o.mapspec()["key"]) return cls._keysxcls.get((qcls.__name__, reln))
[docs] @classmethod def _quote_val( cls, value: str | float | None, *, single: bool | None = None, ) -> str | float | None: # double quote unless single is set """Quote the value unless single is set.""" if value is None: return None if isinstance(value, (int, float)): return value # no quote if single: return f"'{value}'" # quote return f'"{value}"' # quote
[docs] def get_by_id( self, obj: Entity, id: str, *, refresh: bool = False, ) -> Entity | None: """Get an entity given an id attribute value (not the Neo4j id).""" neoid = None if self.drv is None: msg = "get_by_id() requires Neo4j driver instance" raise ArgError(msg) with self.drv.session() as session: result = session.run(cast("LiteralString", self.get_by_id_q()), {"id": id}) rec = ( result.single() ) # should be unique - this call will warn if there are more than one if rec is not None: neoid = rec["id(n)"] if neoid is not None: obj.neoid = neoid return self.get(obj, refresh=True) return None
[docs] def get_by_node_nanoid( self, obj: Entity, nanoid: str, *, refresh: bool = False, ) -> Entity | None: """PROTOTYPE: Get an entity given an id attribute value (not the Neo4j id).""" neo4jid = None if not self.drv: msg = "get_by_id() requires Neo4j driver instance" raise ArgError(msg) with self.drv.session() as session: result = session.run( cast("LiteralString", self.get_by_node_nanoid_q()), {"nanoid": nanoid}, ) rec = ( result.single() ) # should be unique - this call will warn if there are more than one if rec is not None: neo4jid = rec["id(n)"] if neo4jid is None: obj.neoid = neo4jid return self.get(obj, refresh=True) return None
[docs] def get(self, obj: Entity, *, refresh: bool = False) -> Entity: """Get the data for an object instance from the db and load the instance with it.""" if not self.drv: msg = "get() requires Neo4j driver instance" raise ArgError(msg) if refresh: pass elif (obj.neoid in ObjectMap.cache) and (ObjectMap.cache[obj.neoid].dirty >= 0): return obj with self.drv.session() as session: result = session.run(cast("LiteralString", self.get_q(obj))) rec = result.single() if not rec: msg = f"object with id {obj.neoid} not found in db" raise RuntimeError(msg) if obj.neoid not in ObjectMap.cache: ObjectMap.cache[obj.neoid] = obj with self.drv.session() as session: for att in self.cls.mapspec()["relationship"]: result = session.run(cast("LiteralString", self.get_attr_q(obj, att))) values = {} first_val = None for rec in result: o = ObjectMap.cache.get(rec["a"].id) if o: if not first_val: first_val = o values[getattr(o, type(o).mapspec()["key"])] = o else: c = None for lbl in rec["a"].labels: c = ObjectMap.cls_by_label(lbl) if c: break if not c: msg = ( f"node labels {rec['a'].labels} " "have no associated class in the object model" ) raise RuntimeError(msg) o = c(rec["a"]) o.dirty = -1 ObjectMap.cache[o.neoid] = o if not first_val: first_val = o values[getattr(o, type(o).mapspec()["key"])] = o if self.cls.attspec[att] == "object" and len(values) > 1: warn( ( f"expected one node for attribute {att} on class " f"{self.cls.__name__}, but got {len(values)}; using first one" ), stacklevel=2, ) if self.cls.attspec[att] == "object": setattr(obj, att, first_val) elif self.cls.attspec[att] == "collection": setattr(obj, att, values) else: msg = ( f"attribute '{att}' has unknown attribute type " f"'{self.cls.attspec[att]}'" ) raise RuntimeError(msg) obj.clear_removed_entities() obj.dirty = 0 return obj
[docs] def put(self, obj: Entity) -> Entity: """Put the object instance's attributes to the mapped data node in the database.""" if not self.drv: msg = "put() requires Neo4j driver instance" raise ArgError(msg) with self.drv.session() as session: result = None with session.begin_transaction() as tx: for qry in self.put_q(obj): result = tx.run(cast("LiteralString", qry)) if result is None: msg = "no result from put_q" raise RuntimeError(msg) obj.neoid = result.single().value("id(n)") if obj.neoid is None: msg = ( "no neo4j id retrived on put for obj " f"'{getattr(obj, self.cls.mapspec()['key'])}'" ) raise RuntimeError(msg) for att in self.cls.mapspec()["relationship"]: values = getattr(obj, att) if not values: continue if isinstance(values, CollValue): items = values.values() else: items = [values] for val in items: if val.neoid is not None: continue # put val as a node for qry in ObjectMap(cls=type(val), drv=self.drv).put_q(val): result = tx.run(cast("LiteralString", qry)) val.neoid = result.single().value("id(n)") if val.neoid is None: msg = ( "no neo4j id retrived on put for obj " f"'{val[type(val).mapspec()['key']]}'" ) raise RuntimeError(msg) val.dirty = 1 ObjectMap.cache[val.neoid] = val for qry in self.put_attr_q(obj, att, values): tx.run(cast("LiteralString", qry)) # drop removed entities here while obj.removed_entities: ent = obj.removed_entities.pop() self.drop(obj, *ent, tx) ObjectMap.cache[obj.neoid] = obj obj.dirty = 0 return obj
[docs] def rm(self, obj: Entity, *, force: bool | int = False) -> Any | None: """'Delete' the object's mapped node from the database.""" if not self.drv: msg = "rm() requires Neo4j driver instance" raise ArgError(msg) if obj.neoid is None: msg = "object must be mapped (i.e., obj.neoid must be set)" raise ArgError(msg) with self.drv.session() as session: result = session.run(cast("LiteralString", self.rm_q(obj, detach=force))) s = result.single() if s is None: warn("rm() - corresponding db node not found", stacklevel=2) else: return s.value() return None
[docs] def add(self, obj: Entity, att: str, tgt: Entity) -> Any: """ Create a link between an object instance and a target object in the database. This represents adding an object-valued attribute to the object. Args: obj: The object instance to add attribute to. att: The attribute name. tgt: The target entity to link. Returns: The Neo4j ID of the target, or None if not found. """ if not self.drv: msg = "add() requires Neo4j driver instance" raise ArgError(msg) with self.drv.session() as session: for qry in self.put_attr_q(obj, att, tgt): result = session.run(cast("LiteralString", qry)) tgt_id = result.single().value() if tgt_id is None: warn("add() - corresponding db node not found", stacklevel=2) return tgt_id
[docs] def drop( self, obj: Entity, att: str, tgt: Entity, tx: Transaction | None = None, ) -> Any: """ Remove an existing link between an object instance and a target object in the database. This represents dropping an object-valued attribute from the object. Args: obj: The object instance to remove attribute from. att: The attribute name. tgt: The target entity to unlink. tx: Optional transaction to use for the operation. Returns: The result value, or None if not found. """ if not self.drv: msg = "rm() requires Neo4j driver instance" raise ArgError(msg) # if the tgt is not in the database, then dropping it is a no-op: if not tgt.neoid: return None if tx: result = None for qry in self.rm_attr_q(obj, att, tgt): result = tx.run(cast("LiteralString", qry)) s = result.single() if s is None: warn("drop() - corresponding target db node not found", stacklevel=2) else: return s.value() else: with self.drv.session() as session: result = None for qry in self.rm_attr_q(obj, att, tgt): result = session.run(cast("LiteralString", qry)) s = result.single() if s is None: warn( "drop() - corresponding target db node not found", stacklevel=2, ) else: return s.value() return None
[docs] def get_owners( self, obj: Entity, ) -> list[tuple[Entity, tuple[str, str | None] | None]]: """Get the nodes which are linked to the object instance (the owners of the object).""" if not self.drv: msg = "get_owners() requires Neo4j driver instance" raise ArgError(msg) ret = [] with self.drv.session() as session: result = session.run(cast("LiteralString", self.get_owners_q(obj))) for rec in result: if rec["reln"][0] == "_": # skip _prev, _next, and convenience links break ocls = None for lbl in rec["a"].labels: ocls = self.cls_by_label(lbl) if ocls: break assert ocls o = ocls(rec["a"]) # creating object but not putting in the cache - why? keys = self.keys_by_cls_and_reln(type(o), rec["reln"]) # obj.belongs[(id(o),*keys)] = o # not setting the belongs on the obj - why? ret.append((o, keys)) return ret
[docs] def get_q(self, obj: Entity) -> str: """Get the query for an object.""" if not isinstance(obj, self.cls): msg = f"arg1 must be object of class {self.cls.__name__}" raise ArgError(msg) if obj.neoid is None: msg = "object must be mapped (i.e., obj.neoid must be set)" raise ArgError(msg) return ( f"MATCH (n:{self.cls.mapspec()['label']}) " f"WHERE id(n)={obj.neoid} RETURN n,id(n)" )
[docs] def get_by_id_q(self) -> str: """Get the query for an entity by its Neo4j id.""" return ( f"MATCH (n:{self.cls.mapspec()['label']}) " "WHERE id(n)=$id and n._to IS NULL RETURN id(n)" )
[docs] def get_by_node_nanoid_q(self) -> str: """PROTOTYPE: Get the query for an entity given its nanoid.""" return "MATCH (n:node) WHERE n.nanoid=$nanoid and n._to is NULL RETURN id(n)"
[docs] def get_attr_q(self, obj: Entity, att: str) -> str: """Get the query for an attribute of an object.""" if not isinstance(obj, self.cls): msg = f"arg1 must be object of class {self.cls.__name__}" raise ArgError(msg) if obj.neoid is None: msg = "object must be mapped (i.e., obj.neoid must be set)" raise ArgError(msg) label = self.cls.mapspec()["label"] if att in self.cls.mapspec()["property"]: pr = self.cls.mapspec()["property"][att] return f"MATCH (n:{label}) WHERE id(n)={obj.neoid} RETURN n.{pr}" if att in self.cls.mapspec()["relationship"]: spec = self.cls.mapspec()["relationship"][att] end_cls = spec["end_cls"] if isinstance(end_cls, str): end_cls = {end_cls} end_lbls = [eval(x).mapspec()["label"] for x in end_cls] rel = re.sub("^([^:]?)(:[a-zA-Z0-9_]+)(.*)$", r"\1-[\2]-\3", spec["rel"]) if len(end_lbls) == 1: qry = ( f"MATCH (n:{label}){rel}(a:{end_lbls[0]}) " f"WHERE id(n)={obj.neoid} RETURN a" ) if self.cls.attspec[att] == "object": qry += " LIMIT 1" return qry # multiple end classes possible cond = " OR ".join([f"'{lbl}' IN labels(a)" for lbl in end_lbls]) return ( f"MATCH (n:{label}){rel}(a) WHERE id(n)={obj.neoid} AND ({cond}) " "RETURN a" ) msg = f"'{att}' is not a registered attribute for class '{self.cls.__name__}'" raise ArgError(msg)
[docs] def get_owners_q(self, obj: Entity) -> str: """Get the query for the owners of an object.""" if not isinstance(obj, self.cls): msg = f"arg1 must be object of class {self.cls.__name__}" raise ArgError(msg) if obj.neoid is None: msg = "object must be mapped (i.e., obj.neoid must be set)" raise ArgError(msg) label = self.cls.mapspec()["label"] return ( f"MATCH (n:{label})<-[r]-(a) WHERE id(n)={obj.neoid} " "RETURN TYPE(r) as reln, a" )
[docs] def put_q(self, obj: Entity) -> list[str]: """Get the query for putting an object.""" if not isinstance(obj, self.cls): msg = f"arg1 must be object of class {self.cls.__name__}" raise ArgError(msg) props = {} null_props = [] for pr in self.cls.mapspec()["property"]: if getattr(obj, pr) is None: null_props.append(self.cls.mapspec()["property"][pr]) else: props[self.cls.mapspec()["property"][pr]] = getattr(obj, pr) stmts = [] if obj.neoid is not None: set_clause = "SET " + ",".join( [f"n.{pr}={ObjectMap._quote_val(props[pr])}" for pr in props], ) stmts.append( f"MATCH (n:{self.cls.mapspec()['label']}) WHERE id(n)={obj.neoid} " f"{set_clause} RETURN n,id(n)", ) stmts.extend( [ ( f"MATCH (n:{self.cls.mapspec()['label']}) WHERE id(n)=" f"{obj.neoid} REMOVE n.{pr} RETURN n,id(n)" ) for pr in null_props ], ) return stmts spec = ",".join([f"{pr}:{ObjectMap._quote_val(props[pr])}" for pr in props]) return [ f"CREATE (n:{self.cls.mapspec()['label']} {{{spec}}}) RETURN n,id(n)", ]
[docs] def put_attr_q( self, obj: Entity, att: str, values: Entity | list[Entity] | CollValue, ) -> str | list[str]: """Get the query for putting an attribute of an object.""" if not isinstance(obj, self.cls): msg = f"arg1 must be object of class {self.cls.__name__}" raise ArgError(msg) if obj.neoid is None: msg = "object must be mapped (i.e., obj.neoid must be set)" raise ArgError(msg) if not isinstance(values, (Entity, list, CollValue)): msg = "'values' must be a list of values suitable for the attribute" raise ArgError(msg) if isinstance(values, CollValue): values = values.values() elif isinstance(values, Entity): values = [values] if att in self.cls.mapspec()["property"]: return ( f"MATCH (n:{self.cls.mapspec()['label']}) WHERE id(n)={obj.neoid} " f"SET {self.cls.mapspec()['property'][att]}=" f"{ObjectMap._quote_val(values[0])} RETURN id(n)" ) if att in self.cls.mapspec()["relationship"]: if not self._check_values_list(att, values): msg = ( "'values' must be a list of mapped Entity objects of " f"the appropriate subclass for attribute '{att}'", ) raise ArgError(msg) stmts = [] spec = self.cls.mapspec()["relationship"][att] end_cls = spec["end_cls"] if isinstance(end_cls, str): end_cls = {end_cls} end_lbls = [eval(x).mapspec()["label"] for x in end_cls] rel = re.sub("^([^:]?)(:[a-zA-Z0-9_]+)(.*)$", r"\1-[\2]-\3", spec["rel"]) cond = " OR ".join([f"'{lbl}' IN labels(a)" for lbl in end_lbls]) for avalue in values: if len(end_lbls) == 1: stmts.append( f"MATCH (n:{self.cls.mapspec()['label']}),(a:{end_lbls[0]}) " f"WHERE id(n)={obj.neoid} AND id(a)={avalue.neoid} " f"MERGE (n){rel}(a) RETURN id(a)", ) else: stmts.append( f"MATCH (n:{self.cls.mapspec()['label']}),(a) " f"WHERE id(n)={obj.neoid} AND id(a)={avalue.neoid} AND " f"({cond}) MERGE (n){rel}(a) RETURN id(a)", ) return stmts msg = f"'{att}' is not a registered attribute for class '{self.cls.__name__}'" raise ArgError(msg)
[docs] def rm_q(self, obj: Entity, *, detach: bool = False) -> str: """Get the query for removing an object.""" if not isinstance(obj, self.cls): msg = f"arg1 must be object of class {self.cls.__name__}" raise ArgError(msg) if obj.neoid is None: msg = "object must be mapped (i.e., obj.neoid must be set)" raise ArgError(msg) dlt = "DETACH DELETE n" if detach else "DELETE n" qry = f"MATCH (n:{self.cls.mapspec()['label']}) WHERE id(n)={obj.neoid} " return qry + dlt
[docs] def rm_attr_q( self, obj: Entity, att: str, values: list[Entity] | None = None, ) -> str | list[str]: """Get the query for removing an attribute of an object.""" if not isinstance(obj, self.cls): msg = f"arg1 must be object of class {self.cls.__name__}" raise ArgError(msg) if obj.neoid is None: msg = "object must be mapped (i.e., obj.neoid must be set)" raise ArgError(msg) if values and not isinstance(values, list): values = [values] if att in self.cls.mapspec()["property"]: return ( f"MATCH (n:{self.cls.mapspec()['label']}) " f"WHERE id(n)={obj.neoid} REMOVE n.{att} RETURN id(n)" ) if att in self.cls.mapspec()["relationship"]: many = self.cls.attspec[att] == "collection" spec = self.cls.mapspec()["relationship"][att] end_cls = spec["end_cls"] if isinstance(end_cls, str): end_cls = {end_cls} end_lbls = [eval(x).mapspec()["label"] for x in end_cls] cond = " OR ".join([f"'{lbl}' IN labels(a)" for lbl in end_lbls]) rel = re.sub("^([^:]?)(:[a-zA-Z0-9_]+)(.*)$", r"\1-[r\2]-\3", spec["rel"]) if values and values[0] == ":all": if len(end_lbls) == 1: return ( f"MATCH (n:{self.cls.mapspec()['label']}){rel}(a:{end_lbls[0]})" f" WHERE id(n)={obj.neoid} DELETE r RETURN id(n),id(a)" ) return ( f"MATCH (n:{self.cls.mapspec()['label']}){rel}(a) " f"WHERE id(n)={obj.neoid} AND ({cond}) DELETE r RETURN id(n)" ) stmts = [] if not self._check_values_list(att, values): msg = ( "'values' must be a list of mapped Entity objects of the " f"appropriate subclass for attribute '{att}'", ) raise ArgError(msg) for val in values: qry = "" if len(end_lbls) == 1: qry = ( f"MATCH (n:{self.cls.mapspec()['label']}){rel}(a:{end_lbls[0]})" f" WHERE id(n)={obj.neoid} AND id(a)={val.neoid} " f"DELETE r RETURN id(n),id(a)" ) else: qry = ( f"MATCH (n:{self.cls.mapspec()['label']}){rel}(a) " f"WHERE id(n)={obj.neoid} AND id(a)={val.neoid} AND ({cond}) " f"DELETE r RETURN id(n),id(a)" ) stmts.append(qry) return stmts msg = f"'{att}' is not a registered attribute for class '{self.cls.__name__}'" raise ArgError(msg)
[docs] def _check_values_list(self, att: str, values: list[Entity] | CollValue) -> bool: """Check if the values are a list of mapped Entity objects of the appropriate subclass for an attribute.""" v = values if isinstance(values, CollValue): v = values.values() chk = [x.neoid is None for x in v] if True in chk: return False end_cls = self.cls.mapspec()["relationship"][att]["end_cls"] if isinstance(end_cls, str): end_cls = {end_cls} cls_set = tuple([eval(x) for x in end_cls]) print(f"{cls_set=}") print(f"{v=}") chk = [isinstance(x, cls_set) for x in v] return True in chk