From 341e82213720f332ca57faa59151942685739cf1 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Thu, 26 Mar 2026 23:41:03 +1100 Subject: [PATCH 1/7] fix: pass cache_key for AAD verification in all deserialize_data calls Five deserialize_data() calls were missing cache_key=cache_key: - L1+L2 sync path (line 678) - L1+L2 async path (line 973) - L2 async get after lock acquire (line 1035) - L2 async get after lock wait, path A (line 1109) - L2 async get after lock wait, path B (line 1132) Without cache_key, encrypted cache entries using AAD v0x03 would fail verification on every hit, silently falling through to cache miss. The L1-only paths (lines 591, 935) already passed cache_key correctly. Found by code-craftsman expert panel review of remediation spec. --- src/cachekit/decorators/wrapper.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/cachekit/decorators/wrapper.py b/src/cachekit/decorators/wrapper.py index a49a6e1..f16660a 100644 --- a/src/cachekit/decorators/wrapper.py +++ b/src/cachekit/decorators/wrapper.py @@ -675,7 +675,7 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: PLR0912 if l1_found and l1_bytes: # L1 cache hit (~50ns vs ~1000μs for Redis) - deserialize bytes try: - l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes) + l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes, cache_key=cache_key) features.set_operation_context("l1_get", duration_ms=0.001) features.record_success() @@ -970,7 +970,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any: if l1_found and l1_bytes: # L1 cache hit (~50ns vs ~1000μs for Redis) - deserialize bytes try: - l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes) + l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes, cache_key=cache_key) features.set_operation_context("l1_get", duration_ms=0.001) features.record_success() @@ -1032,7 +1032,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any: if cached_data is not None: # Deserialize the cached data - result = operation_handler.serialization_handler.deserialize_data(cached_data) + result = operation_handler.serialization_handler.deserialize_data(cached_data, cache_key=cache_key) # Record cache hit (always compute for L2 latency stats) get_duration_ms = (time.perf_counter() - start_time) * 1000 @@ -1106,7 +1106,9 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any: cached_data = await operation_handler.cache_handler.get_async(cache_key) # type: ignore[attr-defined] if cached_data is not None: # Another request filled the cache while we waited - result = operation_handler.serialization_handler.deserialize_data(cached_data) + result = operation_handler.serialization_handler.deserialize_data( + cached_data, cache_key=cache_key + ) # Update L1 cache with serialized bytes if _l1_cache and cache_key and cached_data: @@ -1129,7 +1131,9 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any: cached_data = await operation_handler.cache_handler.get_async(cache_key) # type: ignore[attr-defined] if cached_data is not None: # Cache was populated while waiting - use it - result = operation_handler.serialization_handler.deserialize_data(cached_data) + result = operation_handler.serialization_handler.deserialize_data( + cached_data, cache_key=cache_key + ) # Update L1 cache with serialized bytes if _l1_cache and cache_key and cached_data: From e331b294ce5ad32d6f7b58989c8f1f667d97262c Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Fri, 27 Mar 2026 07:37:05 +1100 Subject: [PATCH 2/7] refactor: remove dead setup_correlation_tracking and setup_distributed_tracing stubs --- src/cachekit/logging.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/cachekit/logging.py b/src/cachekit/logging.py index ad700c3..c5ccc8a 100644 --- a/src/cachekit/logging.py +++ b/src/cachekit/logging.py @@ -569,13 +569,3 @@ def mask_sensitive_patterns(data: str) -> str: ) return data - - -def setup_correlation_tracking(): - """Setup correlation tracking - compatibility function.""" - pass - - -def setup_distributed_tracing(): - """Setup distributed tracing - compatibility function.""" - pass From 347bf26c85d2703059cde4ad5380a4b4a9387535 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Fri, 27 Mar 2026 07:37:23 +1100 Subject: [PATCH 3/7] refactor: remove dead reset_session_id and fix get_session_id doctests --- src/cachekit/decorators/session.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/cachekit/decorators/session.py b/src/cachekit/decorators/session.py index 810701f..334e022 100644 --- a/src/cachekit/decorators/session.py +++ b/src/cachekit/decorators/session.py @@ -25,7 +25,6 @@ def get_session_id() -> str: Session ID is a valid UUID format: >>> import uuid - >>> reset_session_id() # Start fresh >>> session_id = get_session_id() >>> uuid.UUID(session_id) # Validates UUID format # doctest: +ELLIPSIS UUID('...') @@ -41,26 +40,3 @@ def get_session_id() -> str: if _session_id is None: _session_id = str(uuid.uuid4()) return _session_id - - -def reset_session_id() -> None: - """Reset session ID (primarily for testing). - - Forces a new session ID to be generated on the next get_session_id() call. - - Examples: - Reset generates new session ID: - - >>> old_id = get_session_id() - >>> reset_session_id() - >>> new_id = get_session_id() - >>> old_id != new_id - True - - Safe to call multiple times: - - >>> reset_session_id() - >>> reset_session_id() # No error - """ - global _session_id - _session_id = None From 2fbb607e758d948ded9028332fabe642cfc36b81 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Fri, 27 Mar 2026 07:38:19 +1100 Subject: [PATCH 4/7] refactor: remove dead get_async_http_client and global singleton cleanup --- src/cachekit/backends/cachekitio/client.py | 63 +------------------ tests/unit/backends/test_cachekitio_client.py | 2 +- 2 files changed, 2 insertions(+), 63 deletions(-) diff --git a/src/cachekit/backends/cachekitio/client.py b/src/cachekit/backends/cachekitio/client.py index ed44068..fcd0ae0 100644 --- a/src/cachekit/backends/cachekitio/client.py +++ b/src/cachekit/backends/cachekitio/client.py @@ -10,60 +10,10 @@ if TYPE_CHECKING: from cachekit.backends.cachekitio.config import CachekitIOBackendConfig -# Global pool instances (shared across threads) -_async_client_instance: httpx.AsyncClient | None = None -_client_lock = threading.Lock() - # Thread-local client cache for performance (both sync and async) _thread_local = threading.local() -def get_async_http_client(config: CachekitIOBackendConfig) -> httpx.AsyncClient: - """Get asynchronous HTTP client with connection pooling. - - Creates global singleton async client with connection pooling and keepalive. - - Args: - config: cachekit.io backend configuration - - Returns: - httpx.AsyncClient: Async HTTP client with connection pooling - """ - global _async_client_instance - - if _async_client_instance is None: - with _client_lock: - if _async_client_instance is None: - # Try to enable HTTP/3 if aioquic is installed - client_kwargs = { - "base_url": config.api_url, - "timeout": config.timeout, - "http2": True, # Enable HTTP/2 - "limits": httpx.Limits( - max_connections=config.connection_pool_size, - max_keepalive_connections=config.connection_pool_size, - ), - "headers": { - "Authorization": f"Bearer {config.api_key.get_secret_value()}", - "Content-Type": "application/octet-stream", - }, - } - - # Try HTTP/3 (requires aioquic dependency) - try: - import aioquic # noqa: F401 # type: ignore[import-untyped,import-not-found] - - # HTTP/3 is available via httpx[http3] - need custom transport - # For now, just use HTTP/2 (HTTP/3 support requires more setup) - # client_kwargs["http3"] = True - except ImportError: - pass # HTTP/3 not available, fall back to HTTP/2 - - _async_client_instance = httpx.AsyncClient(**client_kwargs) - - return _async_client_instance - - def get_cached_async_http_client(config: CachekitIOBackendConfig) -> httpx.AsyncClient: """Get thread-local async HTTP client instance. @@ -137,13 +87,7 @@ def get_sync_http_client(config: CachekitIOBackendConfig) -> httpx.Client: async def close_async_client() -> None: - """Close async client instance (useful for cleanup).""" - global _async_client_instance - if _async_client_instance is not None: - await _async_client_instance.aclose() - _async_client_instance = None - - # Close thread-local client + """Close thread-local async client instance (useful for cleanup).""" if hasattr(_thread_local, "async_client") and _thread_local.async_client is not None: await _thread_local.async_client.aclose() _thread_local.async_client = None @@ -162,10 +106,6 @@ def reset_global_client() -> None: Note: This does not properly close clients. Use close_*_client() for proper cleanup. """ - global _async_client_instance - with _client_lock: - _async_client_instance = None - # Reset thread-local caches if hasattr(_thread_local, "async_client"): _thread_local.async_client = None @@ -174,7 +114,6 @@ def reset_global_client() -> None: __all__ = [ - "get_async_http_client", "get_cached_async_http_client", "get_sync_http_client", "close_async_client", diff --git a/tests/unit/backends/test_cachekitio_client.py b/tests/unit/backends/test_cachekitio_client.py index cb5340f..7735246 100644 --- a/tests/unit/backends/test_cachekitio_client.py +++ b/tests/unit/backends/test_cachekitio_client.py @@ -4,7 +4,7 @@ - Thread-local caching (same client returned on repeated calls) - Client configuration (base_url, timeout, Authorization header) - Cleanup via close_sync_client() and close_async_client() -- reset_global_client() clears all thread-local references +- reset_global_client() clears thread-local references - New client created after reset """ From 3d00cd62639ab6e24f7117e0400a7994d38aef86 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Fri, 27 Mar 2026 07:38:31 +1100 Subject: [PATCH 5/7] refactor: remove dead module-level metrics compatibility aliases --- src/cachekit/reliability/metrics_collection.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/cachekit/reliability/metrics_collection.py b/src/cachekit/reliability/metrics_collection.py index 6cb04d8..d9fc7cd 100644 --- a/src/cachekit/reliability/metrics_collection.py +++ b/src/cachekit/reliability/metrics_collection.py @@ -136,22 +136,6 @@ def clear_metrics(): _metrics_timestamps.clear() -# Compatibility aliases for legacy code -def record_cache_operation(operation: str, status: str, namespace: str = "default"): - """Record a cache operation.""" - cache_operations.inc({"operation": operation, "status": status, "namespace": namespace}) - - -def record_circuit_breaker_state(state: str, namespace: str = "default"): - """Record circuit breaker state change.""" - circuit_breaker_state.set(1.0 if state == "OPEN" else 0.0, {"state": state, "namespace": namespace}) - - -def record_timeout_adjustment(adjustment_type: str, value: float, namespace: str = "default"): - """Record adaptive timeout adjustment.""" - adaptive_timeout_adjustments.inc({"type": adjustment_type, "namespace": namespace}) - - # Async metrics collector class class AsyncMetricsCollector: """Async metrics collector for high-performance scenarios. From 3a0adbb206ac636c911eee9239a31c76042d5be0 Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Fri, 27 Mar 2026 07:39:04 +1100 Subject: [PATCH 6/7] refactor: remove dead _use_pipelined, use_pipelined, and sync lock_key variables --- src/cachekit/decorators/wrapper.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/cachekit/decorators/wrapper.py b/src/cachekit/decorators/wrapper.py index f16660a..2e15d11 100644 --- a/src/cachekit/decorators/wrapper.py +++ b/src/cachekit/decorators/wrapper.py @@ -428,7 +428,6 @@ def create_cache_wrapper( use_collect_stats = collect_stats and not fast_mode # use_enable_tracing = enable_tracing and not fast_mode # Not used after CacheConfig removal use_enable_structured_logging = enable_structured_logging and not fast_mode - use_pipelined = pipelined # Keep enabled: pipelining reduces network roundtrips # Initialize handler components # Pre-compute function hash at decoration time (50-200μs savings) @@ -492,7 +491,6 @@ def create_cache_wrapper( # Store backend and handler type for consistent access # If explicit backend provided, use it; otherwise get from provider on first use _backend = backend if backend is not None else None - _use_pipelined = use_pipelined # FIX: Initialize L1 cache if enabled _l1_cache = get_l1_cache(namespace or "default") if l1_enabled else None @@ -617,7 +615,6 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: PLR0912 reset_current_function_stats(token) # L1+L2 MODE: Original behavior with backend initialization - lock_key = f"{cache_key}:lock" with features.create_span("redis_cache", span_attributes) as span: try: From 6f582eb8c2ce413fd7a30fd1c4e062f549d0e6ff Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Sun, 29 Mar 2026 09:31:53 +1100 Subject: [PATCH 7/7] test: cover async lock deserialize_data paths for AAD fix Two tests exercising the thundering herd protection paths where deserialize_data(cache_key=cache_key) was added: 1. Lock acquired, double-check finds cache already populated (another request filled it during lock wait) 2. Lock timeout, final cache check finds it populated (another request completed while waiting for lock) Both use FakeLockableBackend with acquire_lock protocol to trigger the lock-then-deserialize code paths. --- tests/unit/test_async_lock_deserialize.py | 140 ++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 tests/unit/test_async_lock_deserialize.py diff --git a/tests/unit/test_async_lock_deserialize.py b/tests/unit/test_async_lock_deserialize.py new file mode 100644 index 0000000..414ef39 --- /dev/null +++ b/tests/unit/test_async_lock_deserialize.py @@ -0,0 +1,140 @@ +"""Test async distributed locking deserialize_data paths in wrapper.py. + +Targets the two uncovered lines where cache_key=cache_key was added: +- Line ~1106: Lock acquired, double-check finds cache populated +- Line ~1131: Lock timeout, cache populated while waiting + +These are the "thundering herd" protection paths — when multiple concurrent +requests miss cache simultaneously, only one executes the function while +others wait and then find the cache populated. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Any + +import pytest + +from cachekit import cache + + +class LockableTestBackend: + """Backend with LockableBackend protocol for testing lock paths. + + Controls lock acquisition result and simulates cache population + during lock wait. + """ + + def __init__(self, *, lock_acquired: bool = True): + self._store: dict[str, bytes] = {} + self._lock_acquired = lock_acquired + self._get_call_count = 0 + + def get(self, key: str) -> bytes | None: + return self._store.get(key) + + def set(self, key: str, value: bytes, ttl: int | None = None) -> None: + self._store[key] = value + + def delete(self, key: str) -> bool: + return self._store.pop(key, None) is not None + + def exists(self, key: str) -> bool: + return key in self._store + + def health_check(self) -> tuple[bool, dict[str, Any]]: + return True, {"backend_type": "lockable_test"} + + @asynccontextmanager + async def acquire_lock( + self, + key: str, + timeout: float = 10.0, + blocking_timeout: float | None = None, + ) -> AsyncIterator[bool]: + yield self._lock_acquired + + +@pytest.fixture(autouse=True) +def setup_di_for_redis_isolation(): + """Override root conftest's Redis isolation.""" + yield + + +@pytest.mark.asyncio +class TestAsyncLockDeserializePaths: + """Test the two async lock paths where deserialize_data gets cache_key. + + Strategy: Patch the backend's get() to return None on the first call + (triggering lock acquisition) and real cached data on the second call + (inside the lock, simulating another request populating the cache). + """ + + async def test_lock_acquired_cache_populated_during_wait(self): + """Line ~1106: Lock acquired, double-check finds cache populated. + + Flow: L2 miss → acquire lock → check L2 again → find data → deserialize(cache_key=) + """ + backend = LockableTestBackend(lock_acquired=True) + + @cache(backend=backend, ttl=300, l1_enabled=False) + async def expensive_fn(x: int) -> dict: + return {"result": x * 2} + + # First call populates the cache normally + result1 = await expensive_fn(42) + assert result1["result"] == 84 + + # Manipulate the backend to return None on first get (cache miss → triggers + # lock acquisition), then real data on second get (inside lock → deserialize + # with cache_key, hitting the target coverage line). + call_count = 0 + original_get = backend.get + + def patched_get(key: str) -> bytes | None: + nonlocal call_count + call_count += 1 + if call_count <= 1: + return None # First L2 check: miss → triggers lock + return original_get(key) # Inside lock: hit → deserialize with cache_key + + backend.get = patched_get + + result2 = await expensive_fn(42) + assert result2["result"] == 84 + # Should have gone through the lock-acquired path + assert call_count >= 2 + + async def test_lock_timeout_cache_populated_during_wait(self): + """Line ~1131: Lock timeout, cache populated while waiting. + + Flow: L2 miss → lock timeout (not acquired) → check L2 again → find data → deserialize(cache_key=) + """ + backend = LockableTestBackend(lock_acquired=False) # Lock will timeout + + @cache(backend=backend, ttl=300, l1_enabled=False) + async def expensive_fn(x: int) -> dict: + return {"result": x * 2} + + # First call populates the cache normally + result1 = await expensive_fn(42) + assert result1["result"] == 84 + + # Now make the outer L2 check miss, but the lock-timeout check find data + call_count = 0 + original_get = backend.get + + def patched_get(key: str) -> bytes | None: + nonlocal call_count + call_count += 1 + if call_count <= 1: + return None # First L2 check: miss → triggers lock attempt + return original_get(key) # Lock timeout check: hit → deserialize with cache_key + + backend.get = patched_get + + result2 = await expensive_fn(42) + assert result2["result"] == 84 + assert call_count >= 2