Source code for owmeta_core.graph_object

from __future__ import print_function
import logging
from itertools import chain

import six

from .ranged_objects import InRange
from .rdf_utils import UP, DOWN
from .rdf_query_modifiers import default_tq_layers

L = logging.getLogger(__name__)

__all__ = [
    "GraphObject",
    "GraphObjectQuerier",
    "GraphObjectChecker",
    "ComponentTripler",
    "IdentifierMissingException",
]

EMPTY_SET = frozenset([])


[docs]class Variable(int): """ A marker used in `GraphObjectQuerier` for variables in a query """
class _Range(InRange): pass
[docs]class GraphObject(object): """ An object which can be included in the object graph. An abstract base class. """ def __init__(self, **kwargs): super(GraphObject, self).__init__(**kwargs) self.properties = [] self.owner_properties = [] @property def identifier(self): """ Must return an object representing this object or else raise an Exception. """ raise NotImplementedError() @property def defined(self): """ Returns true if an :meth:`identifier` would return an identifier """ raise NotImplementedError()
[docs] def variable(self): """ Must return a `~owmeta_core.graph_object.Variable` object that identifies this `.GraphObject` in queries. The variable can be randomly generated when the object is created and stored in the object. """ raise NotImplementedError()
@property def idl(self): if self.defined: return self.identifier else: return self.variable() def __hash__(self): raise NotImplementedError() def __eq__(self, other): if id(self) == id(other): return True elif isinstance(other, GraphObject): return self.idl == other.idl def __lt__(self, other): if isinstance(other, GraphObject): return self.idl < other.idl else: return id(self) < id(other)
[docs]class GraphObjectChecker(object): ''' Checks the graph of defined GraphObjects for ''' def __init__(self, query_object, graph, sort_first=False): self.query_object = query_object self.graph = graph def __call__(self): tripler = ComponentTripler(self.query_object) L.debug('GOC: Checking %s', self.query_object) for x in sorted(tripler()): if x not in self.graph: L.debug('GOC: Failed on %s', x) return False return True
class GraphObjectValidator(object): def __init__(self, query_object, graph): self.query_object = query_object self.graph = graph def __call__(self): return True
[docs]class GraphObjectQuerier(object): """ Performs queries for objects in the given graph. The querier queries for objects at the center of a star graph. In SPARQL, the query has the form:: SELECT ?x WHERE { ?x <p1> ?o1 . ?o1 <p2> ?o2 . ... ?on <pn> <a> . ?x <q1> ?n1 . ?n1 <q2> ?n2 . ... ?nn <qn> <b> . } It is allowed that ``<px> == <py>`` for ``x != y``. Queries such as:: SELECT ?x WHERE { ?x <p1> ?o1 . ... ?on <pn> ?y . } or:: SELECT ?x WHERE { ?x <p1> ?o1 . ... ?on <pn> ?x . } or:: SELECT ?x WHERE { ?x ?z ?o . } or:: SELECT ?x WHERE { ?x ?z <a> . } are not supported and will be ignored without error. """ def __init__(self, q, graph, hop_scorer=None): """ Call the GraphObjectQuerier object to perform the query. Parameters ---------- q : :class:`GraphObject` The object which is queried on graph : :class:`object` The graph from which the objects are queried. Must implement a method :meth:`triples` that takes a triple pattern, ``t``, and returns a set of triples matching that pattern. The pattern for ``t`` is ``t[i] = None``, 0 <= i <= 2, indicates that the i'th position can take any value. The ``graph`` method can optionally implement the 'range query' 'interface': the graph must have a property ``supports_range_queries`` equal to `True` and :meth:`triples` must accept an `~owmeta_core.ranged_objects.InRange` object in the object position of the query triple, but only for literals hop_scorer : callable Returns a score for a hop (a four-tuple, (subject, predicate, object, target)) indicating how selective the query would be for that hop, with lower numbers being more selective. In general the score should only take the given hop into account -- it should not take previously given hops into account when calculating a score. """ self.query_object = q L.debug('GOQ graph %s', graph) self.graph = default_tq_layers(graph) self.results = dict() self.triples_cache = dict() self.hop_scorer = hop_scorer def do_query(self): L.debug('do_query: Graph %s', self.graph) if self.query_object.defined: L.debug('do_query: Query object %s is already defined', self.query_object) gv = GraphObjectChecker(self.query_object, self.graph) if gv(): return set([self.query_object.identifier]) else: L.debug('do_query: Query graph does not align with the backing graph') return EMPTY_SET qp = _QueryPreparer(self.query_object) paths = qp() if len(paths) == 0: return EMPTY_SET h = self.merge_paths(paths) if L.isEnabledFor(logging.DEBUG): L.debug('do_query: merge_paths_result:\n%s', self._format_merged(h)) return self.query_path_resolver(h)
[docs] def merge_paths(self, l): """ Combines a list of lists into a multi-level table with the elements of the lists as the keys. For given:: [[a, b, c], [a, b, d], [a, e, d]] merge_paths returns:: {a: {b: {c: {}, d: {}}, e: {d: {}}}} """ res = dict() if L.isEnabledFor(logging.DEBUG): L.debug("merge_paths: path %s", _format_paths(l)) for x in l: if len(x) > 0: tmp = res.get(x[0], []) tmp.append(x[1:]) res[x[0]] = tmp for x in res: res[x] = self.merge_paths(res[x]) return res
def _format_merged(self, merge, depth=0): sio = six.StringIO() for triple, remainder in merge.items(): idx = triple.index(None) other_idx = 0 if (idx == 2) else 2 print((depth * 4 * ' ') + str(triple[1]) + '->' + str(triple[other_idx]), file=sio) print(self._format_merged(remainder, depth+1), file=sio, end='') return sio.getvalue() def query_path_resolver(self, path_table): join_args = [] goal = None for hop in sorted(path_table.keys(), key=self.score): L.debug("query_path_resolver: hop %s", hop) goal = hop[3] self._qpr_helper(path_table[hop], hop, join_args) if len(join_args) == 1: return join_args[0] elif len(join_args) > 0: L.debug("Joining %s args on %s", len(join_args), goal) join_args = sorted(join_args, key=len) res = join_args[0] res.intersection_update(*join_args[1:]) L.debug("Joined %s(sizes=%s) args on %s. Result size = %s", len(join_args), [len(s) for s in join_args], goal, len(res)) return res else: return EMPTY_SET def _qpr_helper(self, sub, search_triple, join_args): seen = set() try: L.debug("_qpr_helper: sub %s, search_triple %s", sub, search_triple) idx = search_triple.index(None) other_idx = 0 if (idx == 2) else 2 qx = None if isinstance(search_triple[other_idx], Variable): sub_results = list(self.query_path_resolver(sub)) L.debug("_qpr_helper: sub_results %s", sub_results) if idx == 2: qx = (sub_results, search_triple[1], None) else: qx = (None, search_triple[1], sub_results) if sub_results: trips = self.triples_choices(qx) else: trips = iter(()) else: # join_args is assumed to be sorted such that it the most selective query was executed first, so we # should be able to profitably call triples_choices to reduce the size of our branch L.debug("_qpr_helper: joining...") if join_args: # We use the last-added join_arg. It should be the smallest at this point last_join = join_args[-1] if last_join: tl = (list(last_join),) if idx == 2: qx = search_triple[:2] + tl else: qx = tl + search_triple[1:3] trips = self.triples_choices(qx) else: # triples_choices treats [] as wildcard, but for us it's a 'match nothing', so... trips = iter(()) else: qx = search_triple[:3] trips = self.triples(qx) seen = set(y[idx] for y in trips) if L.isEnabledFor(logging.DEBUG): L.debug("_qpr_helper: Done with search_triple %s -> qx %s with %d seen", search_triple, qx, len(seen)) finally: join_args.append(seen) def score(self, hop): if self.hop_scorer is not None: return self.hop_scorer(hop) return 0 def triples_choices(self, query_triple): return self.graph.triples_choices(query_triple) def triples(self, query_triple): return self.graph.triples(query_triple) def __call__(self): return self.do_query()
def _format_paths(paths): sio = six.StringIO() for path in paths: for triple in path: idx = triple.index(None) other_idx = 0 if (idx == 2) else 2 direction = '' if idx == 2 else '^' print('->' + str(triple[1]) + direction + '->' + str(triple[other_idx]), file=sio, end='') print(file=sio) return sio.getvalue()
[docs]class ComponentTripler(object): """ Gets a set of triples that are connected to the given object by objects which have an identifier. The ComponentTripler does not query against a backing graph, but instead uses the properties attached to the object. """ def __init__(self, start, traverse_undefined=False, generator=False): self.start = start self.seen = set() self.generator = generator self.traverse_undefined = traverse_undefined def g(self, current_node, i=0): if not self.see_node(current_node): if self.traverse_undefined or current_node.defined: for x in chain(self.recurse_upwards(current_node, i), self.recurse_downwards(current_node, i)): yield x def recurse_upwards(self, current_node, depth): for prop in current_node.owner_properties: for x in self.recurse(prop.owner, prop, current_node, UP, depth): yield x def recurse_downwards(self, current_node, depth): for prop in current_node.properties: for val in prop.values: for x in self.recurse(current_node, prop, val, DOWN, depth): yield x def recurse(self, lhs, via, rhs, direction, depth): (ths, nxt) = (rhs, lhs) if direction is UP else (lhs, rhs) if self.traverse_undefined or nxt.defined: yield (lhs.idl, via.link, rhs.idl) for x in self.g(nxt, depth + 1): yield x def see_node(self, node): node_id = id(node) if node_id in self.seen: return True else: self.seen.add(node_id) return False def __call__(self): x = self.g(self.start) if self.generator: return x else: return set(x)
class _QueryPathElement(tuple): def __new__(cls): return tuple.__new__(cls, ([], [])) @property def subpaths(self): return self[0] @subpaths.setter def subpaths(self, toset): del self[0][:] self[0].extend(toset) @property def path(self): return self[1] class _QueryPreparer(object): def __init__(self, start): self.seen = list() self.stack = list() self.paths = list() self.start = start self.variables = dict() self.vcount = 0 # TODO: Refactor. The return values are not actually # used for anything def gather_paths_along_properties( self, current_node, property_list, direction): L.debug("gpap: current_node %s", current_node) ret = [] is_good = False for this_property in property_list: L.debug("this_property is %s", this_property) if direction is UP: others = [this_property.owner] else: others = this_property.values for other in others: other_id = other.idl if isinstance(other, InRange): other_id = _Range(other.min_value, other.max_value) elif not other.defined: other_id = self.var(other_id) if direction is UP: self.stack.append((other_id, this_property.link, None, current_node)) else: self.stack.append((None, this_property.link, other_id, current_node)) L.debug("gpap: preparing %s from %s", other, this_property) subpath = self.prepare(other) if len(self.stack) > 0: self.stack.pop() if subpath[0]: is_good = True subpath[1].path.insert( 0, (current_node.idl, this_property, other.idl)) ret.insert(0, subpath[1]) L.debug("gpap: exiting %s", "good" if is_good else "bad") return is_good, ret def var(self, v): if v in self.variables: return self.variables[v] else: var = Variable(self.vcount) self.variables[v] = var self.vcount += 1 return var def prepare(self, current_node): L.debug("prepare: current_node %s", repr(current_node)) if current_node.defined or isinstance(current_node, InRange): if len(self.stack) > 0: self.paths.append(list(self.stack)) return True, _QueryPathElement() else: if current_node in self.seen: return False, _QueryPathElement() else: self.seen.append(current_node) owner_parts = self.gather_paths_along_properties( current_node, current_node.owner_properties, UP) owned_parts = self.gather_paths_along_properties( current_node, current_node.properties, DOWN) self.seen.pop() subpaths = owner_parts[1] + owner_parts[1] if len(subpaths) == 1: ret = subpaths[0] else: ret = _QueryPathElement() ret.subpaths = subpaths return (owner_parts[0] or owned_parts[0], ret) def __call__(self): x = self.prepare(self.start) L.debug("self.prepare() result:" + str(x)) L.debug("_QueryPreparer paths:" + str(_format_paths(self.paths))) return self.paths
[docs]class DescendantTripler(object): """ Gets triples that the object points to, optionally transitively. """ def __init__(self, start, graph=None, transitive=True): """ Parameters ---------- start : GraphObject the node to start from graph : rdflib.graph.Graph, optional if given, the graph to draw descedants from. Otherwise the object graph is used """ self.seen = set() self.seen_edges = set() self.start = start self.graph = graph self.results = list() self.transitve = transitive def g(self, current_node): if current_node in self.seen: return else: self.seen.add(current_node) if not current_node.defined: return if self.graph is not None: for triple in self.graph.triples((current_node.idl, None, None)): self.results.append(triple) if self.transitve: self.g(_DTWrapper(triple[2])) else: for e in current_node.properties: if id(e) not in self.seen_edges: self.seen_edges.add(id(e)) for val in e.values: if val.defined: self.results.append((current_node.idl, e.link, val.idl)) if self.transitve: self.g(val) def __call__(self): self.g(self.start) return self.results
class _DTWrapper(): """ Used by DescendantTripler to wrap identifiers in GraphObjects """ defined = True __slots__ = ['idl'] def __init__(self, ident): self.idl = ident def __hash__(self): return hash(self.idl) def __eq__(self, other): if type(other) == type(self): return (other is self) or (other.idl == self.idl) else: return False
[docs]class LegendFinder(object): """ Gets a list of the objects which can not be deleted freely from the transitive closure. Essentially, this is the 'mark' phase of the "mark-and-sweep" garbage collection algorithm. "Heroes get remembered, but legends never die." """ def __init__(self, start, graph=None): self.talked_about = dict() self.seen = set() self.start = start self.graph = graph def legends(self, o, depth=0): if o in self.seen: return self.seen.add(o) for prop in o.properties: for value in prop.values: if value != self.start: count = self.count(value) self.talked_about[value] = count - 1 self.legends(value, depth + 1) def count(self, o): if o in self.talked_about: return self.talked_about[o] else: i = 0 if self.graph is not None: for _ in self.graph.triples((None, None, o.idl)): i += 1 else: for prop in o.owner_properties: if prop.owner.defined: i += 1 return i def __call__(self): self.legends(self.start) return {x for x in self.talked_about if self.talked_about[x] > 0}
class HeroTripler(object): def __init__(self, start, graph=None, legends=None): self.seen = set() self.start = start self.heroslist = set() self.results = set() self.graph = graph if legends is None: self.legends = LegendFinder(self.start, graph)() else: self.legends = legends def isLegend(self, o): return o in self.legends def isHero(self, o): return o in self.heroslist def heros(self, o, depth=0): if o in self.seen: return self.seen.add(o) for prop in o.properties: for value in prop.values: if not self.isLegend(value): self.heros(value, depth + 1) self.hero(value) def hero(self, o): if not self.isHero(o): if self.graph is not None: for trip in self.graph.triples((o.idl, None, None)): self.results.add(trip) else: for e in o.properties: for val in e.values: if val.defined: self.results.add((o.idl, e.link, val.idl)) self.heroslist.add(o) def __call__(self): self.heros(self.start) self.hero(self.start) return self.results class ReferenceTripler(object): def __init__(self, start, graph=None): self.seen = set() self.seen_edges = set() self.start = start self.results = set() self.graph = graph def refs(self, o): if self.graph is not None: for trip in chain( self.graph.triples( (None, None, o.idl)), self.graph.triples( (o.idl, None, None))): self.results.add(trip) else: for e in o.properties: if (DOWN, id(e)) not in self.seen_edges: self.seen_edges.add((DOWN, id(e))) for val in e.values: if val.defined: self.results.add((o.idl, e.link, val.idl)) for e in o.owner_properties: if (UP, id(e)) not in self.seen_edges: self.seen_edges.add((UP, id(e))) if e.owner.defined: self.results.add((e.owner.idl, e.link, o.idl)) def __call__(self): self.refs(self.start) return self.results
[docs]class IdentifierMissingException(Exception): """ Indicates that an identifier should be available for the object in question, but there is none """ def __init__(self, dataObject="[unspecified object]", *args, **kwargs): super(IdentifierMissingException, self).__init__( "An identifier should be provided for {}".format(str(dataObject)), *args, **kwargs)