Real-World Kafka Use Cases and Best Practices
Explore real-world Kafka implementations: e-commerce, IoT, fraud detection, and proven patterns for building scalable event-driven systems.
Real-World Kafka Use Cases and Best Practices
Welcome to Part 11 of our Apache Kafka series! We’ve covered the fundamentals, architecture, producers, consumers, topics/partitions, Streams, Connect, Schema Registry, security, and operations. Now we explore real-world applications - how companies use Kafka to solve complex problems and build scalable systems.
This post covers proven use cases, architectural patterns, and best practices drawn from production deployments.
E-commerce Platform
Use Case: Order Processing Pipeline
Problem: Handle millions of orders daily with real-time inventory updates, fraud detection, and personalized recommendations.
Architecture:
Order Events → Kafka → Streams Processing → Multiple Consumers ↓ Schema Registry ↓ Elasticsearch (Search) PostgreSQL (OLTP) Redis (Cache) Implementation:
// Order producer public class OrderProducer { private final KafkaProducer<String, OrderEvent> producer; public void publishOrder(Order order) { OrderEvent event = new OrderEvent( order.getId(), order.getCustomerId(), order.getItems(), order.getTotal(), Instant.now() ); ProducerRecord<String, OrderEvent> record = new ProducerRecord<>("orders", order.getId(), event); producer.send(record, (metadata, exception) -> { if (exception == null) { log.info("Order {} published to partition {}", order.getId(), metadata.partition()); } }); } } Kafka Streams Processing:
StreamsBuilder builder = new StreamsBuilder(); // Order validation KStream<String, OrderEvent> orders = builder.stream("orders"); KStream<String, OrderEvent> validOrders = orders .filter((key, order) -> validateOrder(order)); // Inventory updates validOrders .flatMapValues(order -> order.getItems()) .groupBy((orderId, item) -> item.getProductId()) .reduce((item1, item2) -> new Item( item1.getProductId(), item1.getQuantity() + item2.getQuantity() )) .toStream() .to("inventory-updates", Produced.with(Serdes.String(), itemSerde)); // Real-time analytics KTable<Windowed<String>, Long> ordersPerHour = validOrders .groupByKey() .windowedBy(TimeWindows.of(Duration.ofHours(1))) .count() .toStream() .to("order-analytics", Produced.with(windowedSerde, Serdes.Long())); Connectors Used:
- JDBC Sink: Update PostgreSQL
- Elasticsearch Sink: Search orders
- Redis Sink: Cache hot products
Key Benefits:
- Decoupled order processing from inventory
- Real-time analytics for business decisions
- Fault-tolerant order processing
IoT Data Processing
Use Case: Smart City Sensor Network
Problem: Process millions of sensor readings from devices across a city for traffic management, environmental monitoring, and emergency response.
Architecture:
Sensors → MQTT → Kafka → Streams → Storage/Analytics ↓ ↓ ↓ ↓ Device Registry Schema KSQL Time-Series DB ↓ Registry Queries (InfluxDB) MQTT Broker ↓ Real-Time Alerts Dashboards Implementation:
// IoT data producer public class SensorDataProducer { public void publishSensorReading(SensorReading reading) { String topic = "sensor-" + reading.getSensorType(); String key = reading.getDeviceId() + "-" + reading.getTimestamp(); ProducerRecord<String, SensorReading> record = new ProducerRecord<>(topic, key, reading); // Add headers for routing record.headers().add("sensor_type", reading.getSensorType().getBytes()); record.headers().add("location", reading.getLocation().getBytes()); producer.send(record); } } Streams Processing:
// Anomaly detection KStream<String, SensorReading> sensorData = builder.stream("sensor-temperature"); KStream<String, AnomalyAlert> anomalies = sensorData .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .aggregate( () -> new SensorStats(), (key, reading, stats) -> stats.addReading(reading), Materialized.with(Serdes.String(), sensorStatsSerde) ) .toStream() .filter((windowedKey, stats) -> stats.isAnomalous()) .mapValues(stats -> new AnomalyAlert( windowedKey.key(), stats.getAverage(), stats.getCurrentReading(), Instant.now() )); anomalies.to("anomaly-alerts"); Key Patterns:
- Topic per sensor type for scalability
- Time-windowed aggregations for trend analysis
- Dead letter queues for invalid data
- Schema evolution for sensor firmware updates
Financial Services
Use Case: Real-Time Fraud Detection
Problem: Detect fraudulent transactions in real-time while maintaining low false positive rates.
Architecture:
Transactions → Kafka → Streams ML → Alerts/Blocks ↓ Schema Registry ↓ Transaction DB Fraud Models Audit Logs Implementation:
// Transaction scoring with Kafka Streams KStream<String, Transaction> transactions = builder.stream("transactions"); // Enrich with customer data KTable<String, CustomerProfile> customers = builder.table("customer-profiles"); KStream<String, EnrichedTransaction> enriched = transactions .leftJoin(customers, (tx, profile) -> new EnrichedTransaction(tx, profile)); // Calculate risk scores KStream<String, RiskAssessment> riskAssessments = enriched .mapValues(et -> fraudDetector.assessRisk(et)); // Flag high-risk transactions KStream<String, FraudAlert> alerts = riskAssessments .filter((key, assessment) -> assessment.getRiskScore() > 0.8) .mapValues(assessment -> new FraudAlert( assessment.getTransactionId(), assessment.getRiskScore(), assessment.getRiskFactors() )); alerts.to("fraud-alerts"); // Update customer risk profiles enriched .groupByKey() .aggregate( () -> new CustomerRiskProfile(), (customerId, et, profile) -> profile.updateWithTransaction(et), Materialized.as("customer-risk-profiles") ); Machine Learning Integration:
public class FraudDetector { private final TensorFlowModel model; public RiskAssessment assessRisk(EnrichedTransaction transaction) { // Feature extraction double[] features = extractFeatures(transaction); // Model prediction float riskScore = model.predict(features); // Business rules List<String> riskFactors = applyBusinessRules(transaction); return new RiskAssessment( transaction.getId(), riskScore, riskFactors ); } } Media Streaming
Use Case: Content Recommendation Engine
Problem: Provide personalized content recommendations based on user behavior across multiple platforms.
Architecture:
User Events → Kafka → Streams → Recommendations ↓ ↓ ↓ Mobile Apps Schema ML Models Web Portal Registry Feature Store Smart TV Real-Time API Implementation:
// User behavior tracking public class UserBehaviorTracker { public void trackEvent(UserEvent event) { // Multiple topics for different event types String topic = getTopicForEventType(event.getType()); ProducerRecord<String, UserEvent> record = new ProducerRecord<>(topic, event.getUserId(), event); // Add context headers record.headers().add("platform", event.getPlatform().getBytes()); record.headers().add("session_id", event.getSessionId().getBytes()); producer.send(record); } } Real-Time Recommendations:
// User activity aggregation KStream<String, UserEvent> userEvents = builder.stream("user-activity"); KTable<String, UserProfile> userProfiles = userEvents .groupByKey() .aggregate( () -> new UserProfile(), (userId, event, profile) -> profile.addEvent(event), Materialized.as("user-profiles") ); // Content interaction analysis KStream<String, ContentInteraction> interactions = builder.stream("content-interactions"); KTable<String, ContentStats> contentStats = interactions .groupBy((key, interaction) -> interaction.getContentId()) .aggregate( () -> new ContentStats(), (contentId, interaction, stats) -> stats.addInteraction(interaction), Materialized.as("content-stats") ); // Generate recommendations KStream<String, Recommendation> recommendations = userProfiles .toStream() .leftJoin(contentStats.toStream(), (profile, content) -> recommendationEngine.generateRecommendations(profile, content)) .filter((userId, recs) -> !recs.isEmpty()); recommendations.to("user-recommendations"); Log Aggregation and Analytics
Use Case: Centralized Logging Platform
Problem: Aggregate logs from thousands of services for monitoring, alerting, and analysis.
Architecture:
Application Logs → Kafka → Streams → Storage/Analysis ↓ ↓ ↓ Filebeat/Logstash Schema Elasticsearch Fluentd Registry ClickHouse Vector Real-Time Dashboards Implementation:
// Log producer with different log levels public class LogProducer { private final KafkaProducer<String, LogEvent> producer; public void log(LogEvent event) { String topic = "logs-" + event.getLevel().toLowerCase(); ProducerRecord<String, LogEvent> record = new ProducerRecord<>(topic, event.getServiceId(), event); // Add metadata headers record.headers().add("service", event.getService().getBytes()); record.headers().add("environment", event.getEnvironment().getBytes()); producer.send(record); } } Log Processing Pipeline:
// Error log alerting KStream<String, LogEvent> errorLogs = builder.stream("logs-error"); KStream<String, ErrorAlert> errorAlerts = errorLogs .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .aggregate( () -> new ErrorStats(), (serviceId, log, stats) -> stats.addError(log), Materialized.as("error-stats") ) .toStream() .filter((windowedKey, stats) -> stats.shouldAlert()) .mapValues(stats -> new ErrorAlert( windowedKey.key(), stats.getErrorCount(), stats.getTopErrors() )); errorAlerts.to("error-alerts"); // Log correlation KStream<String, LogEvent> allLogs = builder.stream("logs-*"); KTable<String, CorrelatedLogs> correlations = allLogs .groupBy((key, log) -> log.getCorrelationId()) .windowedBy(TimeWindows.of(Duration.ofHours(1))) .aggregate( () -> new CorrelatedLogs(), (correlationId, log, correlated) -> correlated.addLog(log), Materialized.as("log-correlations") ); Microservices Communication
Use Case: Event-Driven Microservices
Problem: Enable reliable communication between microservices with event sourcing and CQRS patterns.
Architecture:
Commands → Kafka → Event Handlers → Read Models ↓ ↓ ↓ ↓ API Gateway Schema Services Materialized Views Registry (Consumers) (KTables) Load Balancer ↓ Elasticsearch Events PostgreSQL Implementation:
// Command sourcing public class CommandPublisher { public void publishCommand(Command command) { String topic = "commands-" + command.getType(); ProducerRecord<String, Command> record = new ProducerRecord<>(topic, command.getAggregateId(), command); record.headers().add("command_type", command.getType().getBytes()); record.headers().add("aggregate_type", command.getAggregateType().getBytes()); producer.send(record); } } // Event sourcing public class EventPublisher { public void publishEvent(DomainEvent event) { String topic = "events-" + event.getAggregateType(); ProducerRecord<String, DomainEvent> record = new ProducerRecord<>(topic, event.getAggregateId(), event); record.headers().add("event_type", event.getType().getBytes()); record.headers().add("event_version", String.valueOf(event.getVersion()).getBytes()); producer.send(record); } } CQRS with Kafka Streams:
// Command handling KStream<String, Command> commands = builder.stream("commands-user"); KStream<String, DomainEvent> events = commands .mapValues(command -> commandHandler.handle(command)); events.to("events-user"); // Read model projection KTable<String, UserView> userViews = builder.stream("events-user") .groupByKey() .aggregate( () -> new UserView(), (userId, event, view) -> view.apply(event), Materialized.as("user-views") ); // Query the read model ReadOnlyKeyValueStore<String, UserView> userStore = streams.store(StoreQueryParameters.fromNameAndType( "user-views", QueryableStoreTypes.keyValueStore())); UserView user = userStore.get(userId); Best Practices for Production
1. Topic Design
# Use descriptive naming user-events, order-created, payment-processed # Partition by business key # Co-locate related events # Consider access patterns 2. Schema Evolution
// Start with backward compatibility // Use default values for new fields // Test compatibility before deployment // Document breaking changes 3. Error Handling
// Implement dead letter queues KStream<String, Event> events = builder.stream("input-topic"); KStream<String, Event> validEvents = events .filter((k, v) -> isValid(v)); KStream<String, Event> invalidEvents = events .filterNot((k, v) -> isValid(v)); invalidEvents.to("dead-letter-queue"); 4. Monitoring and Alerting
# Key metrics to monitor - Consumer lag - Error rates - Throughput - Latency percentiles - Resource utilization 5. Performance Optimization
# Producer tuning batch.size=1048576 linger.ms=10 compression.type=lz4 # Consumer tuning fetch.min.bytes=1024 max.poll.records=1000 enable.auto.commit=false 6. Security Considerations
# Enable security security.protocol=SASL_SSL ssl.truststore.location=/path/to/truststore.jks sasl.mechanism=SCRAM-SHA-256 # Use ACLs authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer Common Anti-Patterns
1. Using Kafka as a Database
// DON'T: Store large objects producer.send(new ProducerRecord<>("files", "doc1", largeFileBytes)); // DO: Store references producer.send(new ProducerRecord<>("files", "doc1", new FileReference("s3://bucket/doc1.pdf"))); 2. Synchronous Processing
// DON'T: Block on send producer.send(record).get(); // Blocks indefinitely // DO: Use callbacks or async processing producer.send(record, callback); 3. Ignoring Consumer Lag
// DO: Monitor and alert on lag // Implement proper error handling // Scale consumers as needed 4. Large Messages
// DON'T: Send large messages // DO: Use external storage for large data // Send references in Kafka Scaling Patterns
Horizontal Scaling
# Add brokers for capacity # Increase partition count # Scale consumer groups # Use rack awareness Multi-Cluster Patterns
# Active-Active: MirrorMaker 2.0 # Active-Passive: Disaster recovery # Hub and Spoke: Central aggregation Future Trends
Kafka in the Cloud
- Managed Services: Confluent Cloud, Amazon MSK, Redpanda Cloud
- Serverless Kafka: Event-driven functions
- Multi-Cloud: Cross-cloud deployments
Emerging Patterns
- Event Mesh: Organization-wide event routing
- Data Mesh: Decentralized data ownership
- Edge Computing: Kafka at the edge
Conclusion
Kafka has proven itself as the backbone of modern data architectures across industries. From e-commerce to IoT, financial services to media streaming, the patterns and practices we’ve covered provide a solid foundation for building scalable, reliable event-driven systems.
The key to success lies in:
- Understanding your domain and data flows
- Choosing appropriate architectural patterns
- Implementing proper monitoring and operations
- Following security best practices
- Planning for evolution and scaling
Remember, Kafka is a tool that enables better architecture, but the real value comes from how you design your systems around events.
What’s Next?
In this exploration of real-world Kafka use cases, we’ve covered:
- E-commerce order processing
- IoT sensor data processing
- Financial fraud detection
- Media content recommendations
- Log aggregation platforms
- Microservices communication
- Best practices and anti-patterns
- Scaling and future trends
You should now have a comprehensive understanding of how to apply Kafka in production environments.
In Part 12, our final post, we’ll explore advanced topics and the future of Kafka - including KRaft, transactions, and emerging trends.
Additional Resources
| *This is Part 11 of our comprehensive Apache Kafka series. Part 10: Monitoring and Operations ← | Part 12: Advanced Topics and Future →* |