Post

Kafka Security - Authentication, Authorization, and Encryption

Master Kafka security: implement authentication, authorization, encryption, and audit logging to protect your event streaming platform.

Kafka Security - Authentication, Authorization, and Encryption

Welcome to Part 9 of our Apache Kafka series! We’ve covered the fundamentals, architecture, producers, consumers, topics/partitions, Streams, Connect, and Schema Registry. Now we focus on security - protecting your Kafka clusters from unauthorized access and ensuring data privacy.

Security is critical for production Kafka deployments. This post covers authentication, authorization, encryption, and operational security practices.

Security Overview

Kafka security encompasses:

  • Authentication: Verifying client identities
  • Authorization: Controlling access to resources
  • Encryption: Protecting data in transit and at rest
  • Audit Logging: Tracking access and operations
  • Secure Configuration: Hardening cluster settings

Security Layers

┌─────────────────┐ │ Applications │ ← Client authentication └─────────────────┘ │ Encryption (TLS) │ ┌─────────────────┐ │ Kafka │ ← Inter-broker encryption │ Brokers │ Authentication └─────────────────┘ Authorization (ACLs) │ Encryption (TLS) │ ┌─────────────────┐ │ ZooKeeper │ ← Secure ZK access │ │ └─────────────────┘

SSL/TLS Encryption

Certificate Generation

# Create CA openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:ca-password # Create broker keystore keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey # Create certificate signing request keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file # Sign certificate openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:ca-password # Import CA and signed certificate keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed

Broker SSL Configuration

# server.properties # SSL configuration ssl.keystore.location=/path/to/kafka.server.keystore.jks ssl.keystore.password=keystore-password ssl.key.password=key-password ssl.truststore.location=/path/to/kafka.server.truststore.jks ssl.truststore.password=truststore-password # Enable SSL for client connections listeners=SSL://:9093 advertised.listeners=SSL://your.host.name:9093 # Enable SSL for inter-broker communication security.inter.broker.protocol=SSL # Optional: Client authentication ssl.client.auth=required

Client SSL Configuration

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9093"); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "/path/to/client.truststore.jks"); props.put("ssl.truststore.password", "truststore-password"); props.put("ssl.keystore.location", "/path/to/client.keystore.jks"); props.put("ssl.keystore.password", "keystore-password"); props.put("ssl.key.password", "key-password");

SASL Authentication

SASL Mechanisms

  • PLAIN: Simple username/password
  • SCRAM: Salted challenge-response
  • GSSAPI: Kerberos authentication
  • OAUTHBEARER: OAuth 2.0 bearer tokens

SASL/PLAIN Setup

# JAAS configuration (kafka_server_jaas.conf) KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_producer="producer-secret" user_consumer="consumer-secret"; }; # server.properties listeners=SASL_SSL://:9094 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

SASL/SCRAM Setup

# Create SCRAM credentials kafka-configs --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=producer-secret],SCRAM-SHA-512=[iterations=8192,password=producer-secret]' --entity-type users --entity-name producer --bootstrap-server localhost:9092 # server.properties listeners=SASL_SSL://:9094 sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512

Client SASL Configuration

Properties props = new Properties(); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + "username=\"producer\" " + "password=\"producer-secret\";"); // For SCRAM props.put("sasl.mechanism", "SCRAM-SHA-256"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " + "username=\"producer\" " + "password=\"producer-secret\";");

Authorization with ACLs

ACL Concepts

  • Principal: User or service identity
  • Permission: Allow/Deny
  • Operation: Read, Write, Create, Delete, etc.
  • Resource: Topic, Group, Cluster, etc.

Managing ACLs

# Allow producer to write to topic kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Write --topic orders # Allow consumer to read from topic and join group kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --topic orders kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --group order-consumers # List ACLs kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic orders # Remove ACL kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:producer --operation Write --topic orders

ACL Examples

# Cluster-level permissions kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:admin --operation All --cluster # Topic creation permissions kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Create --topic '*' # Prefixed topic access kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:team-a --operation All --topic 'team-a-*'

ZooKeeper Security

Secure ZK Access

# zoo.cfg authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000

ZK JAAS Configuration

# zk_server_jaas.conf Server { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/path/to/zookeeper.keytab" storeKey=true useTicketCache=false principal="zookeeper/localhost@EXAMPLE.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/path/to/zookeeper.keytab" storeKey=true useTicketCache=false principal="zookeeper/localhost@EXAMPLE.COM"; };

Encryption at Rest

Disk Encryption

# LUKS encryption for data directories cryptsetup luksFormat /dev/sdb cryptsetup luksOpen /dev/sdb kafka_data mount /dev/mapper/kafka_data /var/lib/kafka/data

Application-Level Encryption

// Encrypt sensitive data before sending public class EncryptedProducer { private final KafkaProducer<String, byte[]> producer; private final SecretKey secretKey; public void sendEncrypted(String topic, String key, String sensitiveData) { byte[] encryptedData = encrypt(sensitiveData.getBytes(), secretKey); producer.send(new ProducerRecord<>(topic, key, encryptedData)); } }

Audit Logging

Enable Audit Logs

# server.properties kafka.audit.log.enable=true kafka.audit.log.bootstrap.servers=localhost:9092 kafka.audit.log.topic=audit-log kafka.audit.log.principal.builder=org.apache.kafka.core.security.authenticator.DefaultKafkaPrincipalBuilder

