Advertisement

Building a Modern Async Configuration Management System with Type Safety and Hot Reloading

In this tutorial, we guide you through the design and functionality of AsyncConfig, a modern, async-first configuration management library for Python. We build it from the ground up to support powerful features, including type-safe dataclass-based configuration loading, multiple configuration sources (such as environment variables, files, and dictionaries), and hot reloading using watchdog. With a clean API and strong validation capabilities, AsyncConfig is ideal for both development and production environments. Throughout this tutorial, we demonstrate its capabilities using simple, advanced, and validation-focused use cases, all powered by asyncio to support non-blocking workflows.

import asyncio
import json
import os
import yaml
from pathlib import Path
from typing import Any, Dict, Optional, Type, TypeVar, Union, get_type_hints
from dataclasses import dataclass, field, MISSING
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging


__version__ = "0.1.0"
__author__ = "AsyncConfig Team"


T = TypeVar('T')


logger = logging.getLogger(__name__)

We begin by importing essential Python modules required for our configuration system. These include asyncio for asynchronous operations, yaml and json for file parsing, dataclasses for structured configuration, and watchdog for hot reloading. We also define some metadata and set up a logger to track events throughout the system.

class ConfigError(Exception):
    """Base exception for configuration errors."""
    pass




class ValidationError(ConfigError):
    """Raised when configuration validation fails."""
    pass




class LoadError(ConfigError):
    """Raised when configuration loading fails."""
    pass




@dataclass
class ConfigSource:
    """Represents a configuration source with priority and reload capabilities."""
    path: Optional[Path] = None
    env_prefix: Optional[str] = None
    data: Optional[Dict[str, Any]] = None
    priority: int = 0
    watch: bool = False
   
    def __post_init__(self):
        if self.path:
            self.path = Path(self.path)

We define a hierarchy of custom exceptions to handle different configuration-related errors, with ConfigError as the base class and more specific ones, such as ValidationError and LoadError, for targeted troubleshooting. We also create a ConfigSource data class to represent a single configuration source, which can be a file, environment variables, or a dictionary, and include support for prioritization and optional hot reloading.

class ConfigWatcher(FileSystemEventHandler):
    """File system event handler for configuration hot reloading."""
   
    def __init__(self, config_manager, paths: list[Path]):
        self.config_manager = config_manager
        self.paths = {str(p.resolve()) for p in paths}
        super().__init__()
   
    def on_modified(self, event):
        if not event.is_directory and event.src_path in self.paths:
            logger.info(f"Configuration file changed: {event.src_path}")
            asyncio.create_task(self.config_manager._reload_config())

We create the ConfigWatcher class by extending FileSystemEventHandler to enable hot reloading of configuration files. This class monitors specified file paths and triggers an asynchronous reload of the configuration through the associated manager whenever a file is modified. This ensures our application can adapt to configuration changes in real-time without needing a restart.

