Source code for owmeta_core.command

'''
This module defines the root of a high-level interface for owmeta_core, refered to as
"OWM" (for the `main class <OWM>` in the interface), "owm" (for the command line that
wraps the interface), or "the command interface" in the documentation. Additional
"sub-commands" may be defined which provide additional functionality.

If there is a suitable method in the high-level interface, it should generally be
preferred to the lower-level interfaces for stability.
'''
from __future__ import print_function, absolute_import
import sys
from contextlib import contextmanager, nullcontext
import os
from os.path import (exists,
        abspath,
        join as pth_join,
        dirname,
        isabs,
        relpath,
        realpath,
        expanduser,
        expandvars)

from os import makedirs, mkdir, unlink, scandir

import shutil
import json
import logging
from collections import namedtuple
from textwrap import dedent
from tempfile import TemporaryDirectory
import uuid
import atexit
import warnings

from pkg_resources import iter_entry_points, DistributionNotFound
import rdflib
from rdflib.term import URIRef, Identifier
from rdflib.graph import DATASET_DEFAULT_GRAPH_ID

from .command_util import (IVar, SubCommand, GeneratorWithData, GenericUserError,
                           DEFAULT_OWM_DIR)
from . import connect, OWMETA_PROFILE_DIR
from .bundle import (BundleDependentStoreConfigBuilder, BundleDependencyManager,
                     retrieve_remotes)
from .commands.bundle import OWMBundle
from .context import (Context, DEFAULT_CONTEXT_KEY, IMPORTS_CONTEXT_KEY)
from .context_common import CONTEXT_IMPORTS
from .capable_configurable import CAPABILITY_PROVIDERS_KEY
from .capabilities import FilePathProvider
from .data import (NAMESPACE_MANAGER_KEY,
                   NAMESPACE_MANAGER_STORE_KEY,
                   NAMESPACE_MANAGER_STORE_CONF_KEY,
                   TRANSACTION_MANAGER_KEY,
                   _Dataset)
from .dataobject import (DataObject, RDFSClass, RegistryEntry, PythonClassDescription,
                         PIPInstall, PythonPackage, PythonModule, Module, ClassDescription,
                         ModuleAccessor)
from .dataobject_property import ObjectProperty, UnionProperty
from .datasource_loader import DataSourceDirLoader, LoadFailed
from .graph_serialization import write_canonical_to_file, gen_ctx_fname
from .mapper import Mapper, CLASS_REGISTRY_CONTEXT_KEY, CLASS_REGISTRY_CONTEXT_LIST_KEY
from .capability_providers import (TransactionalDataSourceDirProvider,
                                   SimpleCacheDirectoryProvider,
                                   WorkingDirectoryProvider,
                                   SimpleTemporaryDirectoryProvider)
from .utils import FCN, retrieve_provider, PROVIDER_PATH_RE
from .rdf_utils import ContextSubsetStore, BatchAddGraph


L = logging.getLogger(__name__)

DEFAULT_SAVE_CALLABLE_NAME = 'owm_data'
'''
Default name for the provider in the arguments to `OWM.save`
'''

DSDL_GROUP = 'owmeta_core.datasource_dir_loader'

DSD_DIRKEY = 'owmeta_core.command.OWMDirDataSourceDirLoader'
'''
Key used for data source directory loader and file path provider
'''

DEFAULT_NS_MANAGER_STORE = 'FileStorageZODB'


