Apache Kafka was originally developed by LinkedIn, and was later open sourced in year 2011. In November 2014, people who worked on Kafka at LinkedIn created a new company named Confluent with focus on Kafka and Kafka Connect.

What is Kafka Connect

Kafka Connect is a framework which connects Kafka with external Systems. It helps to move the data in and out of the Kafka. Connect makes it simple to use existing connector configuration for common source and sink Connectors. Connectors comes in two flavors:

Source Connector

Source Connector imports data from other System to Kafka Topic. For eg; Source Connector can ingest entire databases and stream table updates to Kafka topics.

Data
Sources
KAFKA
Connect
KAFKA

Sink Connector:

Sink Connector exports data from Kafka topic to other Systems. For eg; Sink Connector can deliver data from Kafka topic to an HDFS File.

KAFKA KAFKA
Connect
Data Sink

Sink Connector exports data from Kafka topic to other Systems. For eg; Sink Connector can deliver data from Kafka topic to an HDFS File.

Why Kaka Connect

Kafka Connect helps use to perform Extract (E) and Transform(T) of ETL Process. Connect contains the set of connectors which allows to import and export the data.

Number of Sources and Sinks used by Programmers are finite, hence why to write a new connector when it is already available.

All the common sources and sink connectors are available at
https://www.confluent.io/product/connectors/

Core concepts of Kafka Connect

Connectors

User instantiates required connectors to either push the data to Kafka or pull data from Kafka. Connector needs configuration file to perform data copying. Implementations of Connector class do not perform its task themselves. It needs helps of configuration file, which describes the set of data to be copied. Once Connector receives Configuration file it starts its task by breaking the job into set of tasks which can be distributed to Kafka connect Workers.

Task

Data that connector copies are represented into partitioned streams where each partition is ordered sequence of records with offsets. Each task is given a subset of partition to process. Each Task copies its subset of data to or from Kafka.

Every partition is ordered sequence of key-value records. In addition to key and value, record stores partition IDs and offsets. These are used by framework to periodically commit the offsets of data. In case of event failure, offsets help to resume from the last committed block and avoid duplication.

Workers

In order to execute a process, Connectors and Tasks logical unit needs to be scheduled. This Process is called as Workers.

There are two types of Workers:

1.Standalone Workers:

In standalone mode, only single process is responsible for executing all connectors and tasks. As single process is running, connector does not divide the work in Standalone mode.

Single worker runs all connectors and tasks and hence configuration file needs to be bundled with connector while instantiating the connector.

It cannot be scaled vertically and it does not provide fault tolerance and hence while running on standalone mode once the data deleted it cannot be recovered back. Task Rebalancing cannot be achieved in Standalone Mode.

2.Distributed Workers:

In distributed mode, multiple workers can execute connectors and tasks. All workers having same group.id automatically coordinate to schedule execution of connectors and tasks across all available workers. Since multiple workers might run the connectors, Configuration file needs to be submitted through REST API. If you add a worker, shut down a worker or worker fails then rest of workers detects it and automatically coordinates and redistribute the connectors and tasks across the updated set of available workers.

Scaling becomes easy in distributed mode as any number of workers can be added by just assigning the same group.id to the new worker. Kafka Connect provides automatic fault tolerance in Distributed Mode.

Resuming from Previous Offsets:

Each record contains partition ID(input filename) and offsets(position of file). Framework uses this to commit offsets periodically so in case of failure, the task can recover and avoid duplications. In case of Standalone Worker, offsets are stored at a location on local machine whereas in Distributed Worker offsets are stored in queue.

Three kafka topics are created onto kafka cluster when connector is active on distributed mode:

config.storage.topic

It contains the name of the topic which stores details of connector and task configuration. This topic is common for all the worker having same group.id. Config storage topic should be single partitioned and should be highly replicated.

Offset.storage.topic

It contains the name of the topic which stores details of connector and task configuration offsets. This topic is common for all the worker having same group.id. Offset storage topic should be highly replicated and should contain large number of partitions.

status.storage.topic

It contains the name of the topic which stores details of connector and task configuration status updates. This topic is common for all the worker having same group.id. Status storage topic should be highly replicated and should contain multiple partitions.

Use of REST Interface:

As Kafka connect runs as service it also supports REST API for managing connectors. REST API is used to view and alter the current status of connector and its tasks, including the ID of the worker to which each was assigned.

There are four possible states of connectors or tasks:

  • UNASSIGNED: The connector/tasks has not yet been assigned to the worker
  • RUNNING: The connector/task is running.
  • PAUSED: The connector/tasks has been paused.
  • FAILED: The connector/tasks has failed.

Common REST API calls:

  • Get the worker’s information
  • curl localhost:8083
  • List connector plugins available on particular worker
  • curl localhost:8083/connector-plugins
  • List active Connector on worker
  • curl localhost:8083/connectors
  • Restarting a Connector
  • curl –X POST localhost:8083/connectors//restart
  • Get tasks information of connector
  • curl localhost:8083/connectors//tasks
  • Restart a task
  • url –X POST localhost:8083/connectors//tasks//restart
  • Pause a connector
  • curl –X PUT localhost:8083/connectors//pause
  • Resume a Connector
  • curl –X PUT localhost:8083/connectors//resume
  • Get status of Connector
  • curl localhost:8083/connectors//status
  • Get Connector Configuration
  • curl localhost:8083/connectors/
  • Delete a connector
  • curl –X DELETE localhost:8083/connectors/