Source code for owmeta_core.rdf_utils

from __future__ import print_function

from rdflib.graph import Graph
from rdflib.term import Literal, URIRef
from rdflib.store import Store

# Directions for traversal across triples
UP = 'up'
''' Object to Subject direction for traversal across triples. '''

DOWN = 'down'
''' Subject to Object direction for traversal across triples. '''


def print_graph(g, hide_namespaces=False):
    s = g.serialize(format='n3').decode("UTF-8")
    if hide_namespaces:
        lines = s.splitlines()
        s = "\n".join(l for l in lines if not l.startswith("@prefix"))
    print(s)


def serialize_rdflib_term(x, namespace_manager=None):
    return x.n3(namespace_manager)


def deserialize_rdflib_term(x):
    if isinstance(x, Literal):
        x = x.toPython()
        if isinstance(x, Literal):
            x = str(x)
    return x


def triple_to_n3(trip, namespace_manager=None):
    p = ''
    ns = set([])
    for x in trip:
        s = serialize_rdflib_term(x, namespace_manager)
        if isinstance(x, URIRef) and s[0] != '<':
            ns.add(s.split(':', 1)[0])
        elif isinstance(x, Literal) and '^^' in s and s[-1] != '>':
            ns.add(s.split('^^', 1)[1].split(':', 1)[0])

        p += s + ' '
    return p


def triples_to_bgp(trips, namespace_manager=None, show_namespaces=False):
    # XXX: Collisions could result between the variable names of different
    # objects
    g = ""
    ns = set([])
    for y in trips:
        g += triple_to_n3(y) + ".\n"

    if (namespace_manager is not None) and show_namespaces:
        g = "".join('@prefix ' + str(x) + ': ' + y.n3() + ' .\n'
                    for x, y
                    in namespace_manager.namespaces()
                    if x in ns) + g

    return g


_none_singleton_set = frozenset([None])


[docs]def transitive_lookup(graph, start, predicate, context=None, direction=DOWN, seen=None): ''' Do a transitive lookup over an `rdflib.graph.Graph` or `rdflib.store.Store` In other words, finds all resources which relate to `start` through zero or more `predicate` relationships. `start` itself will be included in the return value. Loops in the input `graph` will not cause non-termination. Parameters ---------- graph : rdflib.graph.Graph or rdflib.store.Store The graph to query start : rdflib.term.Identifier The resource in the graph to start from predicate : rdflib.term.URIRef The predicate relating terms in the closure context : rdflib.graph.Graph or rdflib.term.URIRef The context in which the query should run. Optional direction : DOWN or UP The direction in which to traverse seen : set of rdflib.term.Identifier A set of terms which have already been "seen" by the algorithm. Useful for repeated calls to `transitive_lookup`. Note: if the `start` is in `seen`, queries from `start` will still be done, but any items in the result of *those* queries will not be queried for if in `seen`. Optional Returns ------- set of rdflib.term.Identifier resources in the transitive closure of `predicate` from `start` ''' if seen: res = seen else: res = set() border = set([start]) while border: new_border = set() if direction is DOWN: qx = (list(border), predicate, None) idx = 2 else: qx = (None, predicate, list(border)) idx = 0 for t in graph.triples_choices(qx, context=context): if isinstance(t[0], tuple): o = t[0][idx] else: o = t[idx] if o not in res: new_border.add(o) res |= border border = new_border res -= _none_singleton_set return res
[docs]class BatchAddGraph(object): ''' Wrapper around graph that turns calls to 'add' into calls to 'addN' ''' def __init__(self, graph, batchsize=1000, _parent=None, *args, **kwargs): self.graph = graph self.g = (graph,) if _parent: self.batch = _parent.batch self.batchsize = _parent.batchsize self._parent = _parent else: self.batchsize = batchsize self._parent = None self.reset() def reset(self): self.batch = [] self._count = 0 @property def count(self): if self._parent: return self._parent.count else: return self._count @count.setter def count(self, value): if self._parent: self._parent.count = value else: self._count = value def add(self, triple): if self.count > 0 and self.count % self.batchsize == 0: self.graph.addN(self.batch) self.batch = [] self.count += 1 self.batch.append(triple + self.g) def get_context(self, ctx): return BatchAddGraph(self.graph.get_context(ctx), _parent=self) def __enter__(self): self.reset() return self def __exit__(self, *exc): if exc[0] is None: self.graph.addN(self.batch)
transitive_subjects = transitive_lookup ''' Alias to `transitive_lookup` ''' class ContextSubsetStore(Store): # Returns triples imported by the given context context_aware = True def __init__(self, store, **kwargs): super(ContextSubsetStore, self).__init__(**kwargs) self.__store = store self.__context_ids = None def init_contexts(self): raise NotImplementedError def __init_contexts(self): if self.__context_ids is None: self.__context_ids = self.init_contexts() def triples(self, pattern, context=None): self.__init_contexts() ctx = self._determine_context(context) if ctx is _BAD_CONTEXT: return # If the sum of lengths of the selected contexts is less than total number of # triples, query each context in series for t in self.__store.triples(pattern, ctx): contexts = set(getattr(c, 'identifier', c) for c in t[1]) if self.__context_ids: inter = self.__context_ids & contexts else: inter = contexts if inter: yield t[0], inter def remove(self, pattern, context=None): self.__init_contexts() ctx = self._determine_context(context) if ctx is _BAD_CONTEXT: return for t in self.__store.triples(pattern, ctx): triple = t[0] contexts = set(getattr(c, 'identifier', c) for c in t[1]) if self.__context_ids: inter = self.__context_ids & contexts else: inter = contexts for ctx in inter: self.__store.remove((triple[0], triple[1], triple[2]), ctx) def triples_choices(self, pattern, context=None): self.__init_contexts() ctx = self._determine_context(context) if ctx is _BAD_CONTEXT: return for t in self.__store.triples_choices(pattern, ctx): contexts = set(getattr(c, 'identifier', c) for c in t[1]) if self.__context_ids: inter = self.__context_ids & contexts else: inter = contexts if inter: yield t[0], inter def _determine_context(self, context): # This is a method that has to contend with RDFLib's abiding confusion over # whether Store's should return Graphs. This is stupid, because of course they # shouldn't, but RDFLib acts like they should...and so here we are context_id = getattr(context, 'identifier', context) if context_id is not None and context_id not in self.__context_ids: return _BAD_CONTEXT if len(self.__context_ids) == 1 and context_id is None: # Micro-benchmarked this with timeit: it's faster than tuple(s)[0] and # next(iter(s),None) for context_id in self.__context_ids: break if context_id is None: return None # We shouldn't be querying from this store, but we pass in ourselves as the store # to prevent RDFLib from making a new memory story return Graph(identifier=context_id, store=self) def contexts(self, triple=None): if triple is not None: for x in self.triples(triple): for c in x[1]: yield getattr(c, 'identifier', c) else: self.__init_contexts() for c in self.__context_ids: yield c def namespace(self, prefix): return self.__store.namespace(prefix) def prefix(self, uri): return self.__store.prefix(uri) def bind(self, prefix, namespace, override=True): return self.__store.bind(prefix, namespace, override=override) def namespaces(self): for x in self.__store.namespaces(): yield x def __str__(self): return f'{type(self).__name__}(store={self.__store})' _BAD_CONTEXT = object()