Source code for bento_meta.model

"""
bento_meta.model
================

This module contains :class:`Model`, a class for managing data models housed
in the Bento Metamodel Database. Models are built from `bento_meta.Entity`
subclasses (see :mod:`bento_meta.objects`). A Model can be used with or 
without a Neo4j database connection.

"""
import re
import sys
sys.path.append("..")
from uuid import uuid4
from warnings import warn
import neo4j.graph
from bento_meta.mdb import MDB, make_nanoid
from bento_meta.object_map import ObjectMap
from bento_meta.entity import Entity, ArgError
from bento_meta.objects import (
    Node,
    Property,
    Edge,
    Term,
    ValueSet,
    Concept,
    Predicate,
    Origin,
    Tag,
)
# from pdb import set_trace


[docs]class Model(object): def __init__(self, handle=None, mdb=None): """Model constructor. :param str handle: A string name for the model. Corresponds to the model property in MDB database nodes. :param bento_meta.mdb.MDB mdb: An MDB object containing the db connection (see :class:`bento_meta.mdb.MDB`) """ if not handle: raise ArgError("model requires arg 'handle' set") self.handle = handle self._mdb = None self.nodes = {} self.edges = {} # keys are (edge.handle, src.handle, dst.handle) tuples self.props = {} # keys are ({edge|node}.handle, prop.handle) tuples self.terms = {} # keys are (term.handle, term.origin) tuples self.removed_entities = [] if mdb: self.mdb = mdb
[docs] @classmethod def versioning(cls, on=None): """Get or set versioning state. :param boolean on: True, apply versioning. False, do not. Note: this delegates to :meth:`Entity.versioning`. """ if on is None: return Entity.versioning_on Entity.versioning_on = on return Entity.versioning_on
[docs] @classmethod def set_version_count(cls, ct): """Set the integer version counter. :param int ct: Set version counter to this value. Note: this delegates to :meth:`Entity.set_version_count`. """ Entity.set_version_count(ct)
@property def drv(self): """Neo4j database driver from MDB object""" return self._mdb.driver if self._mdb else None @property def mdb(self): return self._mdb @mdb.setter def mdb(self, value): if isinstance(value, MDB): self._mdb = value for cls in (Node, Property, Edge, Term, ValueSet, Concept, Predicate, Origin, Tag): cls.object_map = ObjectMap(cls=cls, drv=value.driver) elif not value: self._mdb = None for cls in (Node, Property, Edge, Term, ValueSet, Concept, Origin, Tag): cls.object_map = None else: raise ArgError( "mdb= arg must be a bento_meta.mdb.MDB object" )
[docs] def add_node(self, node=None): """Add a :class:`Node` to the model. :param Node node: A :class:`Node` instance, a :class:`neo4j.graph.Node`, or a dict The model attribute of ``node`` is set to `Model.handle` """ if not node: raise ArgError("arg must be Node, dict, or graph.Node") if isinstance(node, (dict, neo4j.graph.Node)): node = Node(node) if not node.model: node.model = self.handle for p in node.props.values(): self.add_prop(node, p) self.nodes[node.handle] = node return node
[docs] def add_edge(self, edge=None): """Add an :class:`Edge` to the model. :param Edge edge: A :class:`Edge` instance, a :class:`neo4j.graph.Node`, or a dict The model attribute of ``edge`` is set to `Model.handle` """ if not edge: raise ArgError("arg must be Edge, dict, or graph.Node") if isinstance(edge, (dict, neo4j.graph.Node)): edge = Edge(edge) if not edge.src or not edge.dst: raise ArgError("edge must have both src and dst set") if not edge.model: edge.model = self.handle if not self.contains(edge.src): warn("Edge source node not yet in model; adding it") self.add_node(edge.src) if not self.contains(edge.dst): warn("Edge destination node not yet in model; adding it") self.add_node(edge.dst) for p in edge.props.values(): self.add_prop(edge, p) self.edges[edge.triplet] = edge return edge
[docs] def add_prop(self, ent, prop=None): """Add a :class:`Property` to the model. :param Node|Edge ent: Attach ``prop`` to this entity :param Property prop: A :class:`Property` instance, a :class: `neo4j.graph.Node`, or a dict :param boolean reuse: If True, reuse existing property with same handle The model attribute of ``prop`` is set to `Model.handle`. Within a model, :class:`Property` entities are unique with respect to their handle (but can be reused). This method will look for an existing property within the model with the given handle, and add an item to Model.props pointing to it if found. """ if not isinstance(ent, (Node, Edge)): raise ArgError("arg 1 must be Node or Edge") if not prop: raise ArgError("arg 2 must be Property, dict, or graph.Node") if isinstance(prop, (dict, neo4j.graph.Node)): prop = Property(prop) if not prop.model: prop.model = self.handle key = [ent.handle] if isinstance(ent, Node) else list(ent.triplet) key.append(prop.handle) ent.props[getattr(prop, type(prop).mapspec()["key"])] = prop if not tuple(key) in self.props: self.props[tuple(key)] = prop return prop
[docs] def annotate(self, ent, term): """ Associate a single :class:`Term` with an :class:`Entity`. This creates a Concept entity if needed and links both the Entity and the Term to the concept, in keeping with the MDB spec. It supports the Term key in MDF. :param Entity ent: :class:`Entity` object to annotate :param Term term: :class:`Term` object to describe the Entity """ if not isinstance(ent, Entity): raise ArgError("arg1 must be Entity") if not isinstance(term, Term): raise ArgError("arg2 must be Term") if not ent.concept: ent.concept = Concept({"nanoid":make_nanoid()}) term_key = term.handle if term.handle else term.value; if (term_key, term.origin_name) in ent.concept.terms: raise ValueError("Concept already represented by a Term with handle or value '{}'" "and origin_name '{}'".format(term_key, term.origin_name)) ent.concept.terms[(term_key, term.origin_name)] = term self.terms[(term_key, term.origin_name)] = term
[docs] def add_terms(self, prop, *terms): """Add a list of :class:`Term` and/or strings to a :class:`Property` with a value domain of ``value_set`` :param Property prop: :class:`Property` to modify :param list terms: A list of :class:`Term` instances and/or str :class:`Term` instances are created for strings; `Term.value` and `Term.handle` is set to the string. """ if not isinstance(prop, Property): raise ArgError("arg1 must be Property") if not re.match("value_set|enum", prop.value_domain): raise AttributeError( "Property value domain is not value_set or enum, can't add terms" ) if not prop.value_set: warn("Creating ValueSet object for Property " + prop.handle) prop.value_set = ValueSet({"prop": prop, "_id": str(uuid4())}) prop.value_set.handle = self.handle + prop.value_set._id[0:8] for t in terms: if isinstance(t, str): warn("Creating Term object for string '{term}'".format(term=t)) t = Term({"handle":t, "value": t}) elif not isinstance(t, Term): raise ArgError("encountered arg that was not a str or Term object") tm_key = t.handle if t.handle else t.value prop.value_set.terms[tm_key] = t self.terms[(tm_key, t.origin_name)] = t
[docs] def rm_node(self, node): """Remove a :class:`Node` from the Model instance. :param Node node: Node to be removed Note: A node can't be removed if it is participating in an edge (i.e., if the node is some edge's src or dst attribute) *Clarify what happens in the Model object, in the database when versioning is off, in the database when versioning is on* """ if not isinstance(node, Node): raise ArgError("arg must be a Node object") if not self.contains(node): warn( "node '{node}' not contained in model '{model}'".format( node=node.handle, model=self.handle ) ) return if self.edges_by_src(node) or self.edges_by_dst(node): raise ValueError( "can't remove node '{node}', it is participating in edges".format( node=node.handle ) ) for p in node.props: try: del self.props[(node.handle, p.handle)] except: pass del self.nodes[node.handle] self.removed_entities.append(node) return node
[docs] def rm_edge(self, edge): """Remove an :class:`Edge` instance from the Model instance. :param Edge edge: Edge to be removed *Clarify what happens in the Model object, in the database when versioning is off, in the database when versioning is on* """ if not isinstance(edge, Edge): raise ArgError("arg must be an Edge object") if not self.contains(edge): warn( "edge '{edge}' not contained in model '{model}'".format( edge=edge.triplet, model=self.handle ) ) return for p in edge.props: try: k = list(edge.triplet) k.append(p.handle) del self.props[tuple(k)] except: pass del self.edges[edge.triplet] edge.src = None edge.dst = None self.removed_entities.append(edge) return edge
[docs] def rm_prop(self, prop): """Remove a :class:`Property` instance from the Model instance. :param Property prop: Property to be removed *Clarify what happens in the Model object, in the database when versioning is off, in the database when versioning is on* """ if not isinstance(prop, Property): raise ArgError("arg must be a Property object") if not self.contains(prop): warn( "prop '{prop}' not contained in model '{model}'".format( prop=prop.handle, model=self.handle ) ) return for okey in prop.belongs: owner = prop.belongs[okey] (i, att, key) = okey getattr(owner, att)[key] == None k = [owner.handle] if isinstance(owner, Node) else list(owner.triplet) k.append(key) del self.props[tuple(k)] self.removed_entities.append(prop) pass
[docs] def rm_term(self, term): """Not implemented.""" if not isinstance(term, Term): raise ArgError("arg must be a Term object") pass
[docs] def assign_edge_end(self, edge=None, end=None, node=None): """Move the src or dst of an :class:`Edge` to a different :class:`Node`. :param Edge edge: Edge to manipulate :param str end: Edge end to change (src|dst) :param Node node: Node to be connected Note: Both ``node`` and ``edge`` must be present in the Model instance (via :meth:`add_node` and :meth:`add_edge`) """ if not isinstance(edge, Edge): raise ArgError("edge= must an Edge object") if not isinstance(node, Node): raise ArgError("node= must a Node object") if end not in ["src", "dst"]: raise ArgError("end= must be one of 'src' or 'dst'") if not self.contains(edge) or not self.contains(node): warn("model must contain both edge and node") return del self.edges[edge.triplet] setattr(edge, end, node) self.edges[edge.triplet] = edge return edge
[docs] def contains(self, ent): """Ask whether an entity is present in the Model instance. :param Entity ent: Entity in question Note: Only works on Nodes, Edges, and Properties """ if not isinstance(ent, Entity): warn("argument is not an Entity subclass") return if isinstance(ent, Node): return ent in set(self.nodes.values()) if isinstance(ent, Edge): return ent in set(self.edges.values()) if isinstance(ent, Property): return ent in set(self.props.values()) if isinstance(ent, Term): return ent in set(self.terms.values()) pass
[docs] def edges_in(self, node): """Get all :class:`Edge` that have a given :class:`Node` as their dst attribute :param Node node: The node :return: list of :class:`Edge` """ if not isinstance(node, Node): raise ArgError("arg must be Node") return [self.edges[i] for i in self.edges if i[2] == node.handle] pass
[docs] def edges_out(self, node): """Get all :class:`Edge` that have a given :class:`Node` as their src attribute :param Node node: The node :return: list of :class:`Edge` """ if not isinstance(node, Node): raise ArgError("arg must be Node") return [self.edges[i] for i in self.edges if i[1] == node.handle] pass
[docs] def edges_by(self, key, item): if key not in ["src", "dst", "type"]: raise ArgError("arg 'key' must be one of src|dst|type") if isinstance(item, Node): idx = 1 if key == "src" else 2 return [self.edges[x] for x in self.edges if x[idx] == item.handle] else: return [self.edges[x] for x in self.edges if x[0] == item]
[docs] def edges_by_src(self, node): """Get all :class:`Edge` that have a given :class:`Node` as their src attribute :param Node node: The node :return: list of :class:`Edge` """ return self.edges_by("src", node)
[docs] def edges_by_dst(self, node): """Get all :class:`Edge` that have a given :class:`Node` as their dst attribute :param Node node: The node :return: list of :class:`Edge` """ return self.edges_by("dst", node)
[docs] def edges_by_type(self, edge_handle): """Get all :class:`Edge` that have a given edge type (i.e., handle) :param str edge_handle: The edge type :return: list of :class:`Edge` """ if not isinstance(edge_handle, str): raise ArgError("arg must be str") return self.edges_by("type", edge_handle)
[docs] def dget(self, refresh=False): """Pull model from MDB into this Model instance, based on its handle Note: is a noop if `Model.mdb` is unset. """ if not self.mdb: return if refresh: ObjectMap.clear_cache() with self.drv.session() as session: result = session.run( "match p = (s:node)<-[:has_src]-(r:relationship)-[:has_dst]->(d:node) " "where s.model=$hndl and r.model=$hndl and d.model=$hndl return p", {"hndl": self.handle}, ) for rec in result: (ns, nr, nd) = rec["p"].nodes ns = Node(ns) nr = Edge(nr) nd = Node(nd) ObjectMap.cache[ns.neoid] = ns ObjectMap.cache[nr.neoid] = nr ObjectMap.cache[nd.neoid] = nd nr.src = ns nr.dst = nd self.nodes[ns.handle] = ns self.nodes[nd.handle] = nd self.edges[nr.triplet] = nr with self.drv.session() as session: result = session.run( "match (n:node)-[:has_property]->(p:property) where n.model=$hndl and p.model=$hndl return id(n), p", {"hndl": self.handle}, ) for rec in result: n = ObjectMap.cache.get(rec["id(n)"]) if n is None: warn( "node with id {nid} not yet retrieved".format(nid=rec["id(n)"]) ) continue p = Property(rec["p"]) ObjectMap.cache[p.neoid] = p self.props[(n.handle, p.handle)] = p n.props[p.handle] = p p.dirty = -1 with self.drv.session() as session: result = session.run( "match (r:relationship)-[:has_property]->(p:property) " "where r.model=$hndl and p.model=$hndl return id(r), p", {"hndl": self.handle}, ) for rec in result: e = ObjectMap.cache.get(rec["id(r)"]) if e is None: warn( "relationship with id {rid} not yet retrieved".format( rid=rec["id(r)"] ) ) continue p = Property(rec["p"]) ObjectMap.cache[p.neoid] = p k = list(e.triplet) k.append(p.handle) self.props[tuple(k)] = p e.props[p.handle] = p p.dirty = -1 return self
[docs] def dput(self): """Push this Model's objects to MDB. Note: is a noop if `Model.mdb` is unset. """ if not self.mdb: return seen = {} def do_(obj): if id(obj) in seen: return seen[id(obj)] = 1 if obj.dirty == 1: obj.dput() atts = [x for x in type(obj).attspec if type(obj).attspec[x] == "object"] for att in atts: ent = getattr(obj, att) if ent: do_(ent) atts = [ x for x in type(obj).attspec if type(obj).attspec[x] == "collection" ] for att in atts: ents = getattr(obj, att) if ents: for ent in ents: do_(ents[ent]) for e in self.removed_entities: # detach with self.drv.session() as session: result = session.run( "match (e)-[r]-() where id(e)=$eid delete r return id(e)", {"eid": e.neoid}, ) for rec in result: pass for e in self.nodes.values(): do_(e) for e in self.edges.values(): do_(e) for e in self.props.values(): do_(e) return