Source code for repoman.common.config

#!/usr/bin/env python
import os
import logging

from six.moves import StringIO
from six.moves import configparser as cp

from .stores import STORES
from .filters import FILTERS
from .sources import SOURCES


DEFAULT_CONFIG = """
[main]
# Comma separated list of paths where repositories can be found and/or created
allowed_repo_paths =

# Valid values are 'generate', or a path
temp_dir = generate

# Path to the signing key, if empty it will not sign
# If the passphrase is 'ask', it will interactively prompt for it (unless
# passed through command line)
singing_key =
signing_passphrase = ask

# Which classes to load for each element, comma-separated list of class names
stores = all
filters = all
sources = all

# Sometimes you don't want to verify the ssl certs of the urls you use to fetch
# artifacts for (don't set to false if you are not 100% sure)
verify_ssl = true

# What to do whet a source to be added has no artifacts valid values are
# fail|warn|ignore
on_empty_source = fail
"""

logger = logging.getLogger(__name__ )  # flake8: noqa


[docs]class BadConfigError(Exception): pass # flake8: noqa
[docs]def update_conf_from_plugin(config, plugins, prefix): # load all the configs from the sotres, on their sections for plugin in plugins.itervalues(): conf_section = prefix + '.' + plugin.CONFIG_SECTION if not config.has_section(conf_section): config.add_section(conf_section) for opt_name, opt_value in plugin.DEFAULT_CONFIG.iteritems(): if not config.has_option(conf_section, opt_name): config.set(conf_section, opt_name, opt_value) else: print(config.get(conf_section, opt_name))
[docs]class Config(object): """ Configuration object to wrap some config values. It keeps the configuration objects, one with the default values for all the sections and one with all the custom ones (from config files or set after). The resolution order is: custom_config(current_section -> main_section) -> default_config(current_section -> main_section) """ def __init__(self, path=None, section='main'): self.section = section # load the specified file, if any self.config = cp.SafeConfigParser() self.config.add_section(self.section) if path: res = self.load(path) if not res: raise BadConfigError('Unable to load config %s' % path) self.default_config = cp.SafeConfigParser() self.default_config.readfp(StringIO(DEFAULT_CONFIG)) self.load_plugins()
[docs] def load_plugins(self): # load all the configs from the plugins, on their sections update_conf_from_plugin(self.default_config, STORES, 'store') update_conf_from_plugin(self.default_config, FILTERS, 'filter') update_conf_from_plugin(self.default_config, SOURCES, 'source')
[docs] def load(self, path): return self.config.read((os.path.expanduser(path),))
def __getattr__(self, what): try: val = getattr(self.config, what) except AttributeError: val = getattr(self.default_config, what) return val
[docs] def set(self, entry, value): if not self.config.has_section(self.section): self.config.add_section(self.section) return self.config.set(self.section, entry, value)
def _resolve_retrieval(self, entry, func_name): try: val = getattr(self.config, func_name)(self.section, entry) except (cp.NoOptionError, cp.NoSectionError): try: val = getattr(self.config, func_name)('main', entry) except (cp.NoOptionError, cp.NoSectionError): try: val = getattr( self.default_config, func_name )(self.section, entry) except (cp.NoOptionError, cp.NoSectionError): val = getattr( self.default_config, func_name )('main', entry) return val
[docs] def get(self, entry, default=None): try: val = self._resolve_retrieval(entry, 'get') except (cp.NoOptionError, cp.NoSectionError): if default is not None: val = default else: raise return val
[docs] def getboolean(self, entry, default=None): try: val = self._resolve_retrieval(entry, 'getboolean') except (cp.NoOptionError, cp.NoSectionError): if default is not None: val = default else: raise return val
[docs] def getint(self, entry, default=None): try: val = self._resolve_retrieval(entry, 'getint') except (cp.NoOptionError, cp.NoSectionError): if default is not None: val = default else: raise return val
[docs] def getfloat(self, entry, default=None): try: val = self._resolve_retrieval(entry, 'getfloat') except (cp.NoOptionError, cp.NoSectionError): if default is not None: val = default else: raise return val
[docs] def getarray(self, entry, default=None): val = self.get(entry, default) val = [ elem.strip() for elem in val.replace(',', '\n').splitlines() if elem.strip() ] return val
[docs] def getdict(self, entry, default=None): val = self.get(entry, default) try: val = dict([ [item.strip() for item in elem.strip().split('=')] for elem in val.replace(',', '\n').splitlines() if elem.strip() ]) except Exception: raise RuntimeError( 'Wrongly formatted option %s, expected a dict-like string in ' 'the form "%s = key1=val1, key2=val2..."' % (entry, entry) ) return val
[docs] def get_section(self, section): new_config = Config(section=section) new_config.config = self.config new_config.default_config = self.default_config return new_config
[docs] def add_to_section(self, section, option, value): if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, option, value)
def __str__(self): my_str = '### Defaults:' for section in self.default_config.sections(): my_str += "\n[%s]\n" % section for option in self.default_config.options(section): my_str += "%s = %s\n" % ( option, self.default_config.get(section, option), ) my_str += '\n\n### Config' for section in self.config.sections(): my_str += "\n[%s]\n" % section for option in self.config.options(section): my_str += "%s = %s\n" % ( option, self.config.get(section, option, ''), ) my_str += '\n' return my_str