Showcasing Change Data Capture with Debezium and Kafka

blog-post

Setting up Change Data Capture with Databases, Apache Kafka, Kafka Connect, and Debezium takes time; with tricky configurations along the way. Here we will walk through a setup of all components to showcase what is possible and give the complete picture to give you the pieces to bring this into your project.

Real-Time Ecosystem

This demo is available in the rdbms-cdc-nosql folder in dev-local-demos project. It leverages applications available through containers in dev-local. Within a few minutes, you can see change data captures from relational databases (Postgres, MySQL v8, and MySQL v5) into NoSQL data stores (Mongo, Cassandra, and Elastic). Stream enrichment processing is done with ksqlDB.

Challenges

There are many nuances in setting up change-data-capture, and doing this within an enterprise organization takes effort and time. The challenges address here are an attempt to make this a little be easier by providing you a complete POC demonstration to see the steps needed to make a change-data-capture solution.

The specific touch-points here covered:

  • Enabling logging within a database
  • Connector setting nuances
  • Management of Kafka Connect Secrets
  • Logical Data-types

Database logging

Each database has its own nuances to set up logging. This is critical for any change-data-capture process, and something that needs to be well understood for success. Here are the settings and issues in the configuration of Postgres and MySQL with Debezium for change data capture. This is not a complete overview of all the settings, but rather to provide insight into the complexities that need to be determined between your database operations and development teams. Work with your database administration to enable database logging on the tables that are needed, along with any snapshotting or specific configurations.

Postgres and Debezium

  • TL;dr;

    • wal_level must be set to logical.
      • If you are running postgres with docker compose, override command to the following:
        command: "postgres -c wal_level=logical"
        
    • connector needs to enable the pgoutput plugin.
      • Add the following to the connector configuration:
        "plugin.path" : "pgoutput"
        
  • Details

    • Postgres needs to be able to capture changes; this is done through the write-ahead-log (wal).
      The amount of data captured is based on the wal_level settings. The default setting is replica but this is insufficient level of data for debezium. The logical setting includes replica information and additional logical change sets.

    • Debezium has to be configured to use the pgoutput plugin. Use the configuration property plugin.name to set this.

    • Troubleshooting

      This is a set of errors seen when using Debezium Postgres Source Connector.

      • Attempting to start debezium with Postgres, w/out wal_level properly defined.

        Connector configuration is invalid and contains the following 1 error(s):  
        Postgres server wal_level property must be \"logical\" but is: replica
        
      • Restarting postgres w/out -c wal_level=logical will result in postgres failing to start with the following error:

        FATAL:  logical replication slot "debezium" exists, but wal_level < logical
        
      • Starting a connector w/out pgoutput plugin enabled.

        io.debezium.DebeziumException: Creation of replication slot failed
        

    If (I should say when) you uncover an error; take the time to document and document it well. Having a deja-vu moment when an error re-surfaces, is not fun.

MySql (v8) and Debezium

  • TL;dr;

    • Version 8 of MySql has logging enabled by default. In production, however, you do need to verify this with operations.
  • Troubleshooting

    • You delete and recreate your MySQL database, but reuse the connector (and the state it persists in Kafka)
    Caused by: io.debezium.DebeziumException: 
    Client requested master to start replication from position > file size Error code: 1236; SQLSTATE: HY000
    

MySql (v5) and Debezium

  • TL;dr;

    • Binlog is not enabled by default, but is rather simple to do, set log-bin name and configure binlog_format to row.

    • You need to ensure logs are maintained for as longer than any level of reprocessing.

    • Debezium expects to resume where it has left off, modifying the logs after debezium has been started can lead to some unexpected errors.

  • Details

    • Add the following properties to your database’s mysql.cnf file.
    server-id         = 1
    log_bin           = mysql-bin
    expire_logs_days  = 99
    binlog_format     = row
    
  • Troubleshooting

    • you recreate your database but use the same instance of the connector (v5 has same error as v8).
    Caused by: io.debezium.DebeziumException: 
    Client requested master to start replication from position > file size Error code: 1236; SQLSTATE: HY000.
    
    • MySQL was shut down or became inaccessible. This is a pretty easy fix, just not intuitive.
    Caused by: io.debezium.DebeziumException: Failed to read next byte from position XXXXX
    

Debezium

Debezium is an excellent change-data-capture open-source product. It provides a lot of feature that make it a powerful tool. I find that having a few working examples make it a lot easier to understand. Here is a few things pointed out as understanding this made it easier to configure them and quickly get to seeing the rewards of change data capture.

  • database.server.name

    • This property is not a connection property to the database, but rather the name used to keep this connection uniquely identified. It is used in the topic name generated for the CDC process against the source database. My suggestion is to not pick the type of database as the name (e.g. postgres or mysql). Picking a name like this could cause those maintaining the code to believe this name needs to align to the type of the database.
  • io.debezium.transforms.ExtractNewRecordState

    • By default, Debezium provides nested elements of before, after, and operation. For most use-cases, extracting just the after state is sufficient, and Debezium provides a Single Message Transform (SMT) to do just that. Nested elements can be tricky if you are not writing stream applications; so allowing the data to be flattened with one simple SMT is very helpful. Using this SMT makes it easier to pull data into ksqlDB for enrichment.
  • Predicates (Apache Kafka 2.6)

    • Prior to Apache Kafka 2.6, transformations were unconditional, making a single connector process multiple tables more difficult. By using predicates, a single connector can have different rules for extracting the key from the message.

    • A common use of SMTs in Debezium connectors is to pull out the element from the value that is the primary-key, this is to ensure that the events on a given row (primary-key) in the database are processed in order.

  • database.history

    • Many connectors allow for metadata related to the connector to be sourced to a different Kafka cluster. This flexibility leads to confusion, especially to developers new to Kafka Connector and a specific Connector.

    • Debezium’s database history, is designed this way. You need to set-up bootstrap server, protocol, and any other connection to the kafka cluster to maintain this information, even if it is the same cluster. For enterprise deployments, this flexibility is critical. For proof-of-concepts, development, and trying to get something up and running quickly it is a lot of duplicate configuration.

  • decimal.handling.mode

    • Setting this to string can address sink connector issues that cannot handle the decimal logical type. The demo code uses ksqlDB to cast decimal to string, for downstream sinks that need the help, but this is an alternative approach.