Custom Audit Logging

public class CustomAuditAuthorizer extends AclAuthorizer { @Override public AuthorizationResult authorize(AuthorizableRequestContext requestContext, List<Acl> acls) { AuthorizationResult result = super.authorize(requestContext, acls); // Log access attempts auditLogger.log(new AuditEvent( requestContext.principal(), requestContext.operation(), requestContext.resource(), result == AuthorizationResult.ALLOWED )); return result; } }

Secure Configuration

Broker Hardening

# Disable insecure protocols listeners=SSL://:9093,SASL_SSL://:9094 advertised.listeners=SSL://your.host.name:9093,SASL_SSL://your.host.name:9094 # Limit connections max.connections=1000 max.connections.per.ip=10 # Connection limits connection.failed.authentication.delay.ms=1000 # Disable auto topic creation auto.create.topics.enable=false # Secure defaults delete.topic.enable=false

Network Security

# Firewall rules iptables -A INPUT -p tcp --dport 9092 -j DROP # Block plaintext iptables -A INPUT -p tcp --dport 9093 -j ACCEPT # Allow SSL iptables -A INPUT -p tcp --dport 9094 -j ACCEPT # Allow SASL_SSL iptables -A INPUT -p tcp --dport 2181 -j DROP # Block ZK plaintext

Monitoring Security

Security Metrics

// Monitor authentication failures MetricName authFailures = new MetricName("failed-authentication-total", "kafka-server", ""); Double failures = (Double) metrics.get(authFailures).value(); // Monitor SSL connections MetricName sslConnections = new MetricName("ssl-handshake-count", "kafka-server", ""); Double sslCount = (Double) metrics.get(sslConnections).value();

Security Dashboards

# Prometheus metrics kafka_server_socket_server_metrics_connection_count{listener="SSL"} # SSL connections kafka_server_socket_server_metrics_connection_count{listener="SASL_SSL"} # SASL connections kafka_authorizer_zookeeper_authorizer_metrics_zookeeper_authorizer_initialization_count # ACL loads

Best Practices

1. Defense in Depth

# Multiple security layers security.protocol=SASL_SSL # Authentication + Encryption ssl.client.auth=required # Mutual TLS authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # Authorization

2. Principle of Least Privilege

# Grant minimal permissions kafka-acls --add --allow-principal User:producer --operation Write --topic orders # Don't grant All operations unless necessary

3. Certificate Management

# Rotate certificates regularly # Use short-lived certificates # Implement certificate revocation # Monitor certificate expiration

4. Secure Defaults

# server.properties # Disable insecure features auto.create.topics.enable=false delete.topic.enable=false default.replication.factor=3 min.insync.replicas=2

5. Monitoring and Alerting

# Alert on security events # Monitor authentication failures # Track unusual access patterns # Audit configuration changes

Multi-Tenant Security

Tenant Isolation

# Separate topics per tenant kafka-acls --add --allow-principal User:tenant-a --operation All --topic 'tenant-a-*' # Separate consumer groups kafka-acls --add --allow-principal User:tenant-a --operation Read --group 'tenant-a-*'

Quotas for Fairness

# Set producer quotas kafka-configs --alter --add-config 'producer_byte_rate=1048576' --entity-type users --entity-name tenant-a --bootstrap-server localhost:9092 # Set consumer quotas kafka-configs --alter --add-config 'consumer_byte_rate=2097152' --entity-type users --entity-name tenant-a --bootstrap-server localhost:9092

Troubleshooting Security Issues

SSL Handshake Failures

# Check certificate validity openssl x509 -in cert.pem -text -noout # Verify truststore keytool -list -v -keystore truststore.jks # Enable debug logging -Djavax.net.debug=ssl:handshake

Authentication Failures

# Check JAAS configuration # Verify credentials # Check SASL mechanism compatibility # Review broker logs for authentication errors

Authorization Issues

# List current ACLs kafka-acls --list --topic orders --bootstrap-server localhost:9092 # Test with different principals # Check resource patterns # Verify operation permissions

Real-World Security Architecture

# Production security setup kafka: listeners: - SSL://:9093 # External SSL - SASL_SSL://:9094 # Internal SASL_SSL security: inter.broker.protocol: SASL_SSL authorizer: kafka.security.auth.SimpleAclAuthorizer ssl: client.auth: required keystore: /etc/kafka/ssl/server.keystore.jks truststore: /etc/kafka/ssl/server.truststore.jks zookeeper: authProvider: SASLAuthenticationProvider jaasLoginRenew: 3600000 monitoring: audit: enabled: true topic: audit-log metrics: exporters: [prometheus, datadog]

What’s Next?

In this comprehensive guide to Kafka security, we’ve covered:

  • SSL/TLS encryption setup
  • SASL authentication mechanisms
  • ACL-based authorization
  • ZooKeeper security
  • Encryption at rest
  • Audit logging
  • Secure configuration practices
  • Monitoring and troubleshooting

You should now be able to secure your Kafka clusters for production use.

In Part 10, we’ll explore monitoring, operations, and troubleshooting - keeping your Kafka clusters healthy and performant.

Additional Resources


*This is Part 9 of our comprehensive Apache Kafka series. Part 8: Schema Registry ← Part 10: Monitoring and Operations →*
This post is licensed under CC BY 4.0 by the author.