Kafka Connector with Custom Transformation
Kafka is a real time streaming data source that enables organization to manage and scale their data while providing reliable and high performance system.
Single Message Transformations (SMTs) applied in source and sink connectors. Before transformation, message need to be in a desired format with respect to predefined transformers.
In the mean time, Most of programmers straggles to write your own implementation for transformation. Confluent platform provide necessary information about how to build custom transformations. Moreover I will guide you through basic stuff which you need to know before begin customization.
Connector Transformation
When we are migrating Kafka streaming platform, first problem is how to push existing data to kafka without changing underlining architecture. Here I’m going to explain about one part of the connector.
Source Connector
Let’s take example customer table that have id, details, email and phone_number. Each column meaning,
- id : unique identifier and primary key
- details : customer abstract details as json
- email : customer email address
- phone_number : customer phone number
Data is saved along with the id in postgres but I need to produce data to kafka by transforming key as email address. For that, I will create a custom transformation to construct my data object. Let’s go through step by step discussion.
Database
Postrgresql database is appropriate for our use case because details column has json data. When you are configuring postgres database check following constraints to avoid future issues,
- set wal_level to logical
- install plugin wal2json
Create Table
CREATE TABLE public.customer (
id int4 NOT NULL,
details jsonb NOT NULL,
email varchar NOT NULL,
phone_number varchar NOT NULL,
CONSTRAINT customer_pk PRIMARY KEY (id)
);
Basic steps are completed now. Then we have to check the postgresql transformation library.
Custom Transformation Library
Database data transformation need to be go through with proper way. I have created a custom transformation library key-map. Please refer the Readme and go through key-map java file to understand underline behavior of kafka connector.
After maven install, Jar file can be found in target directory which has all the transformation functionality. If you are running kafka connector in virtual machine, custom jar file should be included in library directory.
Start the kafka connector to proceed further steps. Connector will open port 8083 to communicate with workers.
Connector Configuration
Now you are completed hard part. Let’s going through some information about connector configuration. Check the below example,
{ "name": "key-map", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "<database-host>", "database.port": "5432", "database.user": "<user>", "database.password": "<password>", "database.dbname": "postgres", "database.server.name": "postgres", "plugin.name": "wal2json", "table.whitelist": "public.customer", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "transforms": "RerouteTopic,ExtractFieldAfter,KeyTransform", "transforms.RerouteTopic.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.RerouteTopic.topic.regex": "(.*)customer(.*)", "transforms.RerouteTopic.topic.replacement": "customer", "transforms.ExtractFieldAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractFieldAfter.field": "after", "transforms.KeyTransform.type": "com.github.hadlakmal.kafka.connect.smt.MapKey$Value", "transforms.KeyTransform.field": "email" }}
- name : name of the connector
- database.hostname : host name of the database
- database.port : port of database
- database.user : authenticated user name
- database.password : password of the user
- database.dbname : database name
- plugin.name : JSON logical decoder (wal2json)
- value.converter : convert value into JSON after transformation
- key.converter : convert key into String after transformation
- transforms : Define the flow of transformation
- transforms.RerouteTopic : reroute table into topic
- transforms.ExtractFieldAfter : Remove after field from from struct (attach by the postgres connector)
- transforms.KeyTransform : map key in to desired column (custom transformation)
Key-Map Connector
Let’s put configuration to the worker to create the pipeline between postgres and kafka.
curl --location --request POST '<coonetor-host>:8083/connectors' \--header 'Content-Type: application/json' \--data-raw '{"name": "key-map","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","database.hostname": "<database-host>","database.port": "5432","database.user": "<user>","database.password": "<password>","database.dbname": "postgres","database.server.name": "postgres","plugin.name": "wal2json","table.whitelist": "public.customer","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter": "org.apache.kafka.connect.storage.StringConverter","transforms": "RerouteTopic,ExtractFieldAfter,KeyTransform","transforms.RerouteTopic.type": "io.debezium.transforms.ByLogicalTableRouter","transforms.RerouteTopic.topic.regex": "(.*)customer(.*)","transforms.RerouteTopic.topic.replacement": "customer","transforms.ExtractFieldAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value","transforms.ExtractFieldAfter.field": "after","transforms.KeyTransform.type": "com.github.hadlakmal.kafka.connect.smt.MapKey$Value","transforms.KeyTransform.field": "email"}}'
You can check the status of connector by using,
curl --location --request GET '<connector-host>:8082/connectors/key-map/status'
Summary
This was only an overview of custom transformation in kafka connector. I’d recommended to use documentation of Connectors. Understand notations, operators and underline architecture.
Thank you for reading this article. I hope you enjoyed it!