Source code for owmeta_core.agg_store

from rdflib import plugin
from rdflib.store import Store, NO_STORE


[docs]class AggregateStore(Store): ''' A read-only aggregate of RDFLib `stores <rdflib.store.Store>` ''' context_aware = True ''' Specified by RDFLib. Required to be `True` for `~rdflib.graph.ConjunctiveGraph` stores. Aggregated stores MUST be context-aware. This is enforced by :meth:`open`. ''' graph_aware = True ''' Specified by RDFLib. Required to be `True` for `~rdflib.graph.Dataset` stores. The first store must be graph-aware. This is enforced by :meth:`open`. ''' # Unlike "awareness" attributes, checking for support of range queries is handled # after the store is open, so we don't care if we need to change it to `True` later. supports_range_queries = False def __init__(self, configuration=None, identifier=None, graph_aware=None): super(AggregateStore, self).__init__(configuration, identifier) self.__stores = [] self.__bound_ns = dict() self.__bound_pref = dict() if graph_aware is not None: self.graph_aware = graph_aware @property def stores(self): return list(self.__stores) # -- Store methods -- #
[docs] def open(self, configuration, create=True): ''' Creates and opens all of the stores specified in the configuration Also checks for all aggregated stores to be `context_aware` ''' if not isinstance(configuration, (tuple, list)): return NO_STORE self.__stores = [] for store_key, store_conf in configuration: store = plugin.get(store_key, Store)() store.open(store_conf) self.__stores.append(store) assert self.__stores, 'At least one store configuration must be provided' assert all(x.context_aware for x in self.__stores), ('All aggregated stores must be' ' context_aware') assert self.__stores[0].graph_aware, 'The first store must be graph_aware' self.supports_range_queries = all(getattr(x, 'supports_range_queries', False) for x in self.__stores)
def triples(self, triple, context=None): for store in self.__stores: for trip in store.triples(triple, context=context): yield trip def triples_choices(self, triple, context=None): for store in self.__stores: for trip in store.triples_choices(triple, context=context): yield trip def __len__(self, context=None): # rdflib specifies a context argument for __len__, but how do you even pass that # argument to len? return sum(len(store) for store in self.__stores) def contexts(self, triple=None): seen = set() for store in self.__stores: for ctx in store.contexts(triple): if ctx in seen: continue seen.add(ctx) yield ctx def prefix(self, namespace): prefix = self.__bound_pref.get(namespace) if prefix is None: for store in self.__stores: aprefix = store.prefix(namespace) if aprefix and prefix and aprefix != prefix: msg = 'multiple prefixes ({},{}) for namespace {}'.format(prefix, aprefix, namespace) raise AggregatedStoresConflict(msg) prefix = aprefix return prefix def namespace(self, prefix): namespace = self.__bound_ns.get(prefix) if namespace is None: for store in self.__stores: anamespace = store.namespace(prefix) if anamespace and namespace and anamespace != namespace: msg = 'multiple namespaces ({},{}) for prefix {}'.format(namespace, anamespace, prefix) raise AggregatedStoresConflict(msg) namespace = anamespace return namespace def namespaces(self): for ns in self.__bound_ns.items(): yield ns for store in self.__stores: for ns in store.namespaces(): yield ns def bind(self, prefix, namespace, override=True): if not override and prefix in self.__bound_ns.get: return self.__bound_ns[prefix] = namespace self.__bound_pref[namespace] = prefix def close(self, *args, **kwargs): for store in self.__stores: store.close(*args, **kwargs) def gc(self): for store in self.__stores: store.gc() def add(self, *args, **kwargs): return self.__stores[0].add(*args, **kwargs) def addN(self, *args, **kwargs): return self.__stores[0].addN(*args, **kwargs) def remove(self, *args, **kwargs): self.__stores[0].remove(*args, **kwargs) def add_graph(self, *args, **kwargs): if not self.graph_aware: super().add_graph(*args, **kwargs) else: self.__stores[0].add_graph(*args, **kwargs) def remove_graph(self, *args, **kwargs): if not self.graph_aware: super().remove_graph(*args, **kwargs) else: self.__stores[0].remove_graph(*args, **kwargs) def create(self, *args, **kwargs): raise UnsupportedAggregateOperation def destroy(self, *args, **kwargs): raise UnsupportedAggregateOperation def rollback(self, *args, **kwargs): return self.__stores[0].rollback(*args, **kwargs) def commit(self, *args, **kwargs): return self.__stores[0].commit(*args, **kwargs) def __str__(self): return '%s(%s)' % (type(self).__name__, ', '.join(str(s) for s in self.__stores))
[docs]class UnsupportedAggregateOperation(Exception): ''' Thrown for operations which modify a graph and hence are inappropriate for `AggregateStore` '''
class AggregatedStoresConflict(Exception): pass