Source code for bento_meta.util.cypher.entities

"""
bento_meta.util.cypher.entities

Representations of cypher nodes, relationships, properties, paths
"""
import re
from pdb import set_trace
from bento_meta.util.cypher.functions import Func
from copy import deepcopy as clone


[docs]def countmaker(max=10000): return (x for x in range(max))
[docs]class Entity(object): """A property graph Entity. Base class.""" def __init__(self): self.As = None ct = None try: ct = next(type(self).count) except StopIteration: type(self)._reset_counter() ct = next(type(self).count) self.var = type(self).__name__[0].lower() + str(ct)
[docs] @classmethod def _reset_counter(cls): cls.count = countmaker()
[docs] def pattern(self): """Render entity as a match pattern.""" pass
[docs] def condition(self): """Render entity as a condition (for WHERE, e.g.).""" pass
[docs] def Return(self): """Render entity as a return value.""" pass
[docs] def _add_props(self, props): if not type(self) == N and not type(self) == R: return False if not props: return True if type(props) == P: props.entity = self self.props[props.handle] = props elif type(props) == list: for p in props: p.entity = self for p in props: self.props[p.handle] = p elif type(props) == dict: for hdl in props: self.props[hdl] = P(handle=hdl, value=props[hdl]) self.props[hdl].entity = self else: raise RuntimeError("Can't interpret props arg '{}'".format(props)) return True
[docs]class N(Entity): """A property graph Node.""" count = countmaker() def __init__(self, label=None, props=None, As=None): super().__init__() self.props = {} self.label = label self._add_props(props) self.As = As
[docs] def relate_to(self, r, n): # always obj --> arg direction return r.relate(self, n)
[docs] def pattern(self): ret = ",".join([p.pattern() for p in self.props.values() if p.pattern()]) if (len(ret)): ret = " {"+ret+"}" if self.label: ret = "({}:{}{})".format(self.var, self.label, ret) else: ret = "({}{})".format(self.var, ret) return ret
[docs] def condition(self): return [p.condition() for p in self.props.values()]
[docs] def Return(self): ret = " as {}".format(self.As) if self.As else "" ret = "{}{}".format(self.var, ret) return ret
[docs]class R(Entity): """A property graph Relationship or edge.""" count = countmaker() def __init__(self, Type=None, props=None, As=None, _dir='_right'): super().__init__() self.props = {} self.Type = Type self._add_props(props) self._join = [] self._dir = _dir self.As = As # n --> m direction
[docs] def relate(self, n, m): return T(n, self, m) if self._dir == '_right' else T(m, self, n)
[docs] def pattern(self): ret = ",".join([p.pattern() for p in self.props.values() if p.pattern()]) if (len(ret)): ret = " {"+ret+"}" if self.Type: ret = "-[{}:{}{}]-".format(self.var, self.Type, ret) else: ret = "-[{}{}]-".format(self.var, ret) return ret
[docs] def condition(self): return [p.condition() for p in self.props.values()]
[docs] def Return(self): ret = " as {}".format(self.As) if self.As else "" ret = "{}{}".format(self.var, ret) return ret
[docs]class VarLenR(R): """Variable length property graph Relationship or edge.""" def __init__(self, min_len: int = -1, max_len: int = -1, Type=None, props=None, As=None, _dir='_right'): super().__init__() self.props = {} self.Type = Type self._add_props(props) self.min_len = min_len self.max_len = max_len
[docs] def pattern(self): ret = ",".join([p.pattern() for p in self.props.values() if p.pattern()]) var_len = "*" if self.min_len < 0: min_len_str = "" else: min_len_str = str(self.min_len) if self.max_len < 0: max_len_str = "" else: max_len_str = str(self.max_len) if min_len_str or max_len_str: var_len += f"{min_len_str}..{max_len_str}" if len(ret): ret = " {"+ret+"}" if self.Type: ret = f"-[:{self.Type}{ret}{var_len}]-" else: ret = f"-[{ret}{var_len}]-" return ret
[docs]class N0(N): """Completely anonymous node ().""" def __init__(self): super().__init__() self.var = None
[docs] def pattern(self): return "()"
[docs] def Return(self): return None
[docs]class R0(R): """Completely anonymous relationship -[]-, i.e. --""" def __init__(self): super().__init__() self.var = None
[docs] def pattern(self): return "--"
[docs] def Return(self): return None
[docs]class P(Entity): """A property graph Property.""" count = countmaker() parameterize = False def __init__(self, handle, value=None, As=None): super().__init__() self.handle = handle self.value = value self.As = As self.entity = None self.param = {self.var: value} if value else None
[docs] def pattern(self): if self.value: if not self.parameterize: if not type(self.value) == str: return "{}:{}".format(self.handle, str(self.value)) elif re.match("^\\s*[$]", self.value): # a parameter return "{}:{}".format(self.handle, self.value) else: return "{}:'{}'".format(self.handle, self.value) else: return "{}:${}".format(self.handle, self.var) else: return None
[docs] def condition(self): if self.value and self.entity: if not self.parameterize: if not type(self.value) == str: return "{}.{} = {}".format(self.entity.var, self.handle, str(self.value)) elif re.match("^\\s*[$]", self.value): # a parameter return "{}.{} = {}".format(self.entity.var, self.handle, self.value) else: return "{}.{} = '{}'".format(self.entity.var, self.handle, self.value) else: return "{}.{} = ${}".format(self.entity.var, self.handle, self.var) else: return None
[docs] def Return(self): ret = " as {}".format(self.As) if self.As else "" if self.entity: return "{}.{}{}".format(self.entity.var, self.handle, ret) else: return None
[docs]class T(Entity): """A property graph Triple; i.e., (n)-[r]->(m).""" count = countmaker() def __init__(self, n, r, m): super().__init__() self._from = n self._to = m self._edge = r
[docs] def nodes(self): return [self._from, self._to]
[docs] def edge(self): return self._edge
[docs] def edges(self): return [self.edge()]
[docs] def pattern(self): return self._from.pattern()+self._edge.pattern()+">"+self._to.pattern()
[docs] def condition(self): return self.pattern()
[docs] def Return(self): return [x.Return() for x in (self._from, self._edge, self._to) if x.var]
[docs]class NoDirT(T): """A directionless property graph Triple; i.e., (n)-[r]-(m).""" def __init__(self, *args): super().__init__(*args)
[docs] def pattern(self): return self._from.pattern()+self._edge.pattern()+self._to.pattern()
[docs] def nodes(self): return [self._from, self._to]
[docs] def edge(self): return self._edge
[docs] def edges(self): return [self.edge()]
[docs]class G(Entity): """A property graph Path. Defined as an ordered set of partially overlapping triples.""" count = countmaker() def __init__(self, *args): super().__init__() self._pattern = None self.triples = [] args = list(args) self._create_path(args)
[docs] def _create_path(self, args): numargs = len(args) scr = [] while args: ent = args.pop(0) if len(scr) == 0: if isinstance(ent, N): scr.append(ent) elif isinstance(ent, R): if self.triples: scr.append(ent) else: raise RuntimeError( "Entity '{}' is not valid at arg position {}." .format(ent, numargs-len(args)) ) elif isinstance(ent, (T, G)): if not self._append(ent): raise RuntimeError( "Adjacent triples/paths do not overlap, " "at arg position {}.".format(numargs-len(args)) ) scr = [] # reset parse else: raise RuntimeError( "Entity '{}' is not valid at arg position {}." .format(ent, numargs-len(args)) ) elif len(scr) == 1: if (isinstance(scr[0], N) and isinstance(ent, R)): scr.append(ent) elif isinstance(scr[0], R): if isinstance(ent, N): scr.append(ent) elif isinstance(ent, (T, G)) and scr[0]._join: # may be able to link self to ent with _join hints r = scr[0] n1 = [x for x in self.triples[-1].nodes() if x.label == r._join[0]] n2 = [x for x in ent.nodes() if x.label == r._join[1]] if n1 and n2: t = r.relate(n1[0], n2[0]) if not self._append(t): raise RuntimeError("WTF error; this should not fail") if not self._append(ent): raise RuntimeError( "Found right-hand node but could not append path" ) scr = [] # success else: raise RuntimeError( "Can't find endpoint nodes specified in _join for " "relationship at arg position {}".format(numargs-len(args)-1) ) else: raise RuntimeError( "Ends of relationship are ambiguous; relationship " "at arg position {} needs _join hints".format(numargs-len(args)-1) ) else: raise RuntimeError( "Entity '{}' is not valid at arg position {}." .format(ent, numargs-len(args)) ) pass elif len(scr) == 2: if isinstance(scr[0], N): success = None if isinstance(ent, N): success = self._append(scr[1].relate(scr[0], ent)) elif isinstance(ent, T): success = self._append(scr[1].relate(scr[0], ent._from)) if success: success = self._append(ent) elif isinstance(ent, G): if len(ent.triples) > 1: raise RuntimeError( "Can't create start triple, " "to-node is ambiguous, " "at arg position {}.".format(numargs-len(args)) ) success = self._append( scr[1].relate(scr[0], ent.triples[0]._from)) else: raise RuntimeError( "Entity '{}' is not valid at arg position {}." .format(ent, numargs-len(args)) ) if not success: raise RuntimeError( "Resulting adjacent triples/paths do not overlap, " "at arg position {}." .format(numargs-len(args)) ) elif isinstance(scr[0], R): if not self._append(scr[0].relate(self.triples[-1]._to, scr[1])): raise RuntimeError( "Resulting adjacent triples/paths do not overlap, " "at arg position {}." .format(numargs-len(args)) ) scr = [] else: raise RuntimeError("Shouldn't happen.") if scr: if len(scr) == 2 and isinstance( scr[0], R) and isinstance(scr[1], N): if len(self.triples) > 1: raise RuntimeError( "Can't create end triple, from-node is ambiguous, " "at arg position {}.".format(numargs-len(args)) ) if not self._append( scr[0].relate(self.triples[-1]._to, scr[1])): raise RuntimeError( "Resulting adjacent triples/paths do not overlap, " "at arg position {}." .format(numargs-len(args)) ) else: raise RuntimeError("Args do not define a complete Path.")
[docs] def _append(self, ent): def _overlap(s, t): if s == t: return 0b000 if isinstance(s, G): s = s.triples[-1] if isinstance(t, G): t = t.triples[0] if s._from == t._from or ( s._from.var and s._from.var == t._from.var): return 0b001 if s._from == t._to or ( s._from.var and s._from.var == t._to.var): return 0b010 if s._to == t._from or ( s._to.var and s._to.var == t._from.var): return 0b011 if s._to == t._to or ( s._to.var and s._to.var == t._to.var): return 0b100 return -1 if not isinstance(ent, (T, G)): raise RuntimeError("Can't append a {} to a Path." .format(type(ent).__name__)) if self.triples: ovr = _overlap(self.triples[-1], ent) if ovr == 0: return True # same object, skip if ovr < 0: return False # Adjacent triples/paths do not overlap if isinstance(ent, T): self.triples.append(ent) elif isinstance(ent, G): self.triples.extend(ent.triples) return True
[docs] def nodes(self): ret = set() for t in self.triples: ret.add(t._from) ret.add(t._to) return list(ret)
[docs] def edges(self): ret = set() for t in self.triples: ret.add(t._edge) return list(ret)
[docs] def pattern(self): def jn(trps, acc, ret): if not trps: ret.append(acc) return ret trp = trps.pop(0) if not acc: return jn(trps, [trp._from, trp._edge, ">", trp._to], ret) else: if trp._from == acc[0]: acc[0:0] = [trp._to, "<", trp._edge] elif trp._from == acc[-1]: acc.extend([trp._edge, ">", trp._to]) elif trp._to == acc[0]: acc[0:0] = [trp._from, trp._edge, ">"] elif trp._to == acc[-1]: acc.extend(["<", trp._edge, trp._from]) else: ret.append(acc) acc = [trp._from, trp._edge, ">", trp._to] return jn(trps, acc, ret) pass if not self._pattern: trps = clone(self.triples) grps = jn(trps, [], []) pats = [] for grp in grps: pats.append( "".join([x.pattern() if not isinstance(x, str) else x for x in grp]) ) self._pattern = ", ".join(pats) return self._pattern
[docs] def condition(self): return self.pattern()
[docs] def Return(self): s = () for t in self.triples: s.add(t._from, t._to) return ", ".join([_return(x) for x in s if x.var])
# modifiers
[docs]def _as(ent, alias): """Return copy of ent with As alias set.""" if isinstance(ent, T): return ent ret = clone(ent) ret.As = alias return ret
[docs]def _plain(ent): """Return entity without properties.""" ret = None if isinstance(ent, (N, R)): ret = clone(ent) ret.props = {} elif isinstance(ent, P): ret = clone(ent) ret.value = None elif isinstance(ent, T): ret = clone(ent) ret._from.props = {} ret._to.props = {} ret._edge.props = {} else: return ent return ret
[docs]def _anon(ent): """Return entity without variable name.""" ret = clone(ent) ret.var = "" return ret
[docs]def _var(ent): """Return entity without label or type.""" ret = None if isinstance(ent, (N, R)): ret = clone(ent) if hasattr(ret, "label"): ret.label = None if hasattr(ret, "Type"): ret.Type = None elif isinstance(ent, T): ret = clone(ent) ret._from.label = None ret._to.label = None # leave the edge type alone in a triple... else: return ent return ret
[docs]def _plain_var(ent): """Return entity with only the variable, no label or properties""" return _plain(_var(ent))
# rendering contexts
[docs]def _pattern(ent): if isinstance(ent, (str, Func)): return str(ent) return ent.pattern()
[docs]def _condition(ent): if isinstance(ent, (str, Func)): return str(ent) return ent.condition()
[docs]def _return(ent): if isinstance(ent, (str, Func)): return str(ent) return ent.Return()