Circuit Breaker Pattern with Kafka: A Complete Guide - Part 2

>by Roman Tsyupryk
>

Circuit Breaker Pattern with Kafka: A Complete Guide

Part 2: Implementation and Real-World Scenarios

Series Navigation:


Implementation Overview

The most common implementation approach uses Resilience4j with Spring Kafka. The key insight is to listen for circuit breaker state transitions and control the Kafka consumer accordingly.

Core Concept

Circuit Breaker State Change
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  State Transition       β”‚
β”‚  Event Handler          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ CLOSED_TO_OPEN          │───▢ consumer.pause()
β”‚ HALF_OPEN_TO_OPEN       │───▢ consumer.pause()
β”‚ OPEN_TO_HALF_OPEN       │───▢ (keep paused or resume for test)
β”‚ HALF_OPEN_TO_CLOSED     │───▢ consumer.resume()
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation Approach 1: Resilience4j with Spring Kafka

This is the recommended approach for Java/Spring applications.

Step 1: Configure the Circuit Breaker

Define circuit breaker settings in your application configuration:

resilience4j:
  circuitbreaker:
    instances:
      externalApiBreaker:
        failure-rate-threshold: 50
        slow-call-rate-threshold: 80
        slow-call-duration-threshold: 2s
        sliding-window-type: COUNT_BASED
        sliding-window-size: 10
        minimum-number-of-calls: 5
        wait-duration-in-open-state: 30s
        permitted-number-of-calls-in-half-open-state: 3
        automatic-transition-from-open-to-half-open-enabled: true

Step 2: Create a Circuit Breaker Supervisor

The supervisor listens for state transitions and controls the consumer:

@Configuration
public class CircuitBreakerSupervisor {

    private final KafkaListenerEndpointRegistry registry;
    private final CircuitBreaker circuitBreaker;

    public CircuitBreakerSupervisor(
            KafkaListenerEndpointRegistry registry,
            CircuitBreakerRegistry circuitBreakerRegistry) {
        this.registry = registry;
        this.circuitBreaker = circuitBreakerRegistry
            .circuitBreaker("externalApiBreaker");

        setupStateTransitionHandler();
    }

    private void setupStateTransitionHandler() {
        circuitBreaker.getEventPublisher()
            .onStateTransition(this::handleStateChange);
    }

    private void handleStateChange(CircuitBreakerOnStateTransitionEvent event) {
        StateTransition transition = event.getStateTransition();

        switch (transition) {
            case CLOSED_TO_OPEN:
            case CLOSED_TO_FORCED_OPEN:
            case HALF_OPEN_TO_OPEN:
                pauseConsumer();
                break;
            case OPEN_TO_HALF_OPEN:
            case HALF_OPEN_TO_CLOSED:
            case FORCED_OPEN_TO_CLOSED:
            case FORCED_OPEN_TO_HALF_OPEN:
                resumeConsumer();
                break;
        }
    }

    private void pauseConsumer() {
        MessageListenerContainer container =
            registry.getListenerContainer("myListener");
        if (container != null && container.isRunning()) {
            container.pause();
            log.info("Kafka consumer paused - circuit breaker OPEN");
        }
    }

    private void resumeConsumer() {
        MessageListenerContainer container =
            registry.getListenerContainer("myListener");
        if (container != null && container.isPaused()) {
            container.resume();
            log.info("Kafka consumer resumed - circuit breaker CLOSED");
        }
    }
}

Step 3: Wrap API Calls with Circuit Breaker

@Service
public class OrderProcessor {

    private final CircuitBreaker circuitBreaker;
    private final ExternalApiClient apiClient;

    public void processOrder(Order order) {
        // Circuit breaker wraps the external call
        Supplier<Response> decoratedCall = CircuitBreaker
            .decorateSupplier(circuitBreaker,
                () -> apiClient.submitOrder(order));

        try {
            Response response = decoratedCall.get();
            // Process successful response
        } catch (CallNotPermittedException e) {
            // Circuit is OPEN - request was not made
            throw new ServiceUnavailableException(
                "External service unavailable");
        }
    }
}

