Kafka Producer & Consumer Deep Dive: acks, Retries, Batch Processing in Spring Boot
Master Kafka producer ack modes, AckMode.
- Producer
acks=all+min.insync.replicas=2gives the strongest durability guarantee — use for financial events AckMode.MANUAL_IMMEDIATEcommits offsets only after your business logic succeeds, preventing data loss on restart@RetryableTopic(attempts=3, backoff=@Backoff(delay=1000, multiplier=2))auto-configures retry topics and DLT routingRecordFilterStrategyallows declarative message filtering before your listener method is invoked- Batch consumers with
Listincrease throughput 10-50x by amortizing per-record overhead>
Think of Kafka ack modes like a certified letter vs. dropping a note in a mailbox. With acks=0, you just throw the letter in — no confirmation. With acks=1, the post office stamps 'received' but might lose it in a back-office fire. With acks=all, every post office branch confirms they have a copy before you leave. Retry topics are like a special tray for letters that couldn't be delivered — they wait their turn and get another attempt automatically.
The first Kafka integration you build usually works great in the demo. The producer sends, the consumer receives, everything looks smooth. Then your system hits its first real incident: the broker has a leadership election mid-deployment, your consumer receives a malformed message from a third-party service, a downstream database goes down for 45 seconds, and suddenly you realize your Kafka setup has no durability guarantees, no retry logic, and no way to isolate bad messages from good ones.
Producer acknowledgment modes are the first line of defense. acks=0 is performance theater — you're broadcasting messages into the void and hoping for the best. acks=1 is a false comfort — the leader acknowledges but hasn't replicated, so a leader failover between write and replication loses your data permanently. acks=all with min.insync.replicas=2 is the production standard: the message is durable on multiple brokers before the producer considers the send complete.
On the consumer side, offset management is where most production bugs live. Auto-commit increments the offset on a timer, not after successful processing — so a consumer crash mid-processing auto-commits the offset, and those messages are gone forever. AckMode.MANUAL_IMMEDIATE gives you exact control: commit only when your database write succeeds.
Retry logic with @RetryableTopic addresses the class of transient failures — downstream database timeouts, external API blips, temporary lock contention — where the message is valid but the processing environment wasn't ready. Spring Kafka creates retry topics automatically, routes failed messages through configurable backoff periods, and finally sends to a dead-letter topic if all attempts fail. No manual retry infrastructure required.
Batch consumers unlock an order-of-magnitude throughput improvement for high-volume processing. Instead of one database call per message, you receive List<ConsumerRecord>, process 500 records at once, and bulk-insert in a single transaction. The operational complexity increases, but so does your system's ability to keep up under load.
Producer Acknowledgment Modes: acks=0, acks=1, acks=all
The acks producer configuration is the most impactful durability setting in Kafka. It determines how many broker replicas must confirm message receipt before the KafkaProducer.send() future completes. Each value represents a distinct point on the latency-durability trade-off spectrum.
acks=0 (fire-and-forget): The producer sends the message and immediately considers it delivered, without waiting for any broker response. No acknowledgment packet traverses the network for this message. This maximizes throughput and minimizes latency but provides zero durability — if the broker is unavailable or rejects the message, the producer is unaware. Use cases are narrow: high-volume metrics, IoT sensor readings, or application logs where occasional loss is acceptable and the monitoring overhead of tracking every message exceeds the business value of any individual one.
acks=1 (leader-only): The partition leader writes the message to its local log and acknowledges the producer. This is faster than acks=all and provides protection against temporary broker unavailability. However, there is a critical gap: if the leader fails between writing the message and replicating it to followers, the message is lost permanently. The new elected leader (a follower that never received the message) has no record of it, and the producer already received an ACK. This failure scenario is not hypothetical — it occurs during every broker restart and leadership election in a rolling update.
acks=all (or acks=-1): The leader writes the message and waits for all in-sync replicas (ISRs) to confirm they have written it to their logs before acknowledging the producer. The ISR list is maintained by the broker — replicas that fall behind (haven't fetched from the leader within replica.lag.time.max.ms) are removed from the ISR. Setting min.insync.replicas=2 on the topic ensures that even if replicas lag out, at least 2 (leader + 1 follower) must confirm the write. If fewer than min.insync.replicas are in sync, the broker rejects the write with NotEnoughReplicasException — the producer should retry until replicas catch up.
In Spring Boot, set spring.kafka.producer.acks=all and configure min.insync.replicas on your NewTopic bean. Pair with enable.idempotence=true to get duplicate-free retries — the broker assigns a producer ID and sequence number to each message, deduplicating retransmissions that occur during retry after an ambiguous failure.
min.insync.replicas=1 (the default), acks=all is equivalent to acks=1 — only the leader needs to be in the ISR. Always set min.insync.replicas=2 on critical topics so that at least one follower must confirm the write, providing actual durability against leader failure.enable.idempotence=true) for all production producers, not just those requiring exactly-once semantics. It prevents duplicate messages caused by producer retries after ambiguous network failures at zero additional operational cost.AckMode.MANUAL_IMMEDIATE and Offset Management
Spring Kafka's AckMode controls when consumed message offsets are committed back to the Kafka broker. The default AckMode.BATCH commits offsets after each poll loop completes — after the listener method returns for all records in the batch, regardless of whether processing succeeded. This is convenient but dangerous: if your database write fails inside the listener after the poll completes, the offset is already committed and the message is silently lost.
AckMode.MANUAL_IMMEDIATE transfers offset commit control entirely to your code. The listener container injects an Acknowledgment object into your listener method. Calling commits the current record's offset to the broker immediately. If you do not call ack.acknowledge() — because an exception was thrown before reaching that line — the offset stays uncommitted and the message is redelivered on the next poll.acknowledge()
The `IMMEDIATE` suffix is important: AckMode.MANUAL batches acknowledgments and flushes them at the end of the poll loop, while MANUAL_IMMEDIATE commits the offset synchronously as soon as is called. For high-reliability consumers where you need to know that each record's offset is committed before proceeding to the next, use acknowledge()MANUAL_IMMEDIATE.
Pairing MANUAL_IMMEDIATE with error handling requires careful design. A naive implementation that calls inside a try-catch that also catches the failure will acknowledge even on error — defeating the purpose. The correct pattern: call ack.acknowledge() only in the success path. Configure acknowledge()SeekToCurrentErrorHandler on the container so that when an exception escapes the listener, the container seeks the consumer back to the failed record's offset, ensuring it is redelivered rather than skipped.
SeekToCurrentErrorHandler (renamed DefaultErrorHandler in Spring Kafka 2.8+) accepts a BackOff configuration for retry timing. With ExponentialBackOffWithMaxRetries(5), the handler retries the failed record 5 times with exponential backoff before routing to the dead-letter topic via DeadLetterPublishingRecoverer. This gives transient failures (DB timeout, external API blip) multiple chances to succeed before the message is quarantined.
ack.acknowledge() in a finally block ensures the offset is committed even when an exception is thrown — silently dropping failed messages. The DefaultErrorHandler's seek-back mechanism only works when exceptions propagate out of the listener method. Swallowing exceptions and acknowledging in finally is the most common cause of silent message loss in Spring Kafka applications.AckMode.MANUAL_IMMEDIATE with a database unique constraint on the message's correlation ID. On duplicate delivery, the insert fails with a constraint violation — add this to addNotRetryableExceptions so the offset advances and the duplicate is discarded cleanly.@RetryableTopic: Declarative Retry with Exponential Backoff
Before @RetryableTopic, implementing retry logic for Kafka consumers required building custom retry infrastructure: separate retry topics, a scheduler to re-enqueue messages after a delay, a DLQ writer, and manual tracking of retry counts. Spring Kafka 2.7+ collapses all of this into a single annotation.
@RetryableTopic(attempts = 3, backoff = @Backoff(delay = 1000, multiplier = 2)) on a @KafkaListener method automatically creates two retry topics (orders-retry-0, orders-retry-1) and one dead-letter topic (orders-dlt). When the listener method throws a retryable exception, the framework publishes the message to orders-retry-0 with headers recording the original topic, the attempt number, and the timestamp after which the message should be reprocessed. A background listener on the retry topic waits until the delay has elapsed, then re-delivers the message to the main listener. After the configured number of attempts, the message routes to the DLT.
The retry delay strategy is configurable. @Backoff(delay=1000, multiplier=2, maxDelay=30000) implements exponential backoff: attempt 1 waits 1s, attempt 2 waits 2s, attempt 3 waits 4s, capped at 30s. This gives transient failures (DB connection timeouts typically last 1-5s) time to resolve while not holding retry topics open indefinitely.
Exception classification is granular. notRetryOn = {DataIntegrityViolationException.class, DeserializationException.class} sends these exceptions directly to the DLT without retrying — they will fail on every attempt, so retrying wastes resources and delays DLT processing. include = {OptimisticLockingFailureException.class} restricts retrying to only the listed exceptions, sending everything else to DLT.
dltStrategy = DltStrategy.FAIL_ON_ERROR (the default) routes to DLT when retries are exhausted. dltStrategy = DltStrategy.NO_DLT drops the message after exhaustion — dangerous, but useful for truly ephemeral events. The @DltHandler annotation on a method in the same class receives DLT messages for specialized handling: logging, alerting, inserting into a database for manual review, or triggering a compensation event.
By default, @RetryableTopic uses the same consumer group for retry and main topic consumption. This means retry processing uses the same concurrency settings as main topic processing. For high-volume retry scenarios, configure sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC to consolidate retries or override the container factory for retry topics separately.
autoCreateTopics=true, Spring Kafka creates orders-retry-0, orders-retry-1, orders-retry-2, and orders-dlt in your broker. With 10 main topics and 3 retry attempts, you have 40 additional topics. Include these in your topic naming conventions, monitoring, and retention policies. DLT topics accumulate messages indefinitely — always monitor their depth and set retention.@Header(KafkaHeaders.RECEIVED_TOPIC) String topic in your listener to detect whether you're processing a retry. Log retry attempts at WARN level with the original exception headers — this creates an automatic audit trail of which messages needed retries, invaluable for post-incident analysis.RecordFilterStrategy: Filtering Messages Before Processing
Not every message in a topic is meant for every consumer. A RecordFilterStrategy allows you to declaratively exclude messages that match a filter condition, without the consumer ever invoking the listener method. The filter runs after deserialization but before listener invocation — the consumer polls the record, deserializes it, evaluates the filter, and if the filter returns true (filtered), the record is acknowledged and the listener is skipped.
Common use cases: filtering by message type header (one topic carries multiple event types, but each consumer only handles one), filtering by tenant ID in a multi-tenant system, or excluding test messages (produced by integration tests) from production consumers.
The filter function receives the deserialized ConsumerRecord<K, V>. You have full access to the record's key, value, headers, topic, partition, and offset. The return value is a boolean: true means discard (filter out), false means pass through to the listener.
When a record is filtered, its offset still needs to be committed so the consumer doesn't re-read it endlessly. Set factory.setAckDiscarded(true) to auto-acknowledge filtered records. Without this, filtered records accumulate and the consumer's committed offset falls behind the end offset, making lag monitoring misleading — you'll see apparent lag from filtered records that will never be processed.
For batch listeners, BatchRecordFilterStrategy filters at the batch level. The default behavior is to filter individual records within the batch and deliver only the non-filtered subset to the listener. If all records in a batch are filtered, the listener is not called at all. Configure factory.setBatchToRecordFilterStrategy(strategy) for fine-grained control.
setAckDiscarded(true), filtered records are never acknowledged. The consumer's committed offset remains behind the filtered records, creating phantom lag that your monitoring dashboards will report. Always set ackDiscarded=true when using any RecordFilterStrategy.Batch Consumers with List
Individual record processing — one listener invocation per Kafka message — is clean and simple but inefficient at scale. For each record, you pay the overhead of listener dispatch, Spring proxy invocation, and crucially, one database round-trip. At 10,000 messages per second, that's 10,000 individual INSERT statements per second — far more than most databases sustain comfortably.
Batch consumers receive a List<ConsumerRecord<K, V>> (or List<V> for value-only access) representing all records returned by a single KafkaConsumer.poll() call. Instead of 500 individual database calls, you execute one bulk INSERT with 500 rows — a 50-100x reduction in database round-trips. Spring Boot's spring.kafka.listener.type=batch enables batch mode globally, or you can configure it per-factory.
Batch size is controlled by max.poll.records (how many records per poll, default 500) and the consumer's fetch.min.bytes / fetch.max.wait.ms settings which control how full the fetch buffer must be before returning. Setting max.poll.records=1000 with fetch.min.bytes=1048576 (1MB) creates large, efficient batches at the cost of slightly increased latency.
Error handling in batch mode is more complex than single-record mode. If the listener throws, the entire batch is the unit of retry — all records in the batch will be redelivered. This makes idempotent processing essential. The BatchToRecordAdapter bridges batch delivery with single-record error handling, allowing DefaultErrorHandler (which operates at the record level) to function within a batch context.
For partial batch commits — acknowledging records 0-249 but retrying records 250-499 — use Acknowledgment.nack(int index, Duration duration). This seeks the consumer back to record at index within the batch, ensuring records after the failure point are redelivered while records before it are committed. This is the correct approach when batch processing involves heterogeneous records that may fail independently.
Batch listeners integrate naturally with Spring's @Transactional — wrap the entire batch in one database transaction for atomic commit/rollback. This is significantly more efficient than one transaction per record. Be aware of transaction size limits and timeout settings when batches are very large.
ack.nack(index)), the entire batch is retried from the beginning. Records already successfully processed will be reprocessed. Always implement idempotent processing (unique constraint, Redis SETNX, or database upsert) before using batch consumers in production.max-poll-records vs actual batch sizes in production using @Header(KafkaHeaders.BATCH_CONVERTED_MESSAGES) or a ConsumerAwareMessageListener. If actual batch sizes are consistently much smaller than max.poll.records, the topic has insufficient throughput to fill batches — consider reducing max.poll.records to avoid holding the poll lock longer than necessary.SeekToCurrentErrorHandler and DefaultErrorHandler Patterns
The error handler sits between the listener container and your listener method. When an unhandled exception escapes your @KafkaListener method, the error handler decides what to do next: retry immediately, seek back to retry after backoff, skip the record, or route to a dead-letter topic.
Pre-2.8 Spring Kafka used SeekToCurrentErrorHandler. From 2.8+, DefaultErrorHandler is the unified replacement that handles both single-record and batch consumers. The behavior is identical: on exception, the container seeks the consumer back to the failed record's offset so the next poll re-reads it. Without this seek, the container would advance past the failed record (as if it were processed), losing the message.
The DefaultErrorHandler constructor accepts a BackOff and a ConsumerRecordRecoverer. The BackOff determines retry timing. FixedBackOff(1000L, 3) retries 3 times with 1-second delays between each. ExponentialBackOffWithMaxRetries(5) retries 5 times with exponential delays. After exhausting retries, the ConsumerRecordRecoverer handles the failed record. DeadLetterPublishingRecoverer publishes the failed record to a .DLT topic, including headers identifying the original topic, partition, offset, and exception details.
Exception classification controls which exceptions trigger retries and which go directly to the DLT. errorHandler.addNotRetryableExceptions(DeserializationException.class) ensures that malformed messages (which will always fail deserialization) bypass retries and go straight to the DLT. errorHandler.addRetryableExceptions(DataAccessException.class) forces retry for transient database exceptions. Without classification, all exceptions retry, wasting time on failures that are guaranteed to repeat.
The ExceptionClassifier interface provides fine-grained control, allowing a lambda or class to inspect the exception (including its cause chain) and return whether it should be retried. This handles wrapped exceptions where the root cause is what matters, not the wrapper type thrown by the listener.
addNotRetryableExceptions(DeserializationException.class), a single malformed message will retry until all attempts are exhausted, blocking the partition for seconds or minutes. Always add DeserializationException to the non-retryable list.errorHandler.setRetryListeners() — these log entries are invaluable during incidents to distinguish between 'this message failed once due to a DB timeout' (normal, resolved) and 'this message has failed 50 times' (poison message requiring investigation).Silent Message Loss During Broker Leader Election
acks=1. During the broker restart, 3,200 messages were written to the leader and acknowledged to the producer. Before those messages could be replicated to followers, the leader went offline for patching. Kafka elected a follower (which didn't have the messages) as the new leader. The messages never existed on any surviving broker — they were acknowledged to the producer and then lost permanently when the old leader's unsynced data was discarded.acks=all and set min.insync.replicas=2 on all payment-related topics. Also enable retries=Integer.MAX_VALUE and enable.idempotence=true on the producer to handle transient broker errors during leadership transitions without duplicates. Implement an outbox pattern for payment events so database and Kafka writes are atomic.- acks=1 provides a false sense of durability.
- The acknowledgment means the leader received the message, not that the message is safe.
- For any event representing a financial transaction or state change with real-world consequences, acks=all is non-negotiable.
AckMode.MANUAL_IMMEDIATE, the offset only advances when ack.acknowledge() is called explicitly. Check that your error handling path also acknowledges (or that a SeekToCurrentErrorHandler is configured to seek back on failure). If an exception propagates past ack.acknowledge(), the offset is committed. If the exception is thrown before it, the offset stays — causing the message to be redelivered on the next poll. Review all exception paths in your listener method to ensure exactly one code path acknowledges.@EnableKafka is on your configuration class and that @RetryableTopic is on the @KafkaListener method, not the class. Confirm the retry topic names exist in the broker — Spring Boot creates them on startup, but if auto.create.topics.enable=false on the broker, you must create them via NewTopic beans or the admin CLI. Also check that your error is not excluded by notRetryOn — by default, @RetryableTopic retries all exceptions; subclasses of java.lang.Error and explicitly excluded exceptions go straight to DLT.retries is set to a high value (Integer.MAX_VALUE) and retry.backoff.ms=300. With enable.idempotence=true, the producer assigns each message a sequence number and the broker deduplicates retries — enabling safe retry without producing duplicates. Add delivery.timeout.ms=120000 to allow up to 2 minutes for a message to be acknowledged, covering most election durations.BatchToRecordAdapter to have Spring Kafka call your method once per record within the batch, making partial batch failures tractable. For partial commits within a batch, use Acknowledgment.nack(int index, Duration duration) to seek to a specific offset within the batch.ConcurrentKafkaListenerContainerFactory, not the @KafkaListener annotation. Check factory.setRecordFilterStrategy() is called before the factory is used to create containers. If using FilteringBatchMessageListenerAdapter, ensure the adapter wraps the correct listener. Also confirm that factory.setAckDiscarded(true) is set if you want filtered (discarded) records to have their offsets committed automatically — without this, filtered records remain uncommitted and are redelivered.kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic payments --partition 0 --offset earliest --max-messages 50kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic payments --time -1Key takeaways
Common mistakes to avoid
7 patternsUsing acks=1 for payment or order events
acks=all and min.insync.replicas=2 for all topics where message loss has business impact. Enable enable.idempotence=true to prevent duplicate messages from producer retries.Calling ack.acknowledge() inside a finally block or catch block
ack.acknowledge() only in the happy path after all business logic succeeds. Let exceptions propagate to the error handler, which handles retry and DLT routing. Never acknowledge in exception handling paths.Not adding DeserializationException to non-retryable exceptions
errorHandler.addNotRetryableExceptions(DeserializationException.class) to send malformed messages directly to the DLT without retrying. A message that can't be deserialized will never succeed regardless of how many times you retry it.Setting max.poll.records very high without adjusting max.poll.interval.ms
max.poll.records=1000 and each record takes 10ms to process, you need at least 10 seconds to process one poll. Set max.poll.interval.ms higher than (max.poll.records × max processing time per record). Alternatively, reduce max.poll.records until the math works with your current max.poll.interval.ms.Using @RetryableTopic without monitoring DLT depth
kafka.consumer.fetch-manager.records-lag for the DLT consumer group, or a custom KafkaLagMonitor). Alert when DLT depth exceeds a threshold. Implement a @DltHandler method that sends an alert to PagerDuty or Slack immediately when a message hits the DLT.Batch consumer without idempotent processing
INSERT ... ON CONFLICT DO NOTHING (PostgreSQL) or REPLACE INTO (MySQL) for upsert semantics. The constraint prevents duplicates even when the batch is fully reprocessed.Using the same container factory for main topic and retry topic listeners with different concurrency needs
@RetryableTopic's retryTopicSuffix and create a @KafkaListener on the retry topic with a dedicated factory that has concurrency=1 — retry processing rarely needs high parallelism.Interview Questions on This Topic
What is the difference between acks=1 and acks=all, and when would you choose each?
Frequently Asked Questions
That's Messaging. Mark it forged?
10 min read · try the examples if you haven't