The Azure Cosmos DB sink connector allows you to export data from Apache Kafka® topics to an Azure Cosmos DB database. The connector polls data from Kafka to write to container(s) in the database based on the topics subscription.
If you are using the Confluent Platform setup from this repo, the Cosmos DB Sink Connector is included in the installation and you can skip this step.
Otherwise, you can download the JAR file from the latest Release or package this repo to create a new JAR file. To install the connector manually using the JAR file, refer to these instructions.
You can also package a new JAR file from the source code.
# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Kafka Connect installation
ls target/*dependencies.jar
If you are using the Confluent Platform, the easiest way to create a Kafka topic is by using the supplied Control Center UX. Otherwise, you can create a Kafka topic manually using the following syntax:
./kafka-topics.sh --create --zookeeper <ZOOKEEPER_URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
For this quickstart, we will create a Kafka topic named hotels
and will write JSON data (non-schema embedded) to the topic.
To create a topic inside Control Center, see here.
Next, start the Kafka console producer to write a few records to the hotels
topic.
# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels
In the console producer, enter:
{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}
The three records entered are published to the hotels
Kafka topic in JSON format.
Create the Cosmos DB Sink Connector in Kafka Connect
The following JSON body defines the config for the Cosmos DB Sink Connector.
Note: You will need to fill out the values for
connect.cosmos.connection.endpoint
andconnect.cosmos.master.key
, which you should have saved from the Cosmos DB setup guide.
Refer to the sink properties section for more information on each of these configuration properties.
{
"name": "cosmosdb-sink-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
"tasks.max": "1",
"topics": [
"hotels"
],
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "hotels#kafka"
}
}
Once you have all the values filled out, save the JSON file somewhere locally. You can use this file to create the connector using the REST API.
An easy option to create the connector is by going through the Control Center webpage.
Follow this guide to create a connector from Control Center but instead of using the DatagenConnector
option, use the CosmosDBSinkConnector
tile instead. When configuring the sink connector, fill out the values as you have filled in the JSON file.
Alternatively, in the connectors page, you can upload the JSON file from earlier by using the Upload connector config file
option.
Create the sink connector using the Connect REST API
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
Check that the three records from the hotels
topic are created in Cosmos DB.
Navigate to your Cosmos DB instance on Azure portal You should see something like this:
To delete the connector from the Control Center, navigate to the sink connector you created and click the Delete
icon.
Alternatively, use the Connect REST API.
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector
To delete the created Azure Cosmos DB service and its resource group using Azure CLI, refer to these steps.
The following settings are used to configure the Cosmos DB Kafka Sink Connector. These configuration values determine which Kafka topics data is consumed, which Cosmos DB containers data is written into and formats to serialize the data. For an example configuration file with the default values, refer to this config.
Name | Type | Description | Required/Optional |
---|---|---|---|
topics | list | A list of Kafka topics to watch | Required |
connector.class | string | Classname of the Cosmos DB sink. Should be set to com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector |
Required |
connect.cosmos.connection.endpoint | uri | Cosmos endpoint URI string | Required |
connect.cosmos.master.key | string | The Cosmos primary key that the sink connects with | Required |
connect.cosmos.databasename | string | The name of the Cosmos database the sink writes to | Required |
connect.cosmos.containers.topicmap | string | Mapping between Kafka Topics and Cosmos Containers, formatted using CSV as shown: topic#container,topic2#container2 |
Required |
connect.cosmos.connection.gateway.enabled | boolean | Flag to indicate whether to use gateway mode. By default it is false . |
Optional |
connect.cosmos.sink.bulk.enabled | boolean | Flag to indicate whether bulk mode is enabled. By default it is true . |
Optional |
connect.cosmos.sink.maxRetryCount | int | Max retry attempts on transient write failures. By default it is 10 times. NOTE: This is different from max throttling retry attempts, which are infinite. |
Optional |
connect.cosmos.connection.sharing.enabled | boolean | Flag to enable connection sharing between instances of cosmos clients on the same jvm. NOTE: If you have set ‘connect.cosmos.connection.gateway.enabled’ to true, then this configure will not make any difference. | Optional |
key.converter | string | Serialization format for the key data written into Kafka topic | Required |
value.converter | string | Serialization format for the value data written into the Kafka topic | Required |
key.converter.schemas.enable | string | Set to "true" if the key data has embedded schema |
Optional |
value.converter.schemas.enable | string | Set to "true" if the key data has embedded schema |
Optional |
tasks.max | int | Maximum number of connector sink tasks. Default is 1 |
Optional |
Data will always be written to the Cosmos DB as JSON without any schema.
Azure Cosmos DB sink connector converts SinkRecord in to JSON Document supporting below schema types from listed valid Schema.Types
Schema Type | JSON Data Type |
---|---|
Array | Array |
Boolean | Boolean |
Float32 | Number |
Float64 | Number |
Int8 | Number |
Int16 | Number |
Int32 | Number |
Int64 | Number |
Map | Object (JSON) |
String | String Null |
Struct | Object (JSON) |
Cosmos DB sink Connector also supports the following AVRO logical types:
Schema Type | JSON Data Type |
---|---|
Date | Number |
Time | Number |
Timestamp | Number |
Note: Byte deserialization is currently not supported by Azure Cosmos DB sink connector.
Along with the Sink connector settings, you can specify the use of Single Message Transformations (SMTs) to modify messages flowing through the Kafka Connect platform. Refer to the Confluent SMT Documentation for more information.
With the custom InsertUUID
SMT, you can insert the id
field with a random UUID value for each message, before it is written to Cosmos DB.
WARNING: Only use this SMT if the messages do NOT contain the
id
field. Otherwise, theid
values will be overwritten and you may end up with duplicate items in your database.
Note: Using UUIDs as the message ID can be quick and easy but are not an ideal partition key to use in Cosmos DB.
Before you can use the InsertUUID
SMT, you will need to install this transform in your Confluent Platform setup. If you are using the Confluent Platform setup from this repo, the transform is already included in the installation and you can skip this step.
Alternatively, you can package the InsertUUID source to create a new JAR file. To install the connector manually using the JAR file, refer to these instructions.
# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Confluent Platform installation
ls target/*.jar
Inside your Sink connector config, add the following properties to set the id
.
"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"
Refer to the InsertUUID repository for more information on using this SMT.
Using both the InsertField
and Cast
SMTs, you can add specify the TTL on each item created in Cosmos DB.
Note: You will need to enable TTL on the Cosmos DB container to enable TTL at an item level. Refer to the Cosmos DB setup guide or the Cosmos DB docs for more information on setting the TTL.
Inside your Sink connector config, add the following properties to set the TTL (in seconds). In this following example, the TTL is set to 100 seconds.
Note: If the message already contains the
TTL
field, theTTL
value will be overwritten by these SMTs.
"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"
Refer to the InsertField and Cast documenation for more information on using these SMTs.
Here are solutions to some common problems that you may encounter when working with the Cosmos DB Kafka Sink Connector.
If you have non-JSON data on your source topic in Kafka and attempt to read it using the JsonConverter, you will see the following exception:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)
This is likely caused by data in the source topic being serialized in either Avro or another format (like CSV string).
Solution: If the topic data is actually in Avro, then change your Kafka Connect sink connector to use the AvroConverter as shown below.
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
When you try to use the Avro converter to read data from a topic that is not Avro. This would include data written by an Avro serializer other than the Confluent Schema Registry’s Avro serializer, which has its own wire format.
org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
…
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Solution: Check the source topic’s serialization format. Then, either switch Kafka Connect’s sink connector to use the correct converter, or switch the upstream format to Avro.
Kafka Connect supports a special structure of JSON messages containing both payload and schema as follows.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "name"
}
]
},
"payload": {
"userid": 123,
"name": "Sam"
}
}
If you try to read JSON data that does not contain the data in this structure, you will get this error:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
To be clear, the only JSON structure that is valid for schemas.enable=true has schema and payload fields as the top-level elements (shown above).
As the message itself states, if you just have plain JSON data, you should change your connector’s configuration to:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",