Kafka Security - Authentication, Authorization, and Encryption
Master Kafka security: implement authentication, authorization, encryption, and audit logging to protect your event streaming platform.
Kafka Security - Authentication, Authorization, and Encryption
Welcome to Part 9 of our Apache Kafka series! We’ve covered the fundamentals, architecture, producers, consumers, topics/partitions, Streams, Connect, and Schema Registry. Now we focus on security - protecting your Kafka clusters from unauthorized access and ensuring data privacy.
Security is critical for production Kafka deployments. This post covers authentication, authorization, encryption, and operational security practices.
Security Overview
Kafka security encompasses:
- Authentication: Verifying client identities
- Authorization: Controlling access to resources
- Encryption: Protecting data in transit and at rest
- Audit Logging: Tracking access and operations
- Secure Configuration: Hardening cluster settings
Security Layers
┌─────────────────┐ │ Applications │ ← Client authentication └─────────────────┘ │ Encryption (TLS) │ ┌─────────────────┐ │ Kafka │ ← Inter-broker encryption │ Brokers │ Authentication └─────────────────┘ Authorization (ACLs) │ Encryption (TLS) │ ┌─────────────────┐ │ ZooKeeper │ ← Secure ZK access │ │ └─────────────────┘ SSL/TLS Encryption
Certificate Generation
# Create CA openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:ca-password # Create broker keystore keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey # Create certificate signing request keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file # Sign certificate openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:ca-password # Import CA and signed certificate keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed Broker SSL Configuration
# server.properties # SSL configuration ssl.keystore.location=/path/to/kafka.server.keystore.jks ssl.keystore.password=keystore-password ssl.key.password=key-password ssl.truststore.location=/path/to/kafka.server.truststore.jks ssl.truststore.password=truststore-password # Enable SSL for client connections listeners=SSL://:9093 advertised.listeners=SSL://your.host.name:9093 # Enable SSL for inter-broker communication security.inter.broker.protocol=SSL # Optional: Client authentication ssl.client.auth=required Client SSL Configuration
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9093"); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "/path/to/client.truststore.jks"); props.put("ssl.truststore.password", "truststore-password"); props.put("ssl.keystore.location", "/path/to/client.keystore.jks"); props.put("ssl.keystore.password", "keystore-password"); props.put("ssl.key.password", "key-password"); SASL Authentication
SASL Mechanisms
- PLAIN: Simple username/password
- SCRAM: Salted challenge-response
- GSSAPI: Kerberos authentication
- OAUTHBEARER: OAuth 2.0 bearer tokens
SASL/PLAIN Setup
# JAAS configuration (kafka_server_jaas.conf) KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_producer="producer-secret" user_consumer="consumer-secret"; }; # server.properties listeners=SASL_SSL://:9094 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer SASL/SCRAM Setup
# Create SCRAM credentials kafka-configs --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=producer-secret],SCRAM-SHA-512=[iterations=8192,password=producer-secret]' --entity-type users --entity-name producer --bootstrap-server localhost:9092 # server.properties listeners=SASL_SSL://:9094 sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512 Client SASL Configuration
Properties props = new Properties(); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + "username=\"producer\" " + "password=\"producer-secret\";"); // For SCRAM props.put("sasl.mechanism", "SCRAM-SHA-256"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " + "username=\"producer\" " + "password=\"producer-secret\";"); Authorization with ACLs
ACL Concepts
- Principal: User or service identity
- Permission: Allow/Deny
- Operation: Read, Write, Create, Delete, etc.
- Resource: Topic, Group, Cluster, etc.
Managing ACLs
# Allow producer to write to topic kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Write --topic orders # Allow consumer to read from topic and join group kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --topic orders kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --group order-consumers # List ACLs kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic orders # Remove ACL kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:producer --operation Write --topic orders ACL Examples
# Cluster-level permissions kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:admin --operation All --cluster # Topic creation permissions kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Create --topic '*' # Prefixed topic access kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:team-a --operation All --topic 'team-a-*' ZooKeeper Security
Secure ZK Access
# zoo.cfg authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 ZK JAAS Configuration
# zk_server_jaas.conf Server { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/path/to/zookeeper.keytab" storeKey=true useTicketCache=false principal="zookeeper/localhost@EXAMPLE.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/path/to/zookeeper.keytab" storeKey=true useTicketCache=false principal="zookeeper/localhost@EXAMPLE.COM"; }; Encryption at Rest
Disk Encryption
# LUKS encryption for data directories cryptsetup luksFormat /dev/sdb cryptsetup luksOpen /dev/sdb kafka_data mount /dev/mapper/kafka_data /var/lib/kafka/data Application-Level Encryption
// Encrypt sensitive data before sending public class EncryptedProducer { private final KafkaProducer<String, byte[]> producer; private final SecretKey secretKey; public void sendEncrypted(String topic, String key, String sensitiveData) { byte[] encryptedData = encrypt(sensitiveData.getBytes(), secretKey); producer.send(new ProducerRecord<>(topic, key, encryptedData)); } } Audit Logging
Enable Audit Logs
# server.properties kafka.audit.log.enable=true kafka.audit.log.bootstrap.servers=localhost:9092 kafka.audit.log.topic=audit-log kafka.audit.log.principal.builder=org.apache.kafka.core.security.authenticator.DefaultKafkaPrincipalBuilder Custom Audit Logging
public class CustomAuditAuthorizer extends AclAuthorizer { @Override public AuthorizationResult authorize(AuthorizableRequestContext requestContext, List<Acl> acls) { AuthorizationResult result = super.authorize(requestContext, acls); // Log access attempts auditLogger.log(new AuditEvent( requestContext.principal(), requestContext.operation(), requestContext.resource(), result == AuthorizationResult.ALLOWED )); return result; } } Secure Configuration
Broker Hardening
# Disable insecure protocols listeners=SSL://:9093,SASL_SSL://:9094 advertised.listeners=SSL://your.host.name:9093,SASL_SSL://your.host.name:9094 # Limit connections max.connections=1000 max.connections.per.ip=10 # Connection limits connection.failed.authentication.delay.ms=1000 # Disable auto topic creation auto.create.topics.enable=false # Secure defaults delete.topic.enable=false Network Security
# Firewall rules iptables -A INPUT -p tcp --dport 9092 -j DROP # Block plaintext iptables -A INPUT -p tcp --dport 9093 -j ACCEPT # Allow SSL iptables -A INPUT -p tcp --dport 9094 -j ACCEPT # Allow SASL_SSL iptables -A INPUT -p tcp --dport 2181 -j DROP # Block ZK plaintext Monitoring Security
Security Metrics
// Monitor authentication failures MetricName authFailures = new MetricName("failed-authentication-total", "kafka-server", ""); Double failures = (Double) metrics.get(authFailures).value(); // Monitor SSL connections MetricName sslConnections = new MetricName("ssl-handshake-count", "kafka-server", ""); Double sslCount = (Double) metrics.get(sslConnections).value(); Security Dashboards
# Prometheus metrics kafka_server_socket_server_metrics_connection_count{listener="SSL"} # SSL connections kafka_server_socket_server_metrics_connection_count{listener="SASL_SSL"} # SASL connections kafka_authorizer_zookeeper_authorizer_metrics_zookeeper_authorizer_initialization_count # ACL loads Best Practices
1. Defense in Depth
# Multiple security layers security.protocol=SASL_SSL # Authentication + Encryption ssl.client.auth=required # Mutual TLS authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # Authorization 2. Principle of Least Privilege
# Grant minimal permissions kafka-acls --add --allow-principal User:producer --operation Write --topic orders # Don't grant All operations unless necessary 3. Certificate Management
# Rotate certificates regularly # Use short-lived certificates # Implement certificate revocation # Monitor certificate expiration 4. Secure Defaults
# server.properties # Disable insecure features auto.create.topics.enable=false delete.topic.enable=false default.replication.factor=3 min.insync.replicas=2 5. Monitoring and Alerting
# Alert on security events # Monitor authentication failures # Track unusual access patterns # Audit configuration changes Multi-Tenant Security
Tenant Isolation
# Separate topics per tenant kafka-acls --add --allow-principal User:tenant-a --operation All --topic 'tenant-a-*' # Separate consumer groups kafka-acls --add --allow-principal User:tenant-a --operation Read --group 'tenant-a-*' Quotas for Fairness
# Set producer quotas kafka-configs --alter --add-config 'producer_byte_rate=1048576' --entity-type users --entity-name tenant-a --bootstrap-server localhost:9092 # Set consumer quotas kafka-configs --alter --add-config 'consumer_byte_rate=2097152' --entity-type users --entity-name tenant-a --bootstrap-server localhost:9092 Troubleshooting Security Issues
SSL Handshake Failures
# Check certificate validity openssl x509 -in cert.pem -text -noout # Verify truststore keytool -list -v -keystore truststore.jks # Enable debug logging -Djavax.net.debug=ssl:handshake Authentication Failures
# Check JAAS configuration # Verify credentials # Check SASL mechanism compatibility # Review broker logs for authentication errors Authorization Issues
# List current ACLs kafka-acls --list --topic orders --bootstrap-server localhost:9092 # Test with different principals # Check resource patterns # Verify operation permissions Real-World Security Architecture
# Production security setup kafka: listeners: - SSL://:9093 # External SSL - SASL_SSL://:9094 # Internal SASL_SSL security: inter.broker.protocol: SASL_SSL authorizer: kafka.security.auth.SimpleAclAuthorizer ssl: client.auth: required keystore: /etc/kafka/ssl/server.keystore.jks truststore: /etc/kafka/ssl/server.truststore.jks zookeeper: authProvider: SASLAuthenticationProvider jaasLoginRenew: 3600000 monitoring: audit: enabled: true topic: audit-log metrics: exporters: [prometheus, datadog] What’s Next?
In this comprehensive guide to Kafka security, we’ve covered:
- SSL/TLS encryption setup
- SASL authentication mechanisms
- ACL-based authorization
- ZooKeeper security
- Encryption at rest
- Audit logging
- Secure configuration practices
- Monitoring and troubleshooting
You should now be able to secure your Kafka clusters for production use.
In Part 10, we’ll explore monitoring, operations, and troubleshooting - keeping your Kafka clusters healthy and performant.
Additional Resources
- Kafka Security Documentation
- Confluent Security Guide
- OWASP Kafka Security Cheat Sheet
- NIST Cybersecurity Framework
| *This is Part 9 of our comprehensive Apache Kafka series. Part 8: Schema Registry ← | Part 10: Monitoring and Operations →* |