Source code for shinto.config

"""Config File handling."""

from __future__ import annotations

import configparser
import copy
import io
import json
import os
import re
from enum import Enum
from pathlib import Path
from typing import Any

import yaml


[docs]class ConfigType(Enum): """Config file types.""" YAML = "YAML" JSON = "JSON" INI = "INI"
[docs]class ConfigError(Exception): """Gets raised when an unsupported config file type is found."""
def _substitute_env_vars( data: dict[str, Any] | list[Any] | str, raise_on_missing: bool = False, ) -> dict[str, Any] | list[Any] | str: """ Recursively substitute environment variables in config data. Supports ${VAR_NAME} and ${VAR_NAME:default_value} syntax. Args: data: The data to process (dict, list, or string). raise_on_missing: If True, raises KeyError for missing env vars without defaults. Returns: The data with environment variables substituted. Raises: KeyError: If raise_on_missing is True and an env var is missing without a default. """ if isinstance(data, dict): return {key: _substitute_env_vars(value, raise_on_missing) for key, value in data.items()} if isinstance(data, list): return [_substitute_env_vars(item, raise_on_missing) for item in data] if isinstance(data, str): # Pattern to match ${VAR_NAME} or ${VAR_NAME:default_value} pattern = re.compile(r"\$\{([^}:]+)(?::([^}]*))?\}") def replace_match(match: re.Match[str]) -> str: var_name = match.group(1) default_value = match.group(2) env_value = os.environ.get(var_name) if env_value is not None: return env_value if default_value is not None: return default_value if raise_on_missing: msg = f"Environment variable '{var_name}' not found and no default provided" raise KeyError(msg) # Return the original placeholder if env var not found and no default return match.group(0) return pattern.sub(replace_match, data) return data def _merge_dicts(d1: dict, d2: dict) -> dict: """ Recursively merge two dictionaries. Overwrites values in d2 with values in d1. Args: d1 (dict): The first dictionary. d2 (dict): The second dictionary. Returns: dict: The merged dictionary. """ for key, value in d1.items(): if key in d2: if isinstance(d2[key], dict) and isinstance(value, dict): d2[key] = _merge_dicts(value, d2[key]) elif isinstance(d2[key], dict) or isinstance(value, dict): msg = f"Cannot merge {type(value)} with {type(d2[key])} on key: {key}" raise ValueError(msg) else: d2[key] = value else: d2[key] = value return d2 def _read_config_file(file_path: str) -> dict[str, Any]: """ Read config file and return as dict. Supported file types: YAML, JSON, INI. Args: file_path (str): The path to the config file. Returns: dict: The config data. """ file_extension = Path(file_path).suffix.lower() if file_extension in [".yaml", ".yml"]: with Path(file_path).open(encoding="utf-8") as yaml_file: return yaml.safe_load(yaml_file) elif file_extension in [".json", ".js"]: with Path(file_path).open(encoding="utf-8") as yaml_file: return json.load(yaml_file) elif file_extension == ".ini": config_data = configparser.ConfigParser() config_data.read(file_path) return {section: dict(config_data.items(section)) for section in config_data.sections()} else: msg = f"Unsupported config file extension: {file_extension}" raise ConfigError(msg)
[docs]def load_config_file( file_path: str, defaults: dict[str, Any] | None = None, substitute_env_vars: bool = True, raise_on_missing_env: bool = False, ) -> dict[str, Any]: """ Load config from file. Args: file_path (str): The path to the config file. defaults (dict): If provided, the default values to merge with the config file. substitute_env_vars (bool): If True, substitute environment variables in config values. raise_on_missing_env (bool): If True, raise error for missing env vars without defaults. Returns: dict: The config data. """ defaults = copy.deepcopy(defaults) or {} if not file_path or not Path(file_path).exists(): msg = f"Config file not found: {file_path}." raise FileNotFoundError(msg) config = _read_config_file(file_path) if substitute_env_vars: config = _substitute_env_vars(config, raise_on_missing_env) return _merge_dicts(config, defaults)
def _mask_sensitive_keys( data: dict[str, Any] | list[Any], sensitive_keys: list[str] | None = None, ) -> dict[str, Any] | list[Any]: """ Replace sensitive keys in a dictionary with ****. Args: data (dict | list): The data to mask. sensitive_keys (list): The sensitive keys to mask. Defaults to ["password", "pass", "passwd"]. Returns: (dict | list): The masked data. """ if sensitive_keys is None: sensitive_keys = ["password", "pass", "passwd"] if isinstance(data, dict): for key, value in data.items(): if key.lower() in sensitive_keys: data[key] = "****" else: data[key] = _mask_sensitive_keys(value) elif isinstance(data, list): data = [_mask_sensitive_keys(item) for item in data] return data
[docs]def output_config( configdata: dict, output_config_type: ConfigType = ConfigType.YAML, sensitive_keys: list[str] | None = None, ) -> str: """ Dump config dict to a output_config_type (yaml, json, ini). Args: configdata (dict): The config data to dump. output_config_type (ConfigType): The output config type. sensitive_keys (list): The sensitive keys to mask. Defaults to ["password", "pass", "passwd"]. Returns: str: The dumped config data. """ config_data = _mask_sensitive_keys(copy.deepcopy(configdata), sensitive_keys) if output_config_type == ConfigType.YAML: output = yaml.dump(config_data) elif output_config_type == ConfigType.JSON: output = json.dumps(config_data) elif output_config_type == ConfigType.INI: config = configparser.ConfigParser() for section, options in config_data.items(): config[section] = options output_stream = io.StringIO() config.write(output_stream) output = output_stream.getvalue() return output