Apache Kafka, Apache Flink, Confluent’s Schema Registry

blog-post

If you are an Apache Kafka developer looking to write stream-processing applications in Flink, the initial setup isn’t so obvious. Apache Flink has their own opinions on consuming and producing to Kafka along with its integration with Confluent’s Schema Registry. Here are steps and a working example of Apache Kafka and Apache Flink streaming platform up in no time.

Introduction

Apache Flink is a major platform in stream processing; especially in managed services. Here is a look at standing up Apache Flink with integration with Apache Kafka, Confluent’s Schema Registry, and Avro Serialization.

When both the Kafka key and value are to be part of the streaming pipeline (not just the value) nested generics come into play and type handling gets tricky.

Resources

  • A full container-based ecosystem is available at dev-local. It is based on docker-compose and provides container instances of Apache Kafka, Apache Flink, Kafka Connect, and more.

  • A demonstration of this article is provided the flink folder of the dev-local-demos project.

Challenges

The key challenges uncovered:

  • Committing Offsets
  • Using Offsets
  • Serialization and Deserialization
  • Confluent’s Schema Registry Integration
  • Java Generics and Type Erasure

The Details

Committing Offsets

By default, Flink does not commit Kafka consumer offsets. This means when the application restarts, it will consume either from the earliest or latest, depending on the default setting. With a setting, offsets are committed when Flint creates a checkpoint. Just don’t forget to do so when setting up the Kafka source.

Set commit.offsets.on.checkpoint to true and also add a Kafka group.id to your consumer.

KafkaSource<Tuple2<EventKey, EventValue>> source = KafkaSource.<Tuple2<EventKey, EventValue>>builder()
    ...
    .setProperty("commit.offsets.on.checkpoint", "true")
    .setProperty("group.id", "flink-processor")
    ...
    .build();

Additionally, check-pointing must be enabled. It is very simple to do, but also very easy to forget.

StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(100L);

Using Offsets

Committing offsets doesn’t mean they will be used; the KafkaSource instance has to be configured to use them. The committingOffsets of the OffsetsInitializer instructs the source instance to use them. Be sure also to define the behavior when offsets do not yet exist by selecting the appropriate OffsetResetStrategy.

KafkaSource<Tuple2<EventKey, EventValue>> source = KafkaSource.<Tuple2<EventKey, EventValue>>builder()
    ...
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    ...
    .build();

Serialization and Deserialization

The Kafka Producer and Consumer used by Flink leverages the byte[] serializer and deserializer and leaves the marshaling of data to Flink. If you only needed to work with message values, setup is easier. However, when it comes to stream processing; leveraging a Kafka message key is usually necessary; so best to understand how that works.

Implement these interfaces KafkaRecordDeserializationSchema and KafkaRecordSerializationSchema and use them in the KafkaSource and KafkaSink builders respectively.

The method names within KafkaSource and KafkaSink are not consistent. For KafkaSource is it setDeserializer and takes the KafkaRecordDeserializationSchema. For KafkaSink it is setRecordSerializer and takes the KafkaRecordSerializationSchema.

KafkaSource<Tuple2<EventKey, EventValue>> source = KafkaSource.<Tuple2<EventKey, EventValue>>builder()
    ...
    .setDeserializer(new MyAvroDeserialization<>(EventKey.class, EventValue.class, "http://schema-registry:8081"))
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    ...
    .build();
KafkaSink<Tuple2<EventKey, EventValue>> sink = KafkaSink.<Tuple2<EventKey, EventValue>>builder()
    ...
    .setRecordSerializer(new MyAvroSerialization<>(EventKey.class, EventValue.class, topic, "http://schema-registry:8081"))
    ...    
    .build();

Confluent’s Schema Registry Integration

Flink provides their own Confluent Schema, ConfluentRegistryAvroDeserializationSchema as part of the flink-avro-confluent-registry library. Leverage this within your MyAvroDeserialization and MyAvroSerialization implementation. The configuration of Schema Registry and subject naming conventions need to be implemented accordingly.

ConfluentRegistryAvroDeserializationSchema.forSpecific(pojoClass, schemaRegistryUrl)

For topic-based serialization, the subject would be {topic}-key and {topic}-value.

ConfluentRegistryAvroSerializationSchema.forSpecific(pojoClass, subject, schemaRegistryUrl);

Java Generics and Type Erasure

KafkaStreams makes both key and value part of the processor API and domain-specific language (DSL). This reduces the complexities of leveraging generics. In Flink, it is a single object. Capturing both Key and Value objects within Flink requires more nuance with generics.

Fortunately, Flink’s Tuple (and its 26 concrete implementations Tuple0, Tuple1, …, Tuple25) make it possible to write serializers and deserializers generically. The core concept is that the serializer and deserializer need to provide type information in a way where the nested types are not erased. This is through TypeInformation.

Creating a TypeHint works similarly to TypeReference in the Jackson JSON library. A specific instance allows the code to handle the erased types.

TypeInformation<Tuple2<EventKey, EventValue>> typeInformation = TypeInformation.of(
    new TypeHint<Tuple2<EventKey, EventValue>>() {
    }
);

With the use of TupleTypeInfo a single instance of KafkaRecordDeserializationSchema and KafkaRecordSerializationSchema can be created for handling marshaling of Avro to SpecificRecord implementations.

new TupleTypeInfo<>(TypeInformation.of(keyClass), TypeInformation.of(valueClass));

Putting it all Together

This demonstration is available in the dev-local-demos project, flink.

A simple ETL-based processor with a transformation of an Order to a PurchaseOrder (as shown in the demonstration code), is achieved with a very functional transformation. In this example, the business logic is the convert method within the map operation.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
    .enableCheckpointing(100L);

KafkaSource<Tuple2<OrderKey, Order>> source = KafkaSource.<Tuple2<OrderKey, Order>>builder()
    .setBootstrapServers(bootStrapServers)
    .setTopics("input")
    .setDeserializer(new AvroDeserialization<>(OrderKey.class, Order.class, schemaRegistryUrl))
    .setProperty("commit.offsets.on.checkpoint", "true")
    .setProperty("group.id", "FLINK")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .build();

KafkaSink<Tuple2<OrderKey, PurchaseOrder>> sink = KafkaSink.<Tuple2<OrderKey, PurchaseOrder>>builder()
    .setBootstrapServers(bootStrapServers)
    .setRecordSerializer(new AvroSerialization<>(OrderKey.class, PurchaseOrder.class, "output", schemaRegistryUrl))
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
    .map(kv -> {
        return new Tuple2<>(kv.f0, convert(kv.f1));
    }, new TupleTypeInfo<>(TypeInformation.of(OrderKey.class), TypeInformation.of(PurchaseOrder.class)))
    .sinkTo(sink);

env.execute();

Running your application.

Simply build the application with dependencies bundled, and run it with the flink run command. Parallelization is done with the -p command.

flink run -p 4 --detached application-all-dependencies.jar

Conclusion

With an overview of how to use Apache Kafka and Confluent’s Schema Registry with Apache Flink, time to explore.