"""
bento_meta.mdb
==============
This module contains :class:`MDB`, with machinery for efficiently
querying a Neo4j instance of a Metamodel Database.
The constructor queries the database for registered models.
The attribute models : Dict contains model handles (names) as keys,
and a list of version strings as values.
The attribute latest_version : Dict contains model handles as keys
and the version string tagged "is_latest" as values.
"""
from __future__ import annotations
import os
import re
from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Concatenate,
ParamSpec,
TypeVar,
cast,
)
from warnings import warn
from nanoid import generate as nanoid_generate
from neo4j import Driver, GraphDatabase, ManagedTransaction, Record
from typing_extensions import LiteralString
if TYPE_CHECKING:
from collections.abc import Callable
# Type variables for proper decorator typing
P = ParamSpec("P")
T = TypeVar("T")
# Decorator functions to produce executed transactions based on an
# underlying query/param function:
[docs]
def read_txn(
func: Callable[Concatenate[MDB, P], tuple[str, dict[str, Any] | None]],
) -> Callable[Concatenate[MDB, P], list[Record]]:
"""
Decorate a query function to run a read transaction based on its query.
Query function should return a tuple (qry_string, param_dict).
Args:
func: The query function to decorate.
Returns:
Decorated function that returns list of driver Records.
"""
@wraps(func)
def rd(self: MDB, *args: P.args, **kwargs: P.kwargs) -> list[Record]:
def txn_q(tx: ManagedTransaction) -> list[Record]:
(qry, parms) = func(self, *args, **kwargs)
result = tx.run(cast("LiteralString", qry), parameters=parms)
return list(result)
with self.driver.session() as session:
return session.execute_read(txn_q)
return rd
[docs]
def read_txn_value(
func: Callable[Concatenate[MDB, P], tuple[str, dict[str, Any] | None, str]],
) -> Callable[Concatenate[MDB, P], list[Any]]:
"""
Decorate a query function to run a read transaction based on its query.
Query function should return a tuple (qry_string, param_dict, values_key).
Args:
func: The query function to decorate.
Returns:
Decorated function that returns list of values for key specified by query function.
"""
@wraps(func)
def rd(self: MDB, *args: P.args, **kwargs: P.kwargs) -> list[Any]:
def txn_q(tx: ManagedTransaction) -> list[Any]:
(qry, parms, values_key) = func(self, *args, **kwargs)
result = tx.run(cast("LiteralString", qry), parameters=parms)
return result.value(values_key)
with self.driver.session() as session:
return session.execute_read(txn_q)
return rd
[docs]
def read_txn_data(
func: Callable[Concatenate[MDB, P], tuple[str, dict[str, Any] | None]],
) -> Callable[Concatenate[MDB, P], list[dict[str, Any]] | None]:
"""
Decorate a query function to run a read transaction based on its query.
Query function should return a tuple (qry_string, param_dict).
Args:
func: The query function to decorate.
Returns:
Decorated function that returns records as a list of simple dicts.
"""
@wraps(func)
def rd(self: MDB, *args: P.args, **kwargs: P.kwargs) -> list[dict[str, Any]] | None:
(qry, parms) = func(self, *args, **kwargs)
def txn_q(tx: ManagedTransaction) -> list[dict[str, Any]]:
result = tx.run(cast("LiteralString", qry), parameters=parms)
return result.data()
with self.driver.session() as session:
result = session.execute_read(txn_q)
if len(result):
return result
return None
return rd
[docs]
class MDB:
"""A class representing a Metamodel Database."""
def __init__(
self,
uri: str | None = None,
user: str | None = None,
password: str | None = None,
) -> None:
"""
Create an MDB object, with a connection to a Neo4j instance of a metamodel database.
Args:
uri: The Bolt protocol endpoint to the Neo4j instance.
Defaults to NEO4J_MDB_URI env variable.
user: Username for Neo4j access.
Defaults to NEO4J_MDB_USER env variable.
password: Password for user.
Defaults to NEO4J_MDB_PASS env variable.
"""
self.uri = uri or os.environ.get("NEO4J_MDB_URI")
self.user = user or os.environ.get("NEO4J_MDB_USER")
self.password = password or os.environ.get("NEO4J_MDB_PASS")
self.driver: Driver | None = None
self.models: dict[str, list[str]] = {}
self.latest_version: dict[str, str | None] = {}
try:
self.driver = GraphDatabase.driver(
self.uri,
auth=(self.user, self.password),
)
except Exception as e:
warn(f"MDB not connected: {e}", stacklevel=2)
try:
# query DB and cache the models and their versions in the MDB object
info = self.get_model_info()
if not info or len(info) == 0:
msg = "No Model nodes found"
raise RuntimeError(msg)
for m in info:
if self.models.get(m["handle"]):
self.models[m["handle"]].append(m["version"])
else:
self.models[m["handle"]] = [m["version"]]
if m["is_latest"] and not self.latest_version.get(m["handle"]):
self.latest_version[m["handle"]] = m["version"]
for hdl in self.models:
if not self.latest_version.get(hdl):
if len(self.models[hdl]) == 1: # only one version
self.latest_version[hdl] = self.models[hdl][0] or "unversioned"
else:
self.latest_version[hdl] = None
except Exception as e:
# raise RuntimeError
warn(f"Database doesn't look like an MDB: {e}", stacklevel=2)
self._txfns = {}
[docs]
def close(self) -> None:
"""Close the driver connection."""
if self.driver:
self.driver.close()
[docs]
def register_txfn(self, name: str, fn: Callable) -> None:
"""
Register a transaction function with the class for later use.
See https://neo4j.com/docs/api/python-driver/current/api.html#managed-transactions-transaction-functions
Args:
name: Name to register the function under.
fn: The transaction function to register.
"""
self._txfns[name] = fn
# def run_txfn(self, name, *args, **kwargs):
[docs]
@read_txn_value
def get_model_info(self) -> tuple[str, None, str]:
"""Get models, versions, and latest versions from MDB Model nodes."""
return ("match (m:model) return m", None, "m")
[docs]
def get_model_handles(self) -> list[str]:
"""
Return a simple list of model handles available.
Queries Model nodes (not model properties in Entity nodes).
Returns:
List of model handle strings.
"""
return list(self.models.keys())
[docs]
def get_model_versions(self, model: str) -> list[str] | None:
"""
Get list of version strings present in database for a given model.
Args:
model: Model handle to get versions for.
Returns:
List of version strings, or None if model not found.
"""
if self.models.get(model):
return self.models[model]
return None
[docs]
def get_latest_version(self, model: str) -> str | None:
"""
Get the version string from Model node marked is_latest:True for a model handle.
Args:
model: Model handle to get latest version for.
Returns:
Version string, or None if model not found.
"""
if self.models.get(model):
return self.latest_version[model]
return None
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_model_nodes(
self,
model: str | None = None,
) -> list[dict[str, Any]] | None:
"""
Return a list of dicts representing Model nodes.
Returns all versions.
Args:
model: Optional model handle to filter by.
Returns:
List of dicts representing Model nodes, or None if not found.
"""
qry = ("match (m:model) {} return m").format(
"where m.handle = $model" if model else "",
)
return (qry, {"model": model} if model else None) # type: ignore[reportReturnType]
[docs]
@read_txn_value # type: ignore[reportArgumentType]
def get_nodes_by_model(
self,
model: str | None = None,
version: str | None = None,
) -> list[dict[str, Any]] | None:
"""
Get all nodes for a given model.
Args:
model: Model handle to get nodes for. If None, get all nodes in database.
version: Version to filter by. If None, get nodes from model version marked
is_latest:true. If '*', get nodes from all model versions.
Returns:
List of node dicts.
"""
cond = "where n.model = $model and n.version = $version"
parms = {}
if model:
latest = self.get_latest_version(model)
if version is None and latest != "unversioned":
parms = {"model": model, "version": latest}
elif version == "*" or latest == "unversioned":
cond = "where n.model = $model"
parms = {"model": model}
else:
parms = {"model": model, "version": version}
else:
cond = ""
qry = f"match (n:node) {cond} return n"
return (qry, parms, "n") # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_model_nodes_edges(
self,
model: str,
version: str | None = None,
) -> list[dict[str, Any]] | None:
"""
Get all node-relationship-node paths for a given model and version.
Args:
model: Model handle to get paths for.
version: Version to filter by. If None, use version marked is_latest:true
for model. If '*', retrieve from all versions.
Returns:
List of path dicts.
"""
cond = (
"where s.model = $model and s.version = $version and "
"r.model = $model and r.version = $version and "
"d.model = $model and d.version = $version "
)
parms = {}
latest = self.get_latest_version(model)
if version is None and latest != "unversioned":
parms = {"model": model, "version": latest}
elif version == "*" or latest == "unversioned":
cond = "where s.model = $model and r.model = $model and d.model = $model "
parms = {"model": model}
else:
parms = {"model": model, "version": version}
qry = (
"match p = (s:node)<-[:has_src]-(r:relationship)-[:has_dst]->(d:node)"
f"{cond} "
"return p as path"
)
return (qry, parms) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_node_edges_by_node_id(self, nanoid: str) -> list[dict[str, str]]:
"""
Get incoming and outgoing relationship information for a node from its nanoid.
Args:
nanoid: The nanoid of the node to get edges for.
Returns:
List of dicts with id, handle, model, version, near_type, far_type, rln, far_node.
"""
qry = (
"match (n:node {nanoid:$nanoid}) "
"with n "
"optional match (n)<-[e1]-(r:relationship)-[e2]->(m:node) "
"return n.nanoid as id, n.handle as handle, n.model as model, "
" n.version as version, "
" type(e1) as near_type, type(e2) as far_type, r as rln, m as far_node"
)
return (qry, {"nanoid": nanoid}) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_node_and_props_by_node_id(self, nanoid: str) -> list[dict[str, Any]] | None:
"""
Get a node and its properties, given the node nanoid.
Args:
nanoid: The nanoid of the node to get.
Returns:
List with dict containing id, handle, model, version, node, props[].
"""
qry = (
"match (n:node {nanoid:$nanoid}) "
"with n "
"optional match (n)-[:has_property]->(p) "
"return n.nanoid as id, n.handle as handle, n.model as model, "
" n.version as version, n as node, "
" collect(p) as props"
)
return (qry, {"nanoid": nanoid}) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_nodes_and_props_by_model(
self,
model: str | None = None,
version: str | None = None,
) -> list[dict[str, Any]] | None:
"""
Get all nodes with associated properties given a model handle.
Args:
model: Model handle to get nodes for. If None, get all nodes with their properties.
version: Version to filter by. If None, get nodes and props from model version
marked is_latest:true. If '*', get nodes and props from all model versions.
Returns:
List of dicts with id, handle, model, version, props[].
"""
cond = (
"where n.model = $model and n.version = $version and "
"p.model = $model and p.version = $version "
)
parms = {}
if model:
latest = self.get_latest_version(model)
if version is None and latest != "unversioned":
parms = {"model": model, "version": latest}
elif version == "*" or latest == "unversioned":
cond = "where n.model = $model and p.model = $model "
parms = {"model": model}
else:
parms = {"model": model, "version": version}
else:
cond = ""
qry = (
"match (n:node)-[:has_property]->(p:property) "
f"{cond} "
"return n.nanoid as id, n.handle as handle, n.model as model, "
"n.version as version, collect(p) as props"
)
return (qry, parms) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_prop_node_and_domain_by_prop_id(
self,
nanoid: str,
) -> list[dict[str, Any]] | None:
"""
Get a property, its node, and its value domain or value set of terms by nanoid.
Args:
nanoid: The nanoid of the property to get.
Returns:
List with dict containing id, handle, model, version, value_domain, prop,
node, value_set, terms[].
"""
qry = (
"match (p:property {nanoid:$nanoid})<-[:has_property]-(n:node) "
"with p,n "
"optional match (p)-[:has_value_set]->(vs:value_set)-[:has_term]->(t:term) "
"return p.nanoid as id, p.handle as handle, p.model as model, "
"p.version as version, "
"p.value_domain as value_domain, p as prop, n as node, "
" vs as value_set, collect(t) as terms"
)
return (qry, {"nanoid": nanoid}) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_valueset_by_id(self, nanoid: str) -> list[dict[str, Any]] | None:
"""
Get a valueset with the properties that use it and the terms that constitute it.
Args:
nanoid: The nanoid of the valueset to get.
Returns:
List with dict containing id, handle, url, terms[], props[].
"""
qry = (
"match (vs:value_set {nanoid:$nanoid})-[:has_term]->(t) "
"with vs, collect(t) as terms "
"match (vs)<-[:has_value_set]-(p:property) "
"return vs.nanoid as id, vs.handle as handle, vs.url as url, "
" terms, collect(p) as props"
)
return (qry, {"nanoid": nanoid}) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_valuesets_by_model(
self,
model: str | None = None,
version: str | None = None,
) -> list[dict[str, Any]] | None:
"""
Get all valuesets that are used by properties in the given model and version.
Gets all valuesets if model is None. Also returns list of properties using each valueset.
Args:
model: Model handle to get valuesets for. If None, get all valuesets.
version: Version to filter by. If None, get value sets associated with
latest model version. If '*', get those associated with all versions.
Returns:
List of dicts with value_set, props[].
"""
cond = "where p.model = $model and p.version = $version"
parms = {}
if model:
latest = self.get_latest_version(model)
if version is None and latest != "unversioned":
parms = {"model": model, "version": latest}
elif version == "*" or latest == "unversioned":
cond = "where p.model = $model"
parms = {"model": model}
else:
parms = {"model": model, "version": version}
else:
cond = ""
qry = (
"match (vs:value_set)<-[:has_value_set]-(p:property) "
f"{cond} "
"return vs as value_set, collect(p) as props"
)
return (qry, parms) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_term_by_id(self, nanoid: str) -> list[dict[str, Any]] | None:
"""
Get a term having the given nanoid, with its origin.
Args:
nanoid: The nanoid of the term to get.
Returns:
Dict with term, origin.
"""
qry = (
"match (t:term {nanoid:$nanoid}) "
"with t, t.origin_name as origin_name "
"optional match (o:origin {name: origin_name}) "
"return t as term, o as origin "
)
return (qry, {"nanoid": nanoid}) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_props_and_terms_by_model(
self,
model: str | None = None,
version: str | None = None,
) -> list[dict[str, Any]] | None:
"""
Get terms from valuesets associated with properties in a model and version.
Gets all terms if model is None.
Args:
model: Model handle to get props and terms for. If None, get all terms.
version: Version to filter by. If None, get props and terms from the
latest model version. If '*', get those from all versions.
Returns:
List of dicts with prop, terms[].
"""
cond = "where p.model = $model and p.version = $version"
parms = {}
if model:
latest = self.get_latest_version(model)
if version is None and latest != "unversioned":
parms = {"model": model, "version": latest}
elif version == "*" or latest == "unversioned":
cond = "where p.model = $model"
parms = {"model": model}
else:
parms = {"model": model, "version": version}
else:
cond = ""
qry = (
"match (p:property)-[:has_value_set]->(v:value_set)"
"-[:has_term]->(t:term) "
f"{cond} "
"return p as prop, collect(t) as terms"
)
return (qry, parms) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_origins(self) -> list[dict[str, Any]] | None:
"""
Get all origins.
Returns:
List of origin dicts.
"""
qry = "match (o:origin) return o"
return (qry, None) # type: ignore[reportReturnType]
[docs]
@read_txn_value # type: ignore[reportArgumentType]
def get_origin_by_id(self, oid: str) -> list[dict[str, Any]] | None:
"""Get an origin by nanoid."""
qry = "match (o:origin {nanoid:$oid}) where not exists (o._to) return o "
return (qry, {"oid": oid}, "o") # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_entities_by_tag(
self,
key: str,
value: str | None = None,
) -> list[dict[str, Any]] | None:
"""
Get all entities, tagged with a given key or key:value pair.
Args:
key: Tag key to filter by.
value: Optional tag value to filter by.
Returns:
List of dicts with tag_key(str), tag_value(str), entity(str - label), entities[].
"""
cond = "where t.key = $key "
parms = {"key": key}
if value is not None:
cond = "where t.key = $key and t.value = $value "
parms = {"key": key, "value": value}
qry = (
"match (t:tag) "
f"{cond} "
"with t "
"match (e)-[:has_tag]->(t) "
"return t.key as tag_key, t.value as tag_value, collect(e) as entities"
)
return (qry, parms) # type: ignore[reportReturnType]
[docs]
@read_txn_data # type: ignore[reportArgumentType]
def get_with_statement(
self,
qry: str,
parms: dict[str, Any] | None = None,
) -> list[dict[str, Any]] | None:
"""Run an arbitrary read statement and return data."""
if parms is None:
parms = {}
if not isinstance(qry, str):
msg = "qry= must be a string"
raise TypeError(msg)
if not re.match(".*return.*", qry, flags=re.IGNORECASE):
msg = "Read statement needs a RETURN clause."
raise RuntimeError(msg)
if not isinstance(parms, dict):
msg = "parms= must be a dict"
raise TypeError(msg)
return (qry, parms) # type: ignore[reportReturnType]
[docs]
def make_nanoid(
alphabet: str = "abcdefghijkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ0123456789",
size: int = 6,
) -> str:
"""Create a random nanoid and return it as a string."""
return nanoid_generate(alphabet=alphabet, size=size)