Apache Kafka Series: Kafka Connect

Apache Kafka Series: Kafka Connect

Apache Kafka was originally developed by LinkedIn, and was later open sourced in year 2011. In November 2014, people who worked on Kafka at LinkedIn created a new company named Confluent with focus on Kafka and Kafka Connect.

What is Kafka Connect
Kafka Connect is a framework which connects Kafka with external Systems. It helps to move the data in and out of the Kafka. Connect makes it simple to use existing connector configuration for common source and sink Connectors. Connectors comes in two flavors:

Source Connector:
Source Connector imports data from other System to Kafka Topic. For eg; Source Connector can ingest entire databases and stream table updates to Kafka topics.

Sink Connector:
Sink Connector exports data from Kafka topic to other Systems. For eg; Sink Connector can deliver data from Kafka topic to an HDFS File.

Kafka Connect can run either on standalone mode for running jobs on a single machine or as a distributed mode.

Why Kafka Connect:
Kafka Connect helps to perform Extract (E) and Transform(T) of ETL Process. Connect contains the set of connectors which allows to import and export the data.
Number of Sources and Sinks used by Programmers are finite, hence why to write a new connector when it is already available.
All the common sources and sink connectors are available at https://www.confluent.io/product/connectors/

Core concepts of Kafka Connect:

User instantiates required connectors to either push the data to Kafka or pull data from Kafka. Connector needs configuration file to perform data copying. Implementations of Connector class do not perform its task themselves. It needs helps of configuration file, which describes the set of data to be copied. Once Connector receives Configuration file it starts its task by breaking the job into set of tasks which can be distributed to Kafka connect Workers.

Data that connector copies are represented into partitioned streams where each partition is ordered sequence of records with offsets. Each task is given a subset of partition to process. Each Task copies its subset of data to or from Kafka.
Every partition is ordered sequence of key-value records. In addition to key and value, record stores partition IDs and offsets. These are used by framework to periodically commit the offsets of data. In case of event failure, offsets help to resume from the last committed block and avoid duplication.

In order to execute a process, Connectors and Tasks logical unit needs to be scheduled. This Process is called as Workers.Worker can run on two different modes:

Standalone Workers:
In standalone mode, only single process is responsible for executing all connectors and tasks. As single process is running, connector does not divide the work in Standalone mode.
Single worker runs all connectors and tasks and hence configuration file needs to be bundled with connector while instantiating the connector.
It cannot be scaled vertically and it does not provide fault tolerance and hence while running on standalone mode once the data deleted it cannot be recovered back. Task Rebalancing cannot be achieved in Standalone Mode.

Distributed Workers:
In distributed mode, multiple workers can execute connectors and tasks. All workers having same group.id automatically coordinate to schedule execution of connectors and tasks across all available workers. Since multiple workers might run the connectors, Configuration file needs to be submitted through REST API. If you add a worker, shut down a worker or worker fails then rest of workers detects it and automatically coordinates and redistribute the connectors and tasks across the updated set of available workers.
Scaling becomes easy in distributed mode as any number of workers can be added by just assigning the same group.id to the new worker. Kafka Connect provides automatic fault tolerance in Distributed Mode.

Resuming from Previous Offsets:
Each record contains partition ID(input filename) and offsets(position of file). Framework uses this to commit offsets periodically so in case of failure, the task can recover and avoid duplications. In case of Standalone Worker, offsets are stored at a location on local machine whereas in Distributed Worker offsets are stored in queue.
Three kafka topics are created onto kafka cluster when connector is active on distributed mode:

It contains the name of the topic which stores details of connector and task configuration. This topic is common for all the worker having same group.id. Config storage topic should be single partitioned and should be highly replicated.

It contains the name of the topic which stores details of connector and task configuration offsets. This topic is common for all the worker having same group.id. Offset storage topic should be highly replicated and should contain large number of partitions.

It contains the name of the topic which stores details of connector and task configuration status updates. This topic is common for all the worker having same group.id. Status storage topic should be highly replicated and should contain multiple partitions.

Use of REST Interface:
As Kafka connect runs as service it also supports REST API for managing connectors. REST API is used to view and alter the current status of connector and its tasks, including the ID of the worker to which each was assigned.

There are four possible states of connectors or tasks:
UNASSIGNED: The connector/tasks has not yet been assigned to the worker.
RUNNING: The connector/task is running.
PAUSED: The connector/tasks has been paused.
FAILED: The connector/tasks has failed.

Common REST API calls:
Get the worker’s information
curl localhost:8083
List connector plugins available on particular worker
curl localhost:8083/connector-plugins
List active Connector on worker
curl localhost:8083/connectors
Restarting a Connector
curl –X POST localhost:8083/connectors/<name_of_connector>/restart
Get tasks information of connector
curl localhost:8083/connectors/<name_of_connector>/tasks
Restart a task
curl –X POST localhost:8083/connectors/<name_of_connector>/tasks/<task_number>/restart
Pause a connector
curl –X PUT localhost:8083/connectors/<name_of_connector>/pause
Resume a Connector
curl –X PUT localhost:8083/connectors/<name_of_connector>/resume
Get status of Connector
curl localhost:8083/connectors/<name_of_connector>/status
Get Connector Configuration
curl localhost:8083/connectors/<name_of_connector>
Delete a connector
curl –X DELETE localhost:8083/connectors/<name_of_connector>

Incisive,Enthusiastic and always Passionate about learning new tools and new technologies. Holds a major interest in Kafka connect and has also worked extensively on Apache Spark.

Share This Post