from __future__ import print_function
import sys
import types
import argparse
import copy as _copy
import functools
from textwrap import wrap
import logging
from .utils import FCN
from .docscrape import parse as npdoc_parse
from .command_util import IVar, SubCommand
# TODO: Use `inspect` module for getting argument names so we aren't depending on
# docstrings
from .cli_common import (INSTANCE_ATTRIBUTE,
METHOD_NAMED_ARG,
METHOD_NARGS,
METHOD_KWARGS)
from .cli_hints import CLI_HINTS
L = logging.getLogger(__name__)
ARGUMENT_TYPES = {
'int': int
}
''' Map from parameter types to type constructors for parsing arguments '''
_KVLIST_ARG = '_KVLIST_ARG'
[docs]class CLIUserError(Exception):
'''
An error which the user would have to correct.
Typically caused by invalid user input
'''
def _method_runner(runner, key):
L.debug('Getting method %s from %s', key, runner)
method = getattr(runner, key)
@functools.wraps(method)
def _f(*args, **kwargs):
return method(*args, **kwargs)
return _f
def _sc_runner(sub_mapper, sub_runner):
def _f():
return sub_mapper.apply(sub_runner)
return _f
[docs]class CLIArgMapper(object):
'''
Stores mappings for arguments and maps them back to the part of the object
they come from
'''
def __init__(self):
self.mappings = dict()
self.methodname = None
self.runners = dict()
''' Mapping from subcommand names to functions which run for them '''
self.named_arg_count = dict()
self.argparser = None
# A special little mapper just for callable runners
self.runner_mapper = None
[docs] def apply(self, runner):
'''
Applies the collected arguments to the runner by calling methods and traversing
the object attributes as required
Parameters
----------
runner : object
Target of the command and source of argument and method names
See Also
--------
CLICommandWrapper : accepts a runner argument in its ``__init__`` method
'''
iattrs = self.get(INSTANCE_ATTRIBUTE)
kvpairs = self.get(METHOD_KWARGS)
kvs = list(kv.split('=', 1) for kv in next(iter(kvpairs.values()), ()))
kwargs = {k: v for k, v in kvs}
args = self.get_list(METHOD_NAMED_ARG)
if not args:
kwargs.update(self.get(METHOD_NAMED_ARG))
kvlist = self.get(_KVLIST_ARG)
for key, vals in kvlist.items():
kwargs[key] = list(kv.split('=', 1) for kv in vals)
try:
# There is, at most, one nargs entry.
nargs = next(iter(self.get(METHOD_NARGS).values()))
except StopIteration:
nargs = ()
L.debug('Looking up runner %s from %s', self.methodname, self.runners)
runmethod = self.runners.get(self.methodname, None)
def continuation():
argcount = self.named_arg_count.get(self.methodname)
if nargs and args and argcount is not None and len(args) != argcount:
# This means we have passed in positional arguments, and we have a
# variable-length option, but we have not filled out all of the arguments
# necessary to cleanly apply the runmethod since Python would think we're
# trying to apply some arguments twice.
#
# We *could* support a slightly richer set of options here, but it's probably
# not worth it...
#
# Also, this is a programmer error. End-users shouldn't hit this
raise Exception('Missing arguments to method ' + str(self.methodname))
return runmethod(*(tuple(args) + tuple(nargs)), **kwargs)
for k, v in iattrs.items():
setattr(runner, k, v)
if callable(runner) and self.runner_mapper:
if runmethod is not None:
runner._next = continuation
return self.runner_mapper.apply(runner)
if runmethod is None:
self.argparser.print_help(file=sys.stderr)
print(file=sys.stderr)
raise CLIUserError('Please specify a sub-command')
return continuation()
def get(self, key):
return {k[1]: self.mappings[k] for k in self.mappings if k[0] == key and
self.mappings[k] is not None}
def get_list(self, key):
keys = sorted((k for k in self.mappings.keys() if k[0] == key), key=lambda it: it[2])
last = -1
for k in keys:
if k[2] - last != 1:
return []
last = k[2]
return [self.mappings[k] for k in keys]
def __str__(self):
return type(self).__name__ + '(' + str(self.mappings) + ')'
[docs]class CLIStoreAction(argparse.Action):
''' Interacts with the CLIArgMapper '''
def __init__(self, mapper, key, index=-1, mapped_name=None, *args, **kwargs):
'''
Parameters
----------
mapper : CLIArgMapper
CLI argument to Python mapper
key : str
Indicates what kind of argument is being mapped. One of `.INSTANCE_ATTRIBUTE`,
`.METHOD_NAMED_ARG`, `.METHOD_KWARGS`, `.METHOD_NARGS`
index : int
Argument index. Used for maintaining the order of arguments when passed to the
runner
mapped_name : str
The name to map to. optional.
*args
passed to `~argparse.Action`
**kwargs
passed to `~argparse.Action`
'''
super(CLIStoreAction, self).__init__(*args, **kwargs)
if self.nargs == 0:
raise ValueError('nargs for store actions must be > 0; if you '
'have nothing to store, actions such as store '
'true or store const may be more appropriate')
if self.const is not None and self.nargs != argparse.OPTIONAL:
raise ValueError('nargs must be %r to supply const' % argparse.OPTIONAL)
self.mapper = mapper
self.key = key
self.name = mapped_name or self.dest
self.index = index
def __call__(self, parser, namespace, values, option_string=None):
self.mapper.mappings[(self.key, self.name, self.index)] = values
setattr(namespace, self.dest, values)
[docs]class CLIStoreTrueAction(CLIStoreAction):
'''
Action for storing `True` when a given option is provided
'''
def __init__(self, *args, **kwargs):
'''
Parameters
----------
*args
passed to `~.CLIStoreAction`
**kwargs
passed to `~.CLIStoreAction`
'''
super(CLIStoreTrueAction, self).__init__(*args, **kwargs)
self.nargs = 0
def __call__(self, parser, namespace, values, option_string=None):
super(CLIStoreTrueAction, self).__call__(parser, namespace, True, option_string)
[docs]class CLIAppendAction(CLIStoreAction):
'''
Extends CLIStoreAction to append to a set of accumulated values
Used for recording a `dict`
'''
def __call__(self, parser, namespace, values, option_string=None):
'''
Parameters
----------
parser
Ignored
namespace : argparse.Namespace
Namespace to add to
values : str
Value to add
'''
items = _copy.copy(_ensure_value(namespace, self.dest, []))
if isinstance(values, list):
# This can happen because of a kwargs-type argument that has nargs='*'
items += values
else:
items.append(values)
self.mapper.mappings[(self.key, self.name, -1)] = items
setattr(namespace, self.dest, items)
[docs]class CLISubCommandAction(argparse._SubParsersAction):
'''
Action for sub-commands
Extends the normal action for sub-parsers to record the subparser name in a mapper
'''
def __init__(self, mapper, *args, **kwargs):
'''
Parameters
----------
mapper : CLIArgMapper
CLI argument to Python mapper
*args
Passed on to `argparse._SubParsersAction`
**kwargs
Passed on to `argparse._SubParsersAction`
'''
super(CLISubCommandAction, self).__init__(*args, **kwargs)
self.mapper = mapper
def __call__(self, *args, **kwargs):
if self.mapper.methodname is not None:
raise ValueError('More than one sub command has been specified!'
'Attempted to set {} when {} had already been'
' set.'.format(self.dest, self.mapper.methodname))
self.mapper.methodname = args[2][0]
super(CLISubCommandAction, self).__call__(*args, **kwargs)
NOT_SET = object()
def _ensure_value(namespace, name, value):
if getattr(namespace, name, None) is None:
setattr(namespace, name, value)
return getattr(namespace, name)
[docs]class CLICommandWrapper(object):
'''
Wraps an object such that it can be used in a command line interface
'''
def __init__(self, runner, mapper=None, hints=None, hints_map=None, program_name=None):
'''
Parameters
----------
runner : object
An object that provides the methods to be invoked
mapper : CLIArgMapper
Stores the arguments and associated runners for the command. A mapper is
created if none is provided. optional
hints : dict
A multi-level dict describing how certain command line arguments get turned
into attributes and method arguments. If `hints` is not provided, the hints
are looked up by the runner's fully-qualified class name in `hints_map`. optional
hints_map : dict
A multi-level dict describing how certain command line arguments get turned
into attributes and method arguments. Defaults to `CLI_HINTS <.cli_hints>`. optional
program_name : str
The name of the top-level program. Uses `sys.argv[0] <sys.argv>` if not provided.
optional
'''
self.runner = runner
self.mapper = CLIArgMapper() if mapper is None else mapper
self.hints_map = hints_map or CLI_HINTS
self.hints = self.hints_map.get(FCN(type(runner)), {}) if hints is None else hints
self.program_name = program_name
[docs] def extract_args(self, val):
'''
Extract arguments from the method or class docstring
In the return value (see below), the ``summary`` is a `str` used in listing out
sub-commands. The ``detail`` is for the sub-command usage information and should,
generally, include the ``summary``. The ``params`` are a list
`~owmeta_core.docscrape.ParamInfo` objects describing the parameters.
Parameters
----------
val : object
The object with the documentation
Returns
-------
tuple
a triple, ``(summary, detail, params)``
'''
docstring = getattr(val, '__doc__', '')
if not docstring:
docstring = ''
npdoc = npdoc_parse(docstring)
params = npdoc.get('parameters')
paragraphs = self._split_paras(npdoc.get('desc') or '')
if len(paragraphs) >= 1:
summary = paragraphs[0]
else:
summary = ''
detail = '\n \n'.join('\n'.join(wrap(x, width=80)) for x in paragraphs if x)
return summary, detail, params
def _split_paras(self, docstring):
paragraphs = []
temp = ''
for ln in docstring.split('\n'):
ln = ln.strip()
if ln:
temp += '\n' + ln
else:
if temp:
paragraphs.append(temp.strip())
temp = ''
if temp:
paragraphs.append(temp.strip())
return paragraphs
[docs] def parser(self, parser=None):
'''
Generates the argument parser's arguments
Parameters
----------
parser : argparse.ArgumentParser
The parser to add the arguments to. optional: will create a parser if none is
given
'''
if parser is None:
doc = getattr(self.runner, '__doc__', None)
if doc:
cmd_summary, _, _ = self.extract_args(self.runner)
else:
cmd_summary = None
parser = argparse.ArgumentParser(prog=self.program_name, description=cmd_summary)
self.mapper.argparser = parser
for key, val in vars(self.runner).items():
if not key.startswith('_') and key not in self.hints.get('IGNORE', ()):
parser.add_argument('--' + key, help=key.__doc__)
_sp = [None]
def sp():
if _sp[0] is None:
_sp[0] = parser.add_subparsers(dest='subparser', mapper=self.mapper,
action=CLISubCommandAction)
return _sp[0]
runner_type_attrs = dict()
runner_type = type(self.runner)
for x in dir(runner_type):
if x.startswith('_') or x in self.hints.get('IGNORE', ()):
continue
runner_type_attrs[x] = getattr(runner_type, x)
if '__call__' in dir(runner_type):
# Handle sub-commands which are, themselves, callable. Summary and details
# must be specified on the sub-command class docstring and would have already
# been handled by the ``isinstance(val, SubCommand)`` case below
_, _, params = self.extract_args(runner_type.__call__)
saved_mapper = self.mapper
self.mapper = CLIArgMapper()
self._handle_method(self.program_name, parser, '__call__',
runner_type.__call__, params)
saved_mapper.runner_mapper = self.mapper
self.mapper = saved_mapper
# reset our mapper so the methods and such
for key, val in sorted(runner_type_attrs.items()):
if isinstance(val, (types.FunctionType, types.MethodType)):
command_name = key.replace('_', '-')
summary, detail, params = self.extract_args(val)
subparser = sp().add_parser(command_name,
help=summary,
description=detail,
formatter_class=argparse.RawDescriptionHelpFormatter)
self._handle_method(command_name, subparser, key, val, params)
elif isinstance(val, property):
doc = getattr(val, '__doc__', None)
parser.add_argument('--' + key, help=doc,
action=CLIStoreAction,
key=INSTANCE_ATTRIBUTE,
mapper=self.mapper)
elif isinstance(val, SubCommand):
summary, detail, params = self.extract_args(val)
sub_runner = getattr(self.runner, key)
sub_mapper = CLIArgMapper()
command_name = key.replace('_', '-')
self.mapper.runners[command_name] = _sc_runner(sub_mapper, sub_runner)
subparser = sp().add_parser(command_name, help=summary, description=detail)
type(self)(sub_runner, sub_mapper, hints_map=self.hints_map).parser(subparser)
elif isinstance(val, IVar):
doc = getattr(val, '__doc__', None)
var_hints = self.hints.get(key) if self.hints else None
if val.default_value:
if doc:
doc += '. Default is ' + repr(val.default_value)
else:
doc = 'Default is ' + repr(val.default_value)
# NOTE: we have a default value from the val, but we don't
# set it here -- IVars return the defaults ... by default
arg_kwargs = dict(help=doc,
action=CLIStoreAction,
key=INSTANCE_ATTRIBUTE,
mapper=self.mapper)
if val.value_type == bool:
arg_kwargs['action'] = CLIStoreTrueAction
names = None if var_hints is None else var_hints.get('names')
if names is None:
names = ['--' + key.replace('_', '-')]
parser.add_argument(*names, **arg_kwargs)
return parser
def _handle_method(self, command_name, subparser, key, val, params):
sc_hints = self.hints.get(key) if self.hints else None
meth = getattr(self.runner, key)
positional_arg_count = meth.__code__.co_argcount
self.mapper.runners[command_name] = _method_runner(self.runner, key)
argcount = 0
for pindex, param in enumerate(params):
action = CLIStoreAction
if param.val_type == 'bool':
action = CLIStoreTrueAction
atype = ARGUMENT_TYPES.get(param.val_type)
arg = param.name
arg_cli_name = arg.replace('_', '-')
desc = param.desc
if arg.startswith('**'):
arg_hints = self._arg_hints(sc_hints, METHOD_KWARGS, arg[2:])
names = None if arg_hints is None else arg_hints.get('names')
if names is None:
names = ['--' + arg_cli_name[2:]]
argument_args = dict(action=CLIAppendAction,
mapper=self.mapper,
key=METHOD_KWARGS,
type=atype,
help=desc)
if arg_hints:
nargs = arg_hints.get('nargs')
if nargs is not None:
argument_args['nargs'] = nargs
subparser.add_argument(*names,
**argument_args)
elif arg.startswith('*'):
subparser.add_argument(arg_cli_name[1:],
action=action,
nargs='*',
key=METHOD_NARGS,
mapper=self.mapper,
type=atype,
help=desc)
else:
arg_hints = self._arg_hints(sc_hints, METHOD_NAMED_ARG, arg)
names = None if arg_hints is None else arg_hints.get('names')
if names is None:
names = ['--' + arg_cli_name]
index = pindex
if positional_arg_count >= pindex:
index = -1
argument_args = dict(action=action,
key=METHOD_NAMED_ARG,
mapper=self.mapper,
index=index,
mapped_name=arg,
type=atype,
help=desc)
if arg_hints:
nargs = arg_hints.get('nargs')
if nargs is not None:
argument_args['nargs'] = nargs
is_kvlist = arg_hints.get('kvlist')
if is_kvlist:
argument_args['key'] = _KVLIST_ARG
action_hint = arg_hints.get('action')
hinted_action = None
if 'append' == action_hint:
hinted_action = CLIAppendAction
elif 'store_true' == action_hint:
hinted_action = CLIStoreTrueAction
elif 'store' == action_hint:
hinted_action = CLIStoreAction
if hinted_action is not None:
argument_args['action'] = hinted_action
subparser.add_argument(*names,
**argument_args)
argcount += 1
self.mapper.named_arg_count[key] = argcount
def _arg_hints(self, sc_hints, atype, key):
return None if sc_hints is None else sc_hints.get((atype, key))
[docs] def main(self, args=None, argument_callback=None, argument_namespace_callback=None):
'''
Runs in a manner suitable for being the 'main' method for a command
line interface: parses arguments (as would be done with the result of
`parser`) from sys.argv or the provided args list and executes the
commands specified therein
Parameters
----------
args : list
the argument list to parse. optional
argument_callback : callable
a callback to add additional arguments to the command line. optional
argument_namespace_callback : callable
a callback to handle the parsed arguments to the command line. optional
'''
parser = self.parser()
if argument_callback:
argument_callback(parser)
ns = parser.parse_args(args=args)
if argument_namespace_callback:
argument_namespace_callback(ns)
return self.mapper.apply(self.runner)