import re
from minicypher.clauses import Match, Return
from minicypher.entities import (
G,
N,
P,
R,
_As,
)
from minicypher.functions import (
And,
Not,
Or,
count,
exists,
group,
labels,
)
from minicypher.statement import Statement
avail_funcs = {x.__name__: x for x in (count, exists, labels, group, And, Or, Not)}
[docs]
class _engine:
paths = None
def __init__(self, use_params=True):
self.use_params = use_params
self.error = None
self.statement = None
self.params = None
self.key = ""
self.path_id = None
[docs]
@classmethod
def set_paths(cls, paths):
cls.paths = paths
[docs]
def parse(self, toks):
toks = toks
pth = self.paths
return self._walk(None, toks, pth)
[docs]
def _process_node(self, block):
ret = None
if isinstance(block, str):
if block == "_var":
ret = N()
else:
ret = N(label=block)
elif isinstance(block, dict):
if not block["_label"]:
self.error = {
"description": "_node block requires _label key",
"block": block,
}
return False
ret = N(label=block["_label"])
if block.get("_prop"):
ret._add_props(self._process_prop(block["_prop"]))
if block.get("_edge") and block.get("_node"):
n = self._process_node(block["_node"])
e = self._process_edge(block["_edge"])
ret = G(ret, e, n)
else:
self.error = {
"description": "Can't process _node block",
"block": block,
}
return False
return ret
[docs]
def _process_prop(self, block, value=None):
ret = None
if isinstance(block, str):
ret = P(handle=block, value=value)
elif isinstance(block, dict) and block.get("_handle") and block.get("_value"):
ret = P(handle=block["_handle"], value=block["_value"])
else:
self.error = {
"description": "Can't process _prop block",
"block": block,
}
return False
return ret
[docs]
def _process_edge(self, block):
ret = None
if isinstance(block, str):
ret = R(Type=block)
elif isinstance(block, dict):
ret = R(Type=block["_type"])
if block.get("_dir"):
ret._dir = block.get("_dir")
if block.get("_join"):
# when an edge connects two complex paths
# define the nodes on the incoming path and new path
# to link with this edge
ret._join = block["_join"]
else:
self.error = {
"description": "Can't process _edge block",
"block": block,
}
return False
return ret
[docs]
def _process_func(self, block):
ret = {}
if isinstance(block, str):
as_ = re.split("@", block)
ret["_func"] = as_[0]
if len(as_) > 1:
as_ = as_[1]
else:
as_ = None
if ret["_func"] in avail_funcs:
# the Func subclass:
ret["_func"] = avail_funcs[ret["_func"]]
ret["_func_as"] = as_
else:
self.error = {
"description": "Sorry, no cypher function '{}' is currently defined".format(
ret["_func"],
),
"block": block,
}
return False # ERR no such function available
else:
self.error = {
"description": "Block type other than simple string not yet handled for _func",
"block": block,
}
return False
return ret
[docs]
def _create_statement(self, ent, pad):
match_clause = Match(ent)
ret_clause = None
a = []
if isinstance(pad["_return"], str):
if pad["_return"] == "_items":
if isinstance(ent, (N, R)):
ret_clause = Return(ent)
else:
ret_clause = Return(*ent.nodes())
else:
if (isinstance(ent, N) and ent.label == pad["_return"] and ent.var) or (
isinstance(ent, R) and ent.Type == pad["_return"] and ent.var
):
a.append(ent)
else:
a.extend(
[x for x in ent.nodes() if x.label == pad["_return"] and x.var],
)
if not a:
self.error = {
"description": "No named node to return with label '{}'".format(
pad["_return"],
),
"ent": ent,
"pad": pad,
}
return False
ret_clause = Return(*a)
elif isinstance(pad["_return"], dict):
retblock = pad["_return"]
if retblock.get("_path_id"):
self.path_id = retblock["_path_id"]
if retblock.get("_nodes"):
# assume is list-valued
if not isinstance(retblock["_nodes"], list):
self.error = {
"description": "_nodes key must point to a list",
"ent": ent,
"pad": pad,
}
return False
labels = {
x[0]: x[1]
for x in [re.split("@", y) + [None] for y in retblock["_nodes"]]
}
if "*" in labels:
a.append("*")
elif isinstance(ent, N) and ent.var:
if ent.label in labels:
if labels[ent.label]:
a.append(_As(ent, labels[ent.label]))
else:
a.append(ent)
if "_var" in labels and ent == pad["_var"]:
if labels["_var"]:
a.append(_As(ent, labels["_var"]))
else:
a.append(ent)
else:
for n in ent.nodes():
if n.label in labels and n.var:
if labels[n.label]:
a.append(_As(n, labels[n.label]))
else:
a.append(n)
if "_var" in labels and n == pad["_var"]:
if labels["_var"]:
a.append(_As(n, labels["_var"]))
else:
a.append(n)
if retblock.get("_edges"):
if not isinstance(retblock["_edges"], list):
self.error = {
"description": "_edges key must point to a list",
"ent": ent,
"pad": pad,
}
return False
types = {
x[0]: x[1]
for x in [re.split("@", y) + [None] for y in retblock["_edges"]]
}
if isinstance(ent, R) and ent.var:
if ent.Type in types:
if types[ent.Type]:
_As(ent, types[ent.Type])
else:
a.append(ent)
if "_var" in types and ent == pad["_var"]:
if types["_var"]:
a.append(_As(ent, types["_var"]))
else:
a.append(ent)
else:
for e in ent.edges():
if e.Type in types and e.var:
if types[e.Type]:
a.append(_As(e, types[e.Type]))
else:
a.append(e)
if not a:
self.error = {
"description": "No named nodes or edges matching the path _return spec",
"ent": ent,
"pad": pad,
}
return False
if retblock.get("_func"):
f = self._process_func(retblock["_func"])
if not f:
return False # bad _func spec
a = [f["_func"](x) for x in a]
if f.get("_func_as"):
a = [_As(x, f["_func_as"]) for x in a]
ret_clause = Return(*a)
else:
self.error = {
"description": "_return specification not str or dict",
"ent": ent,
"pad": pad,
}
return False
self.statement = Statement(match_clause, ret_clause, use_params=self.use_params)
self.params = self.statement.params
return True
[docs]
def _walk(self, ent, toks, pth):
if not toks or not pth:
self.error = {
"description": "_walk: Either toks or pth is empty",
}
return False
tok = toks[0]
pad = {}
parm = None
if tok in pth:
# plain token - shouldn't start with _
self.key = "/".join([self.key, tok]) if self.key else tok
pth = pth[tok]
elif any([x.startswith("$") for x in pth]):
# parameter
parm = [x for x in pth if x.startswith("$")][0]
pth = pth[parm]
# load pad
pad["_prop"] = P(handle=parm[1:], value=tok)
# note that the cache key will be interpreted as a regexp
self.key = (
"/".join([self.key, "([a-zA-Z0-9_]+)"])
if self.key
else "([a-zA-Z0-9_]+)"
)
else:
self.error = {
"description": f"Token '{tok}' not on valid path",
"token": tok,
}
return False
# collect/create items req by block in the pad, and then
# execute operations on these items in standard order below.
for opn in [x for x in pth if x.startswith("_")]:
# operations in block
if opn == "_node":
if parm: # processing a parameter
pad["_node"] = pth["_node"]
else:
pad["_node"] = self._process_node(pth["_node"])
if pth["_node"] == "_var":
pad["_var"] = pad["_node"]
elif opn == "_edge":
if parm: # processing a parameter
pad["_edge"] = pth["_edge"]
else:
pad["_edge"] = self._process_edge(pth["_edge"])
if pth["_edge"] == "_var":
pad["_var"] == pad["_edge"]
elif opn == "_prop":
# WARN or ERR if pad.get('_prop') is True
# - a parm was already handled in that case
pad["_prop"] = self._process_prop(pth["_prop"], value=tok)
elif opn == "_return":
if len(toks) == 1: # we're on the last token
# pad['_return'] not empty means we're finished
pad["_return"] = pth["_return"]
else: # more toks to go...
pass # so ignore it
elif opn == "_func":
_func = self._process_func(pth["_func"])
if not _func:
return False # bad _func spec
for k in _func:
pad[k] = _func[k]
# pad ready for operations
new_ent = None
if pad.get("_prop"): # add props to incoming entity
if isinstance(ent, (N, R)):
ent._add_props(pad["_prop"])
elif not pad.get("_node") and not pad.get("_edge"):
self.error = {
"description": "Both _edge and _node must be defined here",
"token": tok,
}
return False # ERR, if incoming is path, need to specify the node which gets the property (by label)
else:
if pad.get("_node"):
n = [x for x in ent.nodes() if x.label == pad["_node"]]
if not n:
self.error = {
"description": "Node specified by _node is not present",
"token": tok,
}
return False # specified node can't be found in ent
n[0]._add_props(pad["_prop"])
pad["_node"] = None
pad["_prop"] = None
if pad.get("_edge"):
e = [x for x in ent.edges() if x.Type == pad["_edge"]]
if not e:
self.error = {
"description": "Edge specified by _edge is not present",
"token": tok,
}
return False # specified edge not found in ent
e[0]._add_props(pad["_prop"])
pad["_edge"] = None
pad["_prop"] = None
if pad.get("_node"): # new entity
new_ent = pad["_node"]
if pad.get("_edge"):
if not ent: # no input entity
self.error = {
"description": "No incoming entity to apply _edge to here",
"token": tok,
}
return False # ERR
if not new_ent: # no new entity
self.error = {
"description": "No new entity to link to",
"token": tok,
}
return False # different ERR
if isinstance(ent, N) and isinstance(new_ent, N):
# simple relationship
new_ent = pad["_edge"].relate(ent, new_ent)
else:
new_ent = G(ent, pad["_edge"], new_ent)
if pad.get("_return"): # we made it
if self._create_statement(new_ent or ent, pad):
return True
return False
if len(toks) == 1:
self.error = {
"description": "Reached end of path, but found no _return spec",
"token": tok,
}
return False # ERR reached end of toks, no _return found
return self._walk(new_ent or ent, toks[1:], pth)