class AsyncConfigManager:
    """
    Modern async configuration manager with type safety and hot reloading.
   
    Features:
    - Async-first design
    - Type-safe configuration classes
    - Environment variable support
    - Hot reloading
    - Multiple source merging
    - Validation with detailed error messages
    """
   
    def __init__(self):
        self.sources: list[ConfigSource] = []
        self.observers: list[Observer] = []
        self.config_cache: Dict[str, Any] = {}
        self.reload_callbacks: list[callable] = []
        self._lock = asyncio.Lock()
   
    def add_source(self, source: ConfigSource) -> "AsyncConfigManager":
        """Add a configuration source."""
        self.sources.append(source)
        self.sources.sort(key=lambda x: x.priority, reverse=True)
        return self
   
    def add_file(self, path: Union[str, Path], priority: int = 0, watch: bool = False) -> "AsyncConfigManager":
        """Add a file-based configuration source."""
        return self.add_source(ConfigSource(path=path, priority=priority, watch=watch))
   
    def add_env(self, prefix: str, priority: int = 100) -> "AsyncConfigManager":
        """Add environment variable source."""
        return self.add_source(ConfigSource(env_prefix=prefix, priority=priority))
   
    def add_dict(self, data: Dict[str, Any], priority: int = 50) -> "AsyncConfigManager":
        """Add dictionary-based configuration source."""
        return self.add_source(ConfigSource(data=data, priority=priority))
   
    async def load_config(self, config_class: Type[T]) -> T:
        """Load and validate configuration into a typed dataclass."""
        async with self._lock:
            config_data = await self._merge_sources()
           
            try:
                return self._validate_and_convert(config_data, config_class)
            except Exception as e:
                raise ValidationError(f"Failed to validate configuration: {e}")
   
    async def _merge_sources(self) -> Dict[str, Any]:
        """Merge configuration from all sources based on priority."""
        merged = {}
       
        for source in reversed(self.sources):  
            try:
                data = await self._load_source(source)
                if data:
                    merged.update(data)
            except Exception as e:
                logger.warning(f"Failed to load source {source}: {e}")
       
        return merged
   
    async def _load_source(self, source: ConfigSource) -> Optional[Dict[str, Any]]:
        """Load data from a single configuration source."""
        if source.data:
            return source.data.copy()
       
        if source.path:
            return await self._load_file(source.path)
       
        if source.env_prefix:
            return self._load_env_vars(source.env_prefix)
       
        return None
   
    async def _load_file(self, path: Path) -> Dict[str, Any]:
        """Load configuration from a file."""
        if not path.exists():
            raise LoadError(f"Configuration file not found: {path}")
       
        try:
            content = await asyncio.to_thread(path.read_text)
           
            if path.suffix.lower() == '.json':
                return json.loads(content)
            elif path.suffix.lower() in ['.yml', '.yaml']:
                return yaml.safe_load(content) or {}
            else:
                raise LoadError(f"Unsupported file format: {path.suffix}")
       
        except Exception as e:
            raise LoadError(f"Failed to load {path}: {e}")
   
    def _load_env_vars(self, prefix: str) -> Dict[str, Any]:
        """Load environment variables with given prefix."""
        env_vars = {}
        prefix = prefix.upper() + '_'
       
        for key, value in os.environ.items():
            if key.startswith(prefix):
                config_key = key[len(prefix):].lower()
                env_vars[config_key] = self._convert_env_value(value)
       
        return env_vars
   
    def _convert_env_value(self, value: str) -> Any:
        """Convert environment variable string to appropriate type."""
        if value.lower() in ('true', 'false'):
            return value.lower() == 'true'
       
        try:
            if '.' in value:
                return float(value)
            return int(value)
        except ValueError:
            pass
       
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            pass
       
        return value
   
    def _validate_and_convert(self, data: Dict[str, Any], config_class: Type[T]) -> T:
        """Validate and convert data to the specified configuration class."""
        if not hasattr(config_class, '__dataclass_fields__'):
            raise ValidationError(f"{config_class.__name__} must be a dataclass")
       
        type_hints = get_type_hints(config_class)
        field_values = {}
       
        for field_name, field_info in config_class.__dataclass_fields__.items():
            if field_name in data:
                field_value = data[field_name]
               
                if hasattr(field_info.type, '__dataclass_fields__'):
                    if isinstance(field_value, dict):
                        field_value = self._validate_and_convert(field_value, field_info.type)
               
                field_values[field_name] = field_value
            elif field_info.default is not MISSING:
                field_values[field_name] = field_info.default
            elif field_info.default_factory is not MISSING:
                field_values[field_name] = field_info.default_factory()
            else:
                raise ValidationError(f"Required field '{field_name}' not found in configuration")
       
        return config_class(**field_values)
   
    async def start_watching(self):
        """Start watching configuration files for changes."""
        watch_paths = []
       
        for source in self.sources:
            if source.watch and source.path:
                watch_paths.append(source.path)
       
        if watch_paths:
            observer = Observer()
            watcher = ConfigWatcher(self, watch_paths)
           
            for path in watch_paths:
                observer.schedule(watcher, str(path.parent), recursive=False)
           
            observer.start()
            self.observers.append(observer)
            logger.info(f"Started watching {len(watch_paths)} configuration files")
   
    async def stop_watching(self):
        """Stop watching configuration files."""
        for observer in self.observers:
            observer.stop()
            observer.join()
        self.observers.clear()
   
    async def _reload_config(self):
        """Reload configuration from all sources."""
        try:
            self.config_cache.clear()
            for callback in self.reload_callbacks:
                await callback()
            logger.info("Configuration reloaded successfully")
        except Exception as e:
            logger.error(f"Failed to reload configuration: {e}")
   
    def on_reload(self, callback: callable):
        """Register a callback to be called when configuration is reloaded."""
        self.reload_callbacks.append(callback)
   
    async def __aenter__(self):
        await self.start_watching()
        return self
   
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.stop_watching()

