Showcasing JDBC Sink Connector on Schema-less Data

blog-post

Not all Kafka integration tools are the same. Some integration systems only produce JSON data without a schema. The JDBC Sink Connector requires a schema. Here are steps showcasing a low-code option to push events into a relational database when the source data is schema-less JSON.

Real-Time Ecosystem

This demo is available in the postgres-sink folder in dev-local-demos project. It leverages applications available through containers in dev-local. All you need is a Kafka Cluster with the Confluent Schema Registry, 2 KSQL queries/topic running on ksqlDB, and a JDBC Sink Connector running on a Connect cluster.

Challenges

Relational databases require a schema. The JDBC Sink connector relies on the JDBC API and database-specific drivers to write data from a Kafka topic into a table on the database. These drivers need to know the data types of the fields being written.

Steps

KSQL is code, but it is code executed for you on the platform ksqlDB. This makes it a low-code solution and a very elegant solution to integrate with systems that do not provide a schema. With two queries, you can provide a transformation that makes it super easy for JDBC Sink Connector to consume the data.

Work with streams, not tables

If the goal is to stream the events into the Relational Database table, not persist the data within ksqlDB for stream processing, user streams (not tables) within ksqlDB. This can allow for the retention of Kafka topics only to be as long as needed for the applications to process the events.

Read Data into a Stream

The goal of the first statement is to capture the data in ksqlDB with no transformation. This query needs to align with how the data is coming from the source system. This example shows a client that publishes a complete message as JSON with the order_id also the key of the message.

create or replace stream ORDERS (
   "order_id"  varchar key,
   "user_id"   varchar,
   "store_id"  varchar,
   "quantity"  bigint,
   "amt"       decimal(4,2),
   "ts"        string
) with (kafka_topic='ORDERS', value_format='json', key_format='kafka');

Transform and publish with schema

Specify a key and value format that has a schema, for demonstrations Avro is used. Also, renaming fields to align with the database table’s schema will minimize the number of single message transformations in the connect configuration.

create stream ORDERS_WITH_SCHEMA with(KEY_FORMAT='avro', VALUE_FORMAT='avro')
as
select
  "order_id" as ORDER_ID,
  "user_id" as USER_ID,
  "store_id" as STORE_ID,
  "quantity" as QUANTITY,
  "amt" as AMT,
  PARSE_TIMESTAMP("ts", 'yyyy-MM-dd HH:mm:ss', 'UTC') as TS
from ORDERS;

While this query is good, it can be better. By ensuring the key is itself a record (struct), it will simplify the JDBC Sink Connector configuration and allow for a single instance to handle multiple topics.

create or replace stream ORDERS_WITH_SCHEMA with(KEY_FORMAT='avro', VALUE_FORMAT='avro')
as
SELECT
  STRUCT(ORDER_ID:=`order_id`) as PK,
  "user_id" as USER_ID,
  "store_id" as STORE_ID,
  "quantity" as QUANTITY,
  "amt" as AMT,
  PARSE_TIMESTAMP("ts", 'yyyy-MM-dd HH:mm:ss', 'UTC') as TS
FROM ORDERS
PARTITION BY STRUCT(ORDER_ID:=`order_id`);

When the key is a primitive type, the schema specification only captures the data type, not the field name. By making it a record, a schema is associated to the key, which is then accessible by the JDBC Sink Connector through the schema-registry. This eliminates the need to set the pk.fields in the connector’s configuration.

Foreign Keys

With a topic/table setup disabling foreign-key constraints in the destination database is required, since order guarantees are not achievable between tables. This is a big architect discussion that needs to be settled before architecting out a design where you are migrating data to a relational database.

If this is not possible, a single topic for parent/child events where the message key is the parent’s primary key and the JDBC sink connector is configured with the primary key pulled from the value is possible; this is a fair amount more development and configuration. If the source topic is an aggregate with parents and their children together, then a custom consumer could be the right solution. These are discussions outside this article and tutorial.

Connector Configuration

