Kafka Connect - Integrating with External Systems
Master Kafka Connect: build reliable data pipelines with source and sink connectors for databases, message queues, and external systems.
Kafka Connect - Integrating with External Systems
Welcome to Part 7 of our Apache Kafka series! We’ve covered the fundamentals, architecture, producers, consumers, topics/partitions, and Streams. Now we explore Kafka Connect - the framework for building reliable, scalable data pipelines between Kafka and external systems.
Kafka Connect eliminates the need to write custom integration code by providing a framework for moving data between Kafka and external systems like databases, message queues, and file systems.
What is Kafka Connect?
Kafka Connect is a framework for:
- Source Connectors: Import data from external systems into Kafka
- Sink Connectors: Export data from Kafka to external systems
- Reliability: Exactly-once delivery and fault tolerance
- Scalability: Distributed deployment across multiple workers
- Management: REST API for configuration and monitoring
Key Components
- Connectors: High-level abstractions for data movement
- Tasks: Individual units of work within connectors
- Workers: Processes that execute connectors and tasks
- Converters: Handle data format transformations
Deployment Modes
Standalone Mode
# Start standalone worker connect-standalone worker.properties source-connector.properties Best for:
- Development and testing
- Single worker deployments
- Simple configurations
Distributed Mode
# Start distributed workers connect-distributed worker.properties Best for:
- Production deployments
- High availability
- Scalability across multiple machines
Configuration
Worker Configuration
# worker.properties bootstrap.servers=localhost:9092 # Group coordination group.id=connect-cluster # Key and value converters key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true # Plugin path plugin.path=/usr/share/java,/usr/share/confluent-hub-components # REST API rest.host.name=0.0.0.0 rest.port=8083 # Offset storage offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 # Config storage config.storage.topic=connect-configs config.storage.replication.factor=3 # Status storage status.storage.topic=connect-status status.storage.replication.factor=3 status.storage.partitions=3 Source Connectors
Source connectors read data from external systems and write to Kafka topics.
File Source Connector
{ "name": "file-source", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max": "1", "file": "/tmp/test.txt", "topic": "file-events", "transforms": "AddTimestamp", "transforms.AddTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.AddTimestamp.timestamp.field": "timestamp" } } JDBC Source Connector
{ "name": "jdbc-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://localhost:5432/mydb", "connection.user": "user", "connection.password": "password", "table.whitelist": "users,orders", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "db-", "poll.interval.ms": "1000" } } Debezium CDC Connector
{ "name": "postgres-cdc", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "localhost", "database.port": "5432", "database.user": "debezium", "database.password": "dbz", "database.dbname": "mydb", "database.server.name": "dbserver1", "table.whitelist": "public.users", "plugin.name": "pgoutput" } } Sink Connectors
Sink connectors read from Kafka topics and write to external systems.
File Sink Connector
{ "name": "file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "file": "/tmp/sink.txt", "topics": "file-events" } } JDBC Sink Connector
{ "name": "jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://localhost:5432/warehouse", "connection.user": "user", "connection.password": "password", "topics": "user-events,order-events", "auto.create": "true", "auto.evolve": "true", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "id" } } Elasticsearch Sink Connector
{ "name": "elasticsearch-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "user-events", "connection.url": "http://localhost:9200", "type.name": "_doc", "key.ignore": "false", "schema.ignore": "true", "behavior.on.malformed.documents": "warn" } } Single Message Transforms (SMTs)
SMTs modify messages as they flow through Connect.
Built-in Transforms
Insert Field
{ "transforms": "InsertTimestamp", "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertTimestamp.timestamp.field": "event_timestamp" } Extract Field
{ "transforms": "ExtractId", "transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.ExtractId.field": "id" } Value to Key
{ "transforms": "CopyIdToKey", "transforms.CopyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.CopyIdToKey.fields": "id" } Custom Transform
public class CustomTransform<R extends ConnectRecord<R>> implements Transformation<R> { @Override public R apply(R record) { // Transform logic return record; } @Override public ConfigDef config() { return new ConfigDef(); } @Override public void close() {} } REST API Management
Managing Connectors
# List connectors curl -X GET http://localhost:8083/connectors # Create connector curl -X POST -H "Content-Type: application/json" \ --data @connector-config.json \ http://localhost:8083/connectors # Get connector config curl -X GET http://localhost:8083/connectors/file-source/config # Update connector curl -X PUT -H "Content-Type: application/json" \ --data @updated-config.json \ http://localhost:8083/connectors/file-source/config # Delete connector curl -X DELETE http://localhost:8083/connectors/file-source # Get connector status curl -X GET http://localhost:8083/connectors/file-source/status # Restart connector curl -X POST http://localhost:8083/connectors/file-source/restart Task Management
# List tasks for connector curl -X GET http://localhost:8083/connectors/file-source/tasks # Get task status curl -X GET http://localhost:8083/connectors/file-source/tasks/0/status # Restart task curl -X POST http://localhost:8083/connectors/file-source/tasks/0/restart Converters and Data Formats
JSON Converter
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true Avro Converter
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter.schema.registry.url=http://localhost:8081 String Converter
key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter Error Handling
Dead Letter Queue
{ "name": "error-handling-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "errors.tolerance": "all", "errors.deadletterqueue.topic.name": "dlq-topic", "errors.deadletterqueue.topic.replication.factor": 3, "errors.deadletterqueue.context.headers.enable": true } } Retry and Logging
{ "config": { "errors.retry.timeout": "60000", "errors.retry.delay.max.ms": "60000", "errors.log.enable": "true", "errors.log.include.messages": "true" } } Custom Connectors
Source Connector Implementation
public class CustomSourceConnector extends SourceConnector { @Override public void start(Map<String, String> props) { // Initialize connector } @Override public Class<? extends Task> taskClass() { return CustomSourceTask.class; } @Override public List<Map<String, String>> taskConfigs(int maxTasks) { // Return task configurations return Collections.singletonList(config); } @Override public void stop() { // Cleanup } @Override public ConfigDef config() { return new ConfigDef() .define("topic", Type.STRING, Importance.HIGH, "Target topic") .define("poll.interval.ms", Type.INT, 1000, Importance.MEDIUM, "Poll interval"); } } Source Task Implementation
public class CustomSourceTask extends SourceTask { @Override public void start(Map<String, String> props) { // Initialize task } @Override public List<SourceRecord> poll() throws InterruptedException { // Fetch data and create SourceRecords List<SourceRecord> records = new ArrayList<>(); // Create record SourceRecord record = new SourceRecord( sourcePartition, // Source partition sourceOffset, // Source offset topic, // Target topic null, // Target partition (null for auto) keySchema, // Key schema key, // Key valueSchema, // Value schema value // Value ); records.add(record); return records; } @Override public void stop() { // Cleanup } } Monitoring and Metrics
Key Metrics
- Connector Status: Running, paused, failed
- Task Status: Running, failed, paused
- Throughput: Records per second
- Lag: Source lag for source connectors
- Errors: Error rates and types
JMX Metrics
// Access metrics programmatically MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); ObjectName objectName = new ObjectName("kafka.connect:type=connector-metrics,connector=*"); Set<ObjectName> names = mbeanServer.queryNames(objectName, null); for (ObjectName name : names) { String connectorName = mbeanServer.getAttribute(name, "connector").toString(); Long taskCount = (Long) mbeanServer.getAttribute(name, "task-count"); // ... other metrics } Best Practices
1. Configuration Management
# Use environment variables for sensitive data export CONNECT_BOOTSTRAP_SERVERS=localhost:9092 export CONNECT_DB_PASSWORD=secret # Reference in config connection.password=${CONNECT_DB_PASSWORD} 2. Resource Allocation
# Allocate appropriate resources tasks.max=3 max.poll.records=1000 batch.size=10000 3. Schema Management
{ "config": { "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "auto.register.schemas": "true", "use.latest.version": "true" } } 4. Monitoring Setup
{ "config": { "metric.reporters": "io.confluent.connect.reporter.ConfluentMonitoringReporter", "confluent.monitoring.interceptor.topic": "_confluent-monitoring", "confluent.monitoring.interceptor.publish.ms": "10000" } } Real-World Example: Database to Elasticsearch Pipeline
// 1. JDBC Source (PostgreSQL) { "name": "postgres-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://localhost:5432/ecommerce", "table.whitelist": "products,orders", "mode": "timestamp", "timestamp.column.name": "updated_at", "topic.prefix": "db-", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } // 2. Elasticsearch Sink { "name": "elasticsearch-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "db-products,db-orders", "connection.url": "http://localhost:9200", "type.name": "_doc", "key.ignore": "true", "schema.ignore": "true", "behavior.on.malformed.documents": "warn", "transforms": "RenameField", "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.RenameField.renames": "id:product_id,name:product_name" } } Troubleshooting Common Issues
Connector Won’t Start
# Check logs tail -f /var/log/kafka/connect.log # Validate configuration curl -X PUT -H "Content-Type: application/json" \ --data @config.json \ http://localhost:8083/connector-plugins/JdbcSourceConnector/config/validate Performance Issues
# Monitor throughput curl http://localhost:8083/connectors/jdbc-source/status # Adjust batch settings "batch.size": "10000", "max.poll.records": "1000" Schema Evolution
{ "config": { "auto.evolve": "true", "evolve": "true", "errors.tolerance": "all" } } What’s Next?
In this comprehensive guide to Kafka Connect, we’ve covered:
- Source and sink connectors
- Configuration and deployment modes
- Single message transforms
- REST API management
- Custom connector development
- Error handling and monitoring
- Best practices and real-world examples
You should now be able to build reliable data integration pipelines with Kafka Connect.
In Part 8, we’ll explore Schema Registry and data governance - managing schemas, ensuring compatibility, and maintaining data quality.
Additional Resources
| *This is Part 7 of our comprehensive Apache Kafka series. Part 6: Kafka Streams ← | Part 8: Schema Registry →* |