Kafka Cassandra Sink Connectors (Json, no Avro et no schema/payload messages wrapped)

What follows is the configuration I strived to achieve to save my Kafka topics data into a Cassandra cluster using the Cassandra Sink connector.

It was not so trivial as my data do not conform to what is expected by the connector (from the confluent guide and on all the blogs / articles I googled about the Cassandra Sink Connector). So I wanted to share this with ya to save you time 🙂

So here’s how my data is (or what it is not!)

  1. My data is in a Json format in the topic named requests
    {"id":4,"status":"processing","creationDate":1575906784470}
    {"id":1,"status":"processed","creationDate":1575906789472}
    {"id":6,"status":"other","creationDate":1575906794470}
    {"id":7,"status":"formatted","creationDate":1575906799470}
    {"id":4,"status":"processing","creationDate":1575906804469}
    

    then no avro schema registry, no schema data type … just Json format

  2. I cannot (and I do not want to) wrap data inside a schema / payload envelope

The target table schema in Cassandra strictly matches that structure

CREATE TABLE requests.requests (
    id int PRIMARY KEY,
    creationdate bigint,
    status text
)

The Connector sink connector configuration should look like this in order to insert data from topics into the Cassandra cluster with minimum effort (If we could say that :-/)

{
  "name" : "MyCassandra",
  "config" : {
    "connector.class" : "io.confluent.connect.cassandra.CassandraSinkConnector",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "tasks.max" : "1",
    "topics" : "requests",
    "connect.cassandra.contact.points": "localhost:9042",
    "connect.cassandra.write.mode" : "Update",
    "cassandra.keyspace" : "myks",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
    "transforms": "date,createKey,longToInt",
    "transforms.date.type": "org.apache.kafka.connect.transforms.Cast$Value",
    "transforms.date.spec": "creationDate:int64",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields":"id",
    "transforms.longToInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
    "transforms.longToInt.spec": "id:int32"
  }
}

The resulting date in Cassandra looks what expected

 id | creationdate  | status
----+---------------+------------
  5 | 1575908609416 |   accepted
  1 | 1575908629415 |  processed
  2 | 1575908619420 |  validated
  4 | 1575908614416 | processing
  7 | 1575908604420 |  formatted
  6 | 1575908639419 |      other
  3 | 1575908624416 |      error

Enjoy!