How to Connect Elastic Sink Connector with Kafka

Damindu Lakmal
4 min readMay 14, 2022
Kafka Sink Data to Elastic

Kafka has a mechanism to pump data in and out. Let’s take a quick look into the connectors.

There are two connector types,

  • Sink Connector : Export data from Kafka to external service
  • Source Connector : Import data from external service to Kafka

More details can be found in here.

Sink Connector

The sink connector moves data from kafka to external service. It write data from topic to a data source. Data can be transform and convert into desire format through the transformation and converters.

Elastic Sink Connector

First we need to identify the path of our journey.

  • Install elastic sink Connector
  • Boot-up Elastic Search
  • Connector configuration
  • Elastic index enhancement
  • Masking information

Last two points describes how to overcome elastic mapping issue and masking critical information before sync to elastic.

In our demo we use JSON payload and string Key as our Kafka data. Then we are going to pump data from Kafka to elastic search through a connector.

  • Key : string (“12312”)
  • Payload : JSON
{
"first_name" : "damindu",
"last_name" : "lakmal",
"phone_number" : 1231231213,
"address" : [
{
"road" : "lane"
}
]
}

Install Elastic Sink Connector

Let’s create a Dockerfile to run kafka worker with elastic connector,

FROM confluentinc/cp-kafka-connect:6.1.0                             USER root                             
ENV CONNECT_PLUGIN_PATH /usr/share/confluent-hub-components/
ENV CONNECT_BOOTSTRAP_SERVERS localhost:9092 ENV CONNECT_GROUP_ID localhost
ENV CONNECT_CONFIG_STORAGE_TOPIC config-storage ENV CONNECT_OFFSET_STORAGE_TOPIC offset-storage ENV CONNECT_STATUS_STORAGE_TOPIC status-storage ENV CONNECT_INTERNAL_KEY_CONVERTER org.apache.kafka.connect.storage.StringConverter ENV CONNECT_INTERNAL_VALUE_CONVERTER org.apache.kafka.connect.json.JsonConverter ENV CONNECT_KEY_CONVERTER org.apache.kafka.connect.storage.StringConverter ENV CONNECT_VALUE_CONVERTER org.apache.kafka.connect.json.JsonConverter ENV CONNECT_REST_ADVERTISED_HOST_NAME localhost ENV CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR "1" ENV CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR "1" ENV CONNECT_STATUS_STORAGE_REPLICATION_FACTOR "1" RUN curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
RUN mkdir confluent-hub-client-latest RUN tar -xf confluent-hub-client-latest.tar.gz -C confluent-hub-client-latest
RUN ./confluent-hub-client-latest/bin/confluent-hub install --no-prompt --verbose --component-dir /usr/share/confluent-hub-components confluentinc/kafka-connect-elasticsearch:10.0.1

Before execute below command, you need to install docker. Then run command to create docker image,

docker build -t elastic-connector:v0.0.0 .

Run connector worker using below command,

docker run -it -p 8083:8083 -e CONNECT_BOOTSTRAP_SERVERS=<host>:9092 elastic-connector:v0.0.0

Boot-up Elastic Search

Elastic search is useful for searching information within data. Elastic search provide analytic engine while using fast and scalable search.

Use this link to setup elastic search in your environment.

Connector Configuration

You can find more detailed information about connector configuration here. Let’s create our scenario as below,

{"name": "elastic.sink.connector","config": {"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name": "_doc","topics": "<topic>","schema.ignore": "true","value.converter.schemas.enable": "false","connection.url": "http://<elastic-search-host>:9200","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter": "org.apache.kafka.connect.storage.StringConverter"}}

connection.url need to be configure from your elastic search host. Kafka topic should be named along with topic.

Now our sink connector configuration is ready and put configuration to the worker by,

curl --location --request POST '<coonetor-host>:8083/connectors' \--header 'Content-Type: application/json' \--data-raw '{"name": "elastic.sink.connector","config": {"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name": "_doc","topics": "<topic>","schema.ignore": "true","value.converter.schemas.enable": "false","connection.url": "http://<elastic-search-host>:9200","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter": "org.apache.kafka.connect.storage.StringConverter"}}'

You can view extra information by those REST endpoint.

Elastic Index Enhancement

Elastic sink connector is up and running but we have to alert on critical information changes. You can get the elastic search index mapping that include how JSON object is index with in elastic.

Problem begin when we will going to add filed which is not compatible with our existing elastic mapping because dynamic mapping will be configure index by data payload. Before start the elastic search connector, Put below mapping criteria to elastic search. First you need to identify which fields should be index in elastic search. In my example, I choose first_name and last_name.

curl --location --request PUT 'http://<elastic-search-host>:9200/mos.activity.logs' --header 'Content-Type: application/json' 
--data '{"mappings": {
"dynamic": "false",
"properties": {
"first_name": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256} }},
"last_name":{"type": "text","fields": {"keyword": { "type": "keyword","ignore_above": 256}}} }}}'

Masking Information

Data masking is necessary when you are dealing with personal information like phone number, credit card details and etc.

Connector provide feature that can mask specific field value by using connector transformation. Let’s go through example of masking phone_number field,

{"name": "elastic.sink.connector","config": {"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name": "_doc","topics": "<topic>","schema.ignore": "true","value.converter.schemas.enable": "false","connection.url": "http://<elastic-search-host>:9200","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter": "org.apache.kafka.connect.storage.StringConverter","transforms" : "maskFiled","transforms.maskFiled.type" : "org.apache.kafka.connect.transforms.MaskField$Value","transforms.maskFiled.fields" : "phone_number","transforms.maskFiled.replacement" : "* * * *"}}

Above connector configuration will mask phone number value into ‘* * * *’.

Summary

This was only an overview of elastic sink connector in kafka. I’d recommended to use documentation of Connectors. Understand notations, operators and underline architecture.

Thank you for reading this article. I hope you enjoyed it!

--

--