"""
ConfigStore for runtime configuration management.
Provides thread-safe access to configuration with support for
dynamic updates and change subscriptions.
"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass, is_dataclass
from typing import Any, Callable, Dict, Iterator, Optional, Type
from varlord.resolver import Resolver
from varlord.sources.base import ChangeEvent
[docs]
@dataclass
class ConfigDiff:
"""Represents changes between two configuration snapshots."""
added: Dict[str, Any]
modified: Dict[str, tuple[Any, Any]] # (old_value, new_value)
deleted: Dict[str, Any]
[docs]
class ConfigStore:
"""Thread-safe configuration store with dynamic update support.
Provides:
- Atomic configuration snapshots
- Thread-safe get() and attribute access
- Change subscriptions
- Automatic validation on updates
"""
[docs]
def __init__(
self,
resolver: Resolver,
model: Type[Any],
):
"""Initialize ConfigStore.
Args:
resolver: Resolver for merging sources
model: Dataclass model for type conversion and validation
Note:
Watch is automatically enabled if any source supports it.
"""
self._resolver = resolver
self._model = model
# Thread-safe storage
self._lock = threading.RLock()
self._config: Optional[Any] = None
self._config_dict: Dict[str, Any] = {}
# Subscribers
self._subscribers: list[Callable[[Any, ConfigDiff], None]] = []
# Watch thread
self._watch_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Initial load
self._reload()
# Automatically enable watch if any source supports it
if self._has_watchable_sources():
self._start_watching()
def _reload(self) -> None:
"""Reload configuration from all sources."""
with self._lock:
try:
# Resolve configuration
config_dict = self._resolver.resolve()
# Convert to model instance
new_config = self._dict_to_model(config_dict)
# Validate (basic check - model instantiation validates types)
# If validation fails, exception will be raised
# Calculate diff
diff = self._calculate_diff(self._config_dict, config_dict)
# Atomically replace
self._config = new_config
self._config_dict = config_dict
# Notify subscribers
if diff.added or diff.modified or diff.deleted:
for callback in self._subscribers:
try:
callback(new_config, diff)
except Exception:
# Don't let subscriber errors break the update
pass
except Exception:
# Fail-safe: keep old configuration on error
# In production, you might want to log this
if self._config is None:
raise # First load must succeed
# Otherwise, silently keep old config
def _dict_to_model(self, config_dict: Dict[str, Any]) -> Any:
"""Convert dictionary to model instance.
Supports both flat keys (host) and nested keys (db.host) with automatic
mapping to nested dataclass structures.
Args:
config_dict: Configuration dictionary with keys in dot notation (e.g., "db.host")
Returns:
Model instance
"""
if not is_dataclass(self._model):
raise TypeError(f"Model must be a dataclass, got {type(self._model)}")
# Convert flat dict with dot notation to nested structure
nested_dict = self._flatten_to_nested(config_dict, self._model)
# Create model instance
return self._model(**nested_dict)
def _flatten_to_nested(self, flat_dict: Dict[str, Any], model: type) -> Dict[str, Any]:
"""Convert flat dict with dot notation to nested structure.
Example:
{"db.host": "localhost", "db.port": 5432, "host": "0.0.0.0"}
→ {"db": {"host": "localhost", "port": 5432}, "host": "0.0.0.0"}
Args:
flat_dict: Flat dictionary with dot-notation keys
model: Dataclass model to map to
Returns:
Nested dictionary matching the model structure
"""
from dataclasses import asdict, fields
from varlord.converters import convert_value
field_info = {f.name: f for f in fields(model)}
result: Dict[str, Any] = {}
# Step 1: Convert all dataclass instances in flat_dict to dicts
flat_dict_processed = {}
for key, value in flat_dict.items():
if is_dataclass(type(value)):
flat_dict_processed[key] = asdict(value)
else:
flat_dict_processed[key] = value
# Step 2: Process flat keys first (non-nested)
for key, value in flat_dict_processed.items():
if "." not in key:
if key in field_info:
field = field_info[key]
try:
converted_value = convert_value(value, field.type, key=key)
result[key] = converted_value
except (ValueError, TypeError):
result[key] = value
# Step 3: Process nested keys
for key, value in flat_dict_processed.items():
if "." in key:
parts = key.split(".", 1)
parent_key = parts[0]
child_key = parts[1]
if parent_key in field_info:
field = field_info[parent_key]
if is_dataclass(field.type):
# Initialize parent dict if needed
if parent_key not in result:
# Use value from flat_dict_processed if available
if parent_key in flat_dict_processed:
parent_value = flat_dict_processed[parent_key]
if isinstance(parent_value, dict):
result[parent_key] = parent_value.copy()
else:
result[parent_key] = {}
else:
result[parent_key] = {}
elif not isinstance(result[parent_key], dict):
result[parent_key] = {}
# Recursively process nested structure
# First, get existing nested values to preserve them
existing_nested = {}
if parent_key in result and isinstance(result[parent_key], dict):
for k, v in result[parent_key].items():
if is_dataclass(type(v)):
existing_nested[k] = asdict(v)
elif isinstance(v, dict):
existing_nested[k] = v.copy()
else:
existing_nested[k] = v
# Merge existing values with the new nested key
nested_flat = {child_key: value}
if existing_nested:
# Merge existing nested values into nested_flat for recursive processing
for k, v in existing_nested.items():
if k not in nested_flat:
nested_flat[k] = v
nested_result = self._flatten_to_nested(nested_flat, field.type)
# Update result[parent_key] with nested_result (all values are now in nested_result)
for nested_key, nested_value in nested_result.items():
if is_dataclass(type(nested_value)):
result[parent_key][nested_key] = asdict(nested_value)
else:
result[parent_key][nested_key] = nested_value
# Step 4: Convert nested dicts to dataclass instances with type conversion
for key, value in list(result.items()):
if key in field_info:
field = field_info[key]
if is_dataclass(field.type) and isinstance(value, dict):
# First, convert any dataclass instances in value to dicts
value_dict = {}
for nested_key, nested_value in value.items():
if is_dataclass(type(nested_value)):
value_dict[nested_key] = asdict(nested_value)
else:
value_dict[nested_key] = nested_value
# Recursively process and convert types
nested_instance = self._flatten_to_nested(value_dict, field.type)
# Convert all values to correct types
nested_fields = {f.name: f for f in fields(field.type)}
for nested_key, nested_value in nested_instance.items():
if nested_key in nested_fields:
nested_field = nested_fields[nested_key]
try:
nested_instance[nested_key] = convert_value(
nested_value, nested_field.type, key=f"{key}.{nested_key}"
)
except (ValueError, TypeError):
pass
result[key] = field.type(**nested_instance)
return result
def _calculate_diff(self, old_dict: Dict[str, Any], new_dict: Dict[str, Any]) -> ConfigDiff:
"""Calculate difference between two configuration dictionaries.
Args:
old_dict: Old configuration
new_dict: New configuration
Returns:
ConfigDiff object
"""
added = {k: v for k, v in new_dict.items() if k not in old_dict}
modified = {
k: (old_dict[k], v) for k, v in new_dict.items() if k in old_dict and old_dict[k] != v
}
deleted = {k: old_dict[k] for k in old_dict if k not in new_dict}
return ConfigDiff(added=added, modified=modified, deleted=deleted)
def _has_watchable_sources(self) -> bool:
"""Check if any source supports watching.
Returns:
True if at least one source supports watching (supports_watch() returns True).
"""
for source in self._resolver._sources:
if source.supports_watch():
return True
return False
def _start_watching(self) -> None:
"""Start watching for configuration changes."""
if self._watch_thread and self._watch_thread.is_alive():
return
def watch_loop():
"""Watch loop for monitoring changes."""
# Watch each source that supports watching
watch_threads = []
for source in self._resolver._sources:
try:
# Only watch sources that explicitly support it
if source.supports_watch():
watch_iter = source.watch()
# Start a thread for each source's watch stream
def source_watch(source_name: str, watch_iter: Iterator[ChangeEvent]):
"""Watch a single source."""
backoff = 1.0
max_backoff = 60.0
while not self._stop_event.is_set():
try:
for event in watch_iter:
if self._stop_event.is_set():
break
# On change, reload configuration
self._reload()
backoff = 1.0 # Reset backoff on success
except StopIteration:
# Iterator exhausted, exit
break
except Exception:
# On error, try to reconnect after delay with backoff
if not self._stop_event.is_set():
time.sleep(backoff)
backoff = min(backoff * 2, max_backoff)
# Try to get a new iterator
try:
watch_iter = source.watch()
except Exception:
pass
t = threading.Thread(
target=source_watch,
args=(source.name, watch_iter),
daemon=True,
)
t.start()
watch_threads.append(t)
except Exception:
# Source doesn't support watching or watch() failed
pass
# Wait for stop event
self._stop_event.wait()
self._watch_thread = threading.Thread(target=watch_loop, daemon=True)
self._watch_thread.start()
[docs]
def get(self) -> Any:
"""Get current configuration (thread-safe).
Returns:
Current model instance
"""
with self._lock:
return self._config
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Get current configuration as dictionary (thread-safe).
Returns:
Current configuration dictionary
"""
with self._lock:
return self._config_dict.copy()
[docs]
def subscribe(self, callback: Callable[[Any, ConfigDiff], None]) -> None:
"""Subscribe to configuration changes.
Args:
callback: Function called with (new_config, diff) on changes
Note:
Callbacks are called when:
- Configuration changes are detected via watch (if any source supports it)
- Manual reload() is called and configuration has changed
If no sources support watch, callbacks will only be called on manual reload().
"""
with self._lock:
self._subscribers.append(callback)
[docs]
def reload(self) -> None:
"""Manually reload configuration from sources."""
self._reload()
[docs]
def __getattr__(self, name: str) -> Any:
"""Allow attribute access to configuration."""
config = self.get()
return getattr(config, name)
[docs]
def __repr__(self) -> str:
"""Return string representation."""
watching = self._watch_thread is not None and self._watch_thread.is_alive()
return f"<ConfigStore(model={self._model.__name__}, watching={watching})>"