Source code for owmeta_core.rdf_query_util

import logging

import rdflib

from .graph_object import GraphObjectQuerier
from .rdf_query_modifiers import (ZeroOrMoreTQLayer,
                                  rdfs_subclassof_zom_creator)

L = logging.getLogger(__name__)


def goq_hop_scorer(hop):
    if hop[1] == rdflib.RDF.type:
        return 1
    return 0


[docs]def load_base(graph, idents, target_type, context, resolver): ''' Loads a set of objects from an RDF graph given their identifiers Parameters ---------- graph : rdflib.graph.Graph The graph to query from idents : list of rdflib.term.URIRef A list of identifiers to convert into objects target_type : rdflib.term.URIRef URI of the target type. Any result will be a sub-class of this type context : object Limits the scope of the query to statements within or entailed by this context. Notionally, it's a owmeta_core.context.Context instance resolver : .rdf_type_resolver.RDFTypeResolver Handles some of the mappings ''' L.debug("load_base: graph %s target_type %s context %s resolver %s", graph, target_type, context, resolver) if not idents: return grouped_types = dict() # We don't use a subclassof ZOM layer for this query since we are going to get the # "most specific" type, which will have to be one declared explicitly L.debug("querying %s types in %s", idents, graph) idents = list(idents) ids_missing_types = set(idents) for ident, _, rdf_type in graph.triples_choices((idents, rdflib.RDF['type'], None)): t = grouped_types.get(ident, None) if t is None: ids_missing_types.remove(ident) grouped_types[ident] = set([rdf_type]) else: t.add(rdf_type) if ids_missing_types: raise MissingRDFTypeException('Could not recover a type declaration for' f' {ids_missing_types}') for ident, types in grouped_types.items(): the_type = resolver.type_resolver(graph, types, base=target_type) if the_type is None: raise MissingRDFTypeException( f'The type resolver could not recover a type for {ident}' f' from {types} constrained to {target_type}') yield resolver.id2ob(ident, the_type, context)
[docs]def load_terms(graph, start, target_type): ''' Loads a set of terms based on the object graph starting from `start` Parameters ---------- graph : rdflib.graph.Graph The graph to query from start : .graph_object.GraphObject The graph object to start the query from target_type : rdflib.term.URIRef URI of the target type. Any result will be a sub-class of this type ''' L.debug("load: start %s, target_type %s, graph %s", start, target_type, graph.store) graph = ZeroOrMoreTQLayer(rdfs_subclassof_zom_creator(target_type), graph) return GraphObjectQuerier(start, graph, hop_scorer=goq_hop_scorer)()
[docs]def load(graph, start, target_type, *args): ''' Loads a set of objects based on the graph starting from `start` Parameters ---------- graph : rdflib.graph.Graph The graph to query from start : .graph_object.GraphObject The graph object to start the query from target_type : rdflib.term.URIRef URI of the target type. Any result will be a sub-class of this type ''' idents = load_terms(graph, start, target_type) return load_base(graph, idents, target_type, *args)
[docs]def get_most_specific_rdf_type(graph, types, base=None): ''' Find the RDF type that isn't a sub-class of any other, constrained to be a sub-class of `base` if that is provided. Parameters ---------- graph : rdflib.graph.Graph The graph to query rdfs:subClassOf relationships types : list of rdflib.term.URIRef The types to query base : rdflib.term.URIRef The "base" type See Also -------- RDFTypeResolver ''' if len(types) == 1 and (not base or (base,) == tuple(types)): return tuple(types)[0] if not types and base: return base most_specific_types = _gmsrt_helper(graph, types, base) if len(most_specific_types) == 1: return most_specific_types.pop() else: L.warning(('No most-specific type could be determined among %s' ' constrained to subclasses of %r'), types, base) return None
def _gmsrt_helper(graph, start, base=None): # Finds the most specific (furthest to the left in the chain of subClassOf # relationships), then confirms that the resulting set is a sub-class of the base if # one is given res = set(start) border = set(start) subclasses = {s: {s} for s in start} hit = True while len(res) > 1: new_border = set() itr = graph.triples_choices((list(border), rdflib.RDFS.subClassOf, None)) hit = False for item in itr: if isinstance(item[0], tuple): # If we retrieved from a rdflib.store.Store instead of a Graph, then we # have to get the first element of the pair `item` to get the actual triple triple = item[0] else: triple = item subj, _, obj = triple if obj != subj: obj_subclasses = subclasses.get(obj, None) if obj_subclasses is None: subclasses[obj] = obj_subclasses = set() obj_subclasses |= subclasses[subj] res.discard(obj) hit = True new_border.add(obj) if not hit: break border = new_border if base is not None: if not hit: # If hit is False, then we've stopped because no more super-classes were found, so # base *has* to be in here or else none of our types are any good res &= subclasses.get(base, set()) else: # We exited because we eliminated all of the other types (or only had one to # begin with), but keep going to make sure we have the base. seen = set(border) while border: if base in border: break border = {o for _, _, o in graph.triples_choices((list(border), rdflib.RDFS.subClassOf, None)) if o not in seen} seen |= border else: # no break # We never found the base class in super-classes of our result set, so we # have to drop the result set res = set() return res
[docs]def oid(identifier_or_rdf_type, rdf_type, context, base_type=None): """ Create an object from its rdf type Parameters ---------- identifier_or_rdf_type : rdflib.term.URIRef If `rdf_type` is provided, then this value is used as the identifier for the newly created object. Otherwise, this value will be the :attr:`rdf_type` of the object used to determine the Python type and the object's identifier will be randomly generated. rdf_type : rdflib.term.URIRef If provided, this will be the :attr:`rdf_type` of the newly created object. context : Context, optional The context to resolve a class from base_type : type The base type Returns ------- The newly created object """ identifier = identifier_or_rdf_type if rdf_type is None: rdf_type = identifier_or_rdf_type identifier = None cls = None if context is not None: cls = context.resolve_class(rdf_type) if cls is None and context is not None: for types in _superclass_iter(context.rdf_graph(), rdf_type): for typ in types: cls = context.resolve_class(typ) if cls is not None: break if cls is not None: break if cls is None: cls = base_type # if its our class name, then make our own object # if there's a part after that, that's the property name if context is not None: cls = context(cls) if identifier is not None: o = cls.query(ident=identifier, no_type_decl=True) else: o = cls.query() return o
def _superclass_iter(graph, start): ''' Generate up the super-classes of this type ''' border = set([start]) seen = set([start]) while True: new_border = set() for t in graph.triples_choices((list(border), rdflib.RDFS.subClassOf, None)): if t[2] not in seen: new_border.add(t[2]) seen.add(t[2]) if border == new_border: break if not new_border: break yield new_border border = new_border
[docs]class MissingRDFTypeException(Exception): ''' Raised when we were looking for an RDF type couldn't find one '''