"""
Etcd source.
Loads configuration from ``etcd`` with optional watch support for dynamic updates.
This is an optional source that requires the ``etcd`` extra.
"""
from __future__ import annotations
import threading
import warnings
from pathlib import Path
from typing import Any, Iterator, Mapping, Optional, Type
try:
# Suppress etcd3 deprecation warnings from protobuf
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
import etcd3
# Also suppress warnings from etcd3 submodules
warnings.filterwarnings("ignore", category=DeprecationWarning, module="etcd3")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="etcd3.*")
except (ImportError, TypeError):
# TypeError can occur with protobuf version incompatibility
# If etcd3 is installed but incompatible, treat it as unavailable
etcd3 = None # type: ignore
from varlord.sources.base import ChangeEvent, Source, normalize_key
[docs]
class Etcd(Source):
"""Source that loads configuration from ``etcd``.
Requires the ``etcd`` extra: pip install varlord[etcd]
Supports:
- Loading configuration from a prefix
- TLS/SSL certificate authentication
- User authentication
- Watching for changes (dynamic updates)
- Automatic reconnection on connection loss
Basic Example:
>>> source = Etcd(
... host="127.0.0.1",
... port=2379,
... prefix="/app/",
... )
>>> source.load()
{'host': '0.0.0.0', 'port': '9000'}
With TLS:
>>> source = Etcd(
... host="192.168.0.220",
... port=2379,
... prefix="/app/",
... ca_cert="./cert/ca.cert.pem",
... cert_key="./cert/key.pem",
... cert_cert="./cert/cert.pem",
... )
Note:
⚠️ 重要变更:不再提供 from_env() 类方法。
所有参数都通过 __init__ 传递,调用方负责获取初始配置信息。
可以使用 Env source 或其他方式获取连接参数。
"""
[docs]
def __init__(
self,
host: str = "127.0.0.1",
port: int = 2379,
prefix: str = "/",
watch: bool = False,
timeout: Optional[int] = None,
model: Optional[Type[Any]] = None,
ca_cert: Optional[str] = None,
cert_key: Optional[str] = None,
cert_cert: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
source_id: Optional[str] = None,
) -> None:
"""Initialize Etcd source.
Args:
host: Etcd host
port: Etcd port
prefix: Key prefix to load (e.g., "/app/")
watch: Whether to enable watch support
timeout: Connection timeout in seconds
model: Model to filter ``etcd`` keys.
Only keys that map to model fields will be loaded.
Model is required and will be auto-injected by Config.
ca_cert: Path to CA certificate file for TLS
cert_key: Path to client key file for TLS
cert_cert: Path to client certificate file for TLS
user: Username for authentication (optional)
password: Password for authentication (optional)
source_id: Optional unique identifier (default: auto-generated)
Raises:
ImportError: If etcd3 is not installed
Note:
⚠️ 重要变更:不再提供 from_env() 类方法。
所有参数都通过 __init__ 传递,调用方负责获取初始配置信息。
"""
# Generate ID before calling super() if not provided
if source_id is None:
prefix_normalized = prefix.rstrip("/") + "/" if prefix else "/"
source_id = f"etcd:{host}#{port}#{prefix_normalized}"
super().__init__(model=model, source_id=source_id)
if etcd3 is None:
raise ImportError(
"etcd3 is required for Etcd source. Install it with: pip install varlord[etcd]"
)
self._host = host
self._port = port
self._prefix = prefix.rstrip("/") + "/" if prefix else "/"
self._watch = watch
self._timeout = timeout
self._ca_cert = ca_cert
self._cert_key = cert_key
self._cert_cert = cert_cert
self._user = user
self._password = password
# Client will be created lazily
self._client: Optional[Any] = None
self._lock = threading.Lock()
def _generate_id(self) -> str:
"""Generate unique ID for Etcd source.
⚠️ 注意:使用 # 作为分隔符,避免 host:port 格式导致两个冒号
例如:etcd:127.0.0.1:2379 会变成 etcd:127.0.0.1:2379,有两个冒号
使用 # 分隔:etcd:127.0.0.1#2379#/app/
"""
return f"etcd:{self._host}#{self._port}#{self._prefix}"
def _get_client(self):
"""Get or create ``etcd`` client."""
if self._client is None:
with self._lock:
if self._client is None:
# Build client kwargs
client_kwargs = {
"host": self._host,
"port": self._port,
}
if self._timeout is not None:
client_kwargs["timeout"] = self._timeout
if self._ca_cert is not None:
# Validate certificate file exists
ca_path = Path(self._ca_cert)
if not ca_path.exists():
raise FileNotFoundError(
f"CA certificate file not found: {self._ca_cert}"
)
client_kwargs["ca_cert"] = self._ca_cert
if self._cert_key is not None:
# Validate key file exists
key_path = Path(self._cert_key)
if not key_path.exists():
raise FileNotFoundError(f"Client key file not found: {self._cert_key}")
client_kwargs["cert_key"] = self._cert_key
if self._cert_cert is not None:
# Validate certificate file exists
cert_path = Path(self._cert_cert)
if not cert_path.exists():
raise FileNotFoundError(
f"Client certificate file not found: {self._cert_cert}"
)
client_kwargs["cert_cert"] = self._cert_cert
if self._user is not None:
client_kwargs["user"] = self._user
if self._password is not None:
client_kwargs["password"] = self._password
# Security warning: production should use TLS
if self._ca_cert is None:
warnings.warn(
f"Etcd connection to {self._host}:{self._port} is not using TLS. "
"This is not recommended for production environments.",
RuntimeWarning,
stacklevel=3,
)
self._client = etcd3.client(**client_kwargs)
return self._client
@property
def name(self) -> str:
"""Return source name."""
return "etcd"
[docs]
def load(self) -> Mapping[str, Any]:
"""Load configuration from ``etcd``, filtered by model fields.
Returns:
A mapping of configuration keys to values.
Keys are normalized (prefix removed, converted to dot notation).
Only includes keys that map to model fields.
Raises:
ValueError: If model is not provided
"""
if not self._model:
raise ValueError("Etcd source requires model (should be auto-injected by Config)")
try:
from varlord.metadata import get_all_field_keys
# Get all valid field keys from model
valid_keys = get_all_field_keys(self._model)
client = self._get_client()
result: dict[str, Any] = {}
# Get all keys with the prefix
prefix_bytes = self._prefix.encode("utf-8")
for value, metadata in client.get_prefix(prefix_bytes):
if metadata is None:
continue
# Extract key (remove prefix)
key_bytes = metadata.key
if not key_bytes.startswith(prefix_bytes):
continue
# Convert to string and normalize
key_str = key_bytes[len(prefix_bytes) :].decode("utf-8")
# Convert / to __ (path separator to double underscore for nesting)
key_str = key_str.replace("/", "__")
# Apply unified normalization
normalized_key = normalize_key(key_str)
# Only load if it matches a model field
if normalized_key not in valid_keys:
continue
# Decode value
if value:
try:
# Try to decode as string
decoded_value = value.decode("utf-8")
# Try to parse as JSON if possible
import json
try:
decoded_value = json.loads(decoded_value)
except (ValueError, TypeError):
pass
result[normalized_key] = decoded_value
except UnicodeDecodeError:
# Keep as bytes if not decodable
result[normalized_key] = value
return result
except Exception:
# On error, return empty dict (fail-safe)
return {}
[docs]
def supports_watch(self) -> bool:
"""Check if ``etcd`` source supports watching.
Returns:
True if watch is enabled.
"""
return self._watch
[docs]
def watch(self) -> Iterator[ChangeEvent]:
"""Watch for configuration changes in ``etcd``.
Yields:
ChangeEvent objects representing configuration changes.
Note:
This method blocks and yields events as they occur.
It should be run in a separate thread.
"""
if not self._watch:
return iter([])
if not self._model:
raise ValueError(
"Etcd source requires model for watch (should be auto-injected by Config)"
)
from varlord.metadata import get_all_field_keys
# Get all valid field keys from model (same as load method)
valid_keys = get_all_field_keys(self._model)
client = self._get_client()
prefix_bytes = self._prefix.encode("utf-8")
# Get initial state (decode values same way as load method)
initial_state: dict[str, Any] = {}
for value, metadata in client.get_prefix(prefix_bytes):
if metadata is None:
continue
key_bytes = metadata.key
if not key_bytes.startswith(prefix_bytes):
continue
key_str = key_bytes[len(prefix_bytes) :].decode("utf-8")
key_str = key_str.replace("/", "__")
normalized_key = normalize_key(key_str)
# Only include keys that match model fields (same as load method)
if normalized_key not in valid_keys:
continue
# Decode value same way as load method
decoded_value = value
if value:
try:
decoded_value = value.decode("utf-8")
import json
try:
decoded_value = json.loads(decoded_value)
except (ValueError, TypeError):
pass
except UnicodeDecodeError:
decoded_value = value
initial_state[normalized_key] = decoded_value
# Watch for changes
# watch_prefix returns (events_iterator, cancel) tuple
events_iterator, cancel = client.watch_prefix(prefix_bytes)
for event in events_iterator:
try:
if event is None:
continue
# Extract key
key_bytes = event.key
if not key_bytes.startswith(prefix_bytes):
continue
key_str = key_bytes[len(prefix_bytes) :].decode("utf-8")
key_str = key_str.replace("/", "__")
normalized_key = normalize_key(key_str)
# Only process events for keys that match model fields (same as load method)
if normalized_key not in valid_keys:
continue
# Determine event type and values
# etcd3 events are PutEvent or DeleteEvent instances, not objects with type attribute
if isinstance(event, etcd3.events.PutEvent):
# Key was added or modified
old_value = initial_state.get(normalized_key)
new_value = event.value
if new_value:
try:
new_value = new_value.decode("utf-8")
import json
try:
new_value = json.loads(new_value)
except (ValueError, TypeError):
pass
except UnicodeDecodeError:
pass
event_type = "added" if old_value is None else "modified"
initial_state[normalized_key] = new_value
yield ChangeEvent(
key=normalized_key,
old_value=old_value,
new_value=new_value,
event_type=event_type,
)
elif isinstance(event, etcd3.events.DeleteEvent):
# Key was deleted
old_value = initial_state.pop(normalized_key, None)
yield ChangeEvent(
key=normalized_key,
old_value=old_value,
new_value=None,
event_type="deleted",
)
except Exception:
# Skip malformed events
continue
[docs]
def __repr__(self) -> str:
"""Return string representation."""
tls_info = ""
if self._ca_cert:
tls_info = ", tls=True"
auth_info = ""
if self._user:
auth_info = f", user={self._user!r}"
return f"<Etcd(host={self._host!r}, port={self._port}, prefix={self._prefix!r}, watch={self._watch}{tls_info}{auth_info})>"