# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Application base class. """ import inspect import locale import logging import logging.handlers import os import sys import cmd2 from cliff import _argparse from . import complete from . import help from . import utils logging.getLogger('cliff').addHandler(logging.NullHandler()) # Exit code for exiting due to a signal is 128 + the signal number _SIGINT_EXIT = 130 _SIGPIPE_EXIT = 141 class App(object): """Application base class. :param description: one-liner explaining the program purpose :paramtype description: str :param version: application version number :paramtype version: str :param command_manager: plugin loader :paramtype command_manager: cliff.commandmanager.CommandManager :param stdin: Standard input stream :paramtype stdin: readable I/O stream :param stdout: Standard output stream :paramtype stdout: writable I/O stream :param stderr: Standard error output stream :paramtype stderr: writable I/O stream :param interactive_app_factory: callable to create an interactive application :paramtype interactive_app_factory: cliff.interactive.InteractiveApp :param deferred_help: True - Allow subcommands to accept --help with allowing to defer help print after initialize_app :paramtype deferred_help: bool """ NAME = os.path.splitext(os.path.basename(sys.argv[0]))[0] if NAME == '__main__': NAME = os.path.split(os.path.dirname(sys.argv[0]))[-1] LOG = logging.getLogger(NAME) CONSOLE_MESSAGE_FORMAT = '%(message)s' LOG_FILE_MESSAGE_FORMAT = \ '[%(asctime)s] %(levelname)-8s %(name)s %(message)s' DEFAULT_VERBOSE_LEVEL = 1 DEFAULT_OUTPUT_ENCODING = 'utf-8' def __init__(self, description, version, command_manager, stdin=None, stdout=None, stderr=None, interactive_app_factory=None, deferred_help=False): """Initialize the application. """ self.command_manager = command_manager self.command_manager.add_command('help', help.HelpCommand) self.command_manager.add_command('complete', complete.CompleteCommand) self._set_streams(stdin, stdout, stderr) self.interactive_app_factory = interactive_app_factory self.deferred_help = deferred_help self.parser = self.build_option_parser(description, version) self.interactive_mode = False self.interpreter = None def _set_streams(self, stdin, stdout, stderr): try: locale.setlocale(locale.LC_ALL, '') except locale.Error: pass # Unicode must be encoded/decoded for text I/O streams, the # correct encoding for the stream must be selected and it must # be capable of handling the set of characters in the stream # or Python will raise a codec error. The correct codec is # selected based on the locale. Python2 uses the locales # encoding but only when the I/O stream is attached to a # terminal (TTY) otherwise it uses the default ASCII # encoding. The effect is internationalized text written to # the terminal works as expected but if command line output is # redirected (file or pipe) the ASCII codec is used and the # program aborts with a codec error. # # The default I/O streams stdin, stdout and stderr can be # wrapped in a codec based on the locale thus assuring the # users desired encoding is always used no matter the I/O # destination. Python3 does this by default. # # If the caller supplies an I/O stream we use it unmodified on # the assumption the caller has taken all responsibility for # the stream. But with Python2 if the caller allows us to # default the I/O streams to sys.stdin, sys.stdout and # sys.stderr we apply the locales encoding just as Python3 # would do. We also check to make sure the main Python program # has not already already wrapped sys.stdin, sys.stdout and # sys.stderr as this is a common recommendation. self.stdin = stdin or sys.stdin self.stdout = stdout or sys.stdout self.stderr = stderr or sys.stderr def build_option_parser(self, description, version, argparse_kwargs=None): """Return an argparse option parser for this application. Subclasses may override this method to extend the parser with more global options. :param description: full description of the application :paramtype description: str :param version: version number for the application :paramtype version: str :param argparse_kwargs: extra keyword argument passed to the ArgumentParser constructor :paramtype extra_kwargs: dict """ argparse_kwargs = argparse_kwargs or {} parser = _argparse.ArgumentParser( description=description, add_help=False, **argparse_kwargs ) parser.add_argument( '--version', action='version', version='{0} {1}'.format(App.NAME, version), ) verbose_group = parser.add_mutually_exclusive_group() verbose_group.add_argument( '-v', '--verbose', action='count', dest='verbose_level', default=self.DEFAULT_VERBOSE_LEVEL, help='Increase verbosity of output. Can be repeated.', ) verbose_group.add_argument( '-q', '--quiet', action='store_const', dest='verbose_level', const=0, help='Suppress output except warnings and errors.', ) parser.add_argument( '--log-file', action='store', default=None, help='Specify a file to log output. Disabled by default.', ) if self.deferred_help: parser.add_argument( '-h', '--help', dest='deferred_help', action='store_true', help="Show help message and exit.", ) else: parser.add_argument( '-h', '--help', action=help.HelpAction, nargs=0, default=self, # tricky help="Show help message and exit.", ) parser.add_argument( '--debug', default=False, action='store_true', help='Show tracebacks on errors.', ) return parser def configure_logging(self): """Create logging handlers for any log output. """ root_logger = logging.getLogger('') root_logger.setLevel(logging.DEBUG) # Set up logging to a file if self.options.log_file: file_handler = logging.FileHandler( filename=self.options.log_file, ) formatter = logging.Formatter(self.LOG_FILE_MESSAGE_FORMAT) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) # Always send higher-level messages to the console via stderr console = logging.StreamHandler(self.stderr) console_level = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG, }.get(self.options.verbose_level, logging.DEBUG) console.setLevel(console_level) formatter = logging.Formatter(self.CONSOLE_MESSAGE_FORMAT) console.setFormatter(formatter) root_logger.addHandler(console) return def print_help_if_requested(self): """Print help and exits if deferred help is enabled and requested. '--help' shows the help message and exits: * without calling initialize_app if not self.deferred_help (default), * after initialize_app call if self.deferred_help, * during initialize_app call if self.deferred_help and subclass calls explicitly this method in initialize_app. """ if self.deferred_help and self.options.deferred_help: action = help.HelpAction(None, None, default=self) action(self.parser, self.options, None, None) def run(self, argv): """Equivalent to the main program for the application. :param argv: input arguments and options :paramtype argv: list of str """ try: self.options, remainder = self.parser.parse_known_args(argv) self.configure_logging() self.interactive_mode = not remainder if self.deferred_help and self.options.deferred_help and remainder: # When help is requested and `remainder` has any values disable # `deferred_help` and instead allow the help subcommand to # handle the request during run_subcommand(). This turns # "app foo bar --help" into "app help foo bar". However, when # `remainder` is empty use print_help_if_requested() to allow # for an early exit. # Disabling `deferred_help` here also ensures that # print_help_if_requested will not fire if called by a subclass # during its initialize_app(). self.options.deferred_help = False remainder.insert(0, "help") self.initialize_app(remainder) self.print_help_if_requested() except BrokenPipeError: return _SIGPIPE_EXIT except Exception as err: if hasattr(self, 'options'): debug = self.options.debug else: debug = True if debug: self.LOG.exception(err) raise else: self.LOG.error(err) return 1 except KeyboardInterrupt: return _SIGINT_EXIT result = 1 if self.interactive_mode: result = self.interact() else: try: result = self.run_subcommand(remainder) except BrokenPipeError: return _SIGPIPE_EXIT except KeyboardInterrupt: return _SIGINT_EXIT return result # FIXME(dhellmann): Consider moving these command handling methods # to a separate class. def initialize_app(self, argv): """Hook for subclasses to take global initialization action after the arguments are parsed but before a command is run. Invoked only once, even in interactive mode. :param argv: List of arguments, including the subcommand to run. Empty for interactive mode. """ return def prepare_to_run_command(self, cmd): """Perform any preliminary work needed to run a command. :param cmd: command processor being invoked :paramtype cmd: cliff.command.Command """ return def clean_up(self, cmd, result, err): """Hook run after a command is done to shutdown the app. :param cmd: command processor being invoked :paramtype cmd: cliff.command.Command :param result: return value of cmd :paramtype result: int :param err: exception or None :paramtype err: Exception """ return def interact(self): # Defer importing .interactive as cmd2 is a slow import from .interactive import InteractiveApp if self.interactive_app_factory is None: self.interactive_app_factory = InteractiveApp self.interpreter = self.interactive_app_factory(self, self.command_manager, self.stdin, self.stdout, ) return self.interpreter.cmdloop() def get_fuzzy_matches(self, cmd): """return fuzzy matches of unknown command """ sep = '_' if self.command_manager.convert_underscores: sep = ' ' all_cmds = [k[0] for k in self.command_manager] dist = [] for candidate in sorted(all_cmds): prefix = candidate.split(sep)[0] # Give prefix match a very good score if candidate.startswith(cmd): dist.append((0, candidate)) continue # Levenshtein distance dist.append((utils.damerau_levenshtein(cmd, prefix, utils.COST)+1, candidate)) matches = [] match_distance = 0 for distance, candidate in sorted(dist): if distance > match_distance: if match_distance: # we copied all items with minimum distance, we are done break # we copied all items with distance=0, # now we match all candidates at the minimum distance match_distance = distance matches.append(candidate) return matches def run_subcommand(self, argv): try: subcommand = self.command_manager.find_command(argv) except ValueError as err: # If there was no exact match, try to find a fuzzy match the_cmd = argv[0] fuzzy_matches = self.get_fuzzy_matches(the_cmd) if fuzzy_matches: article = 'a' if self.NAME[0] in 'aeiou': article = 'an' self.stdout.write('%s: \'%s\' is not %s %s command. ' 'See \'%s --help\'.\n' % (self.NAME, ' '.join(argv), article, self.NAME, self.NAME)) self.stdout.write('Did you mean one of these?\n') for match in fuzzy_matches: self.stdout.write(' %s\n' % match) else: if self.options.debug: raise else: self.LOG.error(err) return 2 cmd_factory, cmd_name, sub_argv = subcommand kwargs = {} if 'cmd_name' in inspect.getfullargspec(cmd_factory.__init__).args: kwargs['cmd_name'] = cmd_name cmd = cmd_factory(self, self.options, **kwargs) result = 1 err = None try: self.prepare_to_run_command(cmd) full_name = (cmd_name if self.interactive_mode else ' '.join([self.NAME, cmd_name]) ) cmd_parser = cmd.get_parser(full_name) try: parsed_args = cmd_parser.parse_args(sub_argv) except SystemExit as ex: raise cmd2.exceptions.Cmd2ArgparseError from ex result = cmd.run(parsed_args) except BrokenPipeError as err1: result = _SIGPIPE_EXIT err = err1 raise except help.HelpExit: result = 0 except Exception as err1: err = err1 if self.options.debug: self.LOG.exception(err) else: self.LOG.error(err) except KeyboardInterrupt as err1: result = _SIGINT_EXIT err = err1 raise finally: try: self.clean_up(cmd, result, err) except Exception as err2: if self.options.debug: self.LOG.exception(err2) else: self.LOG.error('Could not clean up: %s', err2) del err return result