Source code for owmeta_core.bundle

from collections import namedtuple, OrderedDict
from itertools import chain
from os import makedirs, rename, scandir, listdir
from os.path import (join as p, exists, relpath, isdir, isfile,
        expanduser, expandvars, realpath)
from struct import pack
import errno
import hashlib
import json
import logging
import re
import shutil
import io

from rdflib import plugin
from rdflib.parser import Parser, create_input_source
from rdflib.term import URIRef
import six
from textwrap import dedent
import transaction
import yaml

from .. import OWMETA_PROFILE_DIR, connect
from ..context import DEFAULT_CONTEXT_KEY, IMPORTS_CONTEXT_KEY, Context
from ..mapper import CLASS_REGISTRY_CONTEXT_KEY, CLASS_REGISTRY_CONTEXT_LIST_KEY, Mapper
from ..context_common import CONTEXT_IMPORTS
from ..data import Data
from ..file_match import match_files
from ..file_lock import lock_file
from ..file_utils import hash_file
from ..graph_serialization import write_canonical_to_file
from ..rdf_utils import BatchAddGraph
from ..utils import FCN, aslist

from .archive import Unarchiver
from .common import (find_bundle_directory, fmt_bundle_directory, BUNDLE_MANIFEST_FILE_NAME,
                     BUNDLE_INDEXED_DB_NAME, validate_manifest, BUNDLE_MANIFEST_VERSION)
from .exceptions import (NotADescriptor, BundleNotFound, NoRemoteAvailable, NoBundleLoader,
                         NotABundlePath, MalformedBundle, NoAcceptableUploaders,
                         FetchTargetIsNotEmpty, TargetIsNotEmpty, UncoveredImports,
                         CircularDependencyDetected)
from .loaders import LOADER_CLASSES, UPLOADER_CLASSES, load_entry_point_loaders

from urllib.parse import quote as urlquote, unquote as urlunquote


L = logging.getLogger(__name__)

DEFAULT_BUNDLES_DIRECTORY = p(OWMETA_PROFILE_DIR, 'bundles')
'''
Default directory for the bundle cache
'''

DEFAULT_REMOTES_DIRECTORY = p(OWMETA_PROFILE_DIR, 'remotes')
'''
Default directory for descriptors of user-level remotes as opposed to project-specific
remotes
'''


