Source code for bento_meta.mdb.loaders

"""
mdb.loaders: load models into an MDB instance consistently
"""

from tqdm import tqdm
from bento_meta.mdb import WriteableMDB
from bento_meta.util.cypher.entities import (
    N, R, P, T, G, _plain_var
    )
from bento_meta.util.cypher.clauses import (
    Match, Create, Merge, Set, OnMatchSet, OnCreateSet,
    Remove, Statement,
    )


[docs]def load_mdf(mdf, mdb, _commit=None): """Load an MDF object into an MDB instance""" load_model(mdf.model, mdb, _commit)
[docs]def load_model(model, mdb, _commit=None): """Load a model object into an MDB instance.""" if not isinstance(mdb, WriteableMDB): raise RuntimeError("mdb object must be a WriteableMDB") cstmts = load_model_statements(model, _commit) for stmt in tqdm(cstmts): mdb.put_with_statement(str(stmt), stmt.params)
[docs]def load_model_statements(model, _commit=None): """ Create Cypher statements from a model to load it de novo into an MDB instance. :param :class:`mdb.Model` model: Model instance for loading :param str _commit: 'Commit string' for marking entities in DB. If set, this will override _commit attributes already existing on Model entities. """ cStatements = [] cNodes = {} for nd in model.nodes: node = model.nodes[nd] cNode = _cEntity(node, model, _commit) cStatements.append( Statement( Merge(cNode), use_params=True ) ) cNodes[node.handle] = cNode cProps = [] if node.tags: cStatements.extend( _tag_statements(node, cNode, _commit) ) if node.props: cStatements.extend( _prop_statements(node, cNode, model, _commit) ) if node.concept: cStatements.extend( _annotate_statements(node, cNode, _commit) ) # node nodes and node-property nodes now exist # nodes are linked to properties for rl in model.edges: edge = model.edges[rl] cEdge = _cEntity(edge, model, _commit) cSrc = _cEntity(edge.src, model, _commit) cDst = _cEntity(edge.dst, model, _commit) if edge.multiplicity: cEdge._add_props({"multiplicity": edge.multiplicity}) if edge.is_required: cEdge._add_props({"is_required": edge.is_required}) # ensure uniqueness for merge cEdge._add_props({"__u":str(rl)}) cStatements.extend([ Statement( Create(cEdge), use_params=True ), Statement( Match(cEdge, cSrc, cDst), Merge( R(Type="has_src").relate(_plain_var(cEdge), _plain_var(cSrc)) ), Merge( R(Type="has_dst").relate(_plain_var(cEdge), _plain_var(cDst)) ), use_params=True )]) if edge.tags: cStatements.extend( _tag_statements(edge, cEdge, _commit) ) if edge.props: cStatements.extend( _prop_statements(edge, cEdge, model, _commit) ) if edge.concept: cStatements.extend( _annotate_statements(edge, cEdge, _commit) ) cStatements.append( Statement( Match(cEdge), Remove(cEdge, prop="__u"), use_params=True ) ) # edge node and edge-property nodes now exist # now go through all properties that the model object knows about # - these should already have been created or merged, but # - if the property list on the model is missing any properties # - that nodes or edges have on themselves, then this indicates # - a bug/inconsistency - and the property won't receive its # - value_set/term list in the DB in the following code. for pr in [x for x in model.props.values() if x.value_domain == 'value_set']: cValueSet = _cEntity(pr.value_set, model, _commit) cProp = _cEntity(pr, model, _commit) cStatements.extend([ Statement( Merge(cValueSet), use_params=True ), Statement( Match(cProp, cValueSet), Merge(R(Type="has_value_set"). relate(_plain_var(cProp), _plain_var(cValueSet))), use_params=True ) ]) if pr.concept: cStatements.extend( _annotate_statements(pr, cProp, _commit) ) for tm in pr.terms.values(): cTerm = _cEntity(tm, model, _commit) cStatements.extend([ Statement( Merge(cTerm), use_params=True ), Statement( Match(cValueSet, cTerm), Merge(R(Type="has_term").relate(_plain_var(cValueSet), _plain_var(cTerm))), use_params=True ) ]) return cStatements
[docs]def _cEntity(ent, model, _commit): label = type(ent).__name__.lower() cEnt = None # translate labels if label == 'edge': label = 'relationship' if label == 'valueset': label = 'value_set' # special handling if label == 'term': cEnt = N(label=label, props={"value": ent.value}) if ent.origin_name: cEnt._add_props({"origin_name": ent.origin_name}) if ent.origin_id: cEnt._add_props({"origin_id": ent.origin_id}) if ent.origin_version: cEnt._add_props({"origin_version": ent.origin_version}) if ent.origin_definition: cEnt._add_props({"origin_defintion": ent.origin_definition}) if ent.handle: cEnt._add_props({"handle": ent.handle}) elif label == 'value_set': cEnt = N(label='value_set', props={"handle": ent.handle}) if ent.url: cEnt._add_props({"url": ent.url}) elif label == 'concept': cEnt = N(label='concept') elif label == 'tag': cEnt = N(label="tag", props={"key": ent.key, "value": ent.value}) else: cEnt = N(label=label, props={"handle": ent.handle, "model": model.handle}) # all ents if _commit: cEnt._add_props({"_commit": _commit}) else: if ent._commit: cEnt._add_props({"_commit": ent._commit}) if ent.nanoid: cEnt._add_props({"nanoid": ent.nanoid}) if ent.desc: cEnt._add_props({"desc": ent.desc}) return cEnt
[docs]def _tag_statements(ent, cEnt, _commit): stmts = [] cTags = [] if ent.tags: for t in ent.tags.values(): cTag = _cEntity(t, None, _commit) cTags.append(cTag) for ct in cTags: stmts.append( Statement( Match(cEnt), Merge(R(Type="has_tag").relate(_plain_var(cEnt), ct)), use_params=True ) ) return stmts
[docs]def _prop_statements(ent, cEnt, model, _commit): stmts = [] for p in ent.props.values(): cProp = _cEntity(p, model, _commit) cProp._add_props({"value_domain": p.value_domain}) stmts.extend([ Statement( Merge(cProp), use_params=True ), Statement( Match(cEnt, cProp), Merge(R(Type="has_property").relate( _plain_var(cEnt), _plain_var(cProp))), use_params=True ), ]) if p.tags: stmts.extend( _tag_statements(p, cProp, _commit) ) return stmts
[docs]def _annotate_statements(ent, cEnt, _commit): stmts = [] if not ent.concept: return [] cConcept = _cEntity(ent.concept, None, _commit) stmts = [] for tm in ent.concept.terms.values(): cTerm = _cEntity(tm, None, _commit) stmts.extend([ Statement( Merge(cTerm), use_params=True ), Statement( Match(cEnt), Merge(R(Type="has_concept").relate( _plain_var(cEnt), cConcept)), use_params=True ), Statement( Match(R(Type="has_concept").relate( cEnt, cConcept), cTerm), Merge(R(Type="represents").relate( _plain_var(cTerm), _plain_var(cConcept))), use_params=True ), ]) return stmts