Step 4: Configure Kafka Consumer

@KafkaListener(
    id = "myListener",
    topics = "orders",
    containerFactory = "kafkaListenerContainerFactory"
)
public void consume(ConsumerRecord<String, Order> record,
                    Acknowledgment ack) {
    try {
        orderProcessor.processOrder(record.value());
        ack.acknowledge();  // Only commit on success
    } catch (ServiceUnavailableException e) {
        // Don't acknowledge - message will be reprocessed
        ack.nack(Duration.ZERO);
    }
}

Implementation Approach 2: Language-Agnostic Pseudocode

For other languages or custom implementations:

CONFIGURATION:
  FAILURE_THRESHOLD = 5
  FAILURE_RATE_THRESHOLD = 50%
  WAIT_DURATION = 30 seconds
  TEST_CALLS = 3

STATE:
  circuit_state = CLOSED
  failure_count = 0
  success_count = 0
  last_failure_time = null
  window = []

FUNCTION on_message_received(message):
    IF circuit_state == OPEN:
        IF current_time - last_failure_time > WAIT_DURATION:
            circuit_state = HALF_OPEN
            success_count = 0
            log("Circuit entering HALF-OPEN state")
        ELSE:
            RETURN  // Message stays in Kafka

    TRY:
        result = call_external_api(message.payload)

        IF circuit_state == HALF_OPEN:
            success_count++
            IF success_count >= TEST_CALLS:
                circuit_state = CLOSED
                kafka_consumer.resume()
                reset_counters()
                log("Circuit CLOSED - service recovered")

        commit_offset(message)

    CATCH error:
        record_failure()

        IF should_open_circuit():
            circuit_state = OPEN
            last_failure_time = current_time
            kafka_consumer.pause()
            log("Circuit OPEN - pausing consumer")

        // DO NOT commit offset

FUNCTION record_failure():
    window.append(current_time)
    // Remove old entries outside sliding window
    window = window.filter(t => current_time - t < WINDOW_SIZE)
    failure_count = window.length

FUNCTION should_open_circuit():
    IF window.length < MIN_CALLS:
        RETURN false
    failure_rate = failure_count / total_calls_in_window
    RETURN failure_rate >= FAILURE_RATE_THRESHOLD

Implementation Approach 3: Node.js with opossum

const CircuitBreaker = require('opossum')
const { Kafka } = require('kafkajs')

const kafka = new Kafka({ brokers: ['localhost:9092'] })
const consumer = kafka.consumer({ groupId: 'order-processor' })

// Configure circuit breaker
const breakerOptions = {
    timeout: 3000,
    errorThresholdPercentage: 50,
    resetTimeout: 30000,
    volumeThreshold: 5
}

const apiBreaker = new CircuitBreaker(callExternalApi, breakerOptions)

// Handle state changes
apiBreaker.on('open', async () => {
    console.log('Circuit OPEN - pausing Kafka consumer')
    await consumer.pause([{ topic: 'orders' }])
})

apiBreaker.on('halfOpen', () => {
    console.log('Circuit HALF-OPEN - testing recovery')
})

apiBreaker.on('close', async () => {
    console.log('Circuit CLOSED - resuming Kafka consumer')
    await consumer.resume([{ topic: 'orders' }])
})

// Consumer setup
await consumer.connect()
await consumer.subscribe({ topic: 'orders' })

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        try {
            await apiBreaker.fire(JSON.parse(message.value))
            // Offset auto-committed on success
        } catch (error) {
            if (error.message === 'Breaker is open') {
                // Circuit is open, message will be reprocessed
                throw error
            }
            // Other errors - also don't commit
            throw error
        }
    }
})