We now implement the core of our system through the AsyncConfigManager class. It acts as the central controller for all configuration operations, adding sources (files, environment variables, dictionaries), merging them by priority, loading files asynchronously, and validating against typed dataclasses. We make the design async-first, allowing non-blocking I/O, and include a locking mechanism to ensure safe concurrent access. Also, we enable hot reloading by watching specified config files and triggering callbacks whenever a change is detected. This setup provides a flexible, robust, and modern foundation for dynamically managing application configurations.

async def load_config(config_class: Type[T],
                     config_file: Optional[Union[str, Path]] = None,
                     env_prefix: Optional[str] = None,
                     watch: bool = False) -> T:
    """
    Convenience function to quickly load configuration.
   
    Args:
        config_class: Dataclass to load configuration into
        config_file: Optional configuration file path
        env_prefix: Optional environment variable prefix
        watch: Whether to watch for file changes
   
    Returns:
        Configured instance of config_class
    """
    manager = AsyncConfigManager()
   
    if config_file:
        manager.add_file(config_file, priority=0, watch=watch)
   
    if env_prefix:
        manager.add_env(env_prefix, priority=100)
   
    return await manager.load_config(config_class)

We add a convenient helper function, load_config, to streamline the configuration setup process. With just one call, we can load settings from a file, environment variables, or both into a typed dataclass, optionally enabling hot reloading. This utility makes the library beginner-friendly while still supporting advanced use cases under the hood.

@dataclass
class DatabaseConfig:
    """Example database configuration."""
    host: str = "localhost"
    port: int = 5432
    username: str = "admin"
    password: str = ""
    database: str = "myapp"
    ssl_enabled: bool = False
    pool_size: int = 10




@dataclass
class AppConfig:
    """Example application configuration."""
    debug: bool = False
    log_level: str = "INFO"
    secret_key: str = ""
    database: DatabaseConfig = field(default_factory=DatabaseConfig)
    redis_url: str = "redis://localhost:6379"
    max_workers: int = 4




async def demo_simple_config():
    """Demo simple configuration loading."""
   
    sample_config = {
        "debug": True,
        "log_level": "DEBUG",
        "secret_key": "dev-secret-key",
        "database": {
            "host": "localhost",
            "port": 5432,
            "username": "testuser",
            "password": "testpass",
            "database": "testdb"
        },
        "max_workers": 8
    }
   
    manager = AsyncConfigManager()
    manager.add_dict(sample_config, priority=0)
   
    config = await manager.load_config(AppConfig)
   
    print("=== Simple Configuration Demo ===")
    print(f"Debug mode: {config.debug}")
    print(f"Log level: {config.log_level}")
    print(f"Database host: {config.database.host}")
    print(f"Database port: {config.database.port}")
    print(f"Max workers: {config.max_workers}")
   
    return config

We define two example configuration dataclasses: DatabaseConfig and AppConfig, which showcase how nested and typed configurations are structured. To demonstrate real usage, we write demo_simple_config(), where we load a basic dictionary into our config manager. This illustrates how effortlessly we can map structured data into type-safe Python objects, making configuration handling clean, readable, and maintainable.

