Source code for owmeta_core.commands.bundle

'''
Bundle commands
'''
from __future__ import print_function
import hashlib
import logging
import shutil
from os.path import join as p, abspath, relpath, isdir, isfile
from os import makedirs, unlink, getcwd, rename
from urllib.parse import urlparse

import yaml

from ..context import DEFAULT_CONTEXT_KEY, IMPORTS_CONTEXT_KEY
from ..mapper import CLASS_REGISTRY_CONTEXT_KEY
from ..command_util import GenericUserError, GeneratorWithData, SubCommand, IVar
from ..bundle import (Descriptor,
                      Installer,
                      URLConfig,
                      Remote,
                      Deployer,
                      Fetcher,
                      Cache,
                      retrieve_remotes,
                      find_bundle_directory,
                      NoBundleLoader as _NoBundleLoader,
                      URL_CONFIG_MAP)
from ..bundle.exceptions import (InstallFailed, TargetIsNotEmpty, UncoveredImports,
                                 FetchTargetIsNotEmpty)
from ..bundle.archive import Unarchiver, Archiver


L = logging.getLogger(__name__)


class _OWMBundleRemoteAddUpdate(object):
    def __init__(self, parent):
        self._parent = parent
        self._owm_bundle_remote = self._parent
        self._owm_bundle = self._owm_bundle_remote._owm_bundle
        self._owm = self._owm_bundle._parent
        self._remote = None
        self._url_config = None

        # _next is provided by cli_command_wrapper. It indicates the continuation for
        # running the remaining sub-commands
        self._next = None

    def _remote_fname(self, name):
        return self._owm_bundle_remote._remote_fname(name)

    def _write_remote(self):
        fname = self._remote_fname(self._remote.name)
        backup_exists = False
        try:
            rename(fname, fname + '.bkp')
            backup_exists = True
        except FileNotFoundError:
            pass

        try:
            with open(fname, 'w') as out:
                self._remote.write(out)
        except Exception:
            if backup_exists:
                rename(fname + '.bkp', fname)
            raise
        finally:
            try:
                unlink(fname + '.bkp')
            except FileNotFoundError:
                pass
        return self._remote

    def _remote_exists(self, name):
        return isfile(self._remote_fname(name))

    def _read_remote(self, name):
        self._remote = self._owm_bundle_remote._read_remote(name)


[docs]class OWMBundleRemoteAdd(_OWMBundleRemoteAddUpdate): ''' Add a remote and, optionally, an accessor to that remote. Remotes contain zero or more "accessor configurations" which describe how to upload to and download from a remote. Sub-commands allow for specifying additional parameters specific to a type of accessor. ''' def __call__(self, name, url): ''' Parameters ---------- name : str Name of the remote url : str URL for the remote ''' # Initial parse of the URL to get the scheme and the URLConfig that belongs to so # we can get more options acs = () if url: urldata = urlparse(url) url_config_class = URL_CONFIG_MAP.get(urldata.scheme, URLConfig) self._url_config = url_config_class(url) acs = (self._url_config,) self._read_remote(name) if self._remote: if self._url_config and not self._remote.add_config(self._url_config): raise GenericUserError('There is already an equivalent config for this remote.' ' Use "update" to modify it') else: self._remote = Remote(name, accessor_configs=acs) if self._owm_bundle_remote.user: base = self._owm.userdir else: base = self._owm.owmdir remotes_dir = p(base, 'remotes') if not isdir(remotes_dir): try: makedirs(remotes_dir) except Exception: L.warning('Could not crerate directory for storage of remote configurations', exc_info=True) raise GenericUserError('Could not create directory for storage of remote configurations') if self._next: return self._next() return self._write_remote()
[docs]class OWMBundleRemoteUpdate(_OWMBundleRemoteAddUpdate): ''' Update a remote accessor Remotes contain zero or more "accessor configurations" which describe how to upload to and download from a remote. Sub-commands allow for specifying additional parameters specific to a type of accessor. ''' def __call__(self, name, url): ''' Parameters ---------- name : str Name of the remote url : str URL for the remote. If there is an accessor with this URL, then that accessor will be updated according to parameters specified in sub-commands ''' self._read_remote(name) if not self._remote: raise GenericUserError(f'There is no remote named "{name}"') for ac in self._remote.accessor_configs: if isinstance(ac, URLConfig) and ac.url == url: self._url_config = ac break else: # no break raise GenericUserError(f'There is no accessor config for "{url}" in the' f' remote named "{name}"') if self._next: return self._next() return self._remote
[docs]class OWMBundleRemote(object): ''' Commands for dealing with bundle remotes ''' user = IVar(value_type=bool, doc='If this option is provided, then remotes in the user profile directory' ' are used rather than those in the project directory.') add = SubCommand(OWMBundleRemoteAdd) update = SubCommand(OWMBundleRemoteUpdate) def __init__(self, parent): self._owm_bundle = parent self._owm = self._owm_bundle._parent def _remote_fname(self, name): if self.user: base = self._owm.userdir else: base = self._owm.owmdir return p(base, 'remotes', hashlib.sha224(name.encode('UTF-8')).hexdigest() + '.remote') def _read_remote(self, name): try: with open(self._remote_fname(name)) as f: return Remote.read(f) except FileNotFoundError: pass except Exception: raise GenericUserError(f'Unable to read remote {name}')
[docs] def list(self): ''' List remotes ''' return GeneratorWithData(self._retrieve_remotes(), text_format=lambda r: r.name, columns=(lambda r: r.name, lambda r: r.file_name), header=("Name", "File Name",))
[docs] def show(self, name): ''' Show details about a remote Parameters ---------- name : str Name of the remote ''' return self._read_remote(name)
[docs] def remove(self, name): ''' Remove the remote Parameters ---------- name : str Name of the remote ''' remote_fname = self._remote_fname(name) try: unlink(remote_fname) except FileNotFoundError: self._owm.message(f'Remote {name} not found at {remote_fname}')
def _retrieve_remotes(self): if self.user: base = self._owm.userdir else: base = self._owm.owmdir return retrieve_remotes(p(base, 'remotes'))
[docs]class OWMBundleCache(object): ''' Bundle cache commands ''' def __init__(self, parent): self._parent = parent
[docs] def list(self): ''' List bundles in the cache ''' bundles_directory = self._parent._bundles_directory() cache = Cache(bundles_directory) return GeneratorWithData(cache.list(), text_format=lambda nd: "{id}{name}@{version}{description}".format( id=nd['id'], name=('(%s)' % nd['name'] if nd.get('name') else ''), version=nd.get('version'), description=(' - %s' % (nd.get('description') or nd.get('error'))) if nd.get('description') or nd.get('error') else ''), columns=(lambda nd: nd['id'], lambda nd: nd['version'], lambda nd: nd.get('name'), lambda nd: nd.get('description'), lambda nd: nd.get('error', '')), default_columns=('ID', 'Version'), header=("ID", "Version", "Name", "Description", "Error"))
[docs]class OWMBundle(object): ''' Bundle commands ''' remote = SubCommand(OWMBundleRemote) cache = SubCommand(OWMBundleCache) def __init__(self, parent): self._parent = parent self._loaders = []
[docs] def fetch(self, bundle_id, bundle_version=None, bundles_directory=None): ''' Retrieve a bundle by id from a remote and put it in the local bundle index and cache Parameters ---------- bundle_id : str The id of the bundle to retrieve. bundle_version : int The version of the bundle to retrieve. optional bundles_directory : str Root directory of the bundles cache. optional: uses the default bundle cache in the user's home directory if not provided ''' if bundles_directory: root_path = abspath(bundles_directory) else: root_path = self._bundles_directory() f = Fetcher(root_path, list(self._retrieve_remotes())) try: f.fetch(bundle_id, bundle_version) except _NoBundleLoader as e: raise NoBundleLoader(e.bundle_id, e.bundle_version) from e except FetchTargetIsNotEmpty as e: raise GenericUserError('There is already content in the bundle cache directory' f' for this bundle at {e.directory}')
def _bundles_directory(self): return p(self._parent.userdir, 'bundles') def _user_remotes_directory(self): return p(self._parent.userdir, 'remotes') def _project_remotes_directory(self): return p(self._parent.owmdir, 'remotes')
[docs] def load(self, input_file_name): ''' Load a bundle from a file and register it into the project Parameters ---------- input_file_name : str The source file of the bundle ''' Unarchiver(bundles_directory=self._bundles_directory()).unpack(input_file_name)
[docs] def save(self, bundle_id, output, bundle_version=None): ''' Write an installed bundle to a file Writing the bundle to a file means writing the bundle manifest, constituent graphs, and attached files to an archive. The bundle can be in the local bundle repository, a remote, or registered in the project. Parameters ---------- bundle_id : str The bundle to save output : str The target file bundle_version : int Version of the bundle to write. optional: defaults to the latest installed bundle ''' return Archiver(getcwd(), bundles_directory=self._bundles_directory()).pack( bundle_id=bundle_id, version=bundle_version, target_file_name=output)
[docs] def install(self, bundle): ''' Install the bundle to the local bundle repository for use across projects on the same machine Parameters ---------- bundle : str ID of the bundle to install or path to the bundle descriptor ''' descriptor_fname = self._get_bundle_descr_fname(bundle) if not descriptor_fname: descriptor_fname = bundle not_known_id = descriptor_fname == bundle try: descr = self._load_descriptor(descriptor_fname) except (OSError, IOError) as e: # XXX: Avoiding specialized exception types for Python 2 compat if e.errno == 2: # FileNotFound raise GenericUserError('Could not find bundle descriptor with {} {}'.format( 'file name' if not_known_id else 'ID', bundle )) if e.errno == 21: # IsADirectoryError raise GenericUserError('A bundle descriptor is a file, but we were given' ' a directory for {}'.format(bundle)) raise GenericUserError('Error recovering bundle descriptor with {} {}'.format( 'file name' if not_known_id else 'ID', bundle )) if not descr: raise GenericUserError('Could not find bundle with id {}'.format(bundle)) with self._parent.connect(): imports_ctx = self._parent._conf(IMPORTS_CONTEXT_KEY, None) default_ctx = self._parent._conf(DEFAULT_CONTEXT_KEY, None) class_registry_ctx = self._parent._conf(CLASS_REGISTRY_CONTEXT_KEY, None) bi = Installer(self._parent.basedir, self._bundles_directory(), self._parent.rdf, imports_ctx=imports_ctx, default_ctx=default_ctx, class_registry_ctx=class_registry_ctx) return self._install_helper(bi, descr)
def _install_helper(self, bi, descr): try: return bi.install(descr) except UncoveredImports as ui: raise GenericUserError('{}:\n{}'.format(ui, '\n'.join('- %s' % uri for uri in ui.imports))) except TargetIsNotEmpty as tine: if self._parent.non_interactive: raise GenericUserError(str(tine)) answer = self._parent.prompt('The target directory, "%s", is not empty. Would you' ' like to delete the contents and continue installation? [yes/no] ' % tine.directory) if str(answer).lower() == 'yes': shutil.rmtree(tine.directory) return self._install_helper(bi, descr) except InstallFailed as ife: raise GenericUserError('Installation failed: {}'.format(ife))
[docs] def register(self, descriptor): ''' Register a bundle within the project Registering a bundle adds it to project configuration and records where the descriptor file is within the project's working tree. If the descriptor file moves it must be re-registered at the new location. Parameters ---------- descriptor : str Descriptor file for the bundle ''' descriptor = abspath(descriptor) descr = self._load_descriptor(descriptor) self._register_bundle(descr, descriptor)
def _load_descriptor(self, fname): with open(fname, 'r') as f: return self._parse_descriptor(f) def _parse_descriptor(self, fh): return Descriptor.make(yaml.full_load(fh)) def _register_bundle(self, descr, file_name): try: with open(p(self._parent.owmdir, 'bundles'), 'r') as f: lines = f.readlines() except OSError: lines = [] with open(p(self._parent.owmdir, 'bundles'), 'w') as f: for line in lines: line = line.strip() if not line: continue idx_id, fn = line.split(' ', 1) if idx_id == descr.id: continue if fn == file_name: continue print(line, file=f) print('{descr.id} {file_name}\n'.format(**vars()), file=f) def _get_bundle_descr_fname(self, bundle_id): try: fh = open(p(self._parent.owmdir, 'bundles'), 'r') except FileNotFoundError: return None else: with fh as f: for line in f: line = line.strip() if not line: continue idx_id, fn = line.split(' ', 1) if bundle_id == idx_id: return fn
[docs] def deregister(self, bundle_id): ''' Remove a bundle from the project Parameters ---------- bundle_id : str The id of the bundle to deregister ''' try: with open(p(self._parent.owmdir, 'bundles'), 'r') as f: lines = f.readlines() except OSError: lines = [] with open(p(self._parent.owmdir, 'bundles'), 'w') as f: for line in lines: line = line.strip() if not line: continue idx_id, fn = line.split(' ', 1) if idx_id == bundle_id: continue print(line, file=f)
[docs] def deploy(self, bundle_id, version=None, remotes=None): ''' Deploys a bundle to a remote. The target remotes come from project and user settings or, if provided, the `remotes` parameter Parameters ---------- bundle_id : str ID of the bundle to deploy version : int Version of the bundle to deploy. optional. remotes : str Names of the remotes to deploy to. optional. ''' bundles_directory = self._bundles_directory() bundle_path = find_bundle_directory(bundles_directory, bundle_id) Deployer(remotes=self._retrieve_remotes()).deploy(bundle_path, remotes=remotes)
[docs] def checkout(self, bundle_id): ''' Switch to the named bundle Parameters ---------- bundle_id : str ID of the bundle to switch to '''
[docs] def list(self): ''' List registered bundles in the current project. To list bundles within the local repo or a remote repo, use the `cache list` sub-command. ''' def helper(): with open(p(self._parent.owmdir, 'bundles'), 'r') as index_fh: for line in index_fh: if not line.strip(): continue bundle_id, file_name = line.strip().split(' ', 1) try: with open(file_name, 'r') as bundle_fh: descr = self._parse_descriptor(bundle_fh) yield {'name': bundle_id, 'description': descr.description or ""} except (IOError, OSError): # This is at debug level since the error should be expressed well # enough by the response, but we still want to show it eventually L.debug("Cannot read bundle descriptor at" " '{}'".format(file_name), exc_info=True) yield {'name': bundle_id, 'error': "ERROR: Cannot read bundle descriptor at '{}'".format( relpath(file_name) )} return GeneratorWithData(helper(), text_format=lambda nd: "{name} - {description}".format(name=nd['name'], description=(nd.get('description') or nd.get('error'))), columns=(lambda nd: nd['name'], lambda nd: nd.get('description'), lambda nd: nd.get('error')), header=("Name", "Description", "Error"))
def _retrieve_remotes(self): rem = self.remote if not isdir(self._parent.owmdir): rem.user = True return rem._retrieve_remotes()
[docs]class NoBundleLoader(GenericUserError): ''' Thrown when a loader can't be found for a bundle ''' def __init__(self, bundle_id, bundle_version=None): super(NoBundleLoader, self).__init__( 'No loader could be found for "%s"%s' % (bundle_id, (' at version ' + str(bundle_version)) if bundle_version is not None else ''))
[docs]class BundleNotFound(GenericUserError): ''' Thrown when a bundle cannot be found with the requested ID and version ''' def __init__(self, bundle_id, bundle_version=None): super(BundleNotFound, self).__init__( 'The requested bundle could not be loaded "%s"%s' % (bundle_id, (' at version ' + bundle_version) if bundle_version is not None else ''))