diff --git a/src/cachekit/backends/cachekitio/client.py b/src/cachekit/backends/cachekitio/client.py index ed44068..fcd0ae0 100644 --- a/src/cachekit/backends/cachekitio/client.py +++ b/src/cachekit/backends/cachekitio/client.py @@ -10,60 +10,10 @@ if TYPE_CHECKING: from cachekit.backends.cachekitio.config import CachekitIOBackendConfig -# Global pool instances (shared across threads) -_async_client_instance: httpx.AsyncClient | None = None -_client_lock = threading.Lock() - # Thread-local client cache for performance (both sync and async) _thread_local = threading.local() -def get_async_http_client(config: CachekitIOBackendConfig) -> httpx.AsyncClient: - """Get asynchronous HTTP client with connection pooling. - - Creates global singleton async client with connection pooling and keepalive. - - Args: - config: cachekit.io backend configuration - - Returns: - httpx.AsyncClient: Async HTTP client with connection pooling - """ - global _async_client_instance - - if _async_client_instance is None: - with _client_lock: - if _async_client_instance is None: - # Try to enable HTTP/3 if aioquic is installed - client_kwargs = { - "base_url": config.api_url, - "timeout": config.timeout, - "http2": True, # Enable HTTP/2 - "limits": httpx.Limits( - max_connections=config.connection_pool_size, - max_keepalive_connections=config.connection_pool_size, - ), - "headers": { - "Authorization": f"Bearer {config.api_key.get_secret_value()}", - "Content-Type": "application/octet-stream", - }, - } - - # Try HTTP/3 (requires aioquic dependency) - try: - import aioquic # noqa: F401 # type: ignore[import-untyped,import-not-found] - - # HTTP/3 is available via httpx[http3] - need custom transport - # For now, just use HTTP/2 (HTTP/3 support requires more setup) - # client_kwargs["http3"] = True - except ImportError: - pass # HTTP/3 not available, fall back to HTTP/2 - - _async_client_instance = httpx.AsyncClient(**client_kwargs) - - return _async_client_instance - - def get_cached_async_http_client(config: CachekitIOBackendConfig) -> httpx.AsyncClient: """Get thread-local async HTTP client instance. @@ -137,13 +87,7 @@ def get_sync_http_client(config: CachekitIOBackendConfig) -> httpx.Client: async def close_async_client() -> None: - """Close async client instance (useful for cleanup).""" - global _async_client_instance - if _async_client_instance is not None: - await _async_client_instance.aclose() - _async_client_instance = None - - # Close thread-local client + """Close thread-local async client instance (useful for cleanup).""" if hasattr(_thread_local, "async_client") and _thread_local.async_client is not None: await _thread_local.async_client.aclose() _thread_local.async_client = None @@ -162,10 +106,6 @@ def reset_global_client() -> None: Note: This does not properly close clients. Use close_*_client() for proper cleanup. """ - global _async_client_instance - with _client_lock: - _async_client_instance = None - # Reset thread-local caches if hasattr(_thread_local, "async_client"): _thread_local.async_client = None @@ -174,7 +114,6 @@ def reset_global_client() -> None: __all__ = [ - "get_async_http_client", "get_cached_async_http_client", "get_sync_http_client", "close_async_client", diff --git a/src/cachekit/decorators/session.py b/src/cachekit/decorators/session.py index 810701f..334e022 100644 --- a/src/cachekit/decorators/session.py +++ b/src/cachekit/decorators/session.py @@ -25,7 +25,6 @@ def get_session_id() -> str: Session ID is a valid UUID format: >>> import uuid - >>> reset_session_id() # Start fresh >>> session_id = get_session_id() >>> uuid.UUID(session_id) # Validates UUID format # doctest: +ELLIPSIS UUID('...') @@ -41,26 +40,3 @@ def get_session_id() -> str: if _session_id is None: _session_id = str(uuid.uuid4()) return _session_id - - -def reset_session_id() -> None: - """Reset session ID (primarily for testing). - - Forces a new session ID to be generated on the next get_session_id() call. - - Examples: - Reset generates new session ID: - - >>> old_id = get_session_id() - >>> reset_session_id() - >>> new_id = get_session_id() - >>> old_id != new_id - True - - Safe to call multiple times: - - >>> reset_session_id() - >>> reset_session_id() # No error - """ - global _session_id - _session_id = None diff --git a/src/cachekit/decorators/wrapper.py b/src/cachekit/decorators/wrapper.py index a49a6e1..2e15d11 100644 --- a/src/cachekit/decorators/wrapper.py +++ b/src/cachekit/decorators/wrapper.py @@ -428,7 +428,6 @@ def create_cache_wrapper( use_collect_stats = collect_stats and not fast_mode # use_enable_tracing = enable_tracing and not fast_mode # Not used after CacheConfig removal use_enable_structured_logging = enable_structured_logging and not fast_mode - use_pipelined = pipelined # Keep enabled: pipelining reduces network roundtrips # Initialize handler components # Pre-compute function hash at decoration time (50-200μs savings) @@ -492,7 +491,6 @@ def create_cache_wrapper( # Store backend and handler type for consistent access # If explicit backend provided, use it; otherwise get from provider on first use _backend = backend if backend is not None else None - _use_pipelined = use_pipelined # FIX: Initialize L1 cache if enabled _l1_cache = get_l1_cache(namespace or "default") if l1_enabled else None @@ -617,7 +615,6 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: PLR0912 reset_current_function_stats(token) # L1+L2 MODE: Original behavior with backend initialization - lock_key = f"{cache_key}:lock" with features.create_span("redis_cache", span_attributes) as span: try: @@ -675,7 +672,7 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: PLR0912 if l1_found and l1_bytes: # L1 cache hit (~50ns vs ~1000μs for Redis) - deserialize bytes try: - l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes) + l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes, cache_key=cache_key) features.set_operation_context("l1_get", duration_ms=0.001) features.record_success() @@ -970,7 +967,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any: if l1_found and l1_bytes: # L1 cache hit (~50ns vs ~1000μs for Redis) - deserialize bytes try: - l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes) + l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes, cache_key=cache_key) features.set_operation_context("l1_get", duration_ms=0.001) features.record_success() @@ -1032,7 +1029,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any: if cached_data is not None: # Deserialize the cached data - result = operation_handler.serialization_handler.deserialize_data(cached_data) + result = operation_handler.serialization_handler.deserialize_data(cached_data, cache_key=cache_key) # Record cache hit (always compute for L2 latency stats) get_duration_ms = (time.perf_counter() - start_time) * 1000 @@ -1106,7 +1103,9 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any: cached_data = await operation_handler.cache_handler.get_async(cache_key) # type: ignore[attr-defined] if cached_data is not None: # Another request filled the cache while we waited - result = operation_handler.serialization_handler.deserialize_data(cached_data) + result = operation_handler.serialization_handler.deserialize_data( + cached_data, cache_key=cache_key + ) # Update L1 cache with serialized bytes if _l1_cache and cache_key and cached_data: @@ -1129,7 +1128,9 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any: cached_data = await operation_handler.cache_handler.get_async(cache_key) # type: ignore[attr-defined] if cached_data is not None: # Cache was populated while waiting - use it - result = operation_handler.serialization_handler.deserialize_data(cached_data) + result = operation_handler.serialization_handler.deserialize_data( + cached_data, cache_key=cache_key + ) # Update L1 cache with serialized bytes if _l1_cache and cache_key and cached_data: diff --git a/src/cachekit/logging.py b/src/cachekit/logging.py index ad700c3..c5ccc8a 100644 --- a/src/cachekit/logging.py +++ b/src/cachekit/logging.py @@ -569,13 +569,3 @@ def mask_sensitive_patterns(data: str) -> str: ) return data - - -def setup_correlation_tracking(): - """Setup correlation tracking - compatibility function.""" - pass - - -def setup_distributed_tracing(): - """Setup distributed tracing - compatibility function.""" - pass diff --git a/src/cachekit/reliability/metrics_collection.py b/src/cachekit/reliability/metrics_collection.py index 6cb04d8..d9fc7cd 100644 --- a/src/cachekit/reliability/metrics_collection.py +++ b/src/cachekit/reliability/metrics_collection.py @@ -136,22 +136,6 @@ def clear_metrics(): _metrics_timestamps.clear() -# Compatibility aliases for legacy code -def record_cache_operation(operation: str, status: str, namespace: str = "default"): - """Record a cache operation.""" - cache_operations.inc({"operation": operation, "status": status, "namespace": namespace}) - - -def record_circuit_breaker_state(state: str, namespace: str = "default"): - """Record circuit breaker state change.""" - circuit_breaker_state.set(1.0 if state == "OPEN" else 0.0, {"state": state, "namespace": namespace}) - - -def record_timeout_adjustment(adjustment_type: str, value: float, namespace: str = "default"): - """Record adaptive timeout adjustment.""" - adaptive_timeout_adjustments.inc({"type": adjustment_type, "namespace": namespace}) - - # Async metrics collector class class AsyncMetricsCollector: """Async metrics collector for high-performance scenarios. diff --git a/tests/unit/backends/test_cachekitio_client.py b/tests/unit/backends/test_cachekitio_client.py index cb5340f..7735246 100644 --- a/tests/unit/backends/test_cachekitio_client.py +++ b/tests/unit/backends/test_cachekitio_client.py @@ -4,7 +4,7 @@ - Thread-local caching (same client returned on repeated calls) - Client configuration (base_url, timeout, Authorization header) - Cleanup via close_sync_client() and close_async_client() -- reset_global_client() clears all thread-local references +- reset_global_client() clears thread-local references - New client created after reset """ diff --git a/tests/unit/test_async_lock_deserialize.py b/tests/unit/test_async_lock_deserialize.py new file mode 100644 index 0000000..414ef39 --- /dev/null +++ b/tests/unit/test_async_lock_deserialize.py @@ -0,0 +1,140 @@ +"""Test async distributed locking deserialize_data paths in wrapper.py. + +Targets the two uncovered lines where cache_key=cache_key was added: +- Line ~1106: Lock acquired, double-check finds cache populated +- Line ~1131: Lock timeout, cache populated while waiting + +These are the "thundering herd" protection paths — when multiple concurrent +requests miss cache simultaneously, only one executes the function while +others wait and then find the cache populated. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Any + +import pytest + +from cachekit import cache + + +class LockableTestBackend: + """Backend with LockableBackend protocol for testing lock paths. + + Controls lock acquisition result and simulates cache population + during lock wait. + """ + + def __init__(self, *, lock_acquired: bool = True): + self._store: dict[str, bytes] = {} + self._lock_acquired = lock_acquired + self._get_call_count = 0 + + def get(self, key: str) -> bytes | None: + return self._store.get(key) + + def set(self, key: str, value: bytes, ttl: int | None = None) -> None: + self._store[key] = value + + def delete(self, key: str) -> bool: + return self._store.pop(key, None) is not None + + def exists(self, key: str) -> bool: + return key in self._store + + def health_check(self) -> tuple[bool, dict[str, Any]]: + return True, {"backend_type": "lockable_test"} + + @asynccontextmanager + async def acquire_lock( + self, + key: str, + timeout: float = 10.0, + blocking_timeout: float | None = None, + ) -> AsyncIterator[bool]: + yield self._lock_acquired + + +@pytest.fixture(autouse=True) +def setup_di_for_redis_isolation(): + """Override root conftest's Redis isolation.""" + yield + + +@pytest.mark.asyncio +class TestAsyncLockDeserializePaths: + """Test the two async lock paths where deserialize_data gets cache_key. + + Strategy: Patch the backend's get() to return None on the first call + (triggering lock acquisition) and real cached data on the second call + (inside the lock, simulating another request populating the cache). + """ + + async def test_lock_acquired_cache_populated_during_wait(self): + """Line ~1106: Lock acquired, double-check finds cache populated. + + Flow: L2 miss → acquire lock → check L2 again → find data → deserialize(cache_key=) + """ + backend = LockableTestBackend(lock_acquired=True) + + @cache(backend=backend, ttl=300, l1_enabled=False) + async def expensive_fn(x: int) -> dict: + return {"result": x * 2} + + # First call populates the cache normally + result1 = await expensive_fn(42) + assert result1["result"] == 84 + + # Manipulate the backend to return None on first get (cache miss → triggers + # lock acquisition), then real data on second get (inside lock → deserialize + # with cache_key, hitting the target coverage line). + call_count = 0 + original_get = backend.get + + def patched_get(key: str) -> bytes | None: + nonlocal call_count + call_count += 1 + if call_count <= 1: + return None # First L2 check: miss → triggers lock + return original_get(key) # Inside lock: hit → deserialize with cache_key + + backend.get = patched_get + + result2 = await expensive_fn(42) + assert result2["result"] == 84 + # Should have gone through the lock-acquired path + assert call_count >= 2 + + async def test_lock_timeout_cache_populated_during_wait(self): + """Line ~1131: Lock timeout, cache populated while waiting. + + Flow: L2 miss → lock timeout (not acquired) → check L2 again → find data → deserialize(cache_key=) + """ + backend = LockableTestBackend(lock_acquired=False) # Lock will timeout + + @cache(backend=backend, ttl=300, l1_enabled=False) + async def expensive_fn(x: int) -> dict: + return {"result": x * 2} + + # First call populates the cache normally + result1 = await expensive_fn(42) + assert result1["result"] == 84 + + # Now make the outer L2 check miss, but the lock-timeout check find data + call_count = 0 + original_get = backend.get + + def patched_get(key: str) -> bytes | None: + nonlocal call_count + call_count += 1 + if call_count <= 1: + return None # First L2 check: miss → triggers lock attempt + return original_get(key) # Lock timeout check: hit → deserialize with cache_key + + backend.get = patched_get + + result2 = await expensive_fn(42) + assert result2["result"] == 84 + assert call_count >= 2