How to Connect Elastic Sink Connector with Kafka
Kafka has a mechanism to pump data in and out. Let’s take a quick look into the connectors.
There are two connector types,
- Sink Connector : Export data from Kafka to external service
- Source Connector : Import data from external service to Kafka
More details can be found in here.
Sink Connector
The sink connector moves data from kafka to external service. It write data from topic to a data source. Data can be transform and convert into desire format through the transformation and converters.
Elastic Sink Connector
First we need to identify the path of our journey.
- Install elastic sink Connector
- Boot-up Elastic Search
- Connector configuration
- Elastic index enhancement
- Masking information
Last two points describes how to overcome elastic mapping issue and masking critical information before sync to elastic.
In our demo we use JSON payload and string Key as our Kafka data. Then we are going to pump data from Kafka to elastic search through a connector.
- Key : string (“12312”)
- Payload : JSON
{
"first_name" : "damindu",
"last_name" : "lakmal",
"phone_number" : 1231231213,
"address" : [
{
"road" : "lane"
}
]
}
Install Elastic Sink Connector
Let’s create a Dockerfile to run kafka worker with elastic connector,
FROM confluentinc/cp-kafka-connect:6.1.0 USER root
ENV CONNECT_PLUGIN_PATH /usr/share/confluent-hub-components/
ENV CONNECT_BOOTSTRAP_SERVERS localhost:9092 ENV CONNECT_GROUP_ID localhost
ENV CONNECT_CONFIG_STORAGE_TOPIC config-storage ENV CONNECT_OFFSET_STORAGE_TOPIC offset-storage ENV CONNECT_STATUS_STORAGE_TOPIC status-storage ENV CONNECT_INTERNAL_KEY_CONVERTER org.apache.kafka.connect.storage.StringConverter ENV CONNECT_INTERNAL_VALUE_CONVERTER org.apache.kafka.connect.json.JsonConverter ENV CONNECT_KEY_CONVERTER org.apache.kafka.connect.storage.StringConverter ENV CONNECT_VALUE_CONVERTER org.apache.kafka.connect.json.JsonConverter ENV CONNECT_REST_ADVERTISED_HOST_NAME localhost ENV CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR "1" ENV CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR "1" ENV CONNECT_STATUS_STORAGE_REPLICATION_FACTOR "1" RUN curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
RUN mkdir confluent-hub-client-latest RUN tar -xf confluent-hub-client-latest.tar.gz -C confluent-hub-client-latestRUN ./confluent-hub-client-latest/bin/confluent-hub install --no-prompt --verbose --component-dir /usr/share/confluent-hub-components confluentinc/kafka-connect-elasticsearch:10.0.1
Before execute below command, you need to install docker. Then run command to create docker image,
docker build -t elastic-connector:v0.0.0 .
Run connector worker using below command,
docker run -it -p 8083:8083 -e CONNECT_BOOTSTRAP_SERVERS=<host>:9092 elastic-connector:v0.0.0
Boot-up Elastic Search
Elastic search is useful for searching information within data. Elastic search provide analytic engine while using fast and scalable search.
Use this link to setup elastic search in your environment.
Connector Configuration
You can find more detailed information about connector configuration here. Let’s create our scenario as below,
{"name": "elastic.sink.connector","config": {"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name": "_doc","topics": "<topic>","schema.ignore": "true","value.converter.schemas.enable": "false","connection.url": "http://<elastic-search-host>:9200","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter": "org.apache.kafka.connect.storage.StringConverter"}}
connection.url need to be configure from your elastic search host. Kafka topic should be named along with topic.
Now our sink connector configuration is ready and put configuration to the worker by,
curl --location --request POST '<coonetor-host>:8083/connectors' \--header 'Content-Type: application/json' \--data-raw '{"name": "elastic.sink.connector","config": {"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name": "_doc","topics": "<topic>","schema.ignore": "true","value.converter.schemas.enable": "false","connection.url": "http://<elastic-search-host>:9200","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter": "org.apache.kafka.connect.storage.StringConverter"}}'
You can view extra information by those REST endpoint.
Elastic Index Enhancement
Elastic sink connector is up and running but we have to alert on critical information changes. You can get the elastic search index mapping that include how JSON object is index with in elastic.
Problem begin when we will going to add filed which is not compatible with our existing elastic mapping because dynamic mapping will be configure index by data payload. Before start the elastic search connector, Put below mapping criteria to elastic search. First you need to identify which fields should be index in elastic search. In my example, I choose first_name and last_name.
curl --location --request PUT 'http://<elastic-search-host>:9200/mos.activity.logs' --header 'Content-Type: application/json'
--data '{"mappings": {
"dynamic": "false",
"properties": {
"first_name": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256} }},
"last_name":{"type": "text","fields": {"keyword": { "type": "keyword","ignore_above": 256}}} }}}'
Masking Information
Data masking is necessary when you are dealing with personal information like phone number, credit card details and etc.
Connector provide feature that can mask specific field value by using connector transformation. Let’s go through example of masking phone_number field,
{"name": "elastic.sink.connector","config": {"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name": "_doc","topics": "<topic>","schema.ignore": "true","value.converter.schemas.enable": "false","connection.url": "http://<elastic-search-host>:9200","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter": "org.apache.kafka.connect.storage.StringConverter","transforms" : "maskFiled","transforms.maskFiled.type" : "org.apache.kafka.connect.transforms.MaskField$Value","transforms.maskFiled.fields" : "phone_number","transforms.maskFiled.replacement" : "* * * *"}}
Above connector configuration will mask phone number value into ‘* * * *’.
Summary
This was only an overview of elastic sink connector in kafka. I’d recommended to use documentation of Connectors. Understand notations, operators and underline architecture.
Thank you for reading this article. I hope you enjoyed it!