Source code for owmeta_core.rdf_query_modifiers

from itertools import chain
import logging

import rdflib as R
from rdflib.namespace import RDFS, RDF

from .utils import FCN
from .rdf_utils import UP, DOWN, transitive_subjects
from .ranged_objects import InRange


L = logging.getLogger(__name__)


class ZeroOrMore(object):
    def __init__(self, identifier, predicate, index, direction=DOWN):
        self.identifier = identifier
        self.predicate = predicate
        self.direction = direction
        self.direction = direction
        self.index = index

    def __repr__(self):
        return "{}({}, {}, {})".format(FCN(type(self)),
                                       repr(self.identifier),
                                       repr(self.predicate),
                                       repr(self.direction))


class SubClassModifier(ZeroOrMore):

    def __init__(self, rdf_type):
        super().__init__(rdf_type, R.RDFS.subClassOf, 2, UP)

    def __repr__(self):
        return FCN(type(self)) + '(' + repr(self.identifier) + ')'


class SubPropertyOfModifier(ZeroOrMore):

    def __init__(self, rdf_property):
        super().__init__(rdf_property, R.RDFS.subPropertyOf, 1, direction=UP)

    def __repr__(self):
        return FCN(type(self)) + '(' + repr(self.identifier) + ')'


[docs]def rdfs_subclassof_zom_creator(target_type): ''' Creates a function used by `ZeroOrMoreTQLayer` to determine if a query needs to be augmented to retrieve sub-classes of a *given* RDF type ''' def helper(triple): if target_type == triple[2] is not None and triple[1] == R.RDF.type: return SubClassModifier(triple[2]) return helper
[docs]def rdfs_subpropertyof_zom(super_property): ''' Argument to `ZeroOrMoreTQLayer`. Adds sub-properties of the given property to triple queries ''' def helper(triple): if triple[1] == super_property: return SubPropertyOfModifier(super_property) return helper
[docs]def rdfs_subclassof_zom(triple): ''' Argument to `ZeroOrMoreTQLayer`. Adds sub-classes to triple queries for an rdf:type ''' if triple[2] is not None and triple[1] == R.RDF.type: return SubClassModifier(triple[2])
def rdfs_subclassof_subclassof_zom_creator(rdf_type): def helper(triple): ''' Argument to `ZeroOrMoreTQLayer`. Adds sub-classes to triple queries for an rdfs:subClassOf ''' if (triple[1] == R.RDFS.subClassOf and triple[2] is not None and (rdf_type == triple[2] or (isinstance(triple[2], list) and rdf_type in triple[2]))): return SubClassModifier(triple[2]) return helper
[docs]class TQLayer(object): ''' Triple Query Layer. Wraps a graph or another `TQLayer` to do something to the `triples` and `triples_choices` query or the result of the query. ''' _NADA = object() def __init__(self, nxt=None): ''' Parameters ---------- nxt : TQLayer or rdflib.graph.Graph The "next" or "lower" layer that this layer modifies ''' self.next = nxt def triples(self, qt, context=None): return self.next.triples(qt) def triples_choices(self, qt, context=None): return self.next.triples_choices(qt) def __contains__(self, qt): ''' This should be overridden -- the default implementation just asks the next layer if it contains the triple. ''' return qt in self.next def __getattr__(self, attr): ''' By default, if this layer doesn't have the given attribute, then the attribute will be looked up on the next layer. ''' res = getattr(super(TQLayer, self), attr, TQLayer._NADA) if res is TQLayer._NADA: return getattr(self.next, attr) def __repr__(self): return FCN(type(self)) + '(' + repr(self.next) + ')' def __str__(self): return FCN(type(self)) + '(' + str(self.next) + ')'
[docs]class TerminalTQLayer(object): ''' A TQLayer that has no "next". May be useful to create a layer that stands in place of a `~rdflib.graph.Graph`. ''' @property def next(self): raise AttributeError(str(type(self)) + ' has no next layer') @next.setter def next(self, val): raise AttributeError(str(type(self)) + ' has no next layer') def triples(self, qt, context=None): raise NotImplementedError() def triples_choices(self, qt, context=None): raise NotImplementedError()
[docs]class RangeTQLayer(TQLayer): ''' A layer that understands ranges in the object position of a triple. If the next layer has the `supports_range_queries` attribute set to `True`, then the range is passed down as-is ''' def triples(self, query_triple, context=None): if isinstance(query_triple[2], InRange): in_range = query_triple[2] if in_range.defined: if getattr(self.next, 'supports_range_queries', False): return self.next.triples(query_triple, context) else: qt = (query_triple[0], query_triple[1], None) return set(x for x in self.next.triples(qt, context) if in_range(x[2])) else: qt = (query_triple[0], query_triple[1], None) return self.next.triples(qt, context) else: return self.next.triples(query_triple, context) def triples_choices(self, query_triple, context=None): if isinstance(query_triple[2], InRange): in_range = query_triple[2] qt = (query_triple[0], query_triple[1], None) if in_range.defined: # XXX: Assuming triples_choices does not also support range # queries. return set(x for x in self.next.triples_choices(qt, context) if in_range(x[2])) else: return self.next.triples_choices(qt, context) else: return self.next.triples_choices(query_triple, context)
class ZeroOrMoreTQLayer(TQLayer): def __init__(self, transformer, *args): ''' Parameters ---------- transformer : `callable` Takes a triple and returns an object describing the relationship or `None`. If an object is returned it must have `predicate`, `identifier`, `direction`, and `index` attributes. - `identifier` is the identifier to start from - `predicate` is the predicate to traverse - `direction` is the direction of traversal: Either `~owmeta_core.rdf_utils.DOWN` for subject -> object or `~owmeta_core.rdf_utils.UP` for object -> subject - `index` is the index in the triple for which a closure should be looked up *args : other arguments Go to `TQLayer` init ''' super(ZeroOrMoreTQLayer, self).__init__(*args) self._tf = transformer def triples(self, query_triple, context=None): match = self._tf(query_triple) if not match: return self.next.triples(query_triple, context) qx = list(query_triple) if match.identifier is not None: matches = list(transitive_subjects(self.next, match.identifier, match.predicate, context, match.direction)) qx[match.index] = matches L.debug('augmented query %s', qx) results = self.next.triples_choices(tuple(qx), context) return self._zom_result_helper(results, match, context, set(matches)) def triples_choices(self, query_triple, context=None): match = self._tf(query_triple) if not match: L.debug('No match %s', query_triple) return self.next.triples_choices(query_triple, context) qx = list(query_triple) iters = [] # XXX: We should, maybe, apply some stats or heuristics here to determine which list to iterate over. if isinstance(match.identifier, list): assert isinstance(qx[match.index], list), ('If there is more than one' ' matching identifier, the list in the query triple must be in the same' ' position') matches = set(qx[match.index]) for match_id in match.identifier: matches |= set(transitive_subjects(self.next, match_id, match.predicate, context, match.direction, matches)) qx[match.index] = list(matches) iters.append(self.next.triples_choices(tuple(qx), context)) elif match.identifier is not None: matches = set(transitive_subjects(self.next, match.identifier, match.predicate, context, match.direction)) for sub in matches: qx[match.index] = sub iters.append(self.next.triples_choices(tuple(qx), context)) else: matches = None iters.append(self.next.triples_choices(query_triple, context)) return self._zom_result_helper(chain(*iters), match, context, matches) def _zom_result_helper(self, results, match, context, limit): zomses = dict() direction = DOWN if match.direction is UP else DOWN predicate = match.predicate index = match.index L.debug('ZeroOrMoreTQLayer: start %r direction %s', match, direction) # The results from the original query are augmented here to "entail" results in # the "reverse" direction that are implied by the "forward" direction. For # instance, if I request everything with a type that's rdfs:Resource, I'll get all # type statements we have subclass relationships for with the modified query, but # I'll be missing the inferred types. We rectify that below # for tr in results: L.debug('ZeroOrMoreTQLayer: match %r result %r', match, tr) zoms = zomses.get(tr[index]) if zoms is None: zoms = set(transitive_subjects(self.next, tr[index], predicate, context, direction)) if limit: zoms = zoms & limit zomses[tr[index]] = zoms for z in zoms: yield tuple(x if x is not tr[index] else z for x in tr) def __contains__(self, query_triple): try: next(self.triples(query_triple)) return True except StopIteration: return False
[docs]class ContainerMembershipIsMemberTQLayer(TQLayer): ''' Adds a triple into the results for rdfs:subPropertyOf(rdfs:member) relationships for all known ContainerMembershipProperty instances ''' def triples(self, query_triple, context=None): iters = [self.next.triples(query_triple, context)] if query_triple[1] == RDFS.subPropertyOf and query_triple[2] == RDFS.member: iters.append((t[0], RDFS.subPropertyOf, RDFS.member) for t in self.next.triples((None, RDF.type, RDFS.ContainerMembershipProperty), context)) return chain(*iters) def triples_choices(self, query_triple, context=None): iters = [self.next.triples_choices(query_triple, context)] if (query_triple[1] == RDFS.subPropertyOf and (query_triple[2] == RDFS.member or (isinstance(query_triple[2], list) and RDFS.member in query_triple[2]))): iters.append((t[0], RDFS.subPropertyOf, RDFS.member) for t in self.next.triples((None, RDF.type, RDFS.ContainerMembershipProperty), context)) return chain(*iters)
_default_tq_layers_list = [ RangeTQLayer, ] def default_tq_layers(base): res = base for layer in reversed(_default_tq_layers_list): res = layer(res) return res