Post

Kafka Producers API - Publishing Events to Kafka

Master the Kafka Producer API: learn to publish events, handle serialization, configure reliability, and implement exactly-once semantics.

Kafka Producers API - Publishing Events to Kafka

Welcome to Part 3 of our Apache Kafka series! In Part 1, we introduced Kafka, and in Part 2, we explored the architecture. Now it’s time to get hands-on with producers - the applications that publish events to Kafka.

Producers are the entry point for data into Kafka. Understanding how to configure and use producers effectively is crucial for building reliable event-driven systems. This post covers everything from basic publishing to advanced features like idempotence and transactions.

Producer Fundamentals

A Kafka producer is responsible for:

  • Serializing events into bytes
  • Partitioning events across topic partitions
  • Batching events for efficiency
  • Handling retries and errors
  • Ensuring delivery guarantees

Basic Producer Setup

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class BasicProducer { public static void main(String[] args) { // Configure producer Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Create producer KafkaProducer<String, String> producer = new KafkaProducer<>(props); // Create and send record ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "user123", "User logged in"); producer.send(record); producer.close(); } }

Key Configuration Properties

Required Properties

  • bootstrap.servers: List of broker addresses
  • key.serializer: Serializer class for keys
  • value.serializer: Serializer class for values

Important Optional Properties

# Reliability settings acks=all # Wait for all replicas retries=3 # Retry failed sends retry.backoff.ms=100 # Backoff between retries # Performance settings batch.size=16384 # Batch size in bytes linger.ms=5 # Wait time for batching buffer.memory=33554432 # Total buffer memory # Compression compression.type=gzip # Compress batches

Serialization

Kafka requires events to be serialized to bytes. You can use built-in serializers or create custom ones.

Built-in Serializers

// String serialization props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Integer serialization props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); // Byte array (for custom serialization) props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

Custom Serialization

public class UserEventSerializer implements Serializer<UserEvent> { @Override public byte[] serialize(String topic, UserEvent data) { try { return objectMapper.writeValueAsBytes(data); } catch (Exception e) { throw new SerializationException("Error serializing UserEvent", e); } } } // Usage props.put("value.serializer", "com.example.UserEventSerializer");

Partitioning Strategies

Producers determine which partition an event goes to. This affects ordering and load distribution.

Default Partitioning (Round-Robin)

// No key specified - round-robin distribution ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "User logged in");

Key-Based Partitioning

// Events with same key go to same partition (ordered) ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "user123", "User logged in");

Custom Partitioning

public class CustomPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // Custom logic based on key or value return Math.abs(key.hashCode()) % cluster.partitionsForTopic(topic).size(); } } // Configure custom partitioner props.put("partitioner.class", "com.example.CustomPartitioner");

Synchronous vs Asynchronous Sending

Synchronous Sending

try { RecordMetadata metadata = producer.send(record).get(); System.out.println("Sent to partition " + metadata.partition() + " at offset " + metadata.offset()); } catch (Exception e) { System.err.println("Send failed: " + e.getMessage()); }

Asynchronous Sending with Callback

producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("Send failed: " + exception.getMessage()); } else { System.out.println("Sent successfully to partition " + metadata.partition() + " at offset " + metadata.offset()); } } });

Error Handling and Retries

Common Producer Errors

  • TimeoutException: Broker not responding
  • InterruptException: Producer interrupted
  • SerializationException: Serialization failure
  • IllegalStateException: Producer closed

Retry Configuration

# Retry settings retries=10 retry.backoff.ms=1000 delivery.timeout.ms=120000 # Idempotence (exactly-once) enable.idempotence=true acks=all

Idempotent Producers

Idempotent producers ensure exactly-once delivery semantics.

// Enable idempotence props.put("enable.idempotence", "true"); props.put("acks", "all"); props.put("retries", "3"); // Producer ID assigned by broker // Duplicate sends within session are ignored

Transactional Producers

For multi-partition, multi-topic transactions:

props.put("transactional.id", "user-event-producer-1"); // Initialize transactions producer.initTransactions(); // Begin transaction producer.beginTransaction(); try { // Send multiple records producer.send(record1); producer.send(record2); // Commit transaction producer.commitTransaction(); } catch (Exception e) { // Abort transaction producer.abortTransaction(); }

Message Headers

Add metadata to events:

ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "user123", "User logged in"); // Add headers record.headers().add("source", "web-app".getBytes()); record.headers().add("version", "1.0".getBytes());

Performance Optimization

Batching Configuration

# Increase batch size for higher throughput batch.size=1048576 # 1MB batches linger.ms=10 # Wait longer for batches # Compression compression.type=lz4 # Fast compression

Memory Management

# Buffer memory buffer.memory=67108864 # 64MB buffer max.block.ms=60000 # Block timeout

Monitoring Producer Metrics

// Access producer metrics Map<String, String> tags = new HashMap<>(); tags.put("client-id", "my-producer"); MetricName metricName = new MetricName("records-send-rate", "producer-metrics", "", tags); double sendRate = producer.metrics().get(metricName).value();

Best Practices

1. Use Appropriate Acknowledgment Levels

# For high throughput (may lose messages) acks=1 # For reliability (slower) acks=all

2. Configure Proper Timeouts

# Request timeout request.timeout.ms=30000 # Delivery timeout delivery.timeout.ms=120000

3. Handle Serialization Errors

try { producer.send(record); } catch (SerializationException e) { // Handle serialization failure log.error("Failed to serialize record", e); }

4. Close Producers Properly

// Graceful shutdown Runtime.getRuntime().addShutdownHook(new Thread(() -> { producer.close(Duration.ofSeconds(5)); }));

Common Pitfalls

1. Blocking on Send

// DON'T: Blocks indefinitely producer.send(record).get(); // DO: Use timeouts producer.send(record).get(10, TimeUnit.SECONDS);

2. Ignoring Send Results

// DON'T: Fire and forget producer.send(record); // DO: Handle results producer.send(record, callback);

3. Large Message Issues

# Handle large messages max.request.size=10485760 # 10MB message.max.bytes=10485760 # Must match broker config

Python Producer Example

from kafka import KafkaProducer import json # Create producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=str.encode ) # Send event event = {'user_id': '123', 'action': 'login', 'timestamp': '2025-12-10T10:00:00Z'} producer.send('user-events', value=event, key='123') # Flush and close producer.flush() producer.close()

Testing Producers

Unit Testing

@Test public void testProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Use mock or test broker try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) { ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test"); producer.send(record).get(); } }

What’s Next?

In this post, we’ve covered the complete Producer API:

  • Basic setup and configuration
  • Serialization strategies
  • Partitioning and routing
  • Synchronous/asynchronous sending
  • Error handling and retries
  • Idempotent and transactional producers
  • Performance optimization
  • Best practices and common pitfalls

You should now be able to build reliable Kafka producers for your applications.

In Part 4, we’ll explore the Consumer API - how to read and process events from Kafka topics.

Additional Resources


*This is Part 3 of our comprehensive Apache Kafka series. Part 2: Kafka Architecture ← Part 4: Consumers API →*
This post is licensed under CC BY 4.0 by the author.