Whenever it comes to a specific Apache Kafka Connector, please read the configuration documentation closely; as there are so numerous differences between connector configurations. With the JDBC Sink connector, a few specific ones should be called out.

  • the dialect tells the connector the type of database. This is important to know how to construct the SQL to the database.

    When it comes to Postgres, pay close attention to the name. PostgresDatabaseDialect and PostgresSqlDatabaseDialect are incorrect.

    "dialect.name": "PostgreSqlDatabaseDialect"
    
  • For connection information, put these in a secret and reference them with the provider.

    The file provider, FileConfigureProvider, is provided with Apache Kafka distribution and easy to enable.

    "connection.url": "${file:/etc/kafka-connect/secrets/postgres.properties:CONNECTION_URL}",
    "connection.user" : "${file:/etc/kafka-connect/secrets/postgres.properties:CONNECTION_USER}",
    "connection.password" : "${file:/etc/kafka-connect/secrets/postgres.properties:CONNECTION_PASSWORD}"
    

    By putting the connection.url value in a secret, makes it easier to reuse configuration across environments.

  • Do not change the default of quote.sql.identifiers from always to never, as invalid JDBC statements may result.

    "quote.sql.identifiers": "always"
    

    In the referenced demonstration, changing this to never results in invalid syntax because of the AMT field.

  • Understand the behavior of the insert mode. If you have an idempotent system and the database isn’t generating sequence numbers as part of the insert, upsert will be the typical setting.

    "insert.mode": "upsert"
    

    This setting is the easiest to use and understand in a streaming platform. The syntax of the merge (upsert) SQL statements can have some unexpected database performance issues. Enable trace logging will show the prepared statements in the logs, and doing an explain-plan analysis on these queries can help address performance concerns.

  • pk.mode and pk.fields

    Setting pk.mode to record.key is the easiest way to deploy a single connector for multiple tables with different primary key field names. If the key schema is a primitive, pk.fields is required. However, if the key is a structure, then this can (and should) be omitted; as all the fields in the key’s structure are used as the primary-key.

  • Understand and leverage partition.assignment.strategy

    If you are configuring multiple topics for one connector, you need to understand partition assignment.

    The default strategy is RangeAssignor. This means that the same partitions for all topics is shared within the same consumer instance. Not only can you only have as many tasks as the partition count of the topic with the most partitions, but you will also end up with uneven distributions if the partition counts are not the same.

    In the provided demonstration, the ORDERS_WITH_SCHEMA has 4 partitions and the USERS_WITH_SCHEMA has 2 partitions yields to uneven workers and limits the number of tasks to 4, even if tasks.max=6.

    • Range Assignor

      kafka-consumer-groups --bootstrap-server <broker>:9092 --describe --group connect-postgres

      GROUPTOPICPARTITIONCLIENT-ID
      connect-postgresORDERS_WITH_SCHEMA0connector-consumer-postgres-0
      connect-postgresUSERS_WITH_SCHEMA0connector-consumer-postgres-0
      connect-postgresORDERS_WITH_SCHEMA1connector-consumer-postgres-1
      connect-postgresUSERS_WITH_SCHEMA1connector-consumer-postgres-1
      connect-postgresORDERS_WITH_SCHEMA2connector-consumer-postgres-2
      connect-postgresORDERS_WITH_SCHEMA3connector-consumer-postgres-3

      Only 4 working tasks are created (unique client-ids), since the maximum partition count of the topics is 4. Because USERS has 2 partitions 2 workers have 2 partitions, and 2 workers have 1 partition.

    • Round Robin Assignor

      "consumer.override.partition.assignment.strategy": "org.apache.kafka.clients.consumer.RoundRobinAssignor",
      

      kafka-consumer-groups --bootstrap-server <broker>:9092 --describe --group connect-postgres

      max.tasks = 6

      GROUPTOPICPARTITIONCLIENT-ID
      connect-postgresORDERS_WITH_SCHEMA0connector-consumer-postgres-0
      connect-postgresORDERS_WITH_SCHEMA1connector-consumer-postgres-1
      connect-postgresORDERS_WITH_SCHEMA2connector-consumer-postgres-2
      connect-postgresORDERS_WITH_SCHEMA3connector-consumer-postgres-3
      connect-postgresUSERS_WITH_SCHEMA0connector-consumer-postgres-4
      connect-postgresUSERS_WITH_SCHEMA1connector-consumer-postgres-5

      The maximum number of possible tasks is the total number of partitions over all topics, if the setting is lower, such as tasks.max=5, they are assigned in a round-robin fashion (hence the name).

      max.tasks = 5

      GROUPTOPICPARTITIONCLIENT-ID
      connect-postgresORDERS_WITH_SCHEMA0connector-consumer-postgres-0
      connect-postgresUSERS_WITH_SCHEMA1connector-consumer-postgres-0
      connect-postgresORDERS_WITH_SCHEMA1connector-consumer-postgres-1
      connect-postgresORDERS_WITH_SCHEMA2connector-consumer-postgres-2
      connect-postgresORDERS_WITH_SCHEMA3connector-consumer-postgres-3
      connect-postgresUSERS_WITH_SCHEMA0connector-consumer-postgres-4

Takeaways

  • It is not always possible to go to the source system to change how it publishes data. Having a streaming platform, like ksqlDB, is important to your enterprise platform.

  • Existing Single Message Transforms (SMTs) do not bridge the schema-less to schema transformation. Writing a custom SMT is an option, but would not be available to use in a Confluent Cloud managed Connector.

  • Building a very simple Kafka Streams application would achieve the same success. While a Kafka Streams solution gives more flexibility, it increases development and operational effort.

  • While the goal of streaming systems is not just a pass-through from a source system into RDBMS tables, many enterprise systems need some level of this functionality.

  • Foreign Key constraints lead to challenges, disabling them for an “eventually consistent” database may not be an option. A More advanced solution is needed if they exist in the source tables.

  • Check out dev-local project and the demo in postgres-sink in dev-local-demos, and try it out yourself.

Reach out

Please contact us if you have improvements, want clarification, or just want to talk streaming.