Implementing circuit breakers for PBM API timeouts

High-throughput claims adjudication pipelines routinely encounter upstream adjudicator latency spikes during peak prescription fulfillment windows. When primary PBM endpoints exceed acceptable response windows, synchronous retry loops exhaust connection pools, trigger thread starvation, and cascade into downstream adjudication bottlenecks. Implementing a deterministic circuit breaker isolates failing endpoints, preserves memory footprint, and enforces predictable degradation paths aligned with established PBM Architecture & Taxonomy Foundations. The breaker must track failure rates against a sliding window, transition through CLOSED, OPEN, and HALF_OPEN states, and trigger deterministic fallback routing without blocking the main adjudication thread.

Deterministic State Transitions & Memory Optimization

Memory optimization is non-negotiable when processing millions of NCPDP-formatted payloads per hour. Loading full response payloads into unbounded lists or pandas DataFrames introduces unpredictable GC pauses and heap fragmentation. Instead, utilize collections.deque with a fixed maxlen for sliding-window failure tracking, and implement stateful circuit tracking via __slots__ to eliminate per-instance __dict__ overhead. The breaker evaluates timeout thresholds at the transport layer before payload deserialization, ensuring that stalled sockets are closed immediately and connection pools remain stable. When the count of recent failures within the recovery window crosses the configured threshold, the circuit opens and routes subsequent transactions to secondary adjudication paths. This architecture directly supports the Fallback Routing Logic Design workflow by decoupling transport failures from business logic evaluation.

stateDiagram-v2
    [*] --> Closed
    Closed --> Open: failure window threshold breached
    Open --> HalfOpen: recovery window elapsed on next call
    Open --> Open: within recovery window, route to fallback
    HalfOpen --> Closed: probe call succeeds
    HalfOpen --> Open: probe call fails

Figure: Circuit breaker state machine transitioning CLOSED to OPEN on threshold breach, OPEN to HALF_OPEN after the recovery window, then CLOSED on probe success or OPEN on probe failure.

NCPDP Payload Transformation & PHI Constraints

Explicit fallback routing requires deterministic payload transformation and strict state preservation. The circuit breaker must map primary endpoint failures to standardized NCPDP rejection codes, preserve original transaction metadata, and forward adjudication requests to backup switches without altering GroupID, BIN, or PharmacyNCPDPID fields. PHI exposure must be eliminated from fallback envelopes and diagnostic logs. The fallback handler injects RejectCode 92 (System Unavailable/Host Unavailable) or RejectCode 99 (Host Processing Error) into the response envelope, flags AdjudicationStatus as FALLBACK_ROUTED, and maintains referential integrity for downstream analytics and reconciliation pipelines. All logging operations must strip patient identifiers, dates of birth, and Rx numbers to comply with HIPAA minimum necessary standards. Refer to PBM Architecture & Taxonomy Foundations for field-level mapping specifications and audit trail requirements.

Production Implementation

The following implementation enforces transport-layer timeouts, thread-safe state transitions, and PHI-safe fallback routing. It is optimized for GIL-bound Python workers and integrates directly with requests or httpx transport layers.

python
import time
import threading
import logging
from collections import deque
from typing import Dict, Any, Callable
import requests

# Secure logger configuration: prevents PHI leakage in stdout/file handlers
logger = logging.getLogger("pbm.circuit_breaker")
logger.setLevel(logging.INFO)

class CircuitBreaker:
    __slots__ = (
        "failure_threshold", "timeout_ms", "recovery_window",
        "failures", "last_failure_time", "state", "_lock"
    )

    def __init__(self, failure_threshold: int = 5, timeout_ms: int = 3000, recovery_window: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout_ms = timeout_ms
        self.recovery_window = recovery_window
        # Fixed-size deque prevents unbounded memory growth during sustained outages
        self.failures = deque(maxlen=failure_threshold * 3)
        self.last_failure_time = 0.0
        self.state = "CLOSED"
        self._lock = threading.Lock()

    def _evaluate_window(self) -> bool:
        """Calculate recent failures within the recovery window."""
        now = time.monotonic()
        cutoff = now - self.recovery_window
        return sum(1 for t in self.failures if t > cutoff) >= self.failure_threshold

    def _record_failure(self) -> None:
        with self._lock:
            now = time.monotonic()
            self.failures.append(now)
            self.last_failure_time = now
            # A failed HALF_OPEN probe immediately re-opens the circuit;
            # a CLOSED circuit opens only once the window threshold is breached.
            if self.state == "HALF_OPEN":
                self.state = "OPEN"
                logger.info("Circuit RE-OPENED: HALF_OPEN probe failed.")
            elif self.state == "CLOSED" and self._evaluate_window():
                self.state = "OPEN"
                logger.info("Circuit OPENED: failure threshold breached.")

    def execute(self, primary_call: Callable[[], Dict[str, Any]], payload: Dict[str, Any]) -> Dict[str, Any]:
        with self._lock:
            if self.state == "OPEN":
                if time.monotonic() - self.last_failure_time >= self.recovery_window:
                    self.state = "HALF_OPEN"
                else:
                    return self._apply_fallback(payload)

        try:
            # Transport-layer timeout enforcement prevents socket exhaustion
            response = primary_call()
            if self.state == "HALF_OPEN":
                self._reset()
            return response
        except requests.exceptions.Timeout:
            self._record_failure()
            return self._apply_fallback(payload)
        except requests.exceptions.RequestException as exc:
            logger.warning("Transport exception: %s", type(exc).__name__)
            self._record_failure()
            return self._apply_fallback(payload)

    def _reset(self) -> None:
        with self._lock:
            self.failures.clear()
            self.state = "CLOSED"
            self.last_failure_time = 0.0

    def _apply_fallback(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        # Strict NCPDP field preservation; zero PHI exposure in fallback envelope
        return {
            "AdjudicationStatus": "FALLBACK_ROUTED",
            "RejectCode": "92",
            "RejectMessage": "Primary adjudicator timeout - routed to secondary switch",
            "GroupID": payload.get("GroupID"),
            "BIN": payload.get("BIN"),
            "PharmacyNCPDPID": payload.get("PharmacyNCPDPID"),
            "OriginalTransactionRef": payload.get("TransactionRef")
        }

Pipeline Integration & Observability

Deploy the breaker as a middleware wrapper around your adjudication HTTP client. Configure failure_threshold and recovery_window based on historical latency percentiles (P95/P99) from your switch provider. Monitor circuit state transitions via structured metrics (e.g., Prometheus circuit_breaker_state gauge, circuit_breaker_failures_total counter). Alert when state=OPEN persists beyond 120 seconds, indicating sustained upstream degradation. For comprehensive routing topology validation and switch failover sequencing, consult the Fallback Routing Logic Design specification. Implement connection pooling limits (a requests.Session mounting an HTTPAdapter with bounded pool_connections and pool_maxsize) to prevent thread starvation during HALF_OPEN probe windows. Validate all fallback payloads against NCPDP D.0 schema requirements before forwarding to secondary switches.