Kafka Consumers API - Reading and Processing Events
Master the Kafka Consumer API: learn consumer groups, offset management, deserialization, and building scalable event processing applications.
Kafka Consumers API - Reading and Processing Events
Welcome to Part 4 of our Apache Kafka series! We’ve covered introduction, architecture, and producers. Now we turn to consumers - the applications that read and process events from Kafka topics.
Consumers are the exit point for data from Kafka. Understanding consumer groups, offset management, and consumption patterns is essential for building scalable, fault-tolerant event processing systems.
Consumer Fundamentals
A Kafka consumer reads events from topics and processes them. Key responsibilities include:
- Subscribing to topics
- Polling for new events
- Deserializing events from bytes
- Managing offsets for progress tracking
- Handling rebalancing in consumer groups
Basic Consumer Setup
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class BasicConsumer { public static void main(String[] args) { // Configure consumer Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-consumer-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); // Create consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // Subscribe to topic consumer.subscribe(Arrays.asList("user-events")); try { while (true) { // Poll for new events ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received: " + record.value() + " from partition " + record.partition() + " at offset " + record.offset()); } } } finally { consumer.close(); } } } Key Configuration Properties
Required Properties
bootstrap.servers: Broker addressesgroup.id: Consumer group identifierkey.deserializer: Key deserializer classvalue.deserializer: Value deserializer class
Important Optional Properties
# Offset management auto.offset.reset=earliest # Where to start if no offset exists enable.auto.commit=true # Auto-commit offsets auto.commit.interval.ms=5000 # Commit frequency # Polling behavior max.poll.records=500 # Max records per poll max.poll.interval.ms=300000 # Max time between polls fetch.min.bytes=1 # Min bytes to fetch fetch.max.wait.ms=500 # Max wait for min bytes # Session management session.timeout.ms=10000 # Session timeout heartbeat.interval.ms=3000 # Heartbeat frequency Consumer Groups and Parallelism
Consumer groups enable parallel processing of topics.
Group Concepts
- Group ID: Identifies the consumer group
- Partition Assignment: Each partition assigned to one consumer
- Rebalancing: Automatic redistribution when consumers join/leave
- Independent Processing: Different groups don’t interfere
Scaling Consumers
// Multiple consumers in same group // Start multiple instances of this code with same group.id // Kafka automatically distributes partitions Properties props = new Properties(); props.put("group.id", "user-processor-group"); // ... other configs KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("user-events")); Offset Management
Offsets track consumption progress. Managing them correctly is crucial for reliability.
Automatic Offset Commits
# Enable auto-commit enable.auto.commit=true auto.commit.interval.ms=1000 Manual Offset Commits
// Disable auto-commit props.put("enable.auto.commit", "false"); // Manual commit after processing try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // Process record processRecord(record); // Manual commit (synchronous) consumer.commitSync(); } } catch (Exception e) { // Handle error consumer.commitSync(); // Commit processed records } Asynchronous Commits
// Async commit with callback consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { log.error("Commit failed", exception); } } }); Deserialization
Consumers deserialize bytes back to objects, opposite of producers.
Built-in Deserializers
// String deserialization props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // JSON deserialization props.put("value.deserializer", "org.springframework.kafka.support.serializer.JsonDeserializer"); props.put("spring.json.value.default.type", "com.example.UserEvent"); Custom Deserialization
public class UserEventDeserializer implements Deserializer<UserEvent> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public UserEvent deserialize(String topic, byte[] data) { try { return objectMapper.readValue(data, UserEvent.class); } catch (Exception e) { throw new SerializationException("Error deserializing UserEvent", e); } } } Polling and Processing Patterns
Basic Polling Loop
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (!records.isEmpty()) { for (ConsumerRecord<String, String> record : records) { processRecord(record); } // Commit offsets consumer.commitAsync(); } } Batch Processing
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (!records.isEmpty()) { List<UserEvent> batch = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { batch.add(deserializeRecord(record)); } // Process batch processBatch(batch); // Commit after successful processing consumer.commitSync(); } } Rebalancing and Partition Assignment
Rebalancing occurs when consumers join or leave the group.
Handling Rebalancing
consumer.subscribe(Arrays.asList("user-events"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // Called before rebalancing // Commit offsets for revoked partitions consumer.commitSync(); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // Called after rebalancing // Seek to specific offsets if needed for (TopicPartition partition : partitions) { consumer.seek(partition, getOffsetForPartition(partition)); } } }); Manual Partition Assignment
// Assign specific partitions (not recommended for most use cases) List<TopicPartition> partitions = Arrays.asList( new TopicPartition("user-events", 0), new TopicPartition("user-events", 1) ); consumer.assign(partitions); Seeking and Offset Control
Control where consumption starts.
Seek to Beginning/End
// Seek to beginning of all assigned partitions consumer.seekToBeginning(consumer.assignment()); // Seek to end of all assigned partitions consumer.seekToEnd(consumer.assignment()); Seek to Specific Offset
// Seek to specific offset TopicPartition partition = new TopicPartition("user-events", 0); consumer.seek(partition, 1000L); // Start from offset 1000 Time-based Seeking
// Seek to events from 1 hour ago long oneHourAgo = System.currentTimeMillis() - (1000 * 60 * 60); Map<TopicPartition, Long> timestamps = new HashMap<>(); for (TopicPartition partition : consumer.assignment()) { timestamps.put(partition, oneHourAgo); } Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) { consumer.seek(entry.getKey(), entry.getValue().offset()); } Error Handling
Common Consumer Errors
TimeoutException: Broker not respondingInterruptException: Consumer interruptedSerializationException: Deserialization failureCommitFailedException: Offset commit failure
Robust Error Handling
while (!closed) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try { processRecord(record); } catch (Exception e) { log.error("Error processing record", e); // Handle processing error (retry, dead letter queue, etc.) } } consumer.commitAsync(); } catch (WakeupException e) { // Shutdown signal break; } catch (Exception e) { log.error("Consumer error", e); // Handle consumer error } } Exactly-Once Processing
Achieving exactly-once semantics with consumers.
Idempotent Processing
// Use record offset as idempotency key String idempotencyKey = record.topic() + "-" + record.partition() + "-" + record.offset(); if (!isProcessed(idempotencyKey)) { processRecord(record); markAsProcessed(idempotencyKey); } Transactional Processing
// Read from input topic, process, write to output topic while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); producer.beginTransaction(); try { for (ConsumerRecord<String, String> record : records) { String result = processRecord(record); producer.send(new ProducerRecord<>("processed-events", result)); } // Commit both consumer offset and producer transaction Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); } producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); } } Performance Optimization
Polling Configuration
# Optimize polling max.poll.records=1000 fetch.min.bytes=1024 fetch.max.wait.ms=500 Threading Models
Single Threaded
// Simple single-threaded consumer while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); processRecords(records); } Multi-threaded (Partition-based)
// One thread per partition Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsByPartition = new HashMap<>(); for (ConsumerRecord<String, String> record : records) { recordsByPartition.computeIfAbsent( new TopicPartition(record.topic(), record.partition()), k -> new ArrayList<>() ).add(record); } // Submit to thread pool for (List<ConsumerRecord<String, String>> partitionRecords : recordsByPartition.values()) { executor.submit(() -> processPartitionRecords(partitionRecords)); } Monitoring Consumer Metrics
Key Metrics to Monitor
- Consumer Lag: How far behind the latest offset
- Poll Rate: Records consumed per second
- Commit Rate: Offset commits per second
- Rebalance Frequency: How often rebalancing occurs
// Access consumer metrics MetricName lagMetric = new MetricName("records-lag", "consumer-fetch-manager-metrics"); double lag = consumer.metrics().get(lagMetric).value(); Best Practices
1. Proper Shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> { consumer.wakeup(); // Wake up from poll })); try { // Consumer loop } finally { consumer.close(Duration.ofSeconds(5)); } 2. Handle Rebalancing Gracefully
// Always commit before rebalancing consumer.subscribe(topics, new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { consumer.commitSync(); } }); 3. Monitor Consumer Health
// Check if consumer is healthy Set<TopicPartition> assigned = consumer.assignment(); if (assigned.isEmpty()) { log.warn("No partitions assigned to consumer"); } 4. Use Appropriate Poll Timeouts
// Don't poll too frequently Duration pollTimeout = Duration.ofMillis(100); ConsumerRecords<String, String> records = consumer.poll(pollTimeout); Python Consumer Example
from kafka import KafkaConsumer import json # Create consumer consumer = KafkaConsumer( 'user-events', bootstrap_servers=['localhost:9092'], group_id='user-processor-group', value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='earliest', enable_auto_commit=True, auto_commit_interval_ms=1000 ) # Consume messages for message in consumer: print(f"Received: {message.value} from partition {message.partition} at offset {message.offset}") Testing Consumers
Unit Testing
@Test public void testConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("test-topic")); ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); assertTrue(records.isEmpty()); // Assuming no messages } } What’s Next?
In this comprehensive guide to Kafka consumers, we’ve covered:
- Basic consumer setup and configuration
- Consumer groups and parallel processing
- Offset management strategies
- Deserialization and error handling
- Rebalancing and partition assignment
- Seeking and time-based consumption
- Exactly-once processing patterns
- Performance optimization and monitoring
- Best practices and testing
You should now be able to build robust Kafka consumer applications.
In Part 5, we’ll dive deep into topics and partitions - how to manage them, configure retention policies, and optimize for performance.
Additional Resources
- Kafka Consumer Documentation
- Confluent Consumer Guide
- Consumer Group Protocol
- Exactly-Once Processing
| *This is Part 4 of our comprehensive Apache Kafka series. Part 3: Producers API ← | Part 5: Topics and Partitions →* |