Source code for owmeta_core.bundle.loaders.http

import http.client
import io
import logging
import os
from os.path import join as p, expanduser
import ssl
from urllib.parse import quote as urlquote, urlparse
import hashlib
import json
import pickle

from cachecontrol import CacheControl
from cachecontrol.caches.file_cache import FileCache
import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

from ...command_util import GenericUserError
from ...utils import FCN, retrieve_provider

from .. import URLConfig
from ..archive import ensure_archive, Unarchiver
from ..common import BUNDLE_ARCHIVE_MIME_TYPE

from . import LoadFailed, Loader, Uploader


L = logging.getLogger(__name__)


[docs]class HTTPURLConfig(URLConfig): ''' HTTP URL configuration ''' def __init__(self, *args, session_file_name=None, session_provider=None, cache_dir=None, mem_cache=False, **kwargs): ''' Parameters ---------- *args Passed on to URLConfig session_file_name : str, optional Session file name session_provider : str, optional Provider path for a callable that returns a session cache_dir : str, optional HTTP cache directory. Supersedes `mem_cache` mem_cache : bool, optional Whether to use an in-memory cache. Superseded by `cache_dir` **kwargs Passed on to URLConfig ''' super(HTTPURLConfig, self).__init__(*args, **kwargs) self.cache_dir = cache_dir self.session_file_name = session_file_name self.session_provider = session_provider self.mem_cache = bool(mem_cache) self._session = None @property def session(self): ''' A `requests.Session` This will be loaded from `.session_file_name` if a value is set for that. Otherwise, the session will either be obtained from the `.session_provider` or a default session will be created; in either case, any response caching configuration will be applied. ''' if self._session is None: if self.session_file_name: try: with open(expanduser(self.session_file_name), 'rb') as session_file: self._session = pickle.load(session_file) except FileNotFoundError: pass if self._session is None: self.init_session() return self._session
[docs] def init_session(self): ''' Initialize the HTTP session. Typically you won't call this, but will just access `.session` ''' self._session = self._make_new_session()
def _make_new_session(self): if self.session_provider: session = self._provide_session() else: session = requests.Session() # HTTP Statuses: # 403 is sometimes returned from Google Drive specifically for a temporary # condition, apparently rate limiting. It's dumb, but whatever. # # 429 too many requests: standard rate limiting status # # 500 is just a general server error...presumably the server operator can fix # it quickly or failover to a working server # # 503 service not available: maybe the server is still coming up. Makes sense # to wait # # 504 gateway timeout: maybe the upstream is so overloaded, that it can't # respond. We'll wait a bit and try again. # Backoff: The choice of "backoff_factor" is arbitrary retries = Retry(backoff_factor=.0707, status_forcelist=[403, 429, 500, 503, 504]) adapter = HTTPAdapter(max_retries=retries) session.mount('http://', adapter) session.mount('https://', adapter) if self.cache_dir: http_cache = FileCache(self.cache_dir) return CacheControl(session, cache=http_cache) elif self.mem_cache: return CacheControl(session) else: return session def _provide_session(self): return retrieve_provider(self.session_provider)() def save_session(self): if hasattr(self, 'session_file_name') and self.session_file_name: sfname = expanduser(self.session_file_name) with open(sfname + '.tmp', 'wb') as session_file: pickle.dump(self._session, session_file) os.rename(sfname + '.tmp', sfname) def __getstate__(self): state = self.__dict__.copy() # We're storing the session in a separate file (if at all), so obviously we don't # persist it with the HTTPURLConfig del state['_session'] return state def __setstate__(self, state): self.__dict__.update(state) self._session = None
[docs]class HTTPSURLConfig(HTTPURLConfig): ''' HTTPS URL configuration ''' def __init__(self, *args, ssl_context_provider=None, ssl_context=None, **kwargs): ''' Parameters ---------- *args Passed on to HTTPURLConfig ssl_context_provider : str Path to a callable that provides a `ssl.SSLContext`. See `https_remote` ssl_context : ssl.SSLContext The SSL/TLS context to use for uploading with this accessor **kwargs Passed on to HTTPURLConfig ''' super(HTTPSURLConfig, self).__init__(*args, **kwargs) self.ssl_context_provider = ssl_context_provider self._ssl_context = ssl_context def init_ssl_context(self): if self._ssl_context is not None: return if self.ssl_context_provider: ssl_context_provider = self._lookup_ssl_context_provider() try: ssl_context = ssl_context_provider() except Exception as e: raise HTTPSURLError('Error from SSL context provider' f' "{self.ssl_context_provider}": {e}') if not isinstance(ssl_context, ssl.SSLContext): raise HTTPSURLError('Provider returned something other than an' f' ssl.SSLContext: {ssl_context}') self._ssl_context = ssl_context def _lookup_ssl_context_provider(self): try: return retrieve_provider(self.ssl_context_provider) except ValueError: raise HTTPSURLError('Format of the provider path is incorrect') except AttributeError: raise HTTPSURLError(f'"{self.ssl_context_provider}" does not point to an' ' SSL context provider') @property def ssl_context(self): self.init_ssl_context() return self._ssl_context def __getstate__(self): state = super(HTTPSURLConfig, self).__getstate__() del state['_ssl_context'] return state def __setstate__(self, state): super(HTTPSURLConfig, self).__setstate__(state) self._ssl_context = None def __str__(self): if self.ssl_context_provider: ssl_context_maybe = f'\n SSL Context Provider: {self.ssl_context_provider}' else: ssl_context_maybe = '' return f'{self.url}{ssl_context_maybe}'
HTTPSURLConfig.register('https') HTTPURLConfig.register('http') class HTTPSURLError(Exception): pass
[docs]class HTTPBundleLoader(Loader): ''' Loads bundles from HTTP(S) resources listed in an index file ''' def __init__(self, index_url, cachedir=None, hash_preference=('sha224',), **kwargs): ''' Parameters ---------- index_url : str or owmeta_core.bundle.URLConfig URL for the index file pointing to the bundle archives cachedir : str, optional Directory where the index and any downloaded bundle archive should be cached. If provided, the index and bundle archive is cached in the given directory. If not provided, the index will be cached in memory and the bundle will not be cached. hash_preference : tuple of str Preference ordering of hashes to use for checking integrity of files. If none match in the preference ordering, then the first one **kwargs Passed on to `.Loader` ''' super(HTTPBundleLoader, self).__init__(**kwargs) if isinstance(index_url, str): self.index_url = index_url self._url_config = None elif isinstance(index_url, HTTPURLConfig): self.index_url = index_url.url self._url_config = index_url elif isinstance(index_url, URLConfig): self.index_url = index_url.url self._url_config = None else: raise TypeError('Expecting a string or URLConfig. Received %s' % type(index_url)) if not hash_preference: hash_preference = tuple(hash_preference) for hash_name in hash_preference: if hash_name not in hashlib.algorithms_available: raise ValueError(f'"{hash_name}" is not available in hashlib on this system') self.hash_preference = hash_preference self.cachedir = cachedir self._session = getattr(index_url, 'session', None) or requests.Session() self._index = None def __repr__(self): return '{}({})'.format(FCN(type(self)), repr(self.index_url)) def _setup_index(self): if self._index is None: response = self._session.get(self.index_url) if response.status_code != 200: raise IndexLoadFailed(response) try: self._index = response.json() except json.decoder.JSONDecodeError: raise IndexLoadFailed(response)
[docs] @classmethod def can_load_from(cls, ac): ''' Returns `True` for ``http://`` or ``https://`` `URLConfigs <URLConfig>` Parameters ---------- ac : AccessorConfig The config which we may be able to load from ''' return (isinstance(ac, URLConfig) and (ac.url.startswith('https://') or ac.url.startswith('http://')))
def _save_session(self): if not self._url_config: return try: self._url_config.save_session() except Exception: L.warning('Error while attempting to save session', exc_info=True)
[docs] def can_load(self, bundle_id, bundle_version=None): ''' Check the index for an entry for the bundle. - If a version is given and the index has an entry for the bundle at that version and that entry gives a URL for the bundle, then we return `True`. - If no version is given and the index has an entry for the bundle at any version and that entry gives a URL for the bundle, then we return `True`. - Otherwise, we return `False` Parameters ---------- bundle_id : str ID of the bundle to look for bundle_version : int, optional Version number of the bundle to look for. If not provided, then any version is deemed acceptable Returns ------- bool `True` if the bundle can be loaded; otherwise, `False` ''' try: self._setup_index() self._save_session() except IndexLoadFailed: L.warning('Failed to set up the index for %s', self, exc_info=L.isEnabledFor(logging.DEBUG)) return False binfo = self._index.get(bundle_id) if binfo: if bundle_version is None: for binfo_version, versioned_binfo in binfo.items(): try: int(binfo_version) except ValueError: L.warning("Got unexpected non-version-number key '%s' in bundle index info", binfo_version) continue try: binfo_url = versioned_binfo.get('url') except AttributeError: L.warning("Got unexpected bundle info for version '%s' in bundle index info", binfo_version) continue if self._bundle_url_is_ok(binfo_url): return True return False if not isinstance(binfo, dict): return False versioned_binfo = binfo.get(str(bundle_version)) try: binfo_url = versioned_binfo.get('url') except AttributeError: L.warning("Got unexpected bundle info for version '%s' in bundle index info", versioned_binfo) return False return self._bundle_url_is_ok(binfo_url)
def _bundle_url_is_ok(self, bundle_url): try: parsed_url = urlparse(bundle_url) except Exception: L.warning("Failed while parsing bundle URL '%s'", bundle_url) return False if parsed_url.scheme in ('http', 'https') and parsed_url.netloc: return True return False def bundle_versions(self, bundle_id): self._setup_index() binfo = self._index.get(bundle_id) if not binfo: return [] res = [] for k in binfo.keys(): try: val = int(k) except ValueError: L.warning("Got unexpected non-version-number key '%s' in bundle index info", k) else: res.append(val) return res def load(self, bundle_id, bundle_version=None): try: self._load(bundle_id, bundle_version) finally: self._save_session() def _load(self, bundle_id, bundle_version=None): ''' Loads a bundle by downloading an index file, looking up the bundle location, and then downloading the bundle ''' self._setup_index() binfo = self._index.get(bundle_id) if not binfo: raise LoadFailed(bundle_id, self, 'Bundle is not in the index') if not isinstance(binfo, dict): raise LoadFailed(bundle_id, self, 'Unexpected type of bundle info in the index') if bundle_version is None: max_vn = 0 for k in binfo.keys(): try: val = int(k) except ValueError: L.warning("Got unexpected non-version-number key '%s' in bundle index info", k) else: if max_vn < val: max_vn = val if not max_vn: raise LoadFailed(bundle_id, self, 'No releases found') bundle_version = max_vn versioned_binfo = binfo.get(str(bundle_version)) if not versioned_binfo or not isinstance(versioned_binfo, dict): raise LoadFailed(bundle_id, self, f'No bundle info for version {bundle_version}') bundle_url = versioned_binfo.get('url') if not self._bundle_url_is_ok(bundle_url): raise LoadFailed(bundle_id, self, 'Did not find a valid URL for "%s" at' ' version %s' % (bundle_id, bundle_version)) hashes = versioned_binfo.get('hashes') if not isinstance(hashes, dict) or not hashes: raise LoadFailed(bundle_id, self, f'No hash info for version {bundle_version}') for hash_name in self.hash_preference: bundle_hash = hashes.get(hash_name) if bundle_hash: break else: # no break for hash_name, bundle_hash in hashes.items(): if hash_name in hashlib.algorithms_available: break else: # no break raise LoadFailed(bundle_id, self, f'No supported hash for version {bundle_version}') try: hsh = hashlib.new(hash_name) except ValueError as e: L.warning('Hash in hashlib.algorithms_available unsupported in hashlib.new') raise LoadFailed(bundle_id, self, f'Unsupported hash {hash_name} for version {bundle_version}') from e response = self._session.get(bundle_url, stream=True) try: response.raise_for_status() except requests.HTTPError as e: raise LoadFailed('Received HTTP error from bundle server') from e if self.cachedir is not None: bfn = urlquote(bundle_id) with open(p(self.cachedir, bfn), 'wb') as f: no_content = True for chunk in response.iter_content(chunk_size=1024): no_content = False hsh.update(chunk) f.write(chunk) if no_content: raise LoadFailed(bundle_id, self, f'Failed to load bundle for version {bundle_version}: no content') digest = hsh.hexdigest() if bundle_hash != digest: raise LoadFailed(bundle_id, self, f'Failed to verify {hash_name} hash for version {bundle_version}: ' f'Expected {bundle_hash} but got {digest}') with open(p(self.cachedir, bfn), 'rb') as f: Unarchiver().unpack(f, self.base_directory) else: bio = io.BytesIO() no_content = True for chunk in response.iter_content(chunk_size=1024): no_content = False hsh.update(chunk) bio.write(chunk) if no_content: raise LoadFailed(bundle_id, self, f'Failed to load bundle for version {bundle_version}: no content') digest = hsh.hexdigest() if bundle_hash != digest: raise LoadFailed(bundle_id, self, f'Failed to verify {hash_name} hash for version {bundle_version}: ' f'Expected {bundle_hash} but got {digest}') bio.seek(0) Unarchiver().unpack(bio, self.base_directory)
[docs]class HTTPBundleUploader(Uploader): ''' Uploads bundles by sending bundle archives in HTTP POST requests ''' def __init__(self, upload_url, ssl_context=None, max_retries=1): ''' Parameters ---------- upload_url : str or URLConfig URL string or accessor config ssl_context : ssl.SSLContext, optional SSL/TLS context to use for the connection. Overrides any context provided in `upload_url` max_retries : int, optional Maximum number of times to retry the upload after a failure. ''' super(HTTPBundleUploader, self).__init__() self.ssl_context = None self.max_retries = max_retries if isinstance(upload_url, str): self.upload_url = upload_url elif isinstance(upload_url, HTTPSURLConfig): self.upload_url = upload_url.url self.ssl_context = upload_url.ssl_context elif isinstance(upload_url, URLConfig): self.upload_url = upload_url.url else: raise TypeError('Expecting a string or URLConfig. Received %s' % type(upload_url)) if ssl_context: self.ssl_context = ssl_context @classmethod def can_upload_to(self, accessor_config): return (isinstance(accessor_config, URLConfig) and (accessor_config.url.startswith('https://') or accessor_config.url.startswith('http://'))) def can_upload(self, bundle_path): return True
[docs] def upload(self, bundle_path): ''' Attempt to upload the bundle. Retries will be attempted when `BrokenPipeError` is thrown by the http client ''' with ensure_archive(bundle_path) as archive_path: self._post(archive_path)
def _post(self, archive): parsed_url = urlparse(self.upload_url) if parsed_url.scheme == 'http': connection_ctor = http.client.HTTPConnection else: def connection_ctor(*args, **kwargs): return http.client.HTTPSConnection(*args, context=self.ssl_context, **kwargs) conn = connection_ctor(parsed_url.netloc) with open(archive, 'rb') as f: retries = 0 completed = False while not completed: try: conn.request("POST", "", body=f, headers={'Content-Type': BUNDLE_ARCHIVE_MIME_TYPE}) completed = True except (BrokenPipeError, ssl.SSLEOFError): if retries >= self.max_retries: raise L.warn('Failed to upload bundle to %s. Will retry %d more times.', self.upload_url, self.max_retries - retries, exc_info=True) conn = connection_ctor(parsed_url.netloc) retries += 1
# XXX: Do something with this response # conn.getresponse()
[docs]def http_remote(self, *, cache=None, session_provider=None, session_file_name=None): ''' Provide additional parameters for HTTP remote accessors Parameters ---------- cache : str Either the string "mem" or a file path to a cache directory session_provider : str Path to a callable that provides a `requests.Session`. The format is similar to that for setuptools entry points: ``path.to.module:path.to.provider.callable``. Notably, there's no name and "extras" are not supported. optional. session_file_name : str Path to a file where the HTTP session can be stored ''' if self._url_config is None: raise GenericUserError('An HTTP URL must be specified for HTTP accessors') if not isinstance(self._url_config, HTTPURLConfig): raise GenericUserError(f'The specified URL, {self._url_config} is not an HTTP URL') _http_urlconfig_command_helper(self, cache, session_provider, session_file_name) return self._write_remote()
[docs]def https_remote(self, *, ssl_context_provider=None, cache=None, session_provider=None, session_file_name=None): ''' Provide additional parameters for HTTPS remote accessors Parameters ---------- ssl_context_provider : str Path to a callable that provides a `ssl.SSLContext` used for bundle uploads. The format is similar to that for setuptools entry points: ``path.to.module:path.to.provider.callable``. Notably, there's no name and "extras" are not supported. optional. cache : str Either the string "mem" or a file path to a cache directory session_provider : str Path to a callable that provides a `requests.Session`. The format is similar to that for setuptools entry points: ``path.to.module:path.to.provider.callable``. Notably, there's no name and "extras" are not supported. optional. session_file_name : str Path to a file where the HTTP session can be stored ''' if self._url_config is None: raise GenericUserError('An HTTPS URL must be specified for HTTPS accessors') if not isinstance(self._url_config, HTTPSURLConfig): raise GenericUserError(f'The specified URL, {self._url_config} is not an HTTPS URL') try: if ssl_context_provider: self._url_config.ssl_context_provider = ssl_context_provider self._url_config.init_ssl_context() except HTTPSURLError as e: raise GenericUserError(str(e)) _http_urlconfig_command_helper(self, cache, session_provider, session_file_name) return self._write_remote()
def _http_urlconfig_command_helper(self, cache, session_provider, session_file_name): if cache == 'mem': self._url_config.mem_cache = True else: self._url_config.cache_dir = cache if session_file_name: self._url_config.session_file_name = session_file_name if session_provider: self._url_config.session_provider = session_provider # Initialize a session to make sure the configs work. self._url_config._make_new_session()
[docs]class IndexLoadFailed(Exception): ''' Thrown when the HTTP bundle loader cannot get its index ''' def __init__(self, response): self.response = response