async def demo_advanced_config():
    """Demo advanced configuration with multiple sources."""
   
    base_config = {
        "debug": False,
        "log_level": "INFO",
        "secret_key": "production-secret",
        "max_workers": 4
    }
   
    override_config = {
        "debug": True,
        "log_level": "DEBUG",
        "database": {
            "host": "dev-db.example.com",
            "port": 5433
        }
    }
   
    env_config = {
        "secret_key": "env-secret-key",
        "redis_url": "redis://prod-redis:6379"
    }
   
    print("n=== Advanced Configuration Demo ===")
   
    manager = AsyncConfigManager()
   
    manager.add_dict(base_config, priority=0)    
    manager.add_dict(override_config, priority=50)  
    manager.add_dict(env_config, priority=100)    
   
    config = await manager.load_config(AppConfig)
   
    print("Configuration sources merged:")
    print(f"Debug mode: {config.debug} (from override)")
    print(f"Log level: {config.log_level} (from override)")
    print(f"Secret key: {config.secret_key} (from env)")
    print(f"Database host: {config.database.host} (from override)")
    print(f"Redis URL: {config.redis_url} (from env)")
   
    return config




async def demo_validation():
    """Demo configuration validation."""
   
    print("n=== Configuration Validation Demo ===")
   
    valid_config = {
        "debug": True,
        "log_level": "DEBUG",
        "secret_key": "test-key",
        "database": {
            "host": "localhost",
            "port": 5432
        }
    }
   
    manager = AsyncConfigManager()
    manager.add_dict(valid_config, priority=0)
   
    try:
        config = await manager.load_config(AppConfig)
        print("✓ Valid configuration loaded successfully")
        print(f"  Database SSL: {config.database.ssl_enabled} (default value)")
        print(f"  Database pool size: {config.database.pool_size} (default value)")
    except ValidationError as e:
        print(f"✗ Validation error: {e}")
   
    incomplete_config = {
        "debug": True,
        "log_level": "DEBUG"
    }
   
    manager2 = AsyncConfigManager()
    manager2.add_dict(incomplete_config, priority=0)
   
    try:
        config2 = await manager2.load_config(AppConfig)
        print("✓ Configuration with defaults loaded successfully")
        print(f"  Secret key: '{config2.secret_key}' (default empty string)")
    except ValidationError as e:
        print(f"✗ Validation error: {e}")

We demonstrate advanced features of our config system through two examples. In demo_advanced_config(), we demonstrate how multiple configuration sources, base, override, and environment, are merged based on their priority, with higher-priority sources taking precedence. This highlights the flexibility of managing environment-specific overrides. In demo_validation(), we validate both complete and partial configurations. The system automatically fills in missing fields with defaults where possible. It throws clear ValidationErrors when required fields are missing, ensuring type safety and robust configuration management in real-world applications.

async def run_demos():
    """Run all demonstration functions."""
    try:
        await demo_simple_config()
        await demo_advanced_config()
        await demo_validation()
        print("n=== All demos completed successfully! ===")
    except Exception as e:
        print(f"Demo error: {e}")
        import traceback
        traceback.print_exc()






await run_demos()


if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            print("Running in Jupyter/IPython environment")
            print("Use: await run_demos()")
        else:
            asyncio.run(run_demos())
    except RuntimeError:
        asyncio.run(run_demos())

We conclude the tutorial with run_demos(), a utility that sequentially executes all demonstration functions, covering simple loading, multi-source merging, and validation. To support both Jupyter and standard Python environments, we include conditional logic for running the demos appropriately. This ensures our configuration system is easy to test, showcase, and integrate into a variety of workflows right out of the box.

In conclusion, we successfully demonstrate how AsyncConfig provides a robust and extensible foundation for managing configuration in modern Python applications. We see how easy it is to merge multiple sources, validate configurations against typed schemas, and respond to live file changes in real-time. Whether we’re building microservices, async backends, or CLI tools, this library offers a flexible and developer-friendly way to manage configuration securely and efficiently.


Check out the Full Codes. All credit for this research goes to the researchers of this project.

Sponsorship Opportunity: Reach the most influential AI developers in US and Europe. 1M+ monthly readers, 500K+ community builders, infinite possibilities. [Explore Sponsorship]

The post Building a Modern Async Configuration Management System with Type Safety and Hot Reloading appeared first on MarkTechPost.