Connector Secrets

  • Apache Kafka provides a Configuration plugin interface that allows for secrets to be stored separately from configuration. This is also available to a distributed connect cluster, and accessible from the connectors.

  • config a file connector as follows org.apache.kafka.common.config.provider.FileConfigProvider in the settings of the distributed connect cluster.

      config.providers=file
      config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
    

    In addition to leveraging these from your kafka component configurations, they are also accessible from connectors; such as:

      "username" : "${file:/etc/kafka-connect/secrets/mysql.properties:USERNAME}"
    
  • Put more than secrets in this file; if you store the URL connecting string to the database here, but connection strings and other settings that vary between deployment environments. By keeping those removed from the configuration, a single artifact than be published and maintained with your source-code.

     "connection.url" : "${file:/etc/kafka-connect/secrets/mysql.properties:CONNECTION_URL}",
     "connection.user" : "${file:/etc/kafka-connect/secrets/mysql.properties:CONNECTION_USER}",
     "connection.password" : "${file:/etc/kafka-connect/secrets/mysql.properties:CONNECTION_PASSWORD}"
    

Schemas and Data-types

When it comes to data-types, especially those consider to be logical data types in Connect API, not all connectors are the same. If you are doing change data capture, odds are you will have decimal’s and timestamps.
Fortunately, timestamps are stored as long epoch, which will usually translate into a database even if the logical type is not properly handled. Decimals, however, are stored as byte arrays. If the connector doesn’t properly invoke the logical converter, it will not be properly converted for the end system. What makes matter worse, connectors are not consistent on how they handle the errors.

Specific Connector Observations

  • Mongo

In this demonstration, MongoDB Sink Connector properly handles the logical type, and data is stored correctly. The SchemaRecordConverter properly handles the conversion; but as you can see, the converter has to account for and handle logical-types; it is not being done within the Connect API.

  • Data shown in MongoDB.

all_orders
All Orders Shown in Mongo

  • Elasticsearch

If elasticsearch sink connector creates the index in elastic (schema.ignore=false), logical types are handled properly. If the sink connector doesn’t create the index (schema.ignore=true), logical converters are not processes and logical-type decimals will end up as an array of bytes in elasticsearch.

  • Index generated by the connector. In each record the amount is a decimal value.

elastic-index-connector-created
Elasticsearch connector created index

  • Index is built manually through elastic API (connector does not create it). The decimal bytes are passed as-is to the index; yielding in a non-desired result. Each record is the byte array value (the physical representation of the logical type).

elastic-index-elastisearch-created
Elasticsearch index created by elastic

The amounts shown in the Kibana screenshot of HUM=, G2Q=, and FIA= are actually the physical byte arrays converted to strings.

  • Cassandra

The Datastax Cassandra Sink Connector does not handle the decimal logical-type correctly. What makes matter worse, is the conversion is a warning that shows up in the connect cluster log. No data is written, and the connector keeps running. When checking the log, you can see:

WARN Error decoding/mapping Kafka record ... : Codec not found for requested operation: 
[DECIMAL <-> java.nio.ByteBuffer] (com.datastax.oss.kafka.sink.CassandraSinkTask) 

For every sink connector you plan to use:

  • Be sure to test with decimals, timestamps, & dates; unless that truly isn’t in your use case.

  • Don’t simplify your POC. For example, don’t let elasticsearch sink connector create your indexes in your POC, if that is not possible in production.

  • Watch the logs and check for WARN or even INFO.

  • If you have issues with logical types, you can have Debezium use strings for decimals (decimal.handling.mode=string), or you can leverage a stream processor (e.g. ksqlDB) to cast a decimal to a string.

Takeaways

  • Enabling database logging can be tricky and each database has unique ways of configuration and enabling.

  • Having an end-to-end proof of concept showcasing change-data-capture is a great way to get developer buy-in and involvement, but plan appropriate time working with your database operational team to get it enabled in the enterprise.

  • Leverage Apache Kafka’s Config Provider for secrets and environment specific differences, such as the database connection string. The ability to check in a single configuration artifact into revision control w/out specifics to a given environment, is a great benefit.

  • Validate decimals, dates, and timestamps, as not all connectors handle them correctly.

  • Check out dev-local project and the demo in rdbms-cdc-nosql in dev-local-demos. The specifics discussed here is based on this demonstration, and scripts are there to get you to start observing change-data-capture with Debezium and Kafka within minutes.

Reach out

Please contact us if you have improvements, want clarification, or just want to talk streaming.