This module contains :class:`MDB`, with machinery for efficiently
querying a Neo4j instance of a Metamodel Database.
import os
import re
from warnings import warn
from functools import wraps
from neo4j import GraphDatabase
from nanoid import generate as nanoid_generate
from pdb import set_trace
# Decorator functions to produce executed transactions based on an
# underlying query/param function:
[docs]def read_txn(func):
"""Decorates a query function to run a read transaction based on
its query.
Query function should return a tuple (qry_string, param_dict).
Returns list of driver Records."""
def rd(self, *args, **kwargs):
def txn_q(tx):
(qry,parms)=func(self, *args, **kwargs)
result = tx.run(qry,parameters=parms)
return [rec for rec in result]
with self.driver.session() as session:
result = session.read_transaction(txn_q)
return result
return rd
[docs]def read_txn_value(func):
"""Decorates a query function to run a read transaction based on
its query.
Query function should return a tuple (qry_string, param_dict, values_key).
Returns list of values for key specified by query function."""
def rd(self, *args, **kwargs):
def txn_q(tx):
(qry,parms,values_key)=func(self, *args, **kwargs)
result = tx.run(qry,parameters=parms)
return result.value(values_key)
with self.driver.session() as session:
result = session.read_transaction(txn_q)
return result
return rd
[docs]def read_txn_data(func):
"""Decorates a query function to run a read transaction based on
its query.
Query function should return a tuple (qry_string, param_dict).
Returns records as a list of simple dicts."""
def rd(self, *args, **kwargs):
(qry,parms)=func(self, *args, **kwargs)
def txn_q(tx):
result = tx.run(qry,parameters=parms)
return result.data()
with self.driver.session() as session:
result = session.read_transaction(txn_q)
if len(result):
return result
return None
return rd
[docs]class MDB:
def __init__(self, uri=os.environ.get("NEO4J_MDB_URI"),
self.uri = uri
self.user = user
self.password = password
self.driver = None
self.driver = GraphDatabase.driver(self.uri,
auth=(self.user, self.password))
except Exception as e:
warn("MDB not connected: {}".format(e))
self._txfns = {}
""" Create an :class:`MDB` object, with a connection to a Neo4j instance of a metamodel database.
:param bolt_url uri: The Bolt protocol endpoint to the Neo4j instance (default, use the
``NEO4J_MDB_URI`` env variable)
:param str user: Username for Neo4j access (default, use the ``NEO4J_MDB_USER`` env variable)
:param str password: Password for user (default, use the ``NEO4J_MDB_PASS`` env variable)"""
[docs] def close(self):
[docs] def register_txfn(self, name, fn):
"""Register a transaction function
(see https://neo4j.com/docs/api/python-driver/current/api.html#managed-transactions-transaction-functions)
with the class for later use."""
self._txfns[name] = fn
# def run_txfn(self, name, *args, **kwargs):
[docs] @read_txn_value
def get_model_handles(self):
"""Return a simple list of model handles available."""
qry = (
"match (p:node) "
"where not exists(p._to) "
"return distinct p.model"
return (qry, None,"p.model")
[docs] @read_txn_data
def get_model_nodes(self, model=None):
"""Return a list of dicts representing Model nodes."""
qry = (
"match (m:model) {} "
"with m "
"where not exists(m._to) "
"return m"
).format("where m.handle = $model" if model else "")
return(qry, {"model":model} if model else None)
[docs] @read_txn_value
def get_nodes_by_model(self, model=None):
"""Get all nodes for a given model. If :param:model is None,
get all nodes in database.
Returns [ <node> ]."""
qry = (
"match (n:node) {} "
"with n "
"where not exists(n._to) "
"return n"
).format("where n.model = $model" if model else "")
return (qry, {"model": model} if model else None, "n")
[docs] @read_txn_data
def get_model_nodes_edges(self, model):
"""Get all node-relationship-node paths for a given model.
Returns [ path ]"""
qry = (
"match p = (s:node {model: $model})<-[:has_src]-"
" (r:relationship {model: $model})-[:has_dst]->"
" (d:node {model: $model}) "
"where not exists(s._to) and not exists(r._to) and "
"not exists(d._to) "
"return p as path"
return (qry, {"model":model})
[docs] @read_txn_data
def get_node_edges_by_node_id(self, nanoid):
"""Get incoming and outgoing relationship information for a node,
given its nanoid.
Returns [ {id, handle, model, near_type, far_type, rln, far_node} ]."""
qry = (
"match (n:node {nanoid:$nanoid}) "
"where not exists(n._to) "
"with n "
"optional match (n)<-[e1]-(r:relationship)-[e2]->(m:node) "
"where not exists(r._to) and not exists(m._to) "
"return n.nanoid as id, n.handle as handle, n.model as model, "
" type(e1) as near_type, type(e2) as far_type, r as rln, m as far_node"
return (qry, {"nanoid": nanoid})
[docs] @read_txn_data
def get_node_and_props_by_node_id(self, nanoid):
"""Get a node and its properties, given the node nanoid.
Returns [ {id, handle, model, node, props[]} ]."""
qry = (
"match (n:node {nanoid:$nanoid}) "
"where not exists(n._to) "
"with n "
"optional match (n)-[:has_property]->(p) "
"where not exists(p._to) "
"return n.nanoid as id, n.handle as handle, n.model as model, n as node, "
" collect(p) as props"
return (qry, {"nanoid": nanoid})
[docs] @read_txn_data
def get_nodes_and_props_by_model(self, model=None):
"""Get all nodes with associated properties given a model handle. If
model is None, get all nodes with their properties.
Returns [ {id, handle, model, props[]} ]"""
qry = (
"match (n:node)-[:has_property]->(p:property) "
"where not exists(n._to) and not exists(p._to) {} "
"return n.nanoid as id, n.handle as handle, n.model as model, "
" collect(p) as props"
).format("and n.model = $model" if model else "")
return (qry, {"model":model})
[docs] @read_txn_data
def get_prop_node_and_domain_by_prop_id(self, nanoid):
"""Get a property, its node, and its value domain or value set
of terms, given the property nanoid.
Returns [ { id, handle, model, value_domain, prop, node, value_set, terms[] } ]."""
qry = (
"match (p:property {nanoid:$nanoid})<-[:has_property]-(n:node) "
"where not exists(p._to) "
"with p,n "
"optional match (p)-[:has_value_set]->(vs:value_set)-[:has_term]->(t:term) "
"return p.nanoid as id, p.handle as handle, p.model as model, "
"p.value_domain as value_domain, p as prop, n as node, "
" vs as value_set, collect(t) as terms"
return (qry, {"nanoid": nanoid})
[docs] @read_txn_data
def get_valueset_by_id(self, nanoid):
"""Get a valueset with the properties that use it and the terms
that constitute it.
Returns [ {id, handle, url, terms[], props[]} ]"""
qry = (
"match (vs:value_set {nanoid:$nanoid}) "
"with vs "
"match (t)<-[:has_term]-(vs)<-[:has_value_set]-(p:property) "
"where not exists(t._to) and not exists(p._to) "
"return vs.nanoid as id, vs.handle as handle, vs.url as url, "
" collect(t) as terms, collect(p) as props"
return (qry, {"nanoid": nanoid})
[docs] @read_txn_data
def get_valuesets_by_model(self, model=None):
"""Get all valuesets that are used by properties in the given
model (or all valuesets if model is None). Also return list of properties using
each valueset.
Returns [ {value_set, props[]} ]."""
qry = (
"match (vs:value_set)<-[:has_value_set]-(p:property) "
"where not exists(vs._to) and not exists(p._to) {} "
"return vs as value_set, collect(p) as props"
).format("and p.model=$model" if model else "")
return (qry, {"model": model})
[docs] @read_txn_data
def get_term_by_id(self, nanoid):
"""Get a term having the given nanoid, with its origin.
Returns {term, origin}."""
qry = (
"match (t:term {nanoid:$nanoid}) "
"where not exists(t._to) "
"with t, t.origin_name as origin_name "
"optional match (o:origin {name: origin_name}) "
"where not exists(o._to) "
"return t as term, o as origin "
return (qry, {"nanoid": nanoid})
[docs] @read_txn_data
def get_props_and_terms_by_model(self, model=None):
"""Get terms from valuesets associated with properties in a given model
(or all such terms if model is None).
Returns [ {prop, terms[]} ]"""
qry = (
"match (p:property)-[:has_value_set]->(v:value_set)"
"-[:has_term]->(t:term) "
"where not( exists(p._to) or exists(v._to) or exists(t._to)) {} "
"return p as prop, collect(t) as terms"
).format("and p.model = $model" if model else "")
return (qry, {"model": model})
[docs] @read_txn_data
def get_origins(self):
"""Get all origins.
Returns [ <origin> ]"""
qry = (
"match (o:origin) "
"where not exists (o._to) "
"return o"
return (qry, None)
[docs] @read_txn_value
def get_origin_by_id(self, oid):
"""Get an origin by nanoid."""
qry = (
"match (o:origin {nanoid:$oid}) "
"where not exists (o._to) "
"return o "
return (qry, {"oid": oid}, "o")
[docs] @read_txn_data
def get_entities_by_tag(self, key, value=None, model=None):
"""Get all entities, optionally from a given model,
tagged with a given key or key:value pair.
Returns [ {tag_key(str), tag_value(str), entity(str - label), entities[]} ]"""
qry = (
"match (t:tag {{key:$key}}) {} "
"with t "
"match (e)-[:has_tag]->(t) "
"where not exists(e._to) {}"
"return t.key as tag_key, t.value as tag_value, head(labels(e)) as entity, collect(e) as entities"
).format("where t.value = $value" if value else "",
"and e.model = $model " if model else "" )
parms = {"key":key}
if value:
if model:
return (qry, parms)
[docs] @read_txn_data
def get_with_statement(self, qry, parms={}):
"""Run an arbitrary read statement and return data."""
if not isinstance(qry, str):
raise RuntimeError("qry= must be a string")
if not re.match(".*return.*", qry, flags=re.I):
raise RuntimeError("Read statement needs a RETURN clause.")
if not isinstance(parms, dict):
raise RuntimeError("parms= must be a dict")
return (qry, parms)
[docs]def make_nanoid(alphabet="abcdefghijkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ0123456789",
"""Create a random nanoid and return it as a string."""
return nanoid_generate(alphabet=alphabet, size=size)