[docs]class Remote(object): ''' A place where bundles come from and go to ''' def __init__(self, name, accessor_configs=()): ''' Parameters ---------- name : str The name of the remote accessor_configs : iterable of AccessorConfig Configs for how you access the remote ''' self.name = name ''' Name of the remote ''' self.accessor_configs = list(accessor_configs) ''' Configs for how you access the remote. One might configure mirrors or replicas for a given bundle repository as multiple accessor configs ''' self.file_name = None ''' If read from a file, the remote should have this attribute set to its source file's path '''
[docs] def add_config(self, accessor_config): ''' Add the given accessor config to this remote Parameters ---------- accessor_config : AccessorConfig The config to add Returns ------- bool `True` if the accessor config was added (meaning there's no equivalent one already set for this remote). Otherwise, `False`. ''' if accessor_config in self.accessor_configs: return False self.accessor_configs.append(accessor_config) return True
[docs] def generate_loaders(self): ''' Generate the bundle loaders for this remote. Loaders are generated from `accessor_configs` and `LOADER_CLASSES` according with which type of `.Loader` can load a type of accessor ''' for ac in self.accessor_configs: for lc in LOADER_CLASSES: if lc.can_load_from(ac): loader = lc(ac) yield loader
[docs] def generate_uploaders(self): ''' Generate the bundle uploaders for this remote ''' for ac in self.accessor_configs: for uc in UPLOADER_CLASSES: if uc.can_upload_to(ac): loader = uc(ac) yield loader
[docs] def write(self, out): ''' Serialize the `Remote` and write to `out` Parameters ---------- out : :term:`file object` Target for writing the remote ''' yaml.dump(self, out)
[docs] @classmethod def read(cls, inp): ''' Read a serialized `Remote` Parameters ---------- inp : :term:`file object` File-like object containing the serialized `Remote` ''' res = yaml.unsafe_load(inp) assert isinstance(res, cls) return res
def __eq__(self, other): return (self.name == other.name and self.accessor_configs == other.accessor_configs) def __hash__(self): return hash((self.name, self.accessor_configs)) def __str__(self): if self.accessor_configs: accessors = '\n' + '\n'.join(' ' + '\n '.join(str(acc).split('\n')) for acc in self.accessor_configs) else: accessors = ' <none>' return dedent('''\ {name} Accessors:{accessors}''').format(name=self.name, accessors=accessors) def __repr__(self): return f'{FCN(type(self))}({self.name!r}, {self.accessor_configs!r})'
class DependencyDescriptor(namedtuple('_DependencyDescriptor', ('id', 'version', 'excludes'))): __slots__ = () def __new__(cls, id, version=None, excludes=()): return super(DependencyDescriptor, cls).__new__(cls, id, version, excludes)
[docs]class AccessorConfig(object): ''' Configuration for accessing a `Remote`. `Loaders <Loader>` are added to a remote according to which accessors are available ''' def __eq__(self, other): raise NotImplementedError() def __hash__(self): raise NotImplementedError()
class _DepList(list): def add(self, dd): self.append(dd)
[docs]class URLConfig(AccessorConfig): ''' Configuration for accessing a remote with just a URL. Note that URLConfigs should be pickle-able since they are written to a YAML file as part of the `.Remote` they're apart of. ''' def __init__(self, url): self.url = url def __eq__(self, other): return isinstance(other, URLConfig) and self.url == other.url def __hash__(self): return hash(self.url) def __str__(self): return '{}(url={})'.format(FCN(type(self)), repr(self.url)) @classmethod def register(cls, scheme): URL_CONFIG_MAP[scheme] = cls __repr__ = __str__
URL_CONFIG_MAP = {} ''' `URLConfigs <URLConfig>` by scheme. Can be populated by pkg_resources entry points '''
[docs]class Descriptor(object): ''' Descriptor for a bundle. The descriptor is sufficient to build a distributable bundle directory tree from a `~rdflib.graph.ConjunctiveGraph` and a set of files (see `Installer`). ''' def __init__(self, ident, **kwargs): self.id = ident self._set(kwargs)
[docs] @classmethod def make(cls, obj): ''' Makes a descriptor from the given object. Parameters ---------- obj : a `dict-like object <dict>` An object with parameters for the Descriptor. Typically a dict Returns ------- Descriptor The created descriptor ''' res = cls(ident=obj['id']) res._set(obj) return res
[docs] @classmethod def load(cls, descriptor_source): ''' Load a descriptor from a YAML record Parameters ---------- descriptor_source : str or :term:`file object` The descriptor source. Handled by `yaml.safe_load <https://pyyaml.org/wiki/PyYAMLDocumentation#the-yaml-package>`_ Raises ------ .NotADescriptor Thrown when the object loaded from `descriptor_source` isn't a `dict` ''' dat = yaml.safe_load(descriptor_source) if isinstance(dat, dict): return cls.make(dat) else: raise NotADescriptor()
[docs] def dump(self, output): ''' Save a descriptor to a file as a YAML record Parameters ---------- output : :term:`file object` The file to save the descriptor to ''' dct = dict() dct['id'] = self.id dct['name'] = self.name dct['version'] = self.version dct['description'] = self.description dct['patterns'] = sorted(('rgx:' if isinstance(RegexURIPattern, p) else '') + p._pattern for p in self.patterns) dct['includes'] = sorted(str(inc.include) for inc in self.includes) # XXX: Test this dct['includes'].extend({inc: {'empty': True}} for inc in sorted(self.empties)) dct['dependencies'] = [dict(d._asdict()) for d in self.dependencies] if self.files is not None: dct['files'] = self.files.to_dict() yaml.dump(dct, output)
def _set(self, obj): self.name = obj.get('name', self.id) self.version = obj.get('version', 1) self.description = obj.get('description', None) self.patterns = set(make_pattern(x) for x in obj.get('patterns', ())) self.includes = set(make_include_func(x) for x in obj.get('includes', ())) self.empties = {uri for uri, options in (inc.popitem() for inc in obj.get('includes', ()) if isinstance(inc, dict)) if options.get('empty', False) is True} deps_set = set() deps = _DepList() for x in obj.get('dependencies', ()): if isinstance(x, six.string_types): dd = DependencyDescriptor(x) elif isinstance(x, dict): dd = DependencyDescriptor(**x) else: dd = DependencyDescriptor(*x) if dd not in deps_set: deps.append(dd) deps_set.add(dd) self.dependencies = deps self.files = FilesDescriptor.make(obj.get('files', None)) def __str__(self): return (FCN(type(self)) + '(ident={},' 'name={},version={},description={},' 'patterns={},includes={},' 'files={},dependencies={})').format( repr(self.id), repr(self.name), repr(self.version), repr(self.description), repr(self.patterns), repr(self.includes), repr(self.files), repr(self.dependencies))
[docs]class Bundle: ''' Main entry point for using bundles Typical usage is something like this:: >>> with Bundle('example/bundleId', version=42) as bnd: ... for aDataObject in bnd(DataObject)().load(): ... # Do something with `aDataObject` ... print(aDataObject) DataObject(<http://example.org/entities#aDataObject>) ''' def __init__(self, ident, bundles_directory=DEFAULT_BUNDLES_DIRECTORY, version=None, conf=None, remotes=None, remotes_directory=DEFAULT_REMOTES_DIRECTORY, transaction_manager=None): ''' .. note:: Paths, `bundles_directory` and `remotes_directory`, will have symbolic links, environment variables, and "~" (for the current user's home directory) expanded when the `Bundle` is initialized. To reflect changes to symbolic links or home directories, the `bundles_directory` or `remotes_directory` attributes must be updated directly or a new instance must be created. Parameters ---------- ident : str Bundle ID bundles_directory : str, optional Path to the bundles directory. Defaults to `.DEFAULT_BUNDLES_DIRECTORY` version : int, optional Bundle version to access. By default, the latest version will be used. conf : .Configuration or dict, optional Configuration to add to the one created for the bundle automatically. Values for the default imports context (`.IMPORTS_CONTEXT_KEY`), the default context (`.DEFAULT_CONTEXT_KEY`) and store (``'rdf.store'``, ``'rdf.source'``, and, ``'rdf.store_conf'``) will be ignored and overwritten. remotes : iterable of Remote or str, optional A subset of remotes and additional remotes to fetch from. See `Fetcher.fetch` remotes_directory : str, optional The directory to load `Remotes <Remote>` from in case a bundle is not in the bundle cache. Defaults to `.DEFAULT_REMOTES_DIRECTORY` transaction_manager : `transaction.TransactionManager`, optional Transaction manager ''' if not ident or not isinstance(ident, str): raise ValueError('ident must be a non-empty string') self.ident = ident if not bundles_directory: bundles_directory = DEFAULT_BUNDLES_DIRECTORY self.bundles_directory = realpath(expandvars(expanduser(bundles_directory))) if not conf: conf = {} conf.update({'rdf.source': 'default'}) self.version = version self.remotes = remotes # XXX: Look at how we bring in projects remotes directory if not remotes_directory: remotes_directory = DEFAULT_REMOTES_DIRECTORY self.remotes_directory = realpath(expandvars(expanduser(remotes_directory))) if transaction_manager is not None: self.transaction_manager = transaction_manager else: self.transaction_manager = BundleTransactionManager(explicit=True) self._store_config_builder = \ BundleDependentStoreConfigBuilder( bundles_directory=bundles_directory, remotes_directory=remotes_directory, remotes=remotes, transaction_manager=self.transaction_manager) self._bundle_dep_mgr = BundleDependencyManager( bundles_directory=self.bundles_directory, remotes=self.remotes, remotes_directory=self.remotes_directory, dependencies=self.dependencies) self._given_conf = conf self.conf = None self._contexts = None self.connection = None ''' The owmeta_core connection to the bundle's indexed database ''' self._bundle_context = None self._loaded_dependencies = dict() @property def identifier(self): return self.ident def resolve(self): try: bundle_directory = self._get_bundle_directory() except BundleNotFound: bundle_directory = self._fetch_bundle(self.ident, self.version) return bundle_directory @property def manifest_data(self): bundle_directory = self.resolve() with open(p(bundle_directory, BUNDLE_MANIFEST_FILE_NAME)) as mf: return json.load(mf) def _get_bundle_directory(self): # - look up the bundle in the bundle cache # - generate a config based on the current config load the config # - make a database from the graphs, if necessary (similar to `owm regendb`). If # delete the existing database if it doesn't match the store config return find_bundle_directory(self.bundles_directory, self.ident, self.version)
[docs] def initdb(self): ''' Initialize the bundle's `conf` `~owmeta_core.data.Data` instance ''' if self.conf is None: bundle_directory = self.resolve() self.conf = Data().copy(self._given_conf) with open(p(bundle_directory, BUNDLE_MANIFEST_FILE_NAME)) as mf: manifest_data = json.load(mf) self.conf[DEFAULT_CONTEXT_KEY] = manifest_data.get(DEFAULT_CONTEXT_KEY) self.conf[IMPORTS_CONTEXT_KEY] = manifest_data.get(IMPORTS_CONTEXT_KEY) self.conf[CLASS_REGISTRY_CONTEXT_KEY] = manifest_data.get(CLASS_REGISTRY_CONTEXT_KEY) if CLASS_REGISTRY_CONTEXT_LIST_KEY not in self.conf: crctx_ids = [] for dep in self._bundle_dep_mgr.load_dependencies_transitive(): crctx_id = dep.manifest_data.get(CLASS_REGISTRY_CONTEXT_KEY) if crctx_id: crctx_ids.append(crctx_id) self.conf[CLASS_REGISTRY_CONTEXT_LIST_KEY] = crctx_ids indexed_db_path = p(bundle_directory, BUNDLE_INDEXED_DB_NAME) store_name, store_conf = self._store_config_builder.build( indexed_db_path, manifest_data.get('dependencies', ())) self.conf['rdf.store'] = store_name self.conf['rdf.store_conf'] = store_conf self.connection = connect(conf=self.conf)
def _fetch_bundle(self, bundle_ident, version): remotes_list = list(retrieve_remotes(self.remotes_directory)) f = Fetcher(self.bundles_directory, remotes_list) return f.fetch(bundle_ident, version, self.remotes) @property def contexts(self): ''' `List <list>` of `str`. Context IDs in this bundle ''' # Since bundles are meant to be immutable, we won't need to add if self._contexts is not None: return self._contexts bundle_directory = self.resolve() contexts = list() graphs_directory = p(bundle_directory, 'graphs') idx_fname = p(graphs_directory, 'index') if not exists(idx_fname): raise Exception('Cannot find an index at {}'.format(repr(idx_fname))) with open(idx_fname, 'rb') as index_file: for l in index_file: l = l.strip() if not l: continue ctx, _ = l.split(b'\x00') contexts.append(ctx.decode('UTF-8')) self._contexts = frozenset(contexts) return self._contexts @property def rdf(self): self.initdb() return self.conf['rdf.graph'] def __str__(self): return f'Bundle({self.ident}' + (')' if self.version is None else f', {self.version})') def __enter__(self): self.initdb() return self def __exit__(self, exc_type, exc_value, traceback): # Close the database connection self.connection.disconnect() self.connection = None self.conf = None def dependencies(self): return self.manifest_data.get('dependencies', ())
[docs] def load_dependencies_transitive(self): ''' Load dependencies from this bundle transitively Yields ------ Bundle A direct or indirect dependency of this bundle ''' return self._bundle_dep_mgr.load_dependencies_transitive()
[docs] def load_dependencies(self): ''' Load direct dependencies of this bundle Yields ------ Bundle A direct dependency of this bundle ''' return self._bundle_dep_mgr.load_dependencies()
def _lookup_context_bundle(self, context_id): owner = self._bundle_dep_mgr.lookup_context_bundle( self.contexts, context_id) if owner is self._bundle_dep_mgr: return self def _load_dependency(self, dependencies_item): try: return self._bundle_dep_mgr._load_dependency(dependencies_item) except BundleDependencyConfigIsMalformed as e: bundle_directory = self.resolve() raise MalformedBundle(bundle_directory, str(e)) from e def __call__(self, target): if not target or not hasattr(target, 'contextualize'): return target self.initdb() if self._bundle_context is None: self._bundle_context = _BundleContext( None, conf=self.conf, bundle=self).stored return target.contextualize(self._bundle_context)
[docs]class BundleDependencyManager(object): ''' Finds the bundle in which a context is defined. For a given bundle graph, that there is *one* Bundle that "owns" a given context. Although multiple bundles may provide that context, the one closest to the root of the graph which provides some statements in that context is called the owner. Note that this does not mean that bundles on which the owner depends do not also be queried; however, the exact behavior is up to the component that uses this component. ''' def __init__(self, dependencies, **common_bundle_arguments): ''' Parameters ---------- dependencies : function Function that returns a sequence of dependency descriptors ''' self._loaded_dependencies = dict() self._common_bundle_arguments = common_bundle_arguments self.dependencies = dependencies
[docs] def load_dependencies_transitive(self): ''' Load dependencies from this bundle transitively. Any given version of a bundle will be yielded at most once regardless of how many times that version of the bundle appears in the dependency graph. Dependencies will yielded in topological sort order, so every dependency a Bundle declares will be yielded before any of its transitive dependencies. Yields ------ Bundle A direct or indirect dependency of this bundle ''' border = {None: self} dependants = OrderedDict() seen = dict() while border: # allegedly, OrderedDict isn't optimized for iteration speed, but better to # signal that order is important in how the border is iterated over. new_border = OrderedDict() for bnd in border.values(): bkey = None if bnd is self else (bnd.ident, bnd.version) for d_bnd in bnd.load_dependencies(): key = (d_bnd.ident, d_bnd.version) this_dependants = dependants.get(key) if this_dependants is None: if bkey is None: dependants[key] = set() else: dependants[key] = set([bkey]) else: this_dependants.add(bkey) if key in seen: continue seen[key] = d_bnd new_border[key] = d_bnd border = new_border while dependants: for key, this_dependants in dependants.items(): if not this_dependants: yield seen[key] break else: # no break break del dependants[key] for this_dependants in dependants.values(): this_dependants.discard(key) if dependants: # Handle the case that we didn't deplete the adjacency list, implying that we # have a cycle sio = io.StringIO() for dependency, this_dependants in dependants.items(): for dependant in this_dependants: sio.write(f' {dependant} -> {dependency}') raise CircularDependencyDetected( f'Circular dependency detected. Links:{sio.getvalue()}')
def lookup_context_bundle(self, contexts, context_id): if context_id is None or str(context_id) in contexts: return self for d in self.dependencies(): d_excludes = frozenset(d.get('excludes', ())) if context_id in d_excludes: continue d_bnd = self._load_dependency(d) match = d_bnd._lookup_context_bundle(context_id) if match: return match return None def _load_dependencies(self): for d in self.dependencies(): yield self._load_dependency(d) load_dependencies = _load_dependencies def _load_dependency(self, dependencies_item): d_id = dependencies_item.get('id') if not d_id: raise BundleDependencyConfigIsMalformed('Dependency entry is missing an identifier') d_version = dependencies_item.get('version') if not d_version: raise BundleDependencyConfigIsMalformed(f'Dependency entry for {d_id} is' ' missing a version number') bundle = self._loaded_dependencies.get((d_id, d_version)) if not bundle: bundle = Bundle(d_id, version=d_version, **self._common_bundle_arguments) self._loaded_dependencies[(d_id, d_version)] = bundle return bundle
class BundleDependencyConfigIsMalformed(Exception): pass
[docs]class BundleDependentStoreConfigBuilder(object): ''' Builds an RDFLib store configuration that depends on bundles. The process of building the store configuration requires traversing the graph of dependencies so that duplicate dependencies in the graph can be omitted. To support this process, this builder will fetch bundles as needed to resolve transitive dependencies ''' def __init__(self, bundles_directory=None, remotes_directory=None, remotes=None, read_only=True, transaction_manager=None): if not bundles_directory: bundles_directory = DEFAULT_BUNDLES_DIRECTORY self.bundles_directory = realpath(expandvars(expanduser(bundles_directory))) if not remotes_directory: remotes_directory = DEFAULT_REMOTES_DIRECTORY self.remotes_directory = realpath(expandvars(expanduser(remotes_directory))) self.remotes = remotes self.read_only = read_only self.transaction_manager = transaction_manager
[docs] def build(self, indexed_db_path, dependencies, bundle_directory=None): ''' Builds the store configuration Parameters ---------- indexed_db_path : str Path to the indexed database of the store that depends on the listed dependenices dependencies : list of dict List of dependencies' info, each entry including at least keys for 'id' and 'version' bundle_directory : str, optional Path to the bundle directory for the dependent store, if the dependent store is a bundle. Used for information in an exceptional path, but not otherwise used Returns ------- str The type of the store. This is the name used to look up the RDFLib store plugin object The configuration for the store. This is the object that will be passed to `rdflib.store.Store.open` to configure the store. ''' return 'agg', self._construct_store_config(indexed_db_path, dependencies, read_only=self.read_only)
__call__ = build def _construct_store_config(self, indexed_db_path, dependencies, current_view_desc=None, view_descs=None, bundle_directory=None, read_only=True): ''' Parameters ---------- indexed_db_path : str File path to the dependency database dependencies : list of dict List of dependencies' info, each entry including at least keys for 'id' and 'version' current_view_desc : tuple, (_BDVD, (str, int)), optional Description of the current view determined by the path through dependant bundles view_descs : set of tuple, optional Describe the "views" on bundles within the dependency tree. The resulting aggregate store config have one subordinate config for each view. bundle_directory : str, optional The directory of the bundle the config is being built from. Will be determined by using `bundles_directory`, the bundle ID, and version for dependencies read_only : boolean, optional Whether the *top-level* config, which may not be a bundle, is read-only (the bundle dependencies are always read-only) ''' if view_descs is None: view_descs = set() if current_view_desc is None: current_view_desc = _BDVD() dependency_configs = self._gather_dependency_configs( dependencies, current_view_desc, view_descs, bundle_directory) fs_store_config = dict(url=indexed_db_path, read_only=read_only, transaction_manager=self.transaction_manager) return [ ('FileStorageZODB', fs_store_config) ] + dependency_configs @aslist def _gather_dependency_configs(self, dependencies, current_view_desc, view_descs, bundle_directory=None): for dd in dependencies: dep_view_desc = current_view_desc.merge_excludes(dd.get('excludes', ())) dep_ident = dd.get('id') dep_version = dd.get('version') if not dep_ident: if bundle_directory: raise MalformedBundle(bundle_directory, 'bundle dependency descriptor is lacking an identifier') else: raise ValueError('bundle dependency descriptor is lacking an identifier') if (dep_view_desc, (dep_ident, dep_version)) in view_descs: return view_descs.add((dep_view_desc, (dep_ident, dep_version))) tries = 0 while tries < 2: try: bundle_directory = find_bundle_directory(self.bundles_directory, dep_ident, dep_version) with open(p(bundle_directory, BUNDLE_MANIFEST_FILE_NAME)) as mf: manifest_data = json.load(mf) break except (BundleNotFound, FileNotFoundError): self._fetch_bundle(dep_ident, dep_version) tries += 1 # We don't want to include items in the configuration that aren't specified by # the dependency descriptor. Also, all of the optionals have defaults that # BundleDependencyStore handles itself, so we don't want to impose them here. addl_dep_confs = {k: v for k, v in dd.items() if k in ('excludes',) and v} yield ('owmeta_core_bds', dict(type='agg', conf=self._construct_store_config( p(bundle_directory, BUNDLE_INDEXED_DB_NAME), manifest_data.get('dependencies', ()), dep_view_desc, view_descs, bundle_directory), **addl_dep_confs)) def _fetch_bundle(self, bundle_ident, version): remotes_list = list(retrieve_remotes(self.remotes_directory)) f = Fetcher(self.bundles_directory, remotes_list) return f.fetch(bundle_ident, version, self.remotes)
class _BDVD(namedtuple('_BDVD', ('excludes',))): ''' Bundle Dependency View Data (BDVD) Holds data we use in traversing bundle dependencies to describe the "view" on the bundle ''' __slots__ = () def __new__(cls, *args, excludes=(), **kwargs): return super(_BDVD, cls).__new__(cls, *args, excludes=excludes, **kwargs) def merge_excludes(self, excludes): return self._replace(excludes=self.excludes + tuple(e for e in excludes if e not in self.excludes)) class _BundleContext(Context): ''' `Context` for a bundle. ''' def __init__(self, *args, bundle, **kwargs): super().__init__(*args, **kwargs) self.bundle = bundle self._mapper = None @property def mapper(self): if self._mapper is None: self._mapper = _BundleMapper(bundle=self.bundle) return self._mapper class _BundleMapper(Mapper): def __init__(self, bundle): try: bundle_conf = bundle.conf except AttributeError: raise Exception('Bundle connection has not been established.' ' Call `initdb` or use the bundle in a context manager') super().__init__(name=f'{bundle.ident}' + (f'@{bundle.version}' if bundle.version else ''), conf=bundle_conf) self.bundle = bundle self._resolved_classes = dict() def resolve_class(self, rdf_type, context): prev_resolved_class = self._resolved_classes.get((rdf_type, context.identifier)) if prev_resolved_class: return prev_resolved_class own_resolved_class = super().resolve_class(rdf_type, context) if own_resolved_class: self._resolved_classes[(rdf_type, context.identifier)] = own_resolved_class return own_resolved_class target_id = context.identifier target_bundle = self.bundle._lookup_context_bundle(target_id) deps = target_bundle.load_dependencies_transitive() for bnd in deps: crctx_id = bnd.manifest_data.get(CLASS_REGISTRY_CONTEXT_KEY, None) if not crctx_id: continue with bnd: resolved_class = bnd.connection.mapper.resolve_class(rdf_type, context) if resolved_class: self._resolved_classes[(rdf_type, context.identifier)] = resolved_class return resolved_class return None class _RemoteHandlerMixin(object): ''' Utility mixin for handling remotes The mixed-in class must have a `remotes` attribute which is a list of `Remote` ''' def __init__(self, load_entry_points=True, **kwargs): ''' Parameters ---------- load_entry_points : bool, optional If `False`, then entry points will not be loaded ''' super(_RemoteHandlerMixin, self).__init__(**kwargs) self.load_entry_points = load_entry_points def _get_remotes(self, remotes): '''' Get remotes Parameters ---------- remotes : iterable of Remote or str A subset of names of remotes to act on and additional remotes to act on ''' if self.load_entry_points: load_entry_point_loaders() instance_remotes = [] additional_remotes = [] if remotes: configured_remotes = {r.name: r for r in self.remotes} for r in remotes: if isinstance(r, six.text_type): instance_remotes.append(configured_remotes.get(r)) elif isinstance(r, Remote): additional_remotes.append(r) else: instance_remotes = self.remotes has_remote = False for rem in chain(additional_remotes, instance_remotes): has_remote = True yield rem if not has_remote: raise NoRemoteAvailable(f'No valid remote from {remotes} among {self.remotes}' f' for {self}')
[docs]class Fetcher(_RemoteHandlerMixin): ''' Fetches bundles from `Remotes <Remote>` A fetcher takes a list of remotes, a bundle ID, and, optionally, a version number and downloads the bundle to a local directory. `Deployer` is, functionally, the dual of this class. ''' def __init__(self, bundles_root, remotes, transaction_manager=None, **kwargs): ''' Parameters ---------- bundles_root : str The root directory of the bundle cache remotes : list of Remote or str List of pre-configured remotes used in calls to `fetch` transaction_manager : `transaction.TransactionManager` Transaction manager to use when populating the indexed database after fetching ''' super(Fetcher, self).__init__(**kwargs) self.bundles_root = bundles_root self.remotes = remotes # For debugging: in case you need to do something with the transaction manager # used when populating the indexed database self._transaction_manager = (transaction_manager or transaction.TransactionManager()) def __call__(self, *args, **kwargs): ''' Calls `fetch` with the given arguments ''' return self.fetch(*args, **kwargs)
[docs] def fetch(self, bundle_id, bundle_version=None, remotes=None, progress_reporter=None, triples_progress_reporter=None): ''' Retrieve a bundle by name from a remote and put it in the local bundle cache. The first remote that can retrieve the bundle will be tried. Each remote will be tried in succession until one downloads the bundle. Parameters ---------- bundle_id : str The id of the bundle to retrieve bundle_version : int, optional The version of the bundle to retrieve. If not provided, attempt to fetch the latest version available remotes : iterable of Remote or str A subset of remotes and additional remotes to fetch from. If an entry in the iterable is a string, then it will be looked for amongst the remotes passed in initially. progress_reporter : `tqdm.tqdm <https://tqdm.github.io/>`_-like object, optional Receives updates of progress in fetching and installing locally triples_progress_reporter : `tqdm.tqdm <https://tqdm.github.io/>`_-like object, optional Receives updates of progress for adding triples for an individual graph Returns ------- str returns the directory where the bundle has been placed Raises ------ .exceptions.NoBundleLoader Thrown when none of the loaders are able to download the bundle .FetchTargetIsNotEmpty Thrown when the requested bundle is already in the cache ''' if remotes is not None: remotes = list(remotes) given_bundle_version = bundle_version loaders = self._get_bundle_loaders(bundle_id, given_bundle_version, remotes) loaders_list = list(loaders) if bundle_version is None: bundle_version = self._find_latest_remote_bundle_versions(bundle_id, loaders_list) bdir = fmt_bundle_directory(self.bundles_root, bundle_id, bundle_version) self._assert_target_is_empty(bdir) for loader in loaders_list: try: loader.base_directory = bdir loader(bundle_id, bundle_version) with open(p(bdir, BUNDLE_MANIFEST_FILE_NAME)) as mf: manifest_data = json.load(mf) for dd in manifest_data.get('dependencies', ()): try: find_bundle_directory(self.bundles_root, dd['id'], dd.get('version')) except BundleNotFound: self.fetch(dd['id'], dd.get('version'), remotes=remotes) dat = self._post_fetch_dest_conf(bdir) build_indexed_database(dat['rdf.graph'], bdir, self._transaction_manager, progress_reporter, triples_progress_reporter) dat.close() return bdir except Exception: L.warning('Failed to load bundle %s with %s', bundle_id, loader, exc_info=True) shutil.rmtree(bdir) else: # no break raise NoBundleLoader(bundle_id, given_bundle_version)
def _post_fetch_dest_conf(self, bundle_directory): db_file_location = p(bundle_directory, BUNDLE_INDEXED_DB_NAME) store_conf = dict(url=db_file_location, transaction_manager=self._transaction_manager) res = Data().copy({ 'rdf.source': 'default', 'rdf.store': 'FileStorageZODB', 'rdf.store_conf': store_conf }) res.init() if not exists(db_file_location): raise Exception('Could not create the database file at ' + db_file_location) return res def _find_latest_remote_bundle_versions(self, bundle_id, loaders_list): latest_bundle_version = 0 for loader in loaders_list: try: versions = loader.bundle_versions(bundle_id) except NotImplementedError: L.debug('Loader %s does not support listing available versions of %s', loader, bundle_id) continue if not versions: L.warning('Loader %s does not have any versions of the bundle %s', loader, bundle_id) continue loader_latest_version = max(versions) if loader_latest_version > latest_bundle_version: latest_bundle_version = loader_latest_version if latest_bundle_version <= 0: raise BundleNotFound(bundle_id, 'No versions of the requested bundle found from any remotes') return latest_bundle_version def _assert_target_is_empty(self, bdir): target_empty = True try: for _ in scandir(bdir): target_empty = False break except FileNotFoundError: return if not target_empty: raise FetchTargetIsNotEmpty(bdir) def _get_bundle_loaders(self, bundle_id, bundle_version, remotes): try: retrieved_remotes = self._get_remotes(remotes) except NoRemoteAvailable as e: raise NoBundleLoader(bundle_id, bundle_version, 'Could not get any remotes to generate loaders from') from e else: for rem in retrieved_remotes: for loader in rem.generate_loaders(): if loader.can_load(bundle_id, bundle_version): yield loader def __str__(self): return f'{self.__class__.__name__}(bundles_root={self.bundles_root}, remotes={self.remotes})'
[docs]class Deployer(_RemoteHandlerMixin): ''' Deploys bundles to `Remotes <Remote>`. A deployer takes a bundle directory tree or bundle archive and uploads it to a remote. `Fetcher` is, functionally, the dual of this class. Deployer is responsible for selecting remotes and corresponding uploaders among a set of options. `Uploaders <Uploader>` are responsible for actually doing the upload. ''' def __init__(self, remotes=(), **kwargs): super(Deployer, self).__init__(**kwargs) self.remotes = remotes def __call__(self, *args, **kwargs): return self.deploy(*args, **kwargs)
[docs] def deploy(self, bundle_path, remotes=None): ''' Deploy a bundle to *all* remotes that are configured to accept uploads Parameters ---------- bundle_path : str Path to a bundle directory tree or archive remotes : iterable of Remote or str A subset of remotes to deploy to and additional remotes to deploy to Raises ------ .NoAcceptableUploaders Thrown when none of the selected uploaders could upload the bundle ''' if not exists(bundle_path): raise NotABundlePath(bundle_path, 'the file does not exist') manifest_data = self._extract_manifest_data_from_bundle_path(bundle_path) validate_manifest(bundle_path, manifest_data) uploaded = False for uploader in self._get_bundle_uploaders(bundle_path, remotes=remotes): uploader(bundle_path) uploaded = True if not uploaded: raise NoAcceptableUploaders(bundle_path)
def _extract_manifest_data_from_bundle_path(self, bundle_path): if isdir(bundle_path): return self._get_directory_manifest_data(bundle_path) elif isfile(bundle_path): return self._get_archive_manifest_data(bundle_path) else: raise NotABundlePath(bundle_path, 'path does not point to a file or directory') def _get_bundle_uploaders(self, bundle_directory, remotes=None): for rem in self._get_remotes(remotes): for uploader in rem.generate_uploaders(): if uploader.can_upload(bundle_directory): yield uploader def _get_directory_manifest_data(self, bundle_path): try: with open(p(bundle_path, BUNDLE_MANIFEST_FILE_NAME)) as mf: return json.load(mf) except (OSError, IOError) as e: if e.errno == errno.ENOENT: # FileNotFound raise MalformedBundle(bundle_path, 'no bundle manifest found') if e.errno == errno.EISDIR: # IsADirectoryError raise MalformedBundle(bundle_path, 'manifest is not a regular file') raise except json.decoder.JSONDecodeError: raise MalformedBundle(bundle_path, 'manifest is malformed: expected a' ' JSON file') def _get_archive_manifest_data(self, bundle_path): with Unarchiver().to_tarfile(bundle_path) as tf: try: mf0 = tf.extractfile(BUNDLE_MANIFEST_FILE_NAME) if mf0 is None: raise MalformedBundle(bundle_path, 'manifest is not a regular file') # Would like to pull the with mf0 as mf: return json.load(mf) except KeyError: raise MalformedBundle(bundle_path, 'no bundle manifest found') except json.decoder.JSONDecodeError: raise MalformedBundle(bundle_path, 'manifest is malformed: expected a' ' JSON file')
[docs]class Cache(object): ''' Cache of bundles ''' def __init__(self, bundles_directory): ''' Parameters ---------- bundles_directory : str The where bundles are stored ''' self.bundles_directory = bundles_directory
[docs] def list(self): ''' Returns a generator of summary bundle info ''' try: bundle_directories = scandir(self.bundles_directory) except (OSError, IOError) as e: if e.errno == errno.ENOENT: return raise for bundle_directory in bundle_directories: if not bundle_directory.is_dir(): continue # Ignore deletes out from under us try: version_directories = scandir(bundle_directory.path) except (OSError, IOError) as e: if e.errno == errno.ENOENT: continue raise def keyfunc(x): try: return int(x.name) except ValueError: return float('+inf') for version_directory in sorted(version_directories, key=keyfunc, reverse=True): if not version_directory.is_dir(): continue try: manifest_fname = p(version_directory.path, BUNDLE_MANIFEST_FILE_NAME) with open(manifest_fname) as mf: try: manifest_data = json.load(mf) bd_id = urlunquote(bundle_directory.name) bd_version = int(version_directory.name) if (bd_id != manifest_data.get('id') or bd_version != manifest_data.get('version')): L.warning('Bundle manifest at %s does not match bundle' ' directory', manifest_fname) continue yield manifest_data except json.decoder.JSONDecodeError: L.warning("Bundle manifest at %s is malformed", manifest_fname) except (OSError, IOError) as e: if e.errno != errno.ENOENT: raise
def retrieve_remote_by_name(remotes_dir, name, **kwargs): for rem in retrieve_remotes(remotes_dir, **kwargs): if rem.name == name: return rem
[docs]def retrieve_remotes(remotes_dir, load_entry_points=True): ''' Retrieve remotes from a project directory or user remotes directory Parameters ---------- owmdir : str path to the project directory load_entry_points : bool, optional if `True`, then the entry points for `~.loaders.Loader` and `~.loaders.Uploader` implementations that have been added as entry points ''' if not exists(remotes_dir): return if load_entry_points: load_entry_point_loaders() for r in listdir(remotes_dir): if r.endswith('.remote'): fname = p(remotes_dir, r) with open(fname) as inp: try: rem = Remote.read(inp) rem.file_name = fname yield rem except Exception: L.warning('Unable to read remote %s', r, exc_info=True)
[docs]class Installer(object): ''' Installs a bundle locally ''' def __init__(self, source_directory, bundles_directory, graph, imports_ctx=None, default_ctx=None, class_registry_ctx=None, installer_id=None, remotes=(), remotes_directory=None): ''' Parameters ---------- source_directory : str Directory where files come from. All files for a bundle must be below this directory bundles_directory : str Directory where the bundles files go. Usually this is the bundle cache directory graph : rdflib.graph.ConjunctiveGraph The graph from which we source contexts for this bundle default_ctx : str, optional The ID of the default context -- the target of a query when not otherwise specified. imports_ctx : str, optional The ID of the imports context this installer should use. Imports relationships are selected from this graph according to the included contexts. class_registry_ctx : str, optional The ID of the class registry context this installer should use. Class registry entries are retrieved from this graph. installer_id : iterable of Remote or str, optional Name of this installer for purposes of mutual exclusion remotes : iterable of Remote, optional Remotes to be used for retrieving dependencies when needed during installation. If not provided, the remotes will be collected from `remotes_directory` remotes_directory : str, optional The directory to load `Remotes <Remote>` from in case a bundle is not in the bundle cache. Defaults to `.DEFAULT_REMOTES_DIRECTORY` ''' self.context_hash = hashlib.sha224 self.file_hash = hashlib.sha224 self.source_directory = source_directory self.bundles_directory = bundles_directory self.graph = graph self.installer_id = installer_id self.imports_ctx = imports_ctx self.default_ctx = default_ctx self.class_registry_ctx = class_registry_ctx self.remotes = list(remotes) self.remotes_directory = remotes_directory # For debugging: in case you need to do something with the transaction manager # used when populating the indexed database self._transaction_manager = transaction.TransactionManager()
[docs] def install(self, descriptor, progress_reporter=None): ''' Given a descriptor, install a bundle Parameters ---------- descriptor : Descriptor The descriptor for the bundle progress_reporter : `tqdm.tqdm <https://tqdm.github.io/>`_-like object Used for reporting progress during installation. optional Returns ------- str The directory where the bundle is installed Raises ------ .TargetIsNotEmpty Thrown when the target directory for installation is not empty. ''' # Create the staging directory in the base directory to reduce the chance of # moving across file systems try: staging_directory = fmt_bundle_directory(self.bundles_directory, descriptor.id, descriptor.version) makedirs(staging_directory) except OSError: pass target_empty = True for _ in scandir(staging_directory): target_empty = False break if not target_empty: raise TargetIsNotEmpty(staging_directory) with lock_file(p(staging_directory, '.lock'), unique_key=self.installer_id): try: self._install(descriptor, staging_directory, progress_reporter=progress_reporter) return staging_directory except Exception: self._cleanup_failed_install(staging_directory) raise
def _cleanup_failed_install(self, staging_directory): shutil.rmtree(p(staging_directory, 'graphs')) shutil.rmtree(p(staging_directory, 'files')) def _install(self, descriptor, staging_directory, progress_reporter=None): graphs_directory, files_directory = self._set_up_directories(staging_directory) self._write_file_hashes(descriptor, files_directory) self._write_context_data(descriptor, graphs_directory) self._generate_bundle_class_registry_ctx(descriptor, graphs_directory) self._generate_bundle_imports_ctx(descriptor, graphs_directory) self._write_manifest(descriptor, staging_directory) self._initdb(staging_directory) self._build_indexed_database(staging_directory, progress_reporter) def _set_up_directories(self, staging_directory): graphs_directory = p(staging_directory, 'graphs') files_directory = p(staging_directory, 'files') try: makedirs(graphs_directory) makedirs(files_directory) except OSError as e: if e.errno != errno.EEXIST: raise return graphs_directory, files_directory def _write_file_hashes(self, descriptor, files_directory): with open(p(files_directory, 'hashes'), 'wb') as hash_out: for fname in _select_files(descriptor, self.source_directory): hsh = self.file_hash() source_fname = p(self.source_directory, fname) hash_file(hsh, source_fname) self._write_hash_line(hash_out, fname.encode('UTF-8'), hsh) shutil.copy2(source_fname, p(files_directory, fname)) def _write_context_data(self, descriptor, graphs_directory): contexts = _select_contexts(descriptor, self.graph) imports_ctxg = None if self.imports_ctx: imports_ctxg = self.graph.get_context(self.imports_ctx) included_context_ids = set() for ctxid in self._write_graphs(graphs_directory, *contexts): included_context_ids.add(ctxid) for c in descriptor.empties: included_context_ids.add(URIRef(c)) # Compute imported contexts imported_contexts = set() for ctxid in included_context_ids: if imports_ctxg is not None: for t in imports_ctxg.triples((ctxid, CONTEXT_IMPORTS, None)): imported_contexts.add(t[2]) uncovered_contexts = imported_contexts - included_context_ids if self.class_registry_ctx: uncovered_contexts.discard(URIRef(self.class_registry_ctx)) uncovered_contexts = self._cover_with_dependencies(uncovered_contexts, descriptor) if uncovered_contexts: raise UncoveredImports(uncovered_contexts) def _write_manifest(self, descriptor, staging_directory): manifest_data = {} if self.default_ctx: manifest_data[DEFAULT_CONTEXT_KEY] = self.default_ctx if self.imports_ctx: # If an imports context was specified, then we'll need to generate an # imports context with the appropriate imports. We don't use the source # imports context ID for the bundle's imports context because the bundle # imports that we actually need are a subset of the total set of imports manifest_data[IMPORTS_CONTEXT_KEY] = fmt_bundle_imports_ctx_id(descriptor.id, descriptor.version) if self.class_registry_ctx: manifest_data[CLASS_REGISTRY_CONTEXT_KEY] = fmt_bundle_class_registry_ctx_id(descriptor.id, descriptor.version) manifest_data['id'] = descriptor.id manifest_data['version'] = descriptor.version manifest_data['manifest_version'] = BUNDLE_MANIFEST_VERSION mf_deps = [] for dd in descriptor.dependencies: bnd = self._dd_to_bundle(dd) # Fetch the dependency if necessary and get the version of the latest from the # bundle manifest. Usually, the bundle will already be on the system since it # *should have* been used for testing. # # (It's probably possible to do something like just grabbing the bundle # manifest data in the case there is not a local copy of the bundle, but that # should be unusual enough that it's probably not justified considering the # overhead of having an alternative to fetching that bundle loaders might be # expected to support.) dd_version = bnd.manifest_data['version'] mf_deps.append({'version': dd_version, 'id': dd.id, 'excludes': dd.excludes}) manifest_data['dependencies'] = mf_deps self.manifest_data = manifest_data with open(p(staging_directory, BUNDLE_MANIFEST_FILE_NAME), 'w') as mf: json.dump(manifest_data, mf, separators=(',', ':')) def _generate_bundle_imports_ctx(self, descriptor, graphs_directory): if not self.imports_ctx: return imports_ctxg = self.graph.get_context(self.imports_ctx) # select all of the imports for all of the contexts in the bundle and serialize contexts = [] idx_fname = p(graphs_directory, 'index') with open(idx_fname) as index_file: for l in index_file: ctx, _ = l.strip().split('\x00') contexts.append(URIRef(ctx)) for c in descriptor.empties: contexts.append(URIRef(c)) if self.class_registry_ctx: contexts.append(URIRef(self.class_registry_ctx)) ctxgraph = imports_ctxg.triples_choices((contexts, CONTEXT_IMPORTS, None)) if self.class_registry_ctx: cr_ctxid = URIRef(fmt_bundle_class_registry_ctx_id(descriptor.id, descriptor.version)) old_ctxgraph = ctxgraph def replace_cr_ctxid(): src_cr_ctxid = URIRef(self.class_registry_ctx) for t in old_ctxgraph: if t[0] == src_cr_ctxid: yield (cr_ctxid, t[1], t[2]) elif t[2] == src_cr_ctxid: yield (t[0], t[1], cr_ctxid) else: yield t ctxgraph = replace_cr_ctxid() ctxid = fmt_bundle_imports_ctx_id(descriptor.id, descriptor.version) self._write_graph(graphs_directory, ctxid, ctxgraph) def _generate_bundle_class_registry_ctx(self, descriptor, graphs_directory): if not self.class_registry_ctx: return ctx_id = fmt_bundle_class_registry_ctx_id(descriptor.id, descriptor.version) class_registry_ctxg = self.graph.get_context(self.class_registry_ctx) self._write_graph(graphs_directory, ctx_id, class_registry_ctxg) def _write_graph(self, graphs_directory, ctxid, ctxgraph): for _ in self._write_graphs(graphs_directory, (ctxid, ctxgraph)): pass def _write_graphs(self, graphs_directory, *graphs_sequence): with open(p(graphs_directory, 'hashes'), 'ab') as hash_out,\ open(p(graphs_directory, 'index'), 'ab') as index_out: for ctxid, ctxgraph in graphs_sequence: ctxidb = ctxid.encode('UTF-8') gbname, hsh = self._write_graph_to_file(ctxgraph, graphs_directory) self._write_hash_line(hash_out, ctxidb, hsh) self._write_index_line(index_out, ctxidb, gbname) yield ctxid hash_out.flush() index_out.flush() def _write_graph_to_file(self, ctxgraph, graphs_directory): hsh = self.context_hash() temp_fname = p(graphs_directory, 'graph.tmp') write_canonical_to_file(ctxgraph, temp_fname) hash_file(hsh, temp_fname) gbname = hsh.hexdigest() + '.nt' ctx_file_name = p(graphs_directory, gbname) rename(temp_fname, ctx_file_name) return gbname, hsh def _write_hash_line(self, hash_out, key, hsh): hash_out.write(key + b'\x00' + pack('B', hsh.digest_size) + hsh.digest() + b'\n') def _write_index_line(self, index_out, ctxidb, gbname): index_out.write(ctxidb + b'\x00' + gbname.encode('UTF-8') + b'\n') def _initdb(self, staging_directory): db_file_location = p(staging_directory, BUNDLE_INDEXED_DB_NAME) self.conf = Data().copy({ 'rdf.source': 'default', 'rdf.store': 'FileStorageZODB', 'rdf.store_conf': dict(transaction_manager=self._transaction_manager, url=db_file_location) }) # Create the database file and initialize some needed data structures self.conf.init() if not exists(db_file_location): raise Exception('Could not create the database file at ' + db_file_location) def _build_indexed_database(self, staging_directory, progress=None): try: dest = self.conf['rdf.graph'] build_indexed_database(dest, staging_directory, self._transaction_manager, progress, None) finally: self.conf.close() def _dd_to_bundle(self, dependency_descriptor): return Bundle(dependency_descriptor.id, version=dependency_descriptor.version, bundles_directory=self.bundles_directory, remotes=self.remotes, remotes_directory=self.remotes_directory) def _cover_with_dependencies(self, uncovered_contexts, descriptor): # XXX: Will also need to check for the contexts having a given ID being consistent # with each other across dependencies dependencies = descriptor.dependencies for d in dependencies: bnd = self._dd_to_bundle(d) for c in bnd.contexts: uncovered_contexts.discard(URIRef(c)) if not uncovered_contexts: break return uncovered_contexts
def fmt_bundle_imports_ctx_id(id, version): return fmt_bundle_ctx_id('generated_imports_ctx', id, version) def fmt_bundle_class_registry_ctx_id(id, version): return fmt_bundle_ctx_id('generated_class_registry_ctx', id, version) def fmt_bundle_class_registry_ctx_list_id(id, version): return fmt_bundle_ctx_id('generated_class_registry_ctx_list', id, version) def fmt_bundle_ctx_id(kind, id, version): return f'http://data.openworm.org/bundle/{kind}?bundle_id={urlquote(id)}&bundle_version={version}'
[docs]class FilesDescriptor(object): ''' Descriptor for files ''' def __init__(self): self.patterns = set() self.includes = set() @classmethod def make(cls, obj): if not obj: return None res = cls() res.patterns = set(obj.get('patterns', ())) res.includes = set(obj.get('includes', ())) return res def to_dict(self): res = {} if res.patterns: res['patterns'] = list(res.patterns) if res.includes: res['includes'] = list(res.includes) return res
def make_pattern(s): if s.startswith('rgx:'): return RegexURIPattern(s[4:]) else: return GlobURIPattern(s) def make_include_func(s): if isinstance(s, str): return URIIncludeFunc(s) elif isinstance(s, dict): uri = None for k in s.keys(): if uri is not None: raise ValueError('Context "includes" entry must have one key--the URI of' f' the context to include. Extra key is "{k}"') uri = k return URIIncludeFunc(uri) else: raise ValueError('Context "includes" entry must be a str or a dict') class URIIncludeFunc(object): def __init__(self, include): self.include = URIRef(include.strip()) def __hash__(self): return hash(self.include) def __call__(self, uri): return URIRef(uri.strip()) == self.include def __str__(self): return '{}({})'.format(FCN(type(self)), repr(self.include)) __repr__ = __str__ class URIPattern(object): def __init__(self, pattern): self._pattern = pattern def __hash__(self): return hash(self._pattern) def __call__(self, uri): return False def __str__(self): return '{}({})'.format(FCN(type(self)), self._pattern) class RegexURIPattern(URIPattern): def __init__(self, pattern): super(RegexURIPattern, self).__init__(re.compile(pattern)) def __call__(self, uri): # Cast the pattern match result to a boolean return not not self._pattern.match(str(uri)) class GlobURIPattern(RegexURIPattern): def __init__(self, pattern): replacements = [ ['*', '.*'], ['?', '.?'], ['[!', '[^'] ] for a, b in replacements: pattern = pattern.replace(a, b) super(GlobURIPattern, self).__init__(re.compile(pattern)) def _select_files(descriptor, directory): fdescr = descriptor.files if not fdescr: return for f in fdescr.includes: if not exists(p(directory, f)): raise Exception('Included file in bundle does not exist', f) yield f for f in fdescr.patterns: for match in match_files(directory, p(directory, f)): yield relpath(match, directory) def _select_contexts(descriptor, graph): for context in graph.contexts(): ctx = context.identifier for inc in descriptor.includes: if inc(ctx): yield ctx, context break for pat in descriptor.patterns: if pat(ctx): yield ctx, context break
[docs]def build_indexed_database(dest, bundle_directory, transaction_manager, progress=None, trip_prog=None): ''' Build the indexed database from a bundle directory ''' idx_fname = p(bundle_directory, 'graphs', 'index') # This code was copied from OWM._load_all_graphs, but we don't have a specific # reason for projects and bundles to have the same format, so keeping the logic # separate triples_read = 0 with open(idx_fname) as index_file: cnt = 0 for l in index_file: cnt += 1 index_file.seek(0) if progress is not None: progress.total = cnt with transaction_manager: bag = BatchAddGraph(dest, batchsize=10000) for l in index_file: ctx, fname = l.strip().split('\x00') parser = plugin.get('nt', Parser)() graph_fname = p(bundle_directory, 'graphs', fname) with open(graph_fname, 'rb') as f, bag.get_context(ctx) as g: parser.parse(create_input_source(f), g) if progress is not None: progress.update(1) if trip_prog is not None: trip_prog.update(bag.count - triples_read) triples_read = g.count if progress is not None: progress.write('Finalizing writes to database...') if progress is not None: progress.write('Loaded {:,} triples'.format(triples_read))
[docs]class BundleTransactionManager(transaction.TransactionManager): ''' Marker class useful in debugging to identify which txn manager we're using '''