"""
bento_meta.object_map
=====================
This module contains :class:`ObjectMap`, a class which provides the
machinery for mapping bento_meta objects to a Bento Metamodel Database
in Neo4j. Mostly not for human consumption. The ObjectMap:
* interprets the attribute specification (attspec) and map
specification (mapspec) associated with :class:`Entity` subclasses
* provides the :meth:`get` and :meth:`put` methods to subclasses, that
enable them to get and put themselves to the database
* generates appropriate `Cypher <https://neo4j.com/docs/cypher-manual/current/>`_ queries to do gets and puts
One ObjectMap instance should be generated for each Entity subclass (see, e.g.,
:class:`bento_meta.model.Model`)
"""
import re
import sys
sys.path.append("..")
from neo4j import BoltDriver, Neo4jDriver
from bento_meta.entity import *
from bento_meta.objects import *
# from pdb import set_trace
[docs]class ObjectMap(object):
"""This module contains :class:`ObjectMap`, a class which provides the
machinery for mapping bento_meta objects to a Bento Metamodel Database
in Neo4j. Mostly not for human consumption."""
cache = {}
def __init__(self, *, cls=None, drv=None):
if not cls:
raise ArgError("arg cls= is required")
self.cls = cls
if drv:
if isinstance(drv, (Neo4jDriver, BoltDriver)):
self.drv = drv
else:
raise ArgError(
"drv= arg must be Neo4jDriver or BoltDriver (returned from GraphDatabase.driver())"
)
self.maps = {}
[docs] @classmethod
def clear_cache(cls):
cls.cache = {}
[docs] @classmethod
def cls_by_label(cls, lbl):
if not hasattr(cls, "_clsxlbl"):
cls._clsxlbl = {}
for o in (Node, Edge, Property, ValueSet, Term, Concept, Origin, Tag):
cls._clsxlbl[o.mapspec()["label"]] = o
return cls._clsxlbl.get(lbl)
[docs] @classmethod
def keys_by_cls_and_reln(cls, qcls, reln):
if not hasattr(cls, "_keysxcls"):
cls._keysxcls = {}
for o in (Node, Edge, Property, ValueSet, Term, Concept, Predicate, Origin, Tag):
for oatt in [x for x in o.attspec if o.attspec[x] == "object"]:
r = o.mapspec()["relationship"][oatt]["rel"]
r = re.match("[:<>]*([a-zA-Z_]+)[:<>]*", r).group(1)
cls._keysxcls[(o.__name__, r)] = (oatt, None)
for catt in [x for x in o.attspec if o.attspec[x] == "collection"]:
r = o.mapspec()["relationship"][catt]["rel"]
r = re.match("[:<>]*([a-zA-Z_]+)[:<>]*", r).group(1)
cls._keysxcls[(o.__name__, r)] = (catt, o.mapspec()["key"])
return cls._keysxcls.get((qcls.__name__, reln))
[docs] @classmethod
def _quote_val(cls, value, single=None): # double quote unless single is set
if value is None:
return
if isinstance(value, (int, float)):
return value # no quote
else:
if single:
return "'{val}'".format(val=value) # quote
else:
return '"{val}"'.format(val=value) # quote
[docs] def get_by_id(self, obj, id, refresh=False):
"""Get an entity given an id attribute value (not the Neo4j id)"""
neoid = None
if self.drv is None:
raise ArgError("get_by_id() requires Neo4j driver instance")
with self.drv.session() as session:
result = session.run(self.get_by_id_q(), {"id": id})
rec = (
result.single()
) # should be unique - this call will warn if there are more than one
if rec is not None:
neoid = rec["id(n)"]
if neoid is not None:
obj.neoid = neoid
return self.get(obj, refresh=True)
else:
return
[docs] def get_by_node_nanoid(self, obj, nanoid, refresh=False):
"""PROTOTYPE
Get an entity given an id attribute value (not the Neo4j id)
"""
neo4jid = None
if not self.drv:
raise ArgError("get_by_id() requires Neo4j driver instance")
with self.drv.session() as session:
result = session.run(self.get_by_node_nanoid_q(), {"nanoid": nanoid})
rec = (
result.single()
) # should be unique - this call will warn if there are more than one
if rec is not None:
neo4jid = rec["id(n)"]
if neo4jid is None:
obj.neoid = neo4jid
return self.get(obj, refresh=True)
else:
return
[docs] def get(self, obj, refresh=False):
"""Get the data for an object instance from the db and load the instance with it"""
if not self.drv:
raise ArgError("get() requires Neo4j driver instance")
if refresh:
pass
elif (obj.neoid in ObjectMap.cache) and (ObjectMap.cache[obj.neoid].dirty >= 0):
return obj
with self.drv.session() as session:
result = session.run(self.get_q(obj))
rec = result.single()
if not rec:
raise RuntimeError(
"object with id {neoid} not found in db".format(neoid=obj.neoid)
)
if obj.neoid not in ObjectMap.cache:
ObjectMap.cache[obj.neoid] = obj
with self.drv.session() as session:
for att in self.cls.mapspec()["relationship"]:
result = session.run(self.get_attr_q(obj, att))
values = {}
first_val = None
for rec in result:
o = ObjectMap.cache.get(rec["a"].id)
if o:
if not first_val:
first_val = o
values[getattr(o, type(o).mapspec()["key"])] = o
else:
c = None
for l in rec["a"].labels:
c = ObjectMap.cls_by_label(l)
if c:
break
if not c:
raise RuntimeError(
"node labels {lbls} have no associated class in the object model".format(
lbls=rec["a"].labels
)
)
o = c(rec["a"])
o.dirty = -1
ObjectMap.cache[o.neoid] = o
if not first_val:
first_val = o
values[getattr(o, type(o).mapspec()["key"])] = o
if self.cls.attspec[att] == "object" and len(values) > 1:
warn(
"expected one node for attribute {att} on class {cls}, but got {n}; using first one".format(
att=att, cls=self.cls.__name__, n=len(values)
)
)
if self.cls.attspec[att] == "object":
setattr(obj, att, first_val)
elif self.cls.attspec[att] == "collection":
setattr(obj, att, values)
else:
raise RuntimeError(
"attribute '{att}' has unknown attribute type '{atype}'".format(
att=att, atype=self.cls.attspec[att]
)
)
obj.clear_removed_entities()
obj.dirty = 0
return obj
[docs] def put(self, obj):
"""Put the object instance's attributes to the mapped data node in the database"""
if not self.drv:
raise ArgError("put() requires Neo4j driver instance")
pass
with self.drv.session() as session:
result = None
with session.begin_transaction() as tx:
for qry in self.put_q(obj):
result = tx.run(qry)
obj.neoid = result.single().value("id(n)")
if obj.neoid is None:
raise RuntimeError(
"no neo4j id retrived on put for obj '{name}'".format(
name=getattr(obj, self.cls.mapspec()["key"])
)
)
for att in self.cls.mapspec()["relationship"]:
values = getattr(obj, att)
if not values:
continue
if isinstance(values, CollValue):
items = values.values()
else:
items = [values]
for val in items:
if val.neoid is not None:
continue
# put val as a node
for qry in ObjectMap(cls=type(val), drv=self.drv).put_q(val):
result = tx.run(qry)
val.neoid = result.single().value("id(n)")
if val.neoid is None:
raise RuntimeError(
"no neo4j id retrived on put for obj '{name}'".format(
name=val[type(val).mapspec()["key"]]
)
)
val.dirty = 1
ObjectMap.cache[val.neoid] = val
for qry in self.put_attr_q(obj, att, values):
tx.run(qry)
# drop removed entities here
while obj.removed_entities:
ent = obj.removed_entities.pop()
self.drop(obj, *ent, tx)
ObjectMap.cache[obj.neoid] = obj
obj.dirty = 0
return obj
[docs] def rm(self, obj, force=False):
"""'Delete' the object's mapped node from the database"""
if not self.drv:
raise ArgError("rm() requires Neo4j driver instance")
if obj.neoid is None:
raise ArgError("object must be mapped (i.e., obj.neoid must be set)")
with self.drv.session() as session:
result = session.run(self.rm_q(obj, force))
s = result.single()
if s is None:
warn("rm() - corresponding db node not found")
else:
return s.value()
[docs] def add(self, obj, att, tgt):
"""Create a link between an object instance and a target object in the database.
This represents adding an object-valued attribute to the object.
"""
if not self.drv:
raise ArgError("add() requires Neo4j driver instance")
with self.drv.session() as session:
for qry in self.put_attr_q(obj, att, tgt):
result = session.run(qry)
tgt_id = result.single().value()
if tgt_id is None:
warn("add() - corresponding db node not found")
return tgt_id
[docs] def drop(self, obj, att, tgt, tx=None):
"""Remove an existing link between an object instance and a target object in the database.
This represents dropping an object-valued attribute from the object.
"""
if not self.drv:
raise ArgError("rm() requires Neo4j driver instance")
# if the tgt is not in the database, then dropping it is a no-op:
if not tgt.neoid:
return
if tx:
result = None
for qry in self.rm_attr_q(obj, att, tgt):
result = tx.run(qry)
s = result.single()
if s is None:
warn("drop() - corresponding target db node not found")
else:
return s.value()
else:
with self.drv.session() as session:
result = None
for qry in self.rm_attr_q(obj, att, tgt):
result = session.run(qry)
s = result.single()
if s is None:
warn("drop() - corresponding target db node not found")
else:
return s.value()
[docs] def get_owners(self, obj):
"""Get the nodes which are linked to the object instance (the owners of the object)"""
if not self.drv:
raise ArgError("get_owners() requires Neo4j driver instance")
ret = []
with self.drv.session() as session:
result = session.run(self.get_owners_q(obj))
for rec in result:
if rec["reln"][0] == "_": # skip _prev, _next, and convenience links
break
ocls = None
for lbl in rec["a"].labels:
ocls = self.cls_by_label(lbl)
if ocls:
break
assert ocls
o = ocls(rec["a"])
# creating object but not putting in the cache - why?
keys = self.keys_by_cls_and_reln(type(o), rec["reln"])
# obj.belongs[(id(o),*keys)] = o
# not setting the belongs on the obj - why?
ret.append((o, keys))
return ret
[docs] def get_q(self, obj):
if not isinstance(obj, self.cls):
raise ArgError(
"arg1 must be object of class {cls}".format(cls=self.cls.__name__)
)
if obj.neoid is None:
raise ArgError("object must be mapped (i.e., obj.neoid must be set)")
return "MATCH (n:{lbl}) WHERE id(n)={neoid} RETURN n,id(n)".format(
lbl=self.cls.mapspec()["label"], neoid=obj.neoid
)
[docs] def get_by_id_q(self):
return "MATCH (n:{lbl}) WHERE id(n)=$id and n._to IS NULL RETURN id(n)".format(
lbl=self.cls.mapspec()["label"]
)
[docs] def get_by_node_nanoid_q(self):
"""PROTOTYPE"""
return "MATCH (n:node) WHERE n.nanoid=$nanoid and n._to is NULL RETURN id(n)"
[docs] def get_attr_q(self, obj, att):
if not isinstance(obj, self.cls):
raise ArgError(
"arg1 must be object of class {cls}".format(cls=self.cls.__name__)
)
if obj.neoid is None:
return ""
label = self.cls.mapspec()["label"]
if att in self.cls.mapspec()["property"]:
pr = self.cls.mapspec()["property"][att]
return "MATCH (n:{lbl}) WHERE id(n)={neoid} RETURN n.{pr}".format(
lbl=label, neoid=obj.neoid, pr=pr
)
elif att in self.cls.mapspec()["relationship"]:
spec = self.cls.mapspec()["relationship"][att]
end_cls = spec["end_cls"]
if isinstance(end_cls, str):
end_cls = {end_cls}
end_lbls = [eval(x).mapspec()["label"] for x in end_cls]
rel = re.sub("^([^:]?)(:[a-zA-Z0-9_]+)(.*)$", r"\1-[\2]-\3", spec["rel"])
if len(end_lbls) == 1:
qry = "MATCH (n:{llbl}){rel}(a:{rlbl}) WHERE id(n)={neoid} RETURN a".format(
neoid=obj.neoid, llbl=label, rel=rel, rlbl=end_lbls[0]
)
if self.cls.attspec[att] == "object":
qry += " LIMIT 1"
return qry
else: # multiple end classes possible
cond = []
for l in end_lbls:
cond.append("'{lbl}' IN labels(a)".format(lbl=l))
cond = " OR ".join(cond)
return "MATCH (n:{lbl}){rel}(a) WHERE id(n)={neoid} AND ({cond}) RETURN a".format(
lbl=label, rel=rel, neoid=obj.neoid, cond=cond
)
else:
raise ArgError(
"'{att}' is not a registered attribute for class '{cls}'".format(
att=att, cls=self.cls.__name__
)
)
[docs] def get_owners_q(self, obj):
if not isinstance(obj, self.cls):
raise ArgError(
"arg1 must be object of class {cls}".format(cls=self.cls.__name__)
)
if obj.neoid is None:
raise ArgError("object must be mapped (i.e., obj.neoid must be set)")
label = self.cls.mapspec()["label"]
return "MATCH (n:{lbl})<-[r]-(a) WHERE id(n)={neoid} RETURN TYPE(r) as reln, a".format(
neoid=obj.neoid, lbl=label
)
[docs] def put_q(self, obj):
if not isinstance(obj, self.cls):
raise ArgError(
"arg1 must be object of class {cls}".format(cls=self.cls.__name__)
)
props = {}
null_props = []
for pr in self.cls.mapspec()["property"]:
if getattr(obj, pr) is None:
null_props.append(self.cls.mapspec()["property"][pr])
else:
props[self.cls.mapspec()["property"][pr]] = getattr(obj, pr)
stmts = []
if obj.neoid is not None:
set_clause = []
for pr in props:
set_clause.append(
"n.{pr}={val}".format(pr=pr, val=ObjectMap._quote_val(props[pr]))
)
set_clause = "SET " + ",".join(set_clause)
stmts.append(
"MATCH (n:{lbl}) WHERE id(n)={neoid} {set_clause} RETURN n,id(n)".format(
lbl=self.cls.mapspec()["label"],
neoid=obj.neoid,
set_clause=set_clause,
)
)
for pr in null_props:
stmts.append(
"MATCH (n:{lbl}) WHERE id(n)={neoid} REMOVE n.{pr} RETURN n,id(n)".format(
lbl=self.cls.mapspec()["label"], neoid=obj.neoid, pr=pr
)
)
return stmts
else:
spec = []
for pr in props:
spec.append(
"{pr}:{val}".format(pr=pr, val=ObjectMap._quote_val(props[pr]))
)
spec = ",".join(spec)
return [
"CREATE (n:%s {%s}) RETURN n,id(n)"
% (self.cls.mapspec()["label"], spec)
]
[docs] def put_attr_q(self, obj, att, values):
if not isinstance(obj, self.cls):
raise ArgError(
"arg1 must be object of class {cls}".format(cls=self.cls.__name__)
)
if obj.neoid is None:
raise ArgError("object must be mapped (i.e., obj.neoid must be set)")
if not isinstance(values, (Entity, list, CollValue)):
raise ArgError(
"'values' must be a list of values suitable for the attribute"
)
if isinstance(values, CollValue):
values = values.values()
elif isinstance(values, Entity):
values = [values]
if att in self.cls.mapspec()["property"]:
return "MATCH (n:{lbl}) WHERE id(n)={neoid} SET {pr}={val} RETURN id(n)".format(
lbl=self.cls.mapspec()["label"],
neoid=obj.neoid,
pr=self.cls.mapspec()["property"][att],
val=ObjectMap._quote_val(values[0]),
)
elif att in self.cls.mapspec()["relationship"]:
if not self._check_values_list(att, values):
raise ArgError(
"'values' must be a list of mapped Entity objects of "
"the appropriate subclass for attribute '{att}'".format(
att=att
)
)
stmts = []
spec = self.cls.mapspec()["relationship"][att]
end_cls = spec["end_cls"]
if isinstance(end_cls, str):
end_cls = {end_cls}
end_lbls = [eval(x).mapspec()["label"] for x in end_cls]
rel = re.sub("^([^:]?)(:[a-zA-Z0-9_]+)(.*)$", r"\1-[\2]-\3", spec["rel"])
cond = []
for l in end_lbls:
cond.append("'{lbl}' IN labels(a)".format(lbl=l))
cond = " OR ".join(cond)
for avalue in values:
if len(end_lbls) == 1:
stmts.append(
"MATCH (n:{lbl}),(a:{albl}) WHERE id(n)={neoid} AND id(a)={aneoid} "
"MERGE (n){rel}(a) RETURN id(a)".format(
lbl=self.cls.mapspec()["label"],
albl=end_lbls[0],
neoid=obj.neoid,
aneoid=avalue.neoid,
rel=rel,
)
)
else:
stmts.append(
"MATCH (n:{lbl}),(a) WHERE id(n)={neoid} AND id(a)={aneoid} AND ({cond}) "
"MERGE (n){rel}(a) RETURN id(a)".format(
lbl=self.cls.mapspec()["label"],
cond=cond,
neoid=obj.neoid,
aneoid=avalue.neoid,
rel=rel,
)
)
return stmts
else:
raise ArgError(
"'{att}' is not a registered attribute for class '{cls}'".format(
att=att, cls=self.cls.__name__
)
)
[docs] def rm_q(self, obj, detach=False):
if not isinstance(obj, self.cls):
raise ArgError(
"arg1 must be object of class {cls}".format(cls=self.cls.__name__)
)
if obj.neoid is None:
raise ArgError("object must be mapped (i.e., obj.neoid must be set)")
dlt = "DETACH DELETE n" if detach else "DELETE n"
qry = "MATCH (n:{lbl}) WHERE id(n)={neoid} ".format(
lbl=self.cls.mapspec()["label"], neoid=obj.neoid
)
return qry + dlt
pass
[docs] def rm_attr_q(self, obj, att, values=None):
if not isinstance(obj, self.cls):
raise ArgError(
"arg1 must be object of class {cls}".format(cls=self.cls.__name__)
)
if obj.neoid is None:
raise ArgError("object must be mapped (i.e., obj.neoid must be set)")
if values and not isinstance(values, list):
values = [values]
if att in self.cls.mapspec()["property"]:
return "MATCH (n:{lbl}) WHERE id(n)={neoid} REMOVE n.{att} RETURN id(n)".format(
lbl=self.cls.mapspec()["label"], neoid=obj.neoid, att=att
)
elif att in self.cls.mapspec()["relationship"]:
many = self.cls.attspec[att] == "collection"
spec = self.cls.mapspec()["relationship"][att]
end_cls = spec["end_cls"]
if isinstance(end_cls, str):
end_cls = {end_cls}
end_lbls = [eval(x).mapspec()["label"] for x in end_cls]
cond = []
for l in end_lbls:
cond.append("'{lbl}' IN labels(a)".format(lbl=l))
cond = " OR ".join(cond)
rel = re.sub("^([^:]?)(:[a-zA-Z0-9_]+)(.*)$", r"\1-[r\2]-\3", spec["rel"])
if values[0] == ":all":
if len(end_lbls) == 1:
return "MATCH (n:{lbl}){rel}(a:{albl}) WHERE id(n)={neoid} DELETE r RETURN id(n),id(a)".format(
lbl=self.cls.mapspec()["label"],
albl=end_lbls[0],
rel=rel,
neoid=obj.neoid,
)
else:
return "MATCH (n:{lbl}){rel}(a) WHERE id(n)={neoid} AND ({cond}) DELETE r RETURN id(n)".format(
lbl=self.cls.mapspec()["label"],
cond=cond,
neoid=obj.neoid,
rel=rel,
)
else:
stmts = []
if not self._check_values_list(att, values):
raise ArgError(
"'values' must be a list of mapped Entity objects of the "
"appropriate subclass for attribute '{att}'".format(att=att)
)
for val in values:
qry = ""
if len(end_lbls) == 1:
qry = "MATCH (n:{lbl}){rel}(a:{albl}) WHERE id(n)={neoid} AND id(a)={aneoid} " \
"DELETE r RETURN id(n),id(a)".format(
lbl=self.cls.mapspec()["label"],
albl=end_lbls[0],
neoid=obj.neoid,
aneoid=val.neoid,
rel=rel)
else:
qry = "MATCH (n:{lbl}){rel}(a) WHERE id(n)={neoid} AND id(a)={aneoid} AND ({cond}) " \
"DELETE r RETURN id(n),id(a)".format(
lbl=self.cls.mapspec()["label"],
neoid=obj.neoid,
aneoid=val.neoid,
cond=cond,
rel=rel)
stmts.append(qry)
return stmts
else:
raise ArgError(
"'{att}' is not a registered attribute for class '{cls}'".format(
att=att, cls=self.cls.__name__
)
)
[docs] def _check_values_list(self, att, values):
v = values
if isinstance(values, CollValue):
v = values.values()
chk = [x.neoid is None for x in v]
if True in chk:
return False
end_cls = self.cls.mapspec()["relationship"][att]["end_cls"]
if isinstance(end_cls, str):
end_cls = {end_cls}
cls_set = tuple([eval(x) for x in end_cls])
chk = [isinstance(x, cls_set) for x in v]
if True in chk:
return True
return False