[docs]class OWMSource(object): ''' Commands for working with DataSource objects ''' def __init__(self, parent): self._parent = parent
[docs] def list(self, context=None, kind=None, full=False): """ List known sources Parameters ---------- kind : str Only list sources of this kind context : str The context to query for sources full : bool Whether to (attempt to) shorten the source URIs by using the namespace manager """ from .datasource import DataSource def generator(): if context is not None: ctx = self._parent._make_ctx(context) else: ctx = self._parent._default_ctx kind_uri = self._parent._den3(kind or DataSource.rdf_type) dst = ctx.stored(ctx.stored.resolve_class(kind_uri)) if dst is None: raise GenericUserError('Could not resolve a Python class for ' + str(kind)) for ds in dst.query().load(): yield ds def format_id(r): nm = self._parent.namespace_manager if full: return r.identifier return nm.normalizeUri(r.identifier) def format_comment(r): comment = r.rdfs_comment() if comment: return '\n'.join(comment) return '' self._parent.connect(expect_cleanup=True) return wrap_data_object_result(generator())
[docs] def derivs(self, data_source): ''' List data sources derived from the one given Parameters ---------- data_source : str The ID of the data source to find derivatives of ''' from owmeta_core.datasource import DataSource def generator(): with self._parent.connect(): uri = self._parent._den3(data_source) ctx = self._parent._default_ctx.stored source = ctx(DataSource)(ident=uri) for deriv in self._derivs(ctx, source): yield deriv def text_format(dat): source, derived = dat return '{}{}'.format(source.identifier, derived.identifier) return GeneratorWithData(generator(), text_format=text_format, header=("Source", "Derived"), columns=(lambda x: x[0], lambda x: x[1]))
def _derivs(self, ctx, source): from owmeta_core.datasource import DataSource derived = ctx(DataSource).query() derived.source(source) res = [] for x in derived.load(): res.append((source, x)) res += self._derivs(ctx, x) return res
[docs] def show(self, *data_source): ''' Parameters ---------- *data_source : str The ID of the data source to show ''' from owmeta_core.datasource import DataSource with self._parent.connect(): for ds in data_source: uri = self._parent._den3(ds) for x in self._parent._default_ctx.stored(DataSource)(ident=uri).load(): self._parent.message(x.format_str(stored=True))
[docs] def list_kinds(self, full=False): """ List kinds of DataSources available in the current context. Note that *only* DataSource types which are reachable from the current context will be listed. So if, for instance, you have just saved some types (e.g., with `owm save`) but have not added an import of the contexts for those types, you may not see any results from this command. Parameters ---------- full : bool Whether to (attempt to) shorten the source URIs by using the namespace manager """ from .datasource import DataSource from .rdf_query_modifiers import (ZeroOrMoreTQLayer, rdfs_subclassof_subclassof_zom_creator) with self._parent.connect(): ctx = self._parent._default_ctx rdfto = ctx.stored(DataSource.rdf_type_object) sc = ctx.stored(RDFSClass)() sc.rdfs_subclassof_property(rdfto) nm = self._parent.namespace_manager zom_matcher = rdfs_subclassof_subclassof_zom_creator(DataSource.rdf_type) g = ZeroOrMoreTQLayer(zom_matcher, ctx.stored.rdf_graph()) for x in sc.load(graph=g): if full: yield x.identifier else: yield nm.normalizeUri(x.identifier)
[docs] def rm(self, *data_source): ''' Remove a `DataSource` Parameters ---------- *data_source : str ID of the source to remove ''' from .datasource import DataSource with self._parent.connect(), self._parent.transaction_manager: for ds in data_source: uri = self._parent._den3(ds) ctx = self._parent._default_ctx.stored for x in ctx(DataSource).query(ident=uri).load(): for trans in x.transformation.get(): ctx(trans).retract() ctx(x).retract()
[docs]class OWMTranslator(object): ''' Data source translator commands ''' def __init__(self, parent): self._parent = parent
[docs] def list(self, context=None, full=False): ''' List translators Parameters ---------- context : str The root context to search full : bool Whether to (attempt to) shorten the source URIs by using the namespace manager ''' from owmeta_core.datasource import DataTranslator def generator(): with self._parent.connect(): if context is not None: ctx = self._parent._make_ctx(context) else: ctx = self._parent._default_ctx dtq = ctx.stored(DataTranslator).query() for dt in dtq.load(): yield dt def id_fmt(trans): nm = self._parent.namespace_manager if full: return str(trans.identifier) else: return nm.normalizeUri(trans.identifier) return GeneratorWithData(generator(), header=('ID',), columns=(id_fmt,), text_format=id_fmt)
[docs] def show(self, translator): ''' Show a translator Parameters ---------- translator : str The translator to show ''' from owmeta_core.datasource import DataTranslator with self._parent.connect(): uri = self._parent._den3(translator) dt = self._parent._default_ctx.stored(DataTranslator)(ident=uri) for x in dt.load(): self._parent.message(x) return
[docs] def create(self, translator_type): ''' Creates an instance of the given translator class and adds it to the graph Parameters ---------- translator_type : str RDF type for the translator class ''' with self._parent.connect(): ctx = self._parent._default_ctx translator_uri = self._parent._den3(translator_type) translator_cls = ctx.stored.resolve_class(translator_uri) if not translator_cls: raise GenericUserError(f'Unable to find the class for {translator_type}') with self._parent.transaction_manager: res = ctx(translator_cls)() ctx.add_import(translator_cls.definition_context) ctx.save() ctx.save_imports(transitive=False) return res.identifier
[docs] def list_kinds(self, full=False): """ List kinds of DataTranslators Note that *only* DataTranslator types which are reachable from the current context will be listed. So if, for instance, you have just saved some types (e.g., with `owm save`) but have not added an import of the contexts for those types, you may not see any results from this command. Parameters ---------- full : bool Whether to (attempt to) shorten the translator URIs by using the namespace manager """ from .datasource import DataTranslator from .rdf_query_modifiers import (ZeroOrMoreTQLayer, rdfs_subclassof_subclassof_zom_creator) with self._parent.connect(): ctx = self._parent._default_ctx rdfto = ctx.stored(DataTranslator.rdf_type_object) sc = ctx.stored(RDFSClass)() sc.rdfs_subclassof_property(rdfto) nm = self._parent.namespace_manager zom_matcher = rdfs_subclassof_subclassof_zom_creator(DataTranslator.rdf_type) g = ZeroOrMoreTQLayer(zom_matcher, ctx.stored.rdf_graph()) for x in sc.load(graph=g): if full: yield x.identifier else: yield nm.normalizeUri(x.identifier)
[docs] def rm(self, *translator): ''' Remove a `DataTranslator` Parameters ---------- *translator : str ID of the source to remove ''' from .datasource import DataTranslator with self._parent.connect(), self._parent.transaction_manager: for dt in translator: uri = self._parent._den3(dt) ctx = self._parent._default_ctx.stored for x in ctx(DataTranslator).query(ident=uri).load(): ctx(x).retract()
[docs]class OWMTypes(object): ''' Commands for dealing with Python classes and RDF types ''' def __init__(self, parent): self._parent = parent
[docs] def rm(self, *type): ''' Removes info about the given types, like ``rdfs:subClassOf`` statements, and removes the corresponding registry entries as well Parameters ---------- *type : str Types to remove ''' with self._parent.connect() as conn, conn.transaction_manager: for class_id in type: uri = self._parent._den3(class_id) ctx = self._parent._default_ctx.stored tdo = ctx.stored(RDFSClass)(ident=uri) ctx(tdo).retract() crctx = conn.mapper.class_registry_context re = crctx.stored(RegistryEntry).query() re.rdf_class(uri) for x in re.load(): crctx.stored(x).retract()
[docs]class OWMNamespace(object): ''' RDF namespace commands ''' def __init__(self, parent): self._parent = parent
[docs] def bind(self, prefix, uri): ''' Bind a prefix to a namespace URI Parameters ---------- prefix : str Prefix to bind to a namespace URI uri : str Namespace URI to bind to a prefix ''' with self._parent.connect(), self._parent.transaction_manager: self._parent.namespace_manager.bind(prefix, uri)
[docs] def list(self): ''' List namespace prefixes and URIs in the project ''' with self._parent.connect() as conn: nm = conn.conf[NAMESPACE_MANAGER_KEY] return GeneratorWithData( (dict(prefix=prefix, uri=uri) for prefix, uri in nm.namespaces()), header=('Prefix', 'URI'), columns=(lambda r: r['prefix'], lambda r: r['uri']))
class _ProgressMock(object): def __getattr__(self, name): return type(self)() def __call__(self, *args, **kwargs): return type(self)()
[docs]class OWMConfig(object): ''' Config file commands. Without any sub-command, prints the configuration parameters ''' user = IVar(value_type=bool, default_value=False, doc='If set, configs are only for the user; otherwise, they \ would be committed to the repository') def __init__(self, parent): self._parent = parent self._next = None def __setattr__(self, t, v): super(OWMConfig, self).__setattr__(t, v) def __call__(self): owm = self._parent if self._next is not None: try: return self._next() finally: owm.repository().add([owm.config_file]) else: fname = self._get_config_file() with open(fname, 'r') as f: return json.load(f) @IVar.property('user.conf', value_type=str) def user_config_file(self): ''' The user config file name ''' if isabs(self._user_config_file): return self._user_config_file return pth_join(self._parent.owmdir, self._user_config_file) @user_config_file.setter def user_config_file(self, val): self._user_config_file = val def _get_config_file(self): if not exists(self._parent.owmdir): raise OWMDirMissingException(self._parent.owmdir) if self.user: res = self.user_config_file else: res = self._parent.config_file if not exists(res): if self.user: self._init_user_config_file() else: self._parent._init_config_file() return res def _init_user_config_file(self): with open(self.user_config_file, 'w') as f: write_config({}, f)
[docs] def get(self, key): ''' Read a config value Parameters ---------- key : str The configuration key ''' fname = self._get_config_file() with open(fname, 'r') as f: ob = json.load(f) return ob.get(key)
[docs] def set(self, key, value): ''' Set a config value Parameters ---------- key : str The configuration key value : str The value to set ''' fname = self._get_config_file() with open(fname, 'r+') as f: ob = json.load(f) f.seek(0) try: json_value = json.loads(value) except ValueError: json_value = value ob[key] = json_value write_config(ob, f)
[docs] def delete(self, key): ''' Deletes a config value Parameters ---------- key : str The configuration key ''' fname = self._get_config_file() with open(fname, 'r+') as f: ob = json.load(f) f.seek(0) del ob[key] write_config(ob, f)
_PROGRESS_MOCK = _ProgressMock() @contextmanager def default_progress_reporter(*args, **kwargs): yield _PROGRESS_MOCK POSSIBLE_EDITORS = [ '/usr/bin/vi', '/usr/bin/vim', '/usr/bin/nano', 'vim', 'vi', 'nano' ]
[docs]class OWMContexts(object): ''' Commands for working with contexts ''' def __init__(self, parent): self._parent = parent
[docs] def serialize(self, context=None, destination=None, format='nquads', include_imports=False, whole_graph=False): ''' Serialize the current default context or the one provided Parameters ---------- context : str The context to save destination : file or str A file-like object to write the file to or a file name. If not provided, messages the result. format : str Serialization format (ex, 'n3', 'nquads') include_imports : bool If true, then include contexts imported by the provided context in the result. The default is not to include imported contexts. whole_graph : bool Serialize all contexts from all graphs (this probably isn't what you want) ''' retstr = False if destination is None: from six import BytesIO retstr = True destination = BytesIO() with self._parent.connect(): if whole_graph: if context is not None: raise GenericUserError('Serializing the whole graph precludes selecting a' ' single context') self._parent.rdf.serialize(destination, format=format) else: if context is None: ctx = self._parent._default_ctx else: ctx = Context(ident=self._parent._den3(context), conf=self._parent._conf()) if include_imports: ctx.stored.rdf_graph().serialize(destination, format=format) else: ctx.own_stored.rdf_graph().serialize(destination, format=format) if retstr: self._parent.message(destination.getvalue().decode(encoding='utf-8'))
[docs] def edit(self, context=None, format=None, editor=None, list_formats=False): ''' Edit a provided context or the current default context. The file name of the serialization will be passed as the sole argument to the editor. If the editor argument is not provided, will use the EDITOR environment variable. If EDITOR is also not defined, will try a few known editors until one is found. The editor must write back to the file. Parameters ---------- context : str The context to edit format : str Serialization format (ex, 'n3', 'nquads'). Default 'n3' editor : str The program which will be used to edit the context serialization. list_formats : bool List the formats available for editing (I.O.W., formats that we can both read and write) ''' import re from rdflib.plugin import plugins from rdflib.serializer import Serializer from rdflib.parser import Parser serializers = set(x.name for x in plugins(kind=Serializer)) parsers = set(x.name for x in plugins(kind=Parser)) formats = serializers & parsers if list_formats: return formats if not format: format = 'n3' if format not in formats: raise GenericUserError("Unsupported format: " + format) from subprocess import call if context is None: ctx = self._parent._default_ctx ctxid = self._parent._conf(DEFAULT_CONTEXT_KEY) else: ctx = Context(ident=context, conf=self._parent._conf()) ctxid = context if not editor: editor = self._get_editor_command() with self._parent._tempdir(prefix='owm-context-edit.') as d: from rdflib import plugin from rdflib.parser import Parser, create_input_source parser = plugin.get(format, Parser)() fname = pth_join(d, 'data') need_edit = True load_original = True # XXX This is so rotten with self._parent.connect(): while need_edit: # We need this loop to be all the way outside of the connection because # we'll need to roll-back the connection since the parser may have already # modified the graph which is our record of the original contents. We # *could* just save the file, but it's safer to just roll-back. need_edit = False try: with self._parent.transaction_manager: if load_original: with open(fname, mode='wb') as destination: # For canonical graphs, we would need to sort the triples first, # but it's not needed here -- the user probably doesn't care one # way or the other ctx.own_stored.rdf_graph().serialize(destination, format=format) load_original = False call([editor, fname]) with open(fname, mode='rb') as source: g = self._parent.own_rdf.get_context(ctxid) L.debug("Removing all triples...") g.remove((None, None, None)) L.debug("Removed all triples") try: L.debug("Parsing...") parser.parse(create_input_source(source), g) except Exception as e: # There are some specific parsing errors, but we try to be lenient # here and allow anything to be retried if not self._parent.non_interactive: self._parent.message(f"Error parsing RDF: {e}") response = self._parent.prompt('Try again? Yes: (M)odified, (O)riginal; (N)o: ') if re.match(r'[nN]', response): # We've already sent the message, so we don't really # need to throw the exception below, but just so that # exception propagates the same way for interactive # and non-interactive we do anyway pass elif re.match(r'[mMyY]', response): need_edit = True raise elif re.match(r'[oO]', response): need_edit = True load_original = True L.debug("raising...") raise raise GenericUserError(f"Error parsing RDF: {e}") except Exception: if need_edit: continue raise
def _get_editor_command(self): editor = os.environ['EDITOR'].strip() if not editor: for editor in POSSIBLE_EDITORS: if hasattr(shutil, 'which'): editor = shutil.which(editor) if editor: break elif os.access(editor, os.R_OK | os.X_OK): break if not editor: raise GenericUserError("No known editor could be found") return editor
[docs] def list(self, include_dependencies=False, include_default=False): ''' List the set of contexts in the graph Parameters ---------- include_dependencies : bool If set, then contexts from dependencies will be included include_default : bool If set, then include the default graph in the results as well ''' with self._parent.connect(): if include_dependencies: for c in self._parent.rdf.contexts(): is_default = c.identifier == DATASET_DEFAULT_GRAPH_ID if not is_default or include_default: yield c.identifier else: for c in self._parent.own_rdf.contexts(): is_default = c.identifier == DATASET_DEFAULT_GRAPH_ID if not is_default or include_default: yield c.identifier
[docs] def list_changed(self): ''' Return the set of contexts which differ from the serialization on disk ''' return self._parent._changed_contexts_set()
[docs] def list_imports(self, context): ''' List the contexts that the given context imports Parameters ---------- context : str The context to list imports for ''' with self._parent.connect(): ctx = self._parent._make_ctx(context).stored for c in ctx.imports: yield c.identifier
[docs] def list_importers(self, context): ''' List the contexts that import the given context Parameters ---------- context : str The context to list importers for ''' imports_ctxid = self._parent.imports_context() imports_ctx = self._parent._context(Context)(imports_ctxid).stored g = imports_ctx.rdf_graph() for t in g.triples((None, CONTEXT_IMPORTS, URIRef(context))): yield t[0]
[docs] def add_import(self, importer, imported): ''' Add an import to the imports graph Parameters ---------- importer : str The importing context imported : list str The imported context ''' importer_ctx = self._parent._context(Context)(importer) with self._parent.connect(), self._parent.transaction_manager: for imp in imported: importer_ctx.add_import(Context(imp)) importer_ctx.save_imports()
[docs] def rm_import(self, importer, imported): ''' Remove an import statement Parameters ---------- importer : str The importing context imported : list of str An imported context ''' with self._parent.connect(): imports_ctxid = self._parent.imports_context() imports_ctx = self._parent._context(Context)(imports_ctxid).stored with self._parent.transaction_manager: for imp in imported: imports_ctx.rdf_graph().remove((URIRef(importer), CONTEXT_IMPORTS, URIRef(imp)))
[docs] def bundle(self, context): ''' Show the closest bundle that defines this context Parameters ---------- context : str The context to lookup ''' context = self._parent._den3(context) with self._parent.connect(): dep_mgr = self._parent._bundle_dep_mgr contexts = set(str(getattr(c, 'identifier', c)) for c in self._parent.own_rdf.contexts()) target_bundle = dep_mgr.lookup_context_bundle(contexts, str(context)) if target_bundle is dep_mgr: return None return target_bundle
[docs] def rm(self, *context): ''' Remove a context Parameters ---------- *context : str Context to remove ''' with self._parent.connect(): graph = self._parent.own_rdf with self._parent.transaction_manager: for c in context: c = self._parent._den3(c) graph.remove_graph(c)
[docs]class OWMRegistryModuleAccessDeclare: ''' Commands for module access declarations ''' def __init__(self, parent): self._parent = parent self._module_access = self._parent self._registry = self._parent._parent self._owm = self._parent._parent._parent
[docs] def python_pip(self, package_name, package_version=None, index=None, module_names=None, module_id=None): ''' Declare access with a Python pip package The given module should already have been defined in the class registry. This may be achieved by the "owm save" command. Parameters ---------- package_name : str Name of the package package_version : str Version of the package. If not provided, will attempt to find the active version in package metadata index : str The index to get the package from. Optional module_names : list of str Name of the module. If not provided, will attempt to find the modules from package metadata. Multiple module names can be provided module_id : str URI identifier of the module. Cannot be specified along with `module_name` ''' # We don't allow or expect arbitrary requirements specifications here for a couple of # reasons: # # 1. we want to create PythonPackage statements # 2. only a '==' specification is acceptable since we're meant to record exactly # which version the bundle was tested with. # # We also don't include platform information like which version of Python and # which operating system. Our concept of usage implies that the user of a given # data set sees which packages they need to download, they'll look at the # documentation for installation instructions for the packages when needed. # # More generally, platform information should be added to artifacts of # computations to indicate how to redo the computation. Moreover, a given # module access description could "diffract" into different platform and OS # specifications depending on which context they are employed in. In other words, # we don't have sufficient information to meaningfully add platform info here. dist = None def get_dist(): try: from importlib.metadata import distribution except ImportError: try: from importlib_metadata import distribution except ImportError: raise GenericUserError( 'Package name and package version must be defined.' ' They cannot be looked up in this version of Python') dist = None try: dist = distribution(package_name) except Exception: L.debug('Caught exception in retrieving Distribution for %s', package_name, exc_info=True) if dist is None: raise GenericUserError( f'Did not find the package "{package_name}"') return dist if not package_version: dist = get_dist() package_version = dist.version self._owm.message('Declaring accessors for any modules of' f' {package_name}=={package_version}') if not (module_names or module_id): from importlib import import_module from pkgutil import walk_packages module_names = set() if dist is None: dist = get_dist() for pkg in (dist.read_text('top_level.txt') or '').split(): mod = import_module(pkg) for m in walk_packages(mod.__path__, pkg + '.'): module_names.add(m.name) with self._owm.connect() as conn, self._owm.transaction_manager: crctx = conn.mapper.class_registry_context for module_name in module_names: # TODO: Use property alternatives when that works pymod_q = crctx.stored(PythonModule).query(ident=module_id) pymod_q.name(module_name) for pymod in pymod_q.load(): package = crctx(PythonPackage)( name=package_name, version=package_version) crctx(pymod).package(package) pip_install = crctx(PIPInstall)( package=package, index_url=index) crctx(pymod).accessor(pip_install) self._owm.message(f'Adding {package} to {pymod} accessed by {pip_install}') crctx.save()
[docs]class OWMRegistryModuleAccessShow: ''' Show module accessor description ''' def __init__(self, parent): self._parent = parent self._module_access = self._parent self._registry = self._parent._parent self._owm = self._parent._parent._parent def __call__(self, module_accessor): ''' Parameters ---------- module_accessor : str Module accessor to show accessors for ''' with self._owm.connect() as conn: ma_id = self._owm._den3(module_accessor) for ctx in conn.mapper.class_registry_context_list: ma = ctx(ModuleAccessor)(ident=ma_id).load_one() if ma: print(ma.help_str())
[docs]class OWMRegistryModuleAccess: ''' Commands for manipulating software module access in the class registry ''' declare = SubCommand(OWMRegistryModuleAccessDeclare) show = SubCommand(OWMRegistryModuleAccessShow) def __init__(self, parent): self._parent = parent self._registry = self._parent self._owm = self._parent._parent
[docs] def list(self, registry_entry=None): ''' List module accessors Parameters ---------- registry_entry : str Registry entry ID. Optional Returns ------- sequence of `ModuleAccessor` ''' def gen(conn): re_id = registry_entry and self._owm._den3(registry_entry) re = None for ctx in conn.mapper.class_registry_context_list: re = ctx(RegistryEntry)(ident=re_id).load_one() if re is not None: break if re is not None: cd = ctx(ClassDescription).query() mod = ctx(Module).query() re.class_description(cd) cd.module(mod) for accessor in mod.accessor.get(): yield accessor conn = self._owm.connect(expect_cleanup=True) return wrap_data_object_result(gen(conn))
[docs]class OWMRegistry(object): ''' Commands for dealing with the class registry, a mapping of RDF types to constructs in programming languages Although it is called the "*class* registry", the registry can map RDF types to constructs other than classes in the target programming language, particularly in languages that don't have classes (e.g., C) or where the use of classes is not preferred in that language. ''' module_access = SubCommand(OWMRegistryModuleAccess) def __init__(self, parent): self._parent = parent
[docs] def list(self, module=None, rdf_type=None, class_name=None): ''' List registered classes Parameters ---------- module : str If provided, limits the registry entries returned to those that have the given module name. Optional. rdf_type : str If provided, limits the registry entries returned to those that have the given RDF type. Optional. class_name : str If provided, limits the registry entries returned to those that have the given class name. Optional. ''' def registry_entries(): nonlocal rdf_type with self._parent.connect() as conn: nm = conn.conf[NAMESPACE_MANAGER_KEY] for re in conn.mapper.load_registry_entries(): ident = re.identifier cd = re.class_description() re_rdf_type = re.rdf_class() if not isinstance(cd, PythonClassDescription): continue module_do = cd.module() if nm: ident = nm.normalizeUri(ident) if hasattr(module_do, 'name'): module_name = module_do.name() re_class_name = cd.name() if module is not None and module != module_name: continue if rdf_type is not None and rdf_type != str(re_rdf_type): continue if class_name is not None and class_name != str(re_class_name): continue if nm: re_rdf_type = nm.normalizeUri(re_rdf_type) res = dict(id=ident, rdf_type=re_rdf_type, class_name=re_class_name, module_name=module_name) if hasattr(module_do, 'package'): package = module_do.package() if package: if nm: pkgid = nm.normalizeUri(package.identifier) else: pkgid = package.identifier res['package'] = dict(id=pkgid, name=package.name(), version=package.version()) yield res def fmt_text(entry, format=None): if format == 'pretty': pkg_id = entry.get('package') and entry['package']['id'] return dedent('''\ {id}: RDF Type: {rdf_type} Module Name: {module_name} Class Name: {class_name} Package: {pkg_id}\n''').format(pkg_id=pkg_id, **entry) else: return entry['id'] return GeneratorWithData(registry_entries(), header=('ID', 'RDF Type', 'Class Name', 'Module Name', 'Package', 'Package Name', 'Package Version'), columns=(lambda r: r['id'], lambda r: r['rdf_type'], lambda r: r['class_name'], lambda r: r['module_name'], lambda r: r.get('package') and r['package']['id'], lambda r: r.get('package') and r['package']['name'], lambda r: r.get('package') and r['package']['version'],), default_columns=('ID', 'RDF Type', 'Class Name', 'Module Name', 'Package'), text_format=fmt_text)
[docs] def show(self, *registry_entry): ''' Show registry entries Parameters ---------- *registry_entry : str Registry entry to show '''
[docs] def rm(self, *registry_entry): ''' Remove a registry entry Parameters ---------- *registry_entry : str Registry entry to remove ''' with self._parent.transaction_manager: for re in registry_entry: uri = self._parent._den3(re) with self._parent.connect() as conn: crctx = conn.mapper.class_registry_context for x in crctx(RegistryEntry).query(ident=uri).load(): crctx.stored(x).retract()
[docs]class OWM: """ High-level commands for working with owmeta data """ graph_accessor_finder = IVar(doc='Finds an RDFLib graph from the given URL') repository_provider = IVar(doc='The provider of the repository logic' ' (cloning, initializing, committing, checkouts)') non_interactive = IVar(value_type=bool, doc='If this option is provided, then interactive prompts are not allowed') context = IVar(doc='Context to use instead of the default context. Commands that' ' work with other contexts (e.g., `owm contexts rm-import`) will continue' ' to use those other contexts unless otherwise indicated') # N.B.: Sub-commands are created on-demand when you access the attribute, # hence they do not, in any way, store attributes set on them. You must # save the instance of the subcommand to a variable in order to make # multiple statements including that sub-command config = SubCommand(OWMConfig) source = SubCommand(OWMSource) translator = SubCommand(OWMTranslator) namespace = SubCommand(OWMNamespace) contexts = SubCommand(OWMContexts) type = SubCommand(OWMTypes) bundle = SubCommand(OWMBundle) registry = SubCommand(OWMRegistry) def __init__(self, owmdir=None, non_interactive=False): ''' Attributes ---------- cleanup_manager : `atexit`-like An object to which functions can be `registered <atexit.register>` and `unregistered <atexit.unregister>`. To handle cleaning up connections that were not closed more directly (e.g., by calling `~OWM.disconnect`) progress_reporter : `tqdm`-like A callable that presents some kind of progress to a user. Interface is a subset of the `tqdm.tqdm` object: the reporter must accept ``unit``, ``miniters``, ``file``, and ``leave`` options, although what it does with those is unspecified. Additionally, for reporting progress on cloning a project, an `optional interface <.git_repo.GitRepoProvider.clone>` is required. ''' # Put the docstring here so it doesn't show up in the CLI output, but does show up # in Sphinx docs self.progress_reporter = default_progress_reporter self.message = lambda *args, **kwargs: print(*args, **kwargs) def prompt(*args, **kwargs): res = input(*args, **kwargs) print() return res self.prompt = prompt self._data_source_directories = None self._changed_contexts = None self._owm_connection = None self._connections = set() self._context_change_tracker = None if owmdir: self.owmdir = owmdir if non_interactive: self.non_interactive = non_interactive self._bundle_dep_mgr = None self._context = _ProjectContext(owm=self) self._cached_default_context = None self.cleanup_manager = atexit def __str__(self): return f'{self.__class__.__name__}({self.owmdir})' @IVar.property(OWMETA_PROFILE_DIR) def userdir(self): ''' Root directory for user-specific configuration ''' return realpath(expandvars(expanduser(self._userdir))) @userdir.setter def userdir(self, val): self._userdir = val @IVar.property('.') def basedir(self): ''' The base directory. owmdir is resolved against this base ''' return self._basedir @basedir.setter def basedir(self, val): self._basedir = realpath(expandvars(expanduser(val))) @IVar.property(DEFAULT_OWM_DIR) def owmdir(self): ''' The base directory for owmeta files. The repository provider's files also go under here ''' if isabs(self._owmdir): res = self._owmdir else: res = pth_join(self.basedir, self._owmdir) return res @owmdir.setter def owmdir(self, val): self._owmdir = val @IVar.property('owm.conf', value_type=str) def config_file(self): ''' The config file name ''' if isabs(self._config_file): return self._config_file return pth_join(self.owmdir, self._config_file) @config_file.setter def config_file(self, val): self._config_file = val @IVar.property('worm.db') def store_name(self): ''' The file name of the database store ''' if isabs(self._store_name): return self._store_name return pth_join(self.owmdir, self._store_name) @store_name.setter def store_name(self, val): self._store_name = val @IVar.property('nm.db') def namespace_manager_store_name(self): ''' The file name of the namespace database store ''' if isabs(self._nm_store_name): return self._nm_store_name return pth_join(self.owmdir, self._nm_store_name) @namespace_manager_store_name.setter def namespace_manager_store_name(self, val): self._nm_store_name = val @IVar.property('temp') def temporary_directory(self): ''' The base temporary directory for any operations that need one ''' if isabs(self._temporary_directory): return self._temporary_directory return pth_join(self.owmdir, self._temporary_directory) @temporary_directory.setter def temporary_directory(self, val): self._temporary_directory = val def _ensure_owmdir(self): if not exists(self.owmdir): makedirs(self.owmdir)
[docs] def save(self, module, provider=None, context=None): ''' Save the data in the given context Saves the "mapped" classes declared in a module and saves the objects declared by the "provider" (see the argument's description) Parameters ---------- module : str Name of the module housing the provider provider : str Name of the provider, a callble that accepts a context object and adds statements to it. Can be a "dotted" name indicating attribute accesses. Default is `DEFAULT_SAVE_CALLABLE_NAME` context : str The target context. The default context is used ''' import importlib as IM from functools import wraps with self.connect() as conn: conf = self._conf() added_cwd = False cwd = os.getcwd() if cwd not in sys.path: sys.path.append(cwd) added_cwd = True try: mod = IM.import_module(module) provider_not_set = provider is None if not provider: provider = DEFAULT_SAVE_CALLABLE_NAME if not context: ctx = conn(_OWMSaveContext)(self._default_ctx, mod) else: ctx = conn(_OWMSaveContext)(Context(ident=context, conf=conf), mod) attr_chain = provider.split('.') prov = mod for x in attr_chain: try: prov = getattr(prov, x) except AttributeError: if provider_not_set and getattr(mod, '__yarom_mapped_classes__', None): def prov(*args, **kwargs): pass break raise ns = OWMSaveNamespace(context=ctx) mapped_classes = getattr(mod, '__yarom_mapped_classes__', None) if mapped_classes: # It's a module with class definitions -- take each of the mapped # classes and add their contexts so they're saved properly... orig_prov = prov mapper = self._owm_connection.mapper @wraps(prov) def save_classes(ns): ns.include_context(mapper.class_registry_context) # Note that we don't call `mapper.save` here. Rather, we declare # the class registry entries and use the OWMSaveNamespace.save # below mapper.process_module(module, mod) mapper.declare_python_class_registry_entry(*mapped_classes) for mapped_class in mapped_classes: ns.include_context(mapped_class.definition_context) # N.B.: We don't add an import of the class to the current # context because there aren't necessarily any statements that # use the class. An import should be added when a statement # using the class is added to the importing context. for mapped_class in mapped_classes: if hasattr(mapped_class, 'rdf_namespace'): try: ns.namespace_manager.bind( mapped_class.__name__, mapped_class.rdf_namespace, override=True, replace=True) except Exception: L.warning('Failed to bind RDF namespace for %s to %s', mapped_class.__name__, mapped_class.rdf_namespace, exc_info=True) orig_prov(ns) prov = save_classes with self.transaction_manager: prov(ns) ns.save(graph=conf['rdf.graph']) return ns.created_contexts() finally: if added_cwd: sys.path.remove(cwd)
[docs] def retract(self, subject, property, object): ''' Remove one or more statements Parameters ---------- subject : str The object which you want to say something about. optional property : str The type of statement to make. optional object : str The other object you want to say something about. optional ''' with self.connect() as conn, conn.transaction_manager: conn.rdf.get_context(self._default_ctx.identifier).remove(( None if subject == 'ANY' else self._den3(subject), None if property == 'ANY' else self._den3(property), None if object == 'ANY' else self._den3(object)))
[docs] def say(self, subject, property, object): ''' Make a statement Parameters ---------- subject : str The object which you want to say something about property : str The type of statement to make object : str The other object you want to say something about ''' with self.connect() as conn, conn.transaction_manager: conn.rdf.get_context(self._default_ctx.identifier).add(( self._den3(subject), self._den3(property), self._den3(object)))
[docs] def set_default_context(self, context, user=False): ''' Set current default context for the repository Parameters ---------- context : str The context to set user : bool If set, set the context only for the current user. Has no effect for retrieving the context ''' config = self.config config.user = user config.set(DEFAULT_CONTEXT_KEY, context)
[docs] def get_default_context(self): ''' Read the current target context for the repository ''' return self._conf().get(DEFAULT_CONTEXT_KEY)
[docs] def imports_context(self, context=None, user=False): ''' Read or set current target imports context for the repository Parameters ---------- context : str The context to set user : bool If set, set the context only for the current user. Has no effect for retrieving the context ''' if context is not None: config = self.config config.user = user config.set(IMPORTS_CONTEXT_KEY, context) else: return self._conf().get(IMPORTS_CONTEXT_KEY)
[docs] def init(self, update_existing_config=False, default_context_id=None): """ Makes a new graph store. The configuration file will be created if it does not exist. If it *does* exist, the location of the database store will, by default, not be changed in that file If not provided, some values will be prompted for, unless batch (non-interactive) mode is enabled. If batch mode is enabled, either an error will be returned or a default value will be used for missing options. Values which are required either in a prompt or as options are indicated as "Required" below. Parameters ---------- update_existing_config : bool If True, updates the existing config file to point to the given file for the store configuration default_context_id : str URI for the default context. Required """ try: reinit = exists(self.owmdir) self._ensure_owmdir() if not exists(self.config_file): self._init_config_file(default_context_id=default_context_id) elif update_existing_config: with open(self.config_file, 'r+') as f: conf = json.load(f) conf['rdf.store_conf'] = pth_join('$OWM', relpath(abspath(self.store_name), abspath(self.owmdir))) f.seek(0) write_config(conf, f) self.connect().disconnect() self._init_repository(reinit) if reinit: self.message('Reinitialized owmeta-core project at %s' % abspath(self.owmdir)) else: self.message('Initialized owmeta-core project at %s' % abspath(self.owmdir)) except BaseException: if not reinit: self._ensure_no_owmdir() raise
def _ensure_no_owmdir(self): if exists(self.owmdir): shutil.rmtree(self.owmdir) def _init_config_file(self, default_context_id=None): with open(self._default_config_file_name(), 'r') as f: default = json.load(f) with open(self.config_file, 'w') as of: default['rdf.store_conf'] = pth_join('$OWM', relpath(abspath(self.store_name), abspath(self.owmdir))) default[NAMESPACE_MANAGER_STORE_KEY] = DEFAULT_NS_MANAGER_STORE default[NAMESPACE_MANAGER_STORE_CONF_KEY] = pth_join('$OWM', relpath(abspath(self.namespace_manager_store_name), abspath(self.owmdir))) if not default_context_id and not self.non_interactive: default_context_id = self.prompt(dedent('''\ The default context is where statements are placed by default. The URI for this context should use a domain name that you control. Please provide the URI of the default context: ''')) default_context_id = default_context_id and str(default_context_id).strip() if not default_context_id: raise GenericUserError("A default context ID is required") default[DEFAULT_CONTEXT_KEY] = str(default_context_id).strip() default[IMPORTS_CONTEXT_KEY] = str(uuid.uuid4().urn).strip() default[CLASS_REGISTRY_CONTEXT_KEY] = str(uuid.uuid4().urn).strip() write_config(default, of) def repository(self): repo = self.repository_provider if exists(self.owmdir): repo.base = self.owmdir return repo def _init_repository(self, reinit): if self.repository_provider is not None: self.repository_provider.init(base=self.owmdir) if not reinit: self.repository_provider.add([relpath(self.config_file, self.owmdir)]) self.repository_provider.commit('Initial commit') def _den3(self, s): r = self._den3_safe(s) if r is None: r = URIRef(s) return r def _den3_safe(self, s): if not s: return s from rdflib.namespace import is_ncname nm = self.namespace_manager if s.startswith('<') and s.endswith('>'): return URIRef(s.strip(u'<>')) parts = s.split(':') expanded = None if len(parts) > 1 and is_ncname(parts[1]): for pref, ns in nm.namespaces(): if pref == parts[0]: expanded = URIRef(ns + parts[1]) break if expanded is not None: return expanded return None
[docs] def fetch_graph(self, url): """ Fetch a graph Parameters ---------- url : str URL for the graph """ res = self._obtain_graph_accessor(url) if not res: raise UnreadableGraphException('Could not read the graph at {}'.format(url)) return res()
[docs] def add_graph(self, url=None, context=None, include_imports=True): """ Fetch a graph and add it to the local store. Parameters ---------- url : str The URL of the graph to fetch context : rdflib.term.URIRef If provided, only this context and, optionally, its imported graphs will be added. include_imports : bool If True, imports of the named context will be included. Has no effect if context is None. """ with self.connect(), self.transaction_manager: graph = self.fetch_graph(url) self._conf('rdf.graph').addN(graph.quads((None, None, None, context)))
def _obtain_graph_accessor(self, url): if self.graph_accessor_finder is None: raise Exception('No graph_accessor_finder has been configured') return self.graph_accessor_finder(url)
[docs] def connect(self, read_only=False, expect_cleanup=False): ''' Create a connection to the project database. Most commands will create their own connections where needed, but for multiple commands you'll want to create one connection at the start. Multiple calls to this method can be made without calling `disconnect` on the resulting connection object, but only if `read_only` has the same value for all calls. Read-only connections can only be made with the default stores: if you have configured your own store and you want the connection to be read-only, you must change the configuration to make it read-only before calling `connect`. Parameters ---------- read_only : bool if True, the resulting connection will be read-only expect_cleanup : bool if False, a warning will be issued if the `cleanup_manager` has to disconnect the connection Returns ------- ProjectConnection Usable as a `context manager <contextmanager.__enter__>` ''' if self._owm_connection is None: conf = self._init_store(read_only=read_only) self._owm_connection = connect(conf=conf, mapper=self._context.mapper) conn = ProjectConnection(self, self._owm_connection, self._connections, expect_cleanup=expect_cleanup) self._connections.add(conn) L.debug("CONNECTED %s (%s open connections)", conn, len(self._connections)) return conn
@property def connected(self): return len(self._connections) > 0 def _conf(self, *args, read_only=False): from owmeta_core.data import Data dat = getattr(self, '_dat', None) if not dat or self._dat_file != self.config_file: if not exists(self.config_file): raise NoConfigFileError(self.config_file) with open(self.config_file) as repo_config: rc = json.load(repo_config) if not exists(self.config.user_config_file): uc = {} else: with open(self.config.user_config_file) as user_config: uc = json.load(user_config) # Pre-process the user-config to resolve variables based on the user # config-file location uc['configure.file_location'] = self.config.user_config_file udat = Data.process_config(uc, variables={'OWM': self.owmdir}) rc.update(udat.items()) rc['configure.file_location'] = self.config_file dat = Data.process_config(rc, variables={'OWM': self.owmdir}) dat['owm.directory'] = self.owmdir store_conf = dat.get('rdf.store_conf', None) if not store_conf: raise GenericUserError('rdf.store_conf is not defined in either of the OWM' ' configuration files at ' + self.config_file + ' or ' + self.config.user_config_file + ' OWM repository may have been initialized' ' incorrectly') if (isinstance(store_conf, str) and isabs(store_conf) and not abspath(store_conf).startswith(abspath(self.owmdir))): raise GenericUserError('rdf.store_conf must specify a path inside of ' + self.owmdir + ' but instead it is ' + store_conf) # If `store_conf` is a dict, we just assume the person who set up the configs # new what they were doing, so no additional checks... if NAMESPACE_MANAGER_STORE_KEY in dat: ns_store = dat[NAMESPACE_MANAGER_STORE_KEY] if ns_store != DEFAULT_NS_MANAGER_STORE: # We don't how to add a transaction manager to anything other than our # default raise GenericUserError('Unable to add `transaction manager` for' f' namespace manager store, "{ns_store}". Only' f' {DEFAULT_NS_MANAGER_STORE} is supported.') try: ns_store_conf = dat[NAMESPACE_MANAGER_STORE_CONF_KEY] except KeyError as e: raise GenericUserError('A separate namespace manager store was' ' declared, but the configuration for the store,' f' "{NAMESPACE_MANAGER_STORE_CONF_KEY}", is missing') from e if isinstance(ns_store_conf, str): ns_store_conf = dict(url=ns_store_conf, transaction_manager=dat[TRANSACTION_MANAGER_KEY]) elif isinstance(ns_store_conf, dict): ns_store_conf['transaction_manager'] = dat[TRANSACTION_MANAGER_KEY] else: raise GenericUserError('Unable to configure namespace manager store' f' transaction manager with "{NAMESPACE_MANAGER_STORE_CONF_KEY}":' f' {ns_store_conf!r}') # We were asked to open read-only, we only know how to tell our default store # how to be read-only, so we check for that if read_only: ns_store_conf['read_only'] = True dat[NAMESPACE_MANAGER_STORE_CONF_KEY] = ns_store_conf deps = dat.get('dependencies', None) if deps: bundles_directory = self.bundle._bundles_directory() remotes_directory = self.bundle._user_remotes_directory() project_remotes = list(retrieve_remotes(self.bundle._project_remotes_directory())) # XXX: Look at how we bring in projects remotes directory cfg_builder = BundleDependentStoreConfigBuilder(bundles_directory=bundles_directory, remotes_directory=remotes_directory, remotes=project_remotes, read_only=read_only, transaction_manager=dat[TRANSACTION_MANAGER_KEY]) store_name, store_conf = cfg_builder.build(store_conf, deps) dat['rdf.source'] = 'default' dat['rdf.store'] = store_name dat['rdf.store_conf'] = store_conf self._bundle_dep_mgr = BundleDependencyManager(bundles_directory=bundles_directory, remotes_directory=remotes_directory, remotes=project_remotes, dependencies=lambda: deps) if CLASS_REGISTRY_CONTEXT_LIST_KEY not in dat: crctx_ids = [] for dep in self._bundle_dep_mgr.load_dependencies_transitive(): crctx_id = dep.manifest_data.get(CLASS_REGISTRY_CONTEXT_KEY) if crctx_id: crctx_ids.append(crctx_id) dat[CLASS_REGISTRY_CONTEXT_LIST_KEY] = crctx_ids self._dat_file = self.config_file self._dat = dat # Putting these after setting _dat to avoid a recursive loop with # self.transaction_manager providers = dat.get(CAPABILITY_PROVIDERS_KEY, []) providers.extend(self._cap_provs()) dat[CAPABILITY_PROVIDERS_KEY] = providers if args: return dat.get(*args) return dat _init_store = _conf
[docs] def disconnect(self): ''' Destroy a connection to the project database Should not be called if there is no active connection ''' if self._owm_connection is not None: if len(self._connections) == 0: L.debug("DISCONNECTING %s", self._owm_connection) self._owm_connection.disconnect() self._dat = None self._owm_connection = None elif len(self._connections) > 0: warnings.warn('Attempted to close OWM connection prematurely:' f' still have {len(self._connections)} connection(s) ', ResourceWarning, stacklevel=2) else: raise AlreadyDisconnected(self)
[docs] def clone(self, url=None, update_existing_config=False, branch=None): """Clone a data store Parameters ---------- url : str URL of the data store to clone update_existing_config : bool If True, updates the existing config file to point to the given file for the store configuration branch : str Branch to checkout after cloning """ try: makedirs(self.owmdir) self.message('Cloning...', file=sys.stderr) with self.progress_reporter(file=sys.stderr, unit=' objects', miniters=0) as progress: self.repository_provider.clone(url, base=self.owmdir, progress=progress, branch=branch) if not exists(self.config_file): self._init_config_file() self._init_store() self.message('Deserializing...', file=sys.stderr) with self.connect(): self._regenerate_database() self.message('Done!', file=sys.stderr) except FileExistsError: raise except BaseException: self._ensure_no_owmdir() raise
[docs] def git(self, *args): ''' Runs git commmands in the ".owm" directory Parameters ---------- *args arguments to git ''' from subprocess import Popen, PIPE startdir = os.getcwd() try: os.chdir(self.owmdir) except FileNotFoundError: raise GenericUserError('Cannot find ".owm" directory') try: with Popen(['git'] + list(args), stdout=PIPE) as p: self.message(p.stdout.read().decode('utf-8', 'ignore')) finally: os.chdir(startdir)
[docs] def regendb(self): ''' Regenerates the indexed database from graph serializations. Note that any uncommitted contents in the indexed database will be deleted. ''' from glob import glob for g in glob(self.store_name + '*'): self.message('unlink', g) try: unlink(g) except IsADirectoryError: shutil.rmtree(g) for g in glob(self.namespace_manager_store_name + '*'): self.message('unlink', g) try: unlink(g) except IsADirectoryError: shutil.rmtree(g) with self.connect(): self._regenerate_database()
def _regenerate_database(self): with self.progress_reporter(unit=' ctx', file=sys.stderr) as ctx_prog, \ self.progress_reporter(unit=' triples', file=sys.stderr, leave=False) as trip_prog, \ self.transaction_manager: self._load_all_graphs(ctx_prog, trip_prog) def _load_all_graphs(self, progress, trip_prog): from rdflib import plugin from rdflib.parser import Parser, create_input_source idx_fname = pth_join(self.owmdir, 'graphs', 'index') triples_read = 0 if exists(idx_fname): dest = self.rdf with open(idx_fname) as index_file: cnt = 0 for l in index_file: cnt += 1 index_file.seek(0) progress.total = cnt bag = BatchAddGraph(dest, batchsize=10000) for l in index_file: fname, ctx = l.strip().split(' ', 1) parser = plugin.get('nt', Parser)() graph_fname = pth_join(self.owmdir, 'graphs', fname) with open(graph_fname, 'rb') as f, bag.get_context(ctx) as g: parser.parse(create_input_source(f), g) progress.update(1) trip_prog.update(bag.count - triples_read) triples_read = g.count progress.write('Finalizing writes to database...') progress.write('Loaded {:,} triples'.format(triples_read)) ns_fname = pth_join(self.owmdir, 'namespaces') try: ns_file = open(ns_fname, 'r') except FileNotFoundError: L.debug('No namespaces file to load at %s', ns_fname) else: with ns_file: for l in ns_file: l = l.rstrip() if not l: continue prefix, uri = l.split(' ', 1) self.namespace_manager.bind(prefix, URIRef(uri)) def _graphs_index(self): idx_fname = pth_join(self.owmdir, 'graphs', 'index') if exists(idx_fname): with open(idx_fname) as index_file: for ent in self._graphs_index0(index_file): yield ent @property def _context_fnames(self): if not hasattr(self, '_cfn'): self._read_graphs_index() return self._cfn @property def _fname_contexts(self): if not hasattr(self, '_fnc'): self._read_graphs_index() return self._fnc def _read_graphs_index(self): ctx_index = dict() fname_index = dict() for fname, ctx in self._graphs_index(): ctx_index[ctx] = pth_join(self.owmdir, 'graphs', fname) fname_index[fname] = ctx self._cfn = ctx_index self._fnc = fname_index def _read_graphs_index0(self, index_file): ctx_index = dict() fname_index = dict() for fname, ctx in self._graphs_index0(index_file): ctx_index[ctx] = fname fname_index[fname] = ctx return ctx_index, fname_index def _graphs_index0(self, index_file): for l in index_file.readlines(): l = l.strip() if not isinstance(l, str): l_str = l.decode('UTF-8') else: l_str = l yield l_str.split(' ', 1)
[docs] def translate(self, translator, output_key=None, output_identifier=None, data_sources=(), named_data_sources=None): """ Do a translation with the named translator and inputs Parameters ---------- translator : str Translator identifier output_key : str Output key. Used for generating the output's identifier. Exclusive with output_identifier output_identifier : str Output identifier. Exclusive with output_key data_sources : list of str Input data sources named_data_sources : dict Named input data sources """ with self.connect(): from .datasource import transform, DataTransformer, DataSource source_objs = [] srcctx = self._default_ctx.stored for s in data_sources: src_obj = next(srcctx(DataSource)(ident=self._den3(s)).load(), None) if src_obj is None: raise GenericUserError(f'No source for "{s}"') source_objs.append(src_obj) named_data_source_objs = dict() if named_data_sources is not None: for key, ds in named_data_sources.items(): src_obj = next(srcctx(DataSource)(ident=self._den3(ds)).load(), None) if src_obj is None: raise GenericUserError(f'No source for "{ds}", named {key}') named_data_source_objs[key] = src_obj if isinstance(translator, str): transformer_id = self._den3(translator) transformer_obj = srcctx(DataTransformer)(ident=self._den3(transformer_id)).load_one() if transformer_obj is None: raise GenericUserError(f'No transformer for {translator}') transformer_obj = self._default_ctx(transformer_obj) try: with self.transaction_manager: old_stdout = sys.stdout old_stderr = sys.stderr try: old_stdout.flush() old_stderr.flush() with open(os.devnull, 'w') as nullout: sys.stdout = nullout sys.stderr = nullout output = transform(transformer_obj, data_sources=source_objs, named_data_sources=named_data_source_objs) self._default_ctx.save() finally: sys.stdout = old_stdout sys.stderr = old_stderr conn2 = self.connect() @contextmanager def connmgr(): with conn2, conn2.transaction_manager: yield conn2 return wrap_data_object_result(conn2(self._default_ctx).stored(output), connection_ctx_mgr=connmgr()) except Exception as e: raise GenericUserError(f'Unable to complete translation: {e}') from e
@contextmanager def _tempdir(self, *args, **kwargs): td = self.temporary_directory if not exists(td): makedirs(td) kwargs['dir'] = td with TemporaryDirectory(*args, **kwargs) as d: yield d @property def _dsd(self): self._load_data_source_directories() return self._data_source_directories def _load_data_source_directories(self): if not self._data_source_directories: # The DSD holds mappings to data sources we've loaded before. In general, this # allows the individual loaders to not worry about checking if they have # loaded something before. # XXX persist the dict loaders = [OWMDirDataSourceDirLoader()] for entry_point in iter_entry_points(group=DSDL_GROUP): try: loaders.append(entry_point.load()()) except DistributionNotFound: L.debug('Not adding DataSource directory loader %s due to failure in' ' package resources resolution', entry_point, exc_info=True) dsd = _DSD(dict(), pth_join(self.owmdir, 'data_source_data'), loaders) try: dindex = open(pth_join(self.owmdir, 'data_source_directories')) for ds_id, dname in (x.strip().split(' ', 1) for x in dindex): dsd.put(ds_id, dname) except OSError: pass self._data_source_directories = dsd def _cap_provs(self): return [DataSourceDirectoryProvider(self._dsd), WorkingDirectoryProvider(), TransactionalDataSourceDirProvider(pth_join(self.owmdir, 'ds_files'), self.transaction_manager), SimpleCacheDirectoryProvider(pth_join(self.owmdir, 'cache')), SimpleTemporaryDirectoryProvider(self.temporary_directory)] @property def _default_ctx(self): context = None if self.context: context = self.context else: conf = self._conf() try: context = conf[DEFAULT_CONTEXT_KEY] except KeyError: raise ConfigMissingException(DEFAULT_CONTEXT_KEY) if self._cached_default_context is not None: cached_id = self._cached_default_context.identifier current_id = URIRef(context) if current_id == cached_id: return self._cached_default_context self._cached_default_context = self._make_ctx(context) return self._cached_default_context default_context = _default_ctx def _make_ctx(self, ctxid=None): return Context.contextualize(self._context)(ident=ctxid) def _package_path(self): """ Get the package path """ from pkgutil import get_loader return dirname(get_loader('owmeta_core').get_filename()) def _default_config_file_name(self): return pth_join(self._package_path(), 'default.conf')
[docs] def list_contexts(self): ''' List contexts ''' for m in self.contexts.list(): yield m
@property def rdf(self): return self._conf('rdf.graph') @property def namespace_manager(self): return self._conf(NAMESPACE_MANAGER_KEY) @property def transaction_manager(self): ''' The `transaction.TransactionManager` for the current connection ''' return self._conf(TRANSACTION_MANAGER_KEY) @property def own_rdf(self): has_dependencies = self._conf('dependencies', None) if has_dependencies: res = _Dataset( self.rdf.store.stores[0], default_union=True) res.namespace_manager = self.namespace_manager return res else: return self._conf('rdf.graph')
[docs] def commit(self, message, skip_serialization=False): ''' Write the graph and configuration changes to the local repository Parameters ---------- message : str commit message skip_serialization : bool If set, then skip graph serialization. Useful if you have manually changed the graph serialization or just want to commit changes to project configuration ''' repo = self.repository() if not skip_serialization: with self.connect(): try: self._serialize_graphs() except DirtyProjectRepository: raise GenericUserError( 'The project repository has uncommitted changes.' ' Undo the changes or commit them (e.g., by' ' re-running this command with --serialize-graphs)') # TODO: Consider allowing some plugin system to allow other configuration to add # files to the repo. repo.commit(message)
def _changed_contexts_set(self): # XXX: This method used to try to determine if a context had been updated since # the corresponding file had changed, but it was really unreliable. gf_index = {URIRef(y): x for x, y in self._graphs_index()} return set(gf_index.keys()) def _serialize_graphs(self, ignore_change_cache=False): g = self.own_rdf repo = self.repository() graphs_base = pth_join(self.owmdir, 'graphs') namespaces_fname = pth_join(self.owmdir, 'namespaces') changed = self._changed_contexts_set() if repo.is_dirty(path=graphs_base): repo.reset(graphs_base) if not exists(graphs_base): mkdir(graphs_base) files = [] ctx_data = [] deleted_contexts = dict(self._context_fnames) with self.transaction_manager: for context in g.contexts(): if not context: continue ident = context.identifier if not ignore_change_cache: ctx_changed = ident in changed else: ctx_changed = True sfname = self._context_fnames.get(str(ident)) if not sfname: # We have to generate a name with a fixed length for the contexts # since the URIs could be longer than the file system allows fname = gen_ctx_fname(ident, graphs_base) else: fname = sfname # If there's a context in the graph, but we don't even have a file, then it is changed. # This can happen if we get out of sync with what's on disk. if not ctx_changed and not exists(fname): ctx_changed = True if ctx_changed: # N.B. We *overwrite* changes to the serialized graphs -- the source of truth is what's in the # RDFLib graph unless we regenerate the database write_canonical_to_file(context, fname) ctx_data.append((relpath(fname, graphs_base), ident)) files.append(fname) deleted_contexts.pop(str(ident), None) with open(namespaces_fname, 'w') as f: for pre, uri in self.namespace_manager.namespaces(): f.write(f'{pre} {uri}\n') files.append(namespaces_fname) if ctx_data: index_fname = pth_join(graphs_base, 'index') with open(index_fname, 'w') as index_file: for l in sorted(ctx_data): print(*l, file=index_file, end='\n') files.append(index_fname) if deleted_contexts: repo.remove(relpath(f, self.owmdir) for f in deleted_contexts.values()) for f in deleted_contexts.values(): unlink(f) if files: repo.add([relpath(f, self.owmdir) for f in files])
[docs] def diff(self, color=False): """ Show differences between what's in the working context set and what's in the serializations Parameters ---------- color : bool If set, then ANSI color escape codes will be incorporated into diff output. Default is to output without color. """ try: self._diff_helper(color) finally: # Reset the graphs directory. It should represent the commited graph always rep = self.repository() if rep.is_dirty(path='graphs'): rep.reset('graphs')
def _diff_helper(self, color): from difflib import unified_diff from os.path import basename r = self.repository() try: with self.connect(): self._serialize_graphs(ignore_change_cache=False) except Exception: L.exception("Could not serialize graphs") raise GenericUserError("Could not serialize graphs") head_commit = r.repo().head.commit # TODO: Determine if this path should actually be platform-dependent try: old_index = head_commit.tree.join(pth_join('graphs', 'index')) except KeyError: old_index = None if old_index: # OStream.stream isn't documented (most things in GitDB aren't), but it is, # technically, public interface. old_index_file = old_index.data_stream.stream _, old_fnc = self._read_graphs_index0(old_index_file) else: old_fnc = dict() new_index_filename = pth_join(self.owmdir, 'graphs', 'index') try: with open(new_index_filename, 'r') as new_index_file: _, new_fnc = self._read_graphs_index0(new_index_file) except FileNotFoundError: new_fnc = dict() di = head_commit.diff(None) for d in di: try: a_blob = d.a_blob if a_blob: adata = a_blob.data_stream.read().split(b'\n') else: adata = [] except Exception as e: print('No "a" data: {}'.format(e), file=sys.stderr) adata = [] try: b_blob = d.b_blob if b_blob: bdata = b_blob.data_stream.read().split(b'\n') else: with open(pth_join(r.repo().working_dir, d.b_path), 'rb') as f: bdata = f.read().split(b'\n') except Exception as e: print('No "b" data: {}'.format(e), file=sys.stderr) bdata = [] afname = basename(d.a_path) bfname = basename(d.b_path) graphdir = pth_join(self.owmdir, 'graphs') if not adata: fromfile = '/dev/null' else: fromfile = old_fnc.get(afname, afname) if not bdata: tofile = '/dev/null' else: tofile = new_fnc.get(bfname, bfname) try: diff = unified_diff([x.decode('utf-8') + '\n' for x in adata], [x.decode('utf-8') + '\n' for x in bdata], fromfile='a ' + fromfile, tofile='b ' + tofile, lineterm='\n') if color: diff = self._colorize_diff(diff) sys.stdout.writelines(diff) except Exception: if adata and not bdata: sys.stdout.writelines('Deleted ' + fromfile + '\n') elif bdata and not adata: sys.stdout.writelines('Created ' + fromfile + '\n') else: asize = a_blob.size asha = a_blob.hexsha bsize = b_blob.size bsha = b_blob.hexsha diff = dedent('''\ --- a {fromfile} --- Size: {asize} --- Shasum: {asha} +++ b {tofile} +++ Size: {bsize} +++ Shasum: {bsha}''').format(locals()) if color: diff = self._colorize_diff(diff) sys.stdout.writelines(diff) def _colorize_diff(self, lines): from termcolor import colored import re hunk_line_numbers_pattern = re.compile(r'^@@[0-9 +,-]+@@') for l in lines: l = l.rstrip() if l.startswith('+++') or l.startswith('---'): l = colored(l, attrs=['bold']) elif hunk_line_numbers_pattern.match(l): l = colored(l, 'cyan') elif l.startswith('+'): l = colored(l, 'green') elif l.startswith('-'): l = colored(l, 'red') l += os.linesep yield l
[docs] def declare(self, python_type, attributes=(), id=None): ''' Create a new data object or update an existing one Parameters ---------- python_type : str The path to the Python type for the object. Formatted like "full.module.path:ClassName" attributes : str Attributes to set on the object before saving id : str The identifier for the object ''' try: cls = retrieve_provider(python_type) except (AttributeError, ModuleNotFoundError) as e: raise GenericUserError(f'No class found for {python_type}') from e with self.connect() as conn, self.transaction_manager: dctx = self._default_ctx dctx.add_import(cls.definition_context) ob = dctx(cls)(ident=self._den3(id)) for prop, val in attributes: if PROVIDER_PATH_RE.match(prop): try: prop_cls = retrieve_provider(prop) except (AttributeError, ModuleNotFoundError) as e: raise GenericUserError(f'No class found for {prop}') from e prop_obj = ob.attach_property(prop_cls) else: try: prop_obj = getattr(ob, prop) except AttributeError as e: raise GenericUserError(f'No property named {prop}') from e if isinstance(prop_obj, (ObjectProperty, UnionProperty)): if prop_obj.value_rdf_type is not None: value_type = conn.mapper.resolve_class( prop_obj.value_rdf_type, dctx) if value_type is None: value_type = DataObject if isinstance(prop, UnionProperty): val_ident = self._den3_safe(val) else: val_ident = self._den3(val) if val_ident is not None: for val in dctx.stored(value_type)(ident=val_ident).load(): break else: # no break msg = ('Unable to find an object with the' f' ID {val_ident!r} of type {value_type!r}') raise GenericUserError(msg) self.message(f"setting {ob!r} {prop_obj!r} {val!r}") prop_obj(val) dctx.save() dctx.save_imports()
[docs]class ProjectConnection(object): ''' Connection to the project database ''' def __init__(self, owm, connection, connections, *, expect_cleanup=True): self.owm = owm self.connection = connection self._context = owm._context self._connections = connections if owm.cleanup_manager is not None: owm.cleanup_manager.register(self.disconnect, _unexpected=not expect_cleanup) self._connected = True @property def mapper(self): # XXX: Maybe refactor this... return self._context.mapper def __del__(self): if self._connected: warnings.warn('OWM connection deleted without being disconnected', ResourceWarning, source=self) def __getattr__(self, attr): return getattr(self.connection, attr) def __call__(self, o): return self._context(o) def __enter__(self): return self def __exit__(self, *args): self.disconnect() def disconnect(self, _unexpected=False): if _unexpected: warnings.warn('Unexpected cleanup by resource manager', ResourceWarning, source=self) try: self._connections.remove(self) L.debug("DISCONNECTED %s (%s open connections)", self, len(self._connections)) if len(self._connections) == 0: self.owm.disconnect() self._connected = False finally: if self.owm.cleanup_manager is not None: self.owm.cleanup_manager.unregister(self.disconnect) def __str__(self): return f'{self.__class__.__name__}({self.owm}, {self.connection})'
[docs] @contextmanager def transaction(self): ''' Context manager that executes the enclosed code in a transaction and then closes the connection. Provides the connection for binding with ``as``. ''' with self, self.transaction_manager: yield self
class _ProjectContext(Context): ''' `Context` for a project. ''' def __init__(self, *args, owm, **kwargs): super().__init__(*args, **kwargs) self.owm = owm self._mapper = None @property def conf(self): return self.owm._conf() @property def mapper(self): if self._mapper is None: self._mapper = _ProjectMapper(owm=self.owm) return self._mapper def imports_graph(self): return rdflib.ConjunctiveGraph(store=_ProjectImportStore(self.owm, store=self.rdf.store)) class _ProjectImportStore(ContextSubsetStore): def __init__(self, owm, **kwargs): super().__init__(**kwargs) self.owm = owm def init_contexts(self): res = set([URIRef(self.owm.imports_context())]) dep_mgr = self.owm._bundle_dep_mgr if dep_mgr is not None: for bnd in dep_mgr.load_dependencies_transitive(): imports_ctx = bnd.manifest_data.get(IMPORTS_CONTEXT_KEY) if imports_ctx is not None: res.add(URIRef(imports_ctx)) return res def __str__(self): return f'{type(self).__name__}({self.owm})' class _ProjectMapper(Mapper): def __init__(self, owm): owm_conf = owm._conf() super().__init__(name=f'{owm.owmdir}', conf=owm_conf) self.owm = owm self._resolved_classes = dict() def resolve_class(self, rdf_type, context): prev_resolved_class = self._resolved_classes.get((rdf_type, context.identifier)) if prev_resolved_class: return prev_resolved_class own_resolved_class = super().resolve_class(rdf_type, context) if own_resolved_class: self._resolved_classes[(rdf_type, context.identifier)] = own_resolved_class return own_resolved_class target_id = context.identifier dep_mgr = self.owm._bundle_dep_mgr if dep_mgr: contexts = set(str(getattr(c, 'identifier', c)) for c in self.owm.own_rdf.contexts()) target_bundle = dep_mgr.lookup_context_bundle(contexts, target_id) if target_bundle is None: target_bundle = dep_mgr deps = target_bundle.load_dependencies_transitive() for bnd in deps: if not bnd.manifest_data.get(CLASS_REGISTRY_CONTEXT_KEY, None): continue with bnd: resolved_class = bnd.connection.mapper.resolve_class(rdf_type, context) if resolved_class: self._resolved_classes[(rdf_type, context.identifier)] = resolved_class return resolved_class return None class _OWMSaveContext(Context): def __init__(self, backer, user_module=None): self._user_mod = user_module self._backer = backer #: Backing context self._imported_ctx_ids = set([self._backer.identifier]) self._unvalidated_statements = [] def add_import(self, ctx): self._imported_ctx_ids.add(ctx.identifier) # Remove unvalidated statements which had this new context as the one they are missing self._unvalidated_statements = [p for p in self._unvalidated_statements if isinstance(p.validation_record, UnimportedContextRecord) and p.validation_record.context != ctx.identifier] return self._backer.add_import(ctx) def add_statement(self, stmt): stmt_tuple = (stmt.subject, stmt.property, stmt.object) def gen(): for i, x in enumerate(stmt_tuple): if (x.context is not None and x.context.identifier is not None and x.context.identifier not in self._imported_ctx_ids): yield UnimportedContextRecord(self._backer.identifier, x.context.identifier, i, stmt) for i, x in enumerate(stmt_tuple): if (x.context is not None and x.context.identifier is None): yield NullContextRecord(i, stmt) for record in gen(): from inspect import getouterframes, currentframe self._unvalidated_statements.append(SaveValidationFailureRecord(self._user_mod, getouterframes(currentframe()), record)) return self._backer.add_statement(stmt) def __getattr__(self, name): return getattr(self._backer, name) def save_context(self, *args, **kwargs): return self._backer.save_context(*args, **kwargs) def save_imports(self, *args, **kwargs): return self._backer.save_imports(*args, **kwargs) def write_config(ob, f): json.dump(ob, f, sort_keys=True, indent=4, separators=(',', ': ')) f.write('\n') f.truncate()
[docs]class InvalidGraphException(GenericUserError): ''' Thrown when a graph cannot be translated due to formatting errors '''
[docs]class UnreadableGraphException(GenericUserError): ''' Thrown when a graph cannot be read due to it being missing, the active user lacking permissions, etc. '''
[docs]class NoConfigFileError(GenericUserError): ''' Thrown when a project config file (e.g., '.owm/owm.conf') cannot be found ''' def __init__(self, config_file_path): super(NoConfigFileError, self).__init__('Cannot find config file at "%s"' % config_file_path)
[docs]class OWMDirMissingException(GenericUserError): ''' Thrown when the .owm directory is needed, but cannot be found '''
[docs]class SaveValidationFailureRecord(namedtuple('_SaveValidationFailureRecord', ['user_module', 'stack', 'validation_record'])): ''' Record of a validation failure in `OWM.save` ''' def filtered_stack(self): umfile = getattr(self.user_module, '__file__', None) if umfile and umfile.endswith('pyc'): umfile = umfile[:-3] + 'py' ourfile = __file__ if ourfile.endswith('pyc'): ourfile = ourfile[:-3] + 'py' def find_last_user_frame(frames): start = False lastum = 0 res = [] for i, f in enumerate(frames): if umfile and f[1].startswith(umfile): lastum = i if start: res.append(f) if not start and f[1].startswith(ourfile): start = True return res[:lastum] return find_last_user_frame(self.stack) def __str__(self): from traceback import format_list stack = format_list([x[1:4] + (''.join(x[4]).strip(),) for x in reversed(self.filtered_stack())]) fmt = '{}\n Traceback (most recent call last, outer owmeta_core frames omitted):\n {}' res = fmt.format(self.validation_record, '\n '.join(''.join(s for s in stack if s).split('\n'))) return res.strip()
class _DSD(object): def __init__(self, ds_dict, base_directory, loaders): self._dsdict = ds_dict self.base_directory = base_directory self._loaders = self._init_loaders(loaders) def __str__(self): return '{}({})'.format(FCN(type(self)), self._dsdict) def __getitem__(self, data_source): dsid = str(data_source.identifier) try: return self._dsdict[dsid] except KeyError: res = self._load_data_source(data_source) if res: self._dsdict[dsid] = res return res raise def put(self, data_source_ident, directory): self._dsdict[str(data_source_ident)] = directory def _init_loaders(self, loaders): res = [] for loader in loaders: nd = pth_join(self.base_directory, loader.directory_key) if not exists(nd): makedirs(nd) loader.base_directory = nd res.append(loader) return res def _load_data_source(self, data_source): for loader in self._loaders: if loader.can_load(data_source): return loader(data_source) class DataSourceDirectoryProvider(FilePathProvider): def __init__(self, dsd): self._dsd = dsd def provides_to(self, ob, cap): try: path = self._dsd[ob] except KeyError: return None return _DSDP(path) class _DSDP(FilePathProvider): def __init__(self, path): self._path = path def file_path(self): return self._path class OWMDirDataSourceDirLoader(DataSourceDirLoader): def __init__(self, *args, **kwargs): super(OWMDirDataSourceDirLoader, self).__init__(*args, directory_key=DSD_DIRKEY, **kwargs) self._index = dict() @property def _idx_fname(self): if self.base_directory: return pth_join(self.base_directory, 'index') return None def _load_index(self): with scandir(self.base_directory) as dirents: dentdict = {de.name: de for de in dirents} with open(self._idx_fname) as f: for l in f: dsid, dname = l.strip().split(' ') if self._index_dir_entry_is_bad(dname, dentdict.get(dname)): continue self._index[dsid] = dname def _index_dir_entry_is_bad(self, dname, de): if not de: msg = "There is no directory entry for {} in {}" L.warning(msg.format(dname, self.base_directory), exc_info=True) return True if not de.is_dir(): msg = "The directory entry for {} in {} is not a directory" L.warning(msg.format(dname, self.base_directory)) return True return False def _ensure_index_loaded(self): if not self._index: self._load_index() def can_load(self, data_source): try: self._ensure_index_loaded() except (OSError, IOError) as e: # If the index file just happens not to be here since the repo doesn't have any data source directories, # then we just can't load the data source's data, but for any other kind of error, something more exotic # could be the cause, so let the caller handle it # if e.errno == 2: # FileNotFound return False raise return str(data_source.identifier) in self._index def load(self, data_source): try: self._ensure_index_loaded() except Exception as e: raise LoadFailed(data_source, self, "Failed to load the index: " + str(e)) try: return self._index[str(data_source.identifier)] except KeyError: raise LoadFailed(data_source, self, 'The given identifier is not in the index') class OWMSaveNamespace(object): def __init__(self, context): self.context = context self._created_ctxs = set() self._external_contexts = set() @property def namespace_manager(self): return self.context.conf[NAMESPACE_MANAGER_KEY] def new_context(self, ctx_id): # Get the type of our context contextualized *with* our context ctx_type = self.context(type(self.context)) # Make the "backing" context for the result we return new_ctx = self.context(Context)(ident=ctx_id, conf=self.context.conf) # Make the "wrapper" context and pass through the user's module for validation res = ctx_type(new_ctx, user_module=self.context._user_mod) # Finally, add the context self._created_ctxs.add(res) return res def include_context(self, ctx): ''' Include the given exernally-created context for saving. If the context is being made within the save function, then you can use new_context instead. ''' self._external_contexts.add(ctx) def created_contexts(self): for ctx in self._created_ctxs: yield ctx yield self.context def validate(self): unvalidated = [] for c in self._created_ctxs: unvalidated += c._unvalidated_statements unvalidated += self.context._unvalidated_statements if unvalidated: raise StatementValidationError(unvalidated) def save(self, *args, **kwargs): # TODO: (openworm/owmeta#374) look at automatically importing contexts based # on UnimportedContextRecords among SaveValidationFailureRecords self.validate() for c in self._created_ctxs: c.save_context(*args, **kwargs) c.save_imports(*args, **kwargs) for c in self._external_contexts: c.save_context(*args, **kwargs) self.context(c).save_imports(*args, **kwargs) self.context.save_imports(*args, **kwargs) return self.context.save_context(*args, **kwargs)
[docs]class NullContextRecord(namedtuple('_NullContextRecord', ['node_index', 'statement'])): ''' Stored when the identifier for the context of an object we're saving is `None` ''' def __str__(self): from .rdf_utils import triple_to_n3 trip = self.statement.to_triple() fmt = 'Context identifier is `None` for {} of statement "{}"' return fmt.format(trip[self.node_index].n3(), triple_to_n3(trip))
[docs]class UnimportedContextRecord(namedtuple('_UnimportedContextRecord', ['importer', 'context', 'node_index', 'statement'])): ''' Stored when statements include a reference to an object but do not include the context of that object in the callback passed to `OWM.save`. For example, if we had a callback like this:: def owm_data(ns): ctxA = ns.new_context(ident='http://example.org/just-pizza-stuff') ctxB = ns.new_context(ident='http://example.org/stuff-sam-likes') sam = ctxB(Person)('sam') pizza = ctxA(Thing)('pizza') sam.likes(pizza) it would generate this error because ``ctxB`` does not declare an import for ``ctxA`` ''' def __str__(self): from .rdf_utils import triple_to_n3 trip = self.statement.to_triple() fmt = 'Missing import of context {} from {} for {} of statement "{}"' return fmt.format(self.context.n3(), self.importer.n3(), trip[self.node_index].n3(), triple_to_n3(trip))
[docs]class StatementValidationError(GenericUserError): ''' Thrown in the case that a set of statements fails to validate ''' def __init__(self, statements): msgfmt = '{} invalid statements were found:\n{}' msg = msgfmt.format(len(statements), '\n'.join(str(x) for x in statements)) super(StatementValidationError, self).__init__(msg) self.statements = statements
[docs]class ConfigMissingException(GenericUserError): ''' Thrown when a configuration key is missing ''' def __init__(self, key): super(ConfigMissingException, self).__init__( 'Missing "%s" in configuration' % key) self.key = key
def wrap_data_object_result(result, props=None, namespace_manager=None, shorten_urls=False, connection_ctx_mgr=None): if connection_ctx_mgr is None: connection_ctx_mgr = nullcontext() def format_id(r): if not shorten_urls or not namespace_manager: return r.identifier return namespace_manager.normalizeUri(r.identifier) def format_value(propname): def f(r): prop = getattr(r, propname, None) if prop is None: return "" vals = prop.get() val_strs = set() for v in vals: if isinstance(v, DataObject): val_strs.add(v.identifier) elif isinstance(v, Identifier): val_strs.add(v) else: val_strs.add(repr(v)) return ' '.join(val_strs) return f props = None if isinstance(result, DataObject): def _f(): with connection_ctx_mgr: yield result iterable = _f() if props is None: props = tuple(x.linkName for x in result.properties) else: if props is None: do_list = list(result) props = set() for r in do_list: props |= set(x.link_name for x in r.properties) props |= set(x.link_name for x in type(r)._property_classes.values()) props = tuple(sorted(props)) def _f(): with connection_ctx_mgr: for s in do_list: yield s iterable = _f() else: iterable = result header = ('ID',) + tuple(props) columns = (format_id,) + tuple(format_value(propname) for propname in props) return GeneratorWithData(iterable, default_columns=('ID',), header=header, text_format=format_id, columns=columns)
[docs]class AlreadyDisconnected(Exception): ''' Thrown when OWM is already disconnected but a request is made to disconnect again ''' def __init__(self, owm): super().__init__(f'Already disconnected {owm}')
[docs]class DirtyProjectRepository(Exception): ''' Thrown when we're about to commit, but the project repository has changes to the graphs such that it's not safe to just re-serialize the indexed database over the graphs. '''