async function callExternalApi(payload) {
    const response = await fetch('https://api.example.com/orders', {
        method: 'POST',
        body: JSON.stringify(payload)
    })
    if (!response.ok) throw new Error(`API error: ${response.status}`)
    return response.json()
}

Real-World Scenario 1: Database Outage

Your order processing service consumes from Kafka and writes to a database. The database becomes unavailable.

The Flow

Timeline:
─────────────────────────────────────────────────────────▢

T0: Database healthy, circuit CLOSED
    └── Messages processing normally

T1: Database connection fails
    └── Circuit records failure (1/5)

T2-T4: More failures
    └── Circuit records failures (5/5)

T5: Threshold exceeded
    └── Circuit OPENS
    └── Consumer PAUSES
    └── Messages accumulate in Kafka

T5-T35: Wait duration (30 seconds)
    └── No processing
    └── Database team fixes issue

T35: Wait expires
    └── Circuit enters HALF-OPEN
    └── Test write to database

T36: Test succeeds
    └── Circuit CLOSES
    └── Consumer RESUMES

T37+: Backlog processing
    └── All accumulated messages processed in order

Key Points

  • Messages are never lost
  • Order is preserved
  • Recovery is automatic
  • No manual replay needed

Real-World Scenario 2: External API Rate Limiting

Your service calls a third-party API that has rate limits. When you exceed the limit, you receive 429 responses.

The Challenge

Rate limiting is different from outages:

  • The service isn't "down" - it's protecting itself
  • You need to wait for the rate limit window to reset
  • Hammering it with retries makes things worse

The Solution

Configure the circuit breaker to trip on 429 responses:

CircuitBreakerConfig config = CircuitBreakerConfig.custom()
    .recordExceptions(RateLimitExceededException.class)
    .ignoreExceptions(ValidationException.class)
    .waitDurationInOpenState(Duration.ofSeconds(60)) // Match rate limit window
    .build();

The Flow

1. Consumer processes messages, calling API
2. API returns 429 (rate limit exceeded)
3. Circuit breaker records as failure
4. After threshold, circuit OPENS
5. Consumer PAUSES for 60 seconds (rate limit window)
6. Circuit enters HALF-OPEN
7. Test call succeeds (rate limit reset)
8. Resume processing at sustainable rate

Real-World Scenario 3: Multiple Downstream Services

Your consumer calls three different APIs: Payment, Inventory, and Shipping. Only the Payment API is failing.

The Problem

With a single circuit breaker:

  • Payment API fails β†’ Circuit opens
  • ALL processing stops
  • Inventory and Shipping calls stop too (even though they're healthy)

Solution: Separate Circuit Breakers

@Service
public class OrderProcessor {

    private final CircuitBreaker paymentBreaker;
    private final CircuitBreaker inventoryBreaker;
    private final CircuitBreaker shippingBreaker;

    public OrderResult processOrder(Order order) {
        // Each call has its own circuit breaker
        PaymentResult payment = paymentBreaker
            .executeSupplier(() -> paymentApi.charge(order));

        InventoryResult inventory = inventoryBreaker
            .executeSupplier(() -> inventoryApi.reserve(order));

        ShippingResult shipping = shippingBreaker
            .executeSupplier(() -> shippingApi.schedule(order));

        return new OrderResult(payment, inventory, shipping);
    }
}

Consumer Pause Strategy

Now you have a choice:

Option A: Pause on ANY circuit open

// Conservative - stops processing if any downstream fails
if (paymentBreaker.isOpen() ||
    inventoryBreaker.isOpen() ||
    shippingBreaker.isOpen()) {
    consumer.pause();
}

Option B: Pause only on critical circuits

// Only pause for payment failures (critical path)
if (paymentBreaker.isOpen()) {
    consumer.pause();
}
// Inventory and shipping can use fallbacks

Option C: Separate topics for independent processing

orders-topic ──▢ order-consumer ──▢ payment-api (critical)

inventory-topic ──▢ inventory-consumer ──▢ inventory-api

shipping-topic ──▢ shipping-consumer ──▢ shipping-api

Real-World Scenario 4: Handling Already-Polled Messages

When you pause a Kafka consumer, it doesn't stop immediately. Messages already fetched (in the local buffer) continue processing.

The Problem

1. Consumer polls batch of 500 messages
2. Starts processing message 1
3. Message 1 fails, circuit OPENS
4. consumer.pause() called
5. But messages 2-500 are already in memory!
6. They continue processing and failing

Solution: Use nack() to Reject Remaining Messages

@KafkaListener(id = "myListener", topics = "orders")
public void consume(ConsumerRecord<String, Order> record,
                    Acknowledgment ack) {
    // Check circuit state BEFORE processing
    if (circuitBreaker.getState() == State.OPEN) {
        // Reject this message - it will be reprocessed later
        ack.nack(Duration.ZERO);
        return;
    }

    try {
        orderProcessor.processOrder(record.value());
        ack.acknowledge();
    } catch (CallNotPermittedException e) {
        // Circuit just opened during our processing
        ack.nack(Duration.ZERO);
    } catch (ServiceException e) {
        // Downstream failure
        ack.nack(Duration.ZERO);
    }
}

Alternative: Reduce Batch Size

Smaller batches mean fewer "stranded" messages:

spring:
  kafka:
    consumer:
      max-poll-records: 10  # Instead of default 500

Trade-off: Lower throughput during normal operation.


Real-World Scenario 5: Preventing Retry Storms

After an outage, when the circuit closes, all consumers resume simultaneously. This can overwhelm the recovering service.

The Thundering Herd Problem

T0: Service recovers
T1: 50 consumers detect recovery (HALF-OPEN β†’ CLOSED)
T2: 50 consumers resume simultaneously
T3: Each has 10,000 message backlog
T4: 500,000 messages flood the recovering service
T5: Service crashes again
T6: Circuits reopen

Solutions

1. Staggered Resume

Add random jitter to resume timing:

private void resumeConsumer() {
    // Random delay between 0-5 seconds
    int delayMs = random.nextInt(5000);
    scheduler.schedule(() -> {
        container.resume();
    }, delayMs, TimeUnit.MILLISECONDS);
}

2. Rate Limiting After Resume

Limit processing rate during recovery:

private final RateLimiter rateLimiter =
    RateLimiter.create(100); // 100 messages per second

public void consume(Order order) {
    rateLimiter.acquire(); // Blocks if rate exceeded
    orderProcessor.processOrder(order);
}

3. Gradual Backlog Processing

Track circuit recovery time and gradually increase rate:

private Instant circuitClosedAt;

public void onCircuitClosed() {
    circuitClosedAt = Instant.now();
}

public int getCurrentRateLimit() {
    Duration sinceRecovery = Duration.between(circuitClosedAt, Instant.now());
    if (sinceRecovery.toMinutes() < 1) return 10;   // First minute: 10/sec
    if (sinceRecovery.toMinutes() < 5) return 50;   // Next 4 minutes: 50/sec
    return 200; // Normal rate
}

Offset Commit Strategy

The offset commit strategy is critical. Never commit offsets for failed messages.

Correct Pattern

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Receive │──▢│ Process │──▢│ API OK  │──▢│ COMMIT  β”‚
β”‚ Message β”‚   β”‚         β”‚   β”‚         β”‚   β”‚ OFFSET  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚
     β”‚ If API fails at any point:
     β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            DO NOT COMMIT OFFSET                     β”‚
β”‚            Message will be reprocessed              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Configuration

spring:
  kafka:
    consumer:
      enable-auto-commit: false
    listener:
      ack-mode: MANUAL_IMMEDIATE

What's Next

In Part 3, we'll explore:

  • Edge cases and failure modes
  • Consumer group rebalancing challenges
  • Distributed circuit breaker state
  • Alternative approaches (DLQ, retry topics, bulkhead)
  • When NOT to use this pattern

Share this post: