# -*- coding: utf-8 -*- # Copyright (C) 2012 Anaconda, Inc # SPDX-License-Identifier: BSD-3-Clause """ A generalized application configuration utility. Features include: - lazy eval - merges configuration files - parameter type validation, with custom validation - parameter aliases Easily extensible to other source formats, e.g. json and ini Limitations: - at the moment only supports a "flat" config structure; no nested data structures """ from __future__ import absolute_import, division, print_function, unicode_literals from abc import ABCMeta, abstractmethod from collections import defaultdict try: from collections.abc import Mapping except ImportError: from collections import Mapping from glob import glob from itertools import chain from logging import getLogger from os import environ, stat from os.path import basename, join, expandvars from stat import S_IFDIR, S_IFMT, S_IFREG from enum import Enum, EnumMeta from .compat import (binary_type, isiterable, iteritems, itervalues, odict, primitive_types, string_types, text_type, with_metaclass) from .constants import NULL from .path import expand from .serialize import yaml_load from .. import CondaError, CondaMultiError from .._vendor.auxlib.collection import AttrDict, first, last, make_immutable from .._vendor.auxlib.exceptions import ThisShouldNeverHappenError from .._vendor.auxlib.type_coercion import TypeCoercionError, typify_data_structure from .._vendor.frozendict import frozendict from .._vendor.boltons.setutils import IndexedSet from .._vendor.toolz import concat, concatv, excepts, merge, unique try: # pragma: no cover from ruamel_yaml.comments import CommentedSeq, CommentedMap from ruamel_yaml.reader import ReaderError from ruamel_yaml.scanner import ScannerError except ImportError: # pragma: no cover from ruamel.yaml.comments import CommentedSeq, CommentedMap # pragma: no cover from ruamel.yaml.reader import ReaderError from ruamel.yaml.scanner import ScannerError log = getLogger(__name__) EMPTY_MAP = frozendict() def pretty_list(iterable, padding=' '): # TODO: move elsewhere in conda.common if not isiterable(iterable): iterable = [iterable] try: return '\n'.join("%s- %s" % (padding, item) for item in iterable) except TypeError: return pretty_list([iterable], padding) def pretty_map(dictionary, padding=' '): return '\n'.join("%s%s: %s" % (padding, key, value) for key, value in iteritems(dictionary)) def expand_environment_variables(unexpanded): if isinstance(unexpanded, string_types) or isinstance(unexpanded, binary_type): return expandvars(unexpanded) else: return unexpanded class ConfigurationError(CondaError): pass class ConfigurationLoadError(ConfigurationError): def __init__(self, path, message_addition='', **kwargs): message = "Unable to load configuration file.\n path: %(path)s\n" super(ConfigurationLoadError, self).__init__(message + message_addition, path=path, **kwargs) class ValidationError(ConfigurationError): def __init__(self, parameter_name, parameter_value, source, msg=None, **kwargs): self.parameter_name = parameter_name self.parameter_value = parameter_value self.source = source super(ValidationError, self).__init__(msg, **kwargs) class MultipleKeysError(ValidationError): def __init__(self, source, keys, preferred_key): self.source = source self.keys = keys msg = ("Multiple aliased keys in file %s:\n" "%s" "Must declare only one. Prefer '%s'" % (source, pretty_list(keys), preferred_key)) super(MultipleKeysError, self).__init__(preferred_key, None, source, msg=msg) class InvalidTypeError(ValidationError): def __init__(self, parameter_name, parameter_value, source, wrong_type, valid_types, msg=None): self.wrong_type = wrong_type self.valid_types = valid_types if msg is None: msg = ("Parameter %s = %r declared in %s has type %s.\n" "Valid types:\n%s" % (parameter_name, parameter_value, source, wrong_type, pretty_list(valid_types))) super(InvalidTypeError, self).__init__(parameter_name, parameter_value, source, msg=msg) class InvalidElementTypeError(InvalidTypeError): def __init__(self, parameter_name, parameter_value, source, wrong_type, valid_types, index_or_key): qualifier = "at index" if isinstance(index_or_key, int) else "for key" msg = ("Parameter %s declared in %s has invalid element %r %s %s.\n" "Valid element types:\n" "%s." % (parameter_name, source, parameter_value, qualifier, index_or_key, pretty_list(valid_types))) super(InvalidElementTypeError, self).__init__(parameter_name, parameter_value, source, wrong_type, valid_types, msg=msg) class CustomValidationError(ValidationError): def __init__(self, parameter_name, parameter_value, source, custom_message): msg = ("Parameter %s = %r declared in %s is invalid.\n" "%s" % (parameter_name, parameter_value, source, custom_message)) super(CustomValidationError, self).__init__(parameter_name, parameter_value, source, msg=msg) class MultiValidationError(CondaMultiError, ConfigurationError): def __init__(self, errors, *args, **kwargs): super(MultiValidationError, self).__init__(errors, *args, **kwargs) def raise_errors(errors): if not errors: return True elif len(errors) == 1: raise errors[0] else: raise MultiValidationError(errors) class ParameterFlag(Enum): final = "final" top = "top" bottom = "bottom" def __str__(self): return "%s" % self.value @classmethod def from_name(cls, name): return cls[name] @classmethod def from_value(cls, value): return cls(value) @classmethod def from_string(cls, string): try: string = string.strip('!#') return cls.from_value(string) except (ValueError, AttributeError): return None @with_metaclass(ABCMeta) class RawParameter(object): def __init__(self, source, key, raw_value): self.source = source self.key = key try: # ignore flake8 on this because it finds an error on py3 even though it is guarded self._raw_value = unicode(raw_value.decode('utf-8')) # NOQA except: self._raw_value = raw_value def __repr__(self): return text_type(vars(self)) @abstractmethod def value(self, parameter_obj): raise NotImplementedError() @abstractmethod def keyflag(self): raise NotImplementedError() @abstractmethod def valueflags(self, parameter_obj): raise NotImplementedError() @classmethod def make_raw_parameters(cls, source, from_map): if from_map: return dict((key, cls(source, key, from_map[key])) for key in from_map) return EMPTY_MAP class EnvRawParameter(RawParameter): source = 'envvars' def value(self, parameter_obj): if hasattr(parameter_obj, 'string_delimiter'): assert isinstance(self._raw_value, string_types) string_delimiter = getattr(parameter_obj, 'string_delimiter') # TODO: add stripping of !important, !top, and !bottom return tuple(v for v in ( vv.strip() for vv in self._raw_value.split(string_delimiter) ) if v) else: return self.__important_split_value[0].strip() def keyflag(self): return ParameterFlag.final if len(self.__important_split_value) >= 2 else None def valueflags(self, parameter_obj): if hasattr(parameter_obj, 'string_delimiter'): string_delimiter = getattr(parameter_obj, 'string_delimiter') # TODO: add stripping of !important, !top, and !bottom return tuple('' for _ in self._raw_value.split(string_delimiter)) else: return self.__important_split_value[0].strip() @property def __important_split_value(self): return self._raw_value.split("!important") @classmethod def make_raw_parameters(cls, appname): keystart = "{0}_".format(appname.upper()) raw_env = dict((k.replace(keystart, '', 1).lower(), v) for k, v in iteritems(environ) if k.startswith(keystart)) return super(EnvRawParameter, cls).make_raw_parameters(EnvRawParameter.source, raw_env) class ArgParseRawParameter(RawParameter): source = 'cmd_line' def value(self, parameter_obj): return make_immutable(self._raw_value) def keyflag(self): return None def valueflags(self, parameter_obj): return None if isinstance(parameter_obj, PrimitiveParameter) else () @classmethod def make_raw_parameters(cls, args_from_argparse): return super(ArgParseRawParameter, cls).make_raw_parameters(ArgParseRawParameter.source, args_from_argparse) class YamlRawParameter(RawParameter): # this class should encapsulate all direct use of ruamel.yaml in this module def __init__(self, source, key, raw_value, keycomment): self._keycomment = keycomment super(YamlRawParameter, self).__init__(source, key, raw_value) def value(self, parameter_obj): self.__process(parameter_obj) return self._value def keyflag(self): return ParameterFlag.from_string(self._keycomment) def valueflags(self, parameter_obj): self.__process(parameter_obj) return self._valueflags def __process(self, parameter_obj): if hasattr(self, '_value'): return elif isinstance(self._raw_value, CommentedSeq): valuecomments = self._get_yaml_list_comments(self._raw_value) self._valueflags = tuple(ParameterFlag.from_string(s) for s in valuecomments) self._value = tuple(self._raw_value) elif isinstance(self._raw_value, CommentedMap): valuecomments = self._get_yaml_map_comments(self._raw_value) self._valueflags = dict((k, ParameterFlag.from_string(v)) for k, v in iteritems(valuecomments) if v is not None) self._value = frozendict(self._raw_value) elif isinstance(self._raw_value, primitive_types): self._valueflags = None self._value = self._raw_value else: raise ThisShouldNeverHappenError() # pragma: no cover @staticmethod def _get_yaml_key_comment(commented_dict, key): try: return commented_dict.ca.items[key][2].value.strip() except (AttributeError, KeyError): return None @staticmethod def _get_yaml_list_comments(value): items = value.ca.items raw_comment_lines = tuple(excepts((AttributeError, KeyError, TypeError), lambda q: items.get(q)[0].value.strip() or None, lambda _: None # default value on exception )(q) for q in range(len(value))) return raw_comment_lines @staticmethod def _get_yaml_map_comments(rawvalue): return dict((key, excepts((AttributeError, KeyError), lambda k: rawvalue.ca.items[k][2].value.strip() or None, lambda _: None # default value on exception )(key)) for key in rawvalue) @classmethod def make_raw_parameters(cls, source, from_map): if from_map: return dict((key, cls(source, key, from_map[key], cls._get_yaml_key_comment(from_map, key))) for key in from_map) return EMPTY_MAP @classmethod def make_raw_parameters_from_file(cls, filepath): with open(filepath, 'r') as fh: try: ruamel_yaml = yaml_load(fh) except ScannerError as err: mark = err.problem_mark raise ConfigurationLoadError( filepath, " reason: invalid yaml at line %(line)s, column %(column)s", line=mark.line, column=mark.column ) except ReaderError as err: raise ConfigurationLoadError(filepath, " reason: invalid yaml at position %(position)s", position=err.position) return cls.make_raw_parameters(filepath, ruamel_yaml) or EMPTY_MAP def load_file_configs(search_path): # returns an ordered map of filepath and dict of raw parameter objects def _file_yaml_loader(fullpath): assert fullpath.endswith((".yml", ".yaml")) or "condarc" in basename(fullpath), fullpath yield fullpath, YamlRawParameter.make_raw_parameters_from_file(fullpath) def _dir_yaml_loader(fullpath): for filepath in sorted(concatv(glob(join(fullpath, "*.yml")), glob(join(fullpath, "*.yaml")))): yield filepath, YamlRawParameter.make_raw_parameters_from_file(filepath) # map a stat result to a file loader or a directory loader _loader = { S_IFREG: _file_yaml_loader, S_IFDIR: _dir_yaml_loader, } def _get_st_mode(path): # stat the path for file type, or None if path doesn't exist try: return S_IFMT(stat(path).st_mode) except OSError: return None expanded_paths = tuple(expand(path) for path in search_path) stat_paths = (_get_st_mode(path) for path in expanded_paths) load_paths = (_loader[st_mode](path) for path, st_mode in zip(expanded_paths, stat_paths) if st_mode is not None) raw_data = odict(kv for kv in chain.from_iterable(load_paths)) return raw_data @with_metaclass(ABCMeta) class Parameter(object): _type = None _element_type = None def __init__(self, default, aliases=(), validation=None, expandvars=False): self._name = None self._names = None self.default = default self.aliases = aliases self._validation = validation self._expandvars = expandvars def _set_name(self, name): # this is an explicit method, and not a descriptor/setter # it's meant to be called by the Configuration metaclass self._name = name # lgtm [py/mutable-descriptor] _names = frozenset(x for x in chain(self.aliases, (name, ))) self._names = _names # lgtm [py/mutable-descriptor] return name @property def name(self): if self._name is None: # The Configuration metaclass should call the `_set_name` method. raise ThisShouldNeverHappenError() # pragma: no cover return self._name @property def names(self): if self._names is None: # The Configuration metaclass should call the `_set_name` method. raise ThisShouldNeverHappenError() # pragma: no cover return self._names def _raw_parameters_from_single_source(self, raw_parameters): # while supporting parameter name aliases, we enforce that only one definition is given # per data source keys = self.names & frozenset(raw_parameters.keys()) matches = {key: raw_parameters[key] for key in keys} numkeys = len(keys) if numkeys == 0: return None, None elif numkeys == 1: return next(itervalues(matches)), None elif self.name in keys: return matches[self.name], MultipleKeysError(raw_parameters[next(iter(keys))].source, keys, self.name) else: return None, MultipleKeysError(raw_parameters[next(iter(keys))].source, keys, self.name) def _get_all_matches(self, instance): # a match is a raw parameter instance matches = [] multikey_exceptions = [] for filepath, raw_parameters in iteritems(instance.raw_data): match, error = self._raw_parameters_from_single_source(raw_parameters) if match is not None: matches.append(match) if error: multikey_exceptions.append(error) return matches, multikey_exceptions @abstractmethod def _merge(self, matches): raise NotImplementedError() def _expand(self, data): if self._expandvars: # This is similar to conda._vendor.auxlib.type_coercion.typify_data_structure # It could be DRY-er but that would break SRP. if isinstance(data, Mapping): return type(data)((k, expand_environment_variables(v)) for k, v in iteritems(data)) elif isiterable(data): return type(data)(expand_environment_variables(v) for v in data) else: return expand_environment_variables(data) else: return data def __get__(self, instance, instance_type): # strategy is "extract and merge," which is actually just map and reduce # extract matches from each source in SEARCH_PATH # then merge matches together if self.name in instance._cache_: return instance._cache_[self.name] matches, errors = self._get_all_matches(instance) merged = self._merge(matches) if matches else self.default # We need to expand any environment variables before type casting. # Otherwise e.g. `my_bool_var: $BOOL` with BOOL=True would raise a TypeCoercionError. expanded = self._expand(merged) try: result = self.typify(expanded, "<>") except CustomValidationError as e: errors.append(e) else: errors.extend(self.collect_errors(instance, result)) raise_errors(errors) instance._cache_[self.name] = result # lgtm [py/uninitialized-local-variable] return result # lgtm [py/uninitialized-local-variable] def collect_errors(self, instance, value, source="<>"): """Validate a Parameter value. Args: instance (Configuration): The instance object to which the Parameter descriptor is attached. value: The value to be validated. """ errors = [] if not isinstance(value, self._type): errors.append(InvalidTypeError(self.name, value, source, type(value), self._type)) elif self._validation is not None: result = self._validation(value) if result is False: errors.append(ValidationError(self.name, value, source)) elif isinstance(result, string_types): errors.append(CustomValidationError(self.name, value, source, result)) return errors def _match_key_is_important(self, raw_parameter): return raw_parameter.keyflag() is ParameterFlag.final def _first_important_matches(self, matches): idx = first(enumerate(matches), lambda x: self._match_key_is_important(x[1]), apply=lambda x: x[0]) return matches if idx is None else matches[:idx+1] @staticmethod def _str_format_flag(flag): return " #!%s" % flag if flag is not None else '' @staticmethod def _str_format_value(value): if value is None: return 'None' return value @classmethod def repr_raw(cls, raw_parameter): raise NotImplementedError() def typify(self, value, source, name=None): element_type = self._element_type try: return typify_data_structure(value, element_type) except TypeCoercionError as e: if name is None: name = self.name msg = text_type(e) if issubclass(element_type, Enum): choices = ", ".join(map("'{}'".format, element_type.__members__.values())) msg += "\nValid choices for {}: {}".format(name, choices) raise CustomValidationError(name, e.value, source, msg) class PrimitiveParameter(Parameter): """Parameter type for a Configuration class that holds a single python primitive value. The python primitive types are str, int, float, complex, bool, and NoneType. In addition, python 2 has long and unicode types. """ def __init__(self, default, aliases=(), validation=None, element_type=None, expandvars=False): """ Args: default (Any): The parameter's default value. aliases (Iterable[str]): Alternate names for the parameter. validation (callable): Given a parameter value as input, return a boolean indicating validity, or alternately return a string describing an invalid value. Returning `None` also indicates a valid value. element_type (type or Tuple[type]): Type-validation of parameter's value. If None, type(default) is used. """ self._type = type(default) if element_type is None else element_type self._element_type = self._type super(PrimitiveParameter, self).__init__(default, aliases, validation, expandvars) def _merge(self, matches): important_match = first(matches, self._match_key_is_important, default=None) if important_match is not None: return important_match.value(self) last_match = last(matches, lambda x: x is not None, default=None) if last_match is not None: return last_match.value(self) raise ThisShouldNeverHappenError() # pragma: no cover def repr_raw(self, raw_parameter): return "%s: %s%s" % (raw_parameter.key, self._str_format_value(raw_parameter.value(self)), self._str_format_flag(raw_parameter.keyflag())) class SequenceParameter(Parameter): """Parameter type for a Configuration class that holds a sequence (i.e. list) of python primitive values. """ _type = tuple def __init__(self, element_type, default=(), aliases=(), validation=None, string_delimiter=',', expandvars=False): """ Args: element_type (type or Iterable[type]): The generic type of each element in the sequence. default (Iterable[str]): The parameter's default value. aliases (Iterable[str]): Alternate names for the parameter. validation (callable): Given a parameter value as input, return a boolean indicating validity, or alternately return a string describing an invalid value. """ self._element_type = element_type self.string_delimiter = string_delimiter super(SequenceParameter, self).__init__(default, aliases, validation, expandvars) def collect_errors(self, instance, value, source="<>"): errors = super(SequenceParameter, self).collect_errors(instance, value) element_type = self._element_type for idx, element in enumerate(value): if not isinstance(element, element_type): errors.append(InvalidElementTypeError(self.name, element, source, type(element), element_type, idx)) return errors def _merge(self, matches): # get matches up to and including first important_match # but if no important_match, then all matches are important_matches relevant_matches_and_values = tuple((match, match.value(self)) for match in self._first_important_matches(matches)) for match, value in relevant_matches_and_values: if not isinstance(value, tuple): raise InvalidTypeError(self.name, value, match.source, value.__class__.__name__, self._type.__name__) # get individual lines from important_matches that were marked important # these will be prepended to the final result def get_marked_lines(match, marker, parameter_obj): return tuple(line for line, flag in zip(match.value(parameter_obj), match.valueflags(parameter_obj)) if flag is marker) if match else () top_lines = concat(get_marked_lines(m, ParameterFlag.top, self) for m, _ in relevant_matches_and_values) # also get lines that were marked as bottom, but reverse the match order so that lines # coming earlier will ultimately be last bottom_lines = concat(get_marked_lines(m, ParameterFlag.bottom, self) for m, _ in reversed(relevant_matches_and_values)) # now, concat all lines, while reversing the matches # reverse because elements closer to the end of search path take precedence all_lines = concat(v for _, v in reversed(relevant_matches_and_values)) # stack top_lines + all_lines, then de-dupe top_deduped = tuple(unique(concatv(top_lines, all_lines))) # take the top-deduped lines, reverse them, and concat with reversed bottom_lines # this gives us the reverse of the order we want, but almost there # NOTE: for a line value marked both top and bottom, the bottom marker will win out # for the top marker to win out, we'd need one additional de-dupe step bottom_deduped = unique(concatv(reversed(tuple(bottom_lines)), reversed(top_deduped))) # just reverse, and we're good to go return tuple(reversed(tuple(bottom_deduped))) def repr_raw(self, raw_parameter): lines = list() lines.append("%s:%s" % (raw_parameter.key, self._str_format_flag(raw_parameter.keyflag()))) for q, value in enumerate(raw_parameter.value(self)): valueflag = raw_parameter.valueflags(self)[q] lines.append(" - %s%s" % (self._str_format_value(value), self._str_format_flag(valueflag))) return '\n'.join(lines) def _get_all_matches(self, instance): # this is necessary to handle argparse `action="append"`, which can't be set to a # default value of NULL # it also config settings like `channels: ~` matches, exceptions = super(SequenceParameter, self)._get_all_matches(instance) matches = tuple(m for m in matches if m._raw_value is not None) return matches, exceptions class MapParameter(Parameter): """Parameter type for a Configuration class that holds a map (i.e. dict) of python primitive values. """ _type = frozendict def __init__(self, element_type, default=None, aliases=(), validation=None, expandvars=False): """ Args: element_type (type or Iterable[type]): The generic type of each element. default (Mapping): The parameter's default value. If None, will be an empty dict. aliases (Iterable[str]): Alternate names for the parameter. validation (callable): Given a parameter value as input, return a boolean indicating validity, or alternately return a string describing an invalid value. """ self._element_type = element_type default = default and frozendict(default) or frozendict() super(MapParameter, self).__init__(default, aliases, validation, expandvars) def collect_errors(self, instance, value, source="<>"): errors = super(MapParameter, self).collect_errors(instance, value) if isinstance(value, Mapping): element_type = self._element_type errors.extend(InvalidElementTypeError(self.name, val, source, type(val), element_type, key) for key, val in iteritems(value) if not isinstance(val, element_type)) return errors def _merge(self, matches): # get matches up to and including first important_match # but if no important_match, then all matches are important_matches relevant_matches_and_values = tuple((match, match.value(self)) for match in self._first_important_matches(matches)) for match, value in relevant_matches_and_values: if not isinstance(value, Mapping): raise InvalidTypeError(self.name, value, match.source, value.__class__.__name__, self._type.__name__) # mapkeys with important matches def key_is_important(match, key): return match.valueflags(self).get(key) == ParameterFlag.final important_maps = tuple(dict((k, v) for k, v in iteritems(match_value) if key_is_important(match, k)) for match, match_value in relevant_matches_and_values) # dump all matches in a dict # then overwrite with important matches return frozendict(merge(concatv((v for _, v in relevant_matches_and_values), reversed(important_maps)))) def repr_raw(self, raw_parameter): lines = list() lines.append("%s:%s" % (raw_parameter.key, self._str_format_flag(raw_parameter.keyflag()))) for valuekey, value in iteritems(raw_parameter.value(self)): valueflag = raw_parameter.valueflags(self).get(valuekey) lines.append(" %s: %s%s" % (valuekey, self._str_format_value(value), self._str_format_flag(valueflag))) return '\n'.join(lines) def _get_all_matches(self, instance): # it also config settings like `proxy_servers: ~` matches, exceptions = super(MapParameter, self)._get_all_matches(instance) matches = tuple(m for m in matches if m._raw_value is not None) return matches, exceptions class ConfigurationType(type): """metaclass for Configuration""" def __init__(cls, name, bases, attr): super(ConfigurationType, cls).__init__(name, bases, attr) # call _set_name for each parameter cls.parameter_names = tuple(p._set_name(name) for name, p in iteritems(cls.__dict__) if isinstance(p, Parameter)) @with_metaclass(ConfigurationType) class Configuration(object): def __init__(self, search_path=(), app_name=None, argparse_args=None): # Currently, __init__ does a **full** disk reload of all files. # A future improvement would be to cache files that are already loaded. self.raw_data = odict() self._cache_ = dict() self._reset_callbacks = IndexedSet() self._validation_errors = defaultdict(list) self._set_search_path(search_path) self._set_env_vars(app_name) self._set_argparse_args(argparse_args) def _set_search_path(self, search_path): self._search_path = IndexedSet(search_path) self._set_raw_data(load_file_configs(search_path)) self._reset_cache() return self def _set_env_vars(self, app_name=None): self._app_name = app_name if not app_name: return self self.raw_data[EnvRawParameter.source] = EnvRawParameter.make_raw_parameters(app_name) self._reset_cache() return self def _set_argparse_args(self, argparse_args): # the argparse_args we store internally in this class as self._argparse_args # will be a mapping type, not a non-`dict` object like argparse_args is natively if hasattr(argparse_args, '__dict__'): # the argparse_args from argparse will be an object with a __dict__ attribute # and not a mapping type like this method will turn it into self._argparse_args = AttrDict((k, v) for k, v, in iteritems(vars(argparse_args)) if v is not NULL) elif not argparse_args: # argparse_args can be initialized as `None` self._argparse_args = AttrDict() else: # we're calling this method with argparse_args that are a mapping type, likely # already having been processed by this method before self._argparse_args = AttrDict((k, v) for k, v, in iteritems(argparse_args) if v is not NULL) source = ArgParseRawParameter.source self.raw_data[source] = ArgParseRawParameter.make_raw_parameters(self._argparse_args) self._reset_cache() return self def _set_raw_data(self, raw_data): self.raw_data.update(raw_data) self._reset_cache() return self def _reset_cache(self): self._cache_ = dict() for callback in self._reset_callbacks: callback() return self def register_reset_callaback(self, callback): self._reset_callbacks.add(callback) def check_source(self, source): # this method ends up duplicating much of the logic of Parameter.__get__ # I haven't yet found a way to make it more DRY though typed_values = {} validation_errors = [] raw_parameters = self.raw_data[source] for key in self.parameter_names: parameter = self.__class__.__dict__[key] match, multikey_error = parameter._raw_parameters_from_single_source(raw_parameters) if multikey_error: validation_errors.append(multikey_error) if match is not None: untyped_value = match.value(parameter) if untyped_value is None: if isinstance(parameter, SequenceParameter): untyped_value = () elif isinstance(parameter, MapParameter): untyped_value = {} try: typed_value = parameter.typify(untyped_value, match.source, name=match.key) except CustomValidationError as e: validation_errors.append(e) else: collected_errors = parameter.collect_errors(self, typed_value, match.source) if collected_errors: validation_errors.extend(collected_errors) else: typed_values[match.key] = typed_value # parameter.repr_raw(match) else: # this situation will happen if there is a multikey_error and none of the # matched keys is the primary key pass return typed_values, validation_errors def validate_all(self): validation_errors = list(chain.from_iterable(self.check_source(source)[1] for source in self.raw_data)) raise_errors(validation_errors) self.validate_configuration() @staticmethod def _collect_validation_error(func, *args, **kwargs): try: func(*args, **kwargs) except ConfigurationError as e: return e.errors if hasattr(e, 'errors') else e, return () def validate_configuration(self): errors = chain.from_iterable(Configuration._collect_validation_error(getattr, self, name) for name in self.parameter_names) post_errors = self.post_build_validation() raise_errors(tuple(chain.from_iterable((errors, post_errors)))) def post_build_validation(self): return () def collect_all(self): typed_values = odict() validation_errors = odict() for source in self.raw_data: typed_values[source], validation_errors[source] = self.check_source(source) raise_errors(tuple(chain.from_iterable(itervalues(validation_errors)))) return odict((k, v) for k, v in iteritems(typed_values) if v) def describe_parameter(self, parameter_name): # TODO, in Parameter base class, rename element_type to value_type if parameter_name not in self.parameter_names: parameter_name = '_' + parameter_name parameter = self.__class__.__dict__[parameter_name] assert isinstance(parameter, Parameter) # dedupe leading underscore from name name = parameter.name.lstrip('_') aliases = tuple(alias for alias in parameter.aliases if alias != name) description = self.get_descriptions().get(name, '') et = parameter._element_type if type(et) == EnumMeta: et = [et] if not isiterable(et): et = [et] element_types = tuple(_et.__name__ for _et in et) details = { 'parameter_type': parameter.__class__.__name__.lower().replace("parameter", ""), 'name': name, 'aliases': aliases, 'element_types': element_types, 'default_value': parameter.default, 'description': description.replace('\n', ' ').strip(), } if isinstance(parameter, SequenceParameter): details['string_delimiter'] = parameter.string_delimiter return details def list_parameters(self): return tuple(sorted(name.lstrip('_') for name in self.parameter_names)) def typify_parameter(self, parameter_name, value, source): # return a tuple with correct parameter name and typed-value if parameter_name not in self.parameter_names: parameter_name = '_' + parameter_name parameter = self.__class__.__dict__[parameter_name] assert isinstance(parameter, Parameter) return parameter.typify(value, source) def get_descriptions(self): raise NotImplementedError()