Skip to content
63 changes: 1 addition & 62 deletions src/cachekit/backends/cachekitio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,10 @@
if TYPE_CHECKING:
from cachekit.backends.cachekitio.config import CachekitIOBackendConfig

# Global pool instances (shared across threads)
_async_client_instance: httpx.AsyncClient | None = None
_client_lock = threading.Lock()

# Thread-local client cache for performance (both sync and async)
_thread_local = threading.local()


def get_async_http_client(config: CachekitIOBackendConfig) -> httpx.AsyncClient:
"""Get asynchronous HTTP client with connection pooling.

Creates global singleton async client with connection pooling and keepalive.

Args:
config: cachekit.io backend configuration

Returns:
httpx.AsyncClient: Async HTTP client with connection pooling
"""
global _async_client_instance

if _async_client_instance is None:
with _client_lock:
if _async_client_instance is None:
# Try to enable HTTP/3 if aioquic is installed
client_kwargs = {
"base_url": config.api_url,
"timeout": config.timeout,
"http2": True, # Enable HTTP/2
"limits": httpx.Limits(
max_connections=config.connection_pool_size,
max_keepalive_connections=config.connection_pool_size,
),
"headers": {
"Authorization": f"Bearer {config.api_key.get_secret_value()}",
"Content-Type": "application/octet-stream",
},
}

# Try HTTP/3 (requires aioquic dependency)
try:
import aioquic # noqa: F401 # type: ignore[import-untyped,import-not-found]

# HTTP/3 is available via httpx[http3] - need custom transport
# For now, just use HTTP/2 (HTTP/3 support requires more setup)
# client_kwargs["http3"] = True
except ImportError:
pass # HTTP/3 not available, fall back to HTTP/2

_async_client_instance = httpx.AsyncClient(**client_kwargs)

return _async_client_instance


def get_cached_async_http_client(config: CachekitIOBackendConfig) -> httpx.AsyncClient:
"""Get thread-local async HTTP client instance.

Expand Down Expand Up @@ -137,13 +87,7 @@ def get_sync_http_client(config: CachekitIOBackendConfig) -> httpx.Client:


async def close_async_client() -> None:
"""Close async client instance (useful for cleanup)."""
global _async_client_instance
if _async_client_instance is not None:
await _async_client_instance.aclose()
_async_client_instance = None

# Close thread-local client
"""Close thread-local async client instance (useful for cleanup)."""
if hasattr(_thread_local, "async_client") and _thread_local.async_client is not None:
await _thread_local.async_client.aclose()
_thread_local.async_client = None
Expand All @@ -162,10 +106,6 @@ def reset_global_client() -> None:

Note: This does not properly close clients. Use close_*_client() for proper cleanup.
"""
global _async_client_instance
with _client_lock:
_async_client_instance = None

# Reset thread-local caches
if hasattr(_thread_local, "async_client"):
_thread_local.async_client = None
Expand All @@ -174,7 +114,6 @@ def reset_global_client() -> None:


__all__ = [
"get_async_http_client",
"get_cached_async_http_client",
"get_sync_http_client",
"close_async_client",
Expand Down
24 changes: 0 additions & 24 deletions src/cachekit/decorators/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def get_session_id() -> str:
Session ID is a valid UUID format:

>>> import uuid
>>> reset_session_id() # Start fresh
>>> session_id = get_session_id()
>>> uuid.UUID(session_id) # Validates UUID format # doctest: +ELLIPSIS
UUID('...')
Expand All @@ -41,26 +40,3 @@ def get_session_id() -> str:
if _session_id is None:
_session_id = str(uuid.uuid4())
return _session_id


def reset_session_id() -> None:
"""Reset session ID (primarily for testing).

Forces a new session ID to be generated on the next get_session_id() call.

Examples:
Reset generates new session ID:

>>> old_id = get_session_id()
>>> reset_session_id()
>>> new_id = get_session_id()
>>> old_id != new_id
True

Safe to call multiple times:

>>> reset_session_id()
>>> reset_session_id() # No error
"""
global _session_id
_session_id = None
17 changes: 9 additions & 8 deletions src/cachekit/decorators/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ def create_cache_wrapper(
use_collect_stats = collect_stats and not fast_mode
# use_enable_tracing = enable_tracing and not fast_mode # Not used after CacheConfig removal
use_enable_structured_logging = enable_structured_logging and not fast_mode
use_pipelined = pipelined # Keep enabled: pipelining reduces network roundtrips

# Initialize handler components
# Pre-compute function hash at decoration time (50-200μs savings)
Expand Down Expand Up @@ -492,7 +491,6 @@ def create_cache_wrapper(
# Store backend and handler type for consistent access
# If explicit backend provided, use it; otherwise get from provider on first use
_backend = backend if backend is not None else None
_use_pipelined = use_pipelined

# FIX: Initialize L1 cache if enabled
_l1_cache = get_l1_cache(namespace or "default") if l1_enabled else None
Expand Down Expand Up @@ -617,7 +615,6 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: PLR0912
reset_current_function_stats(token)

# L1+L2 MODE: Original behavior with backend initialization
lock_key = f"{cache_key}:lock"

with features.create_span("redis_cache", span_attributes) as span:
try:
Expand Down Expand Up @@ -675,7 +672,7 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: PLR0912
if l1_found and l1_bytes:
# L1 cache hit (~50ns vs ~1000μs for Redis) - deserialize bytes
try:
l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes)
l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes, cache_key=cache_key)

features.set_operation_context("l1_get", duration_ms=0.001)
features.record_success()
Expand Down Expand Up @@ -970,7 +967,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
if l1_found and l1_bytes:
# L1 cache hit (~50ns vs ~1000μs for Redis) - deserialize bytes
try:
l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes)
l1_value = operation_handler.serialization_handler.deserialize_data(l1_bytes, cache_key=cache_key)

features.set_operation_context("l1_get", duration_ms=0.001)
features.record_success()
Expand Down Expand Up @@ -1032,7 +1029,7 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:

if cached_data is not None:
# Deserialize the cached data
result = operation_handler.serialization_handler.deserialize_data(cached_data)
result = operation_handler.serialization_handler.deserialize_data(cached_data, cache_key=cache_key)

# Record cache hit (always compute for L2 latency stats)
get_duration_ms = (time.perf_counter() - start_time) * 1000
Expand Down Expand Up @@ -1106,7 +1103,9 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
cached_data = await operation_handler.cache_handler.get_async(cache_key) # type: ignore[attr-defined]
if cached_data is not None:
# Another request filled the cache while we waited
result = operation_handler.serialization_handler.deserialize_data(cached_data)
result = operation_handler.serialization_handler.deserialize_data(
cached_data, cache_key=cache_key
)

# Update L1 cache with serialized bytes
if _l1_cache and cache_key and cached_data:
Expand All @@ -1129,7 +1128,9 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
cached_data = await operation_handler.cache_handler.get_async(cache_key) # type: ignore[attr-defined]
if cached_data is not None:
# Cache was populated while waiting - use it
result = operation_handler.serialization_handler.deserialize_data(cached_data)
result = operation_handler.serialization_handler.deserialize_data(
cached_data, cache_key=cache_key
)

# Update L1 cache with serialized bytes
if _l1_cache and cache_key and cached_data:
Expand Down
10 changes: 0 additions & 10 deletions src/cachekit/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,13 +569,3 @@ def mask_sensitive_patterns(data: str) -> str:
)

return data


def setup_correlation_tracking():
"""Setup correlation tracking - compatibility function."""
pass


def setup_distributed_tracing():
"""Setup distributed tracing - compatibility function."""
pass
16 changes: 0 additions & 16 deletions src/cachekit/reliability/metrics_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,6 @@ def clear_metrics():
_metrics_timestamps.clear()


# Compatibility aliases for legacy code
def record_cache_operation(operation: str, status: str, namespace: str = "default"):
"""Record a cache operation."""
cache_operations.inc({"operation": operation, "status": status, "namespace": namespace})


def record_circuit_breaker_state(state: str, namespace: str = "default"):
"""Record circuit breaker state change."""
circuit_breaker_state.set(1.0 if state == "OPEN" else 0.0, {"state": state, "namespace": namespace})


def record_timeout_adjustment(adjustment_type: str, value: float, namespace: str = "default"):
"""Record adaptive timeout adjustment."""
adaptive_timeout_adjustments.inc({"type": adjustment_type, "namespace": namespace})


# Async metrics collector class
class AsyncMetricsCollector:
"""Async metrics collector for high-performance scenarios.
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/backends/test_cachekitio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
- Thread-local caching (same client returned on repeated calls)
- Client configuration (base_url, timeout, Authorization header)
- Cleanup via close_sync_client() and close_async_client()
- reset_global_client() clears all thread-local references
- reset_global_client() clears thread-local references
- New client created after reset
"""

Expand Down
140 changes: 140 additions & 0 deletions tests/unit/test_async_lock_deserialize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Test async distributed locking deserialize_data paths in wrapper.py.

Targets the two uncovered lines where cache_key=cache_key was added:
- Line ~1106: Lock acquired, double-check finds cache populated
- Line ~1131: Lock timeout, cache populated while waiting

These are the "thundering herd" protection paths — when multiple concurrent
requests miss cache simultaneously, only one executes the function while
others wait and then find the cache populated.
"""

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

import pytest

from cachekit import cache


class LockableTestBackend:
"""Backend with LockableBackend protocol for testing lock paths.

Controls lock acquisition result and simulates cache population
during lock wait.
"""

def __init__(self, *, lock_acquired: bool = True):
self._store: dict[str, bytes] = {}
self._lock_acquired = lock_acquired
self._get_call_count = 0

def get(self, key: str) -> bytes | None:
return self._store.get(key)

def set(self, key: str, value: bytes, ttl: int | None = None) -> None:
self._store[key] = value

def delete(self, key: str) -> bool:
return self._store.pop(key, None) is not None

def exists(self, key: str) -> bool:
return key in self._store

def health_check(self) -> tuple[bool, dict[str, Any]]:
return True, {"backend_type": "lockable_test"}

@asynccontextmanager
async def acquire_lock(
self,
key: str,
timeout: float = 10.0,
blocking_timeout: float | None = None,
) -> AsyncIterator[bool]:
yield self._lock_acquired


@pytest.fixture(autouse=True)
def setup_di_for_redis_isolation():
"""Override root conftest's Redis isolation."""
yield


@pytest.mark.asyncio
class TestAsyncLockDeserializePaths:
"""Test the two async lock paths where deserialize_data gets cache_key.

Strategy: Patch the backend's get() to return None on the first call
(triggering lock acquisition) and real cached data on the second call
(inside the lock, simulating another request populating the cache).
"""

async def test_lock_acquired_cache_populated_during_wait(self):
"""Line ~1106: Lock acquired, double-check finds cache populated.

Flow: L2 miss → acquire lock → check L2 again → find data → deserialize(cache_key=)
"""
backend = LockableTestBackend(lock_acquired=True)

@cache(backend=backend, ttl=300, l1_enabled=False)
async def expensive_fn(x: int) -> dict:
return {"result": x * 2}

# First call populates the cache normally
result1 = await expensive_fn(42)
assert result1["result"] == 84

# Manipulate the backend to return None on first get (cache miss → triggers
# lock acquisition), then real data on second get (inside lock → deserialize
# with cache_key, hitting the target coverage line).
call_count = 0
original_get = backend.get

def patched_get(key: str) -> bytes | None:
nonlocal call_count
call_count += 1
if call_count <= 1:
return None # First L2 check: miss → triggers lock
return original_get(key) # Inside lock: hit → deserialize with cache_key

backend.get = patched_get

result2 = await expensive_fn(42)
assert result2["result"] == 84
# Should have gone through the lock-acquired path
assert call_count >= 2

async def test_lock_timeout_cache_populated_during_wait(self):
"""Line ~1131: Lock timeout, cache populated while waiting.

Flow: L2 miss → lock timeout (not acquired) → check L2 again → find data → deserialize(cache_key=)
"""
backend = LockableTestBackend(lock_acquired=False) # Lock will timeout

@cache(backend=backend, ttl=300, l1_enabled=False)
async def expensive_fn(x: int) -> dict:
return {"result": x * 2}

# First call populates the cache normally
result1 = await expensive_fn(42)
assert result1["result"] == 84

# Now make the outer L2 check miss, but the lock-timeout check find data
call_count = 0
original_get = backend.get

def patched_get(key: str) -> bytes | None:
nonlocal call_count
call_count += 1
if call_count <= 1:
return None # First L2 check: miss → triggers lock attempt
return original_get(key) # Lock timeout check: hit → deserialize with cache_key

backend.get = patched_get

result2 = await expensive_fn(42)
assert result2["result"] == 84
assert call_count >= 2
Loading