KTools - Kafka Console Consumer Filter

blog-post

A second command-line tool has been added to ktools. The command kafka-console-consumer-filter allows for filtering JSON, JSON Schema, and Avro serialized messages by JSON Path expressions. In addition to filtering a message, JSON Paths can also be used for highlighting elements within the JSON.

Introduction

This tool is designed around questions like:

  • Did an event with a given JSON Element make it to Kafka?
  • I want to quickly check events on the topic over the past 2 hours w/out having to write an ISO 8601 string.
  • I want to stream data as it is happening highlighting what I’m most interested about.

Why

As a developer, I embrace command-line tools. Having the ability to use a CLI is extremely helpful. The need for seeing and displaying content on a topic is an important ability for troubleshooting, validation, and for new developers. In the past, I would chain kafka-console-consumer with various unix operations, e.g. grep but this would have its struggles.

I also found the date-time navigation of using CLI tools to be quite frustrating. In addition, not being able to stop consumption by a given time also made it quite challenging to verify a simple question, “Was a message created two to three hours ago?”

Overview

The best way to give an overview of this tool is through examples. Here are a few examples to see if this CLI tool can help you.

Find all messages that have a price attribute where the price is less than 1.00.

specifically

  • Filter any message that doesn’t have a price element that is less than 1.00.

  • Highlight the items in blue, the SKU in yellow, prices less than 1.00 in red, and any price greater than 30.00 in green.

  • Search the entire topic, exiting when it reaches offsets associated with “right now”.

  • Include the topic key and metadata as part of the output.

syntax

kafka-console-consumer-filter --bootstrap-server localhost:9092 --topic orders-pickup \
    --filter "$..[?(@.price < 1.0)]" \
    --highlight "BLUE=$..items" \
    --highlight "RED=$..[?(@.price < 1.0)].price" \
    --highlight "GREEN=$..[?(@.price > 30.0)].price" \
    --highlight "YELLOW=$..sku" \
    --include-key \
    --include-metadata \
    --start earliest \
    --end now

output

example1
example JSON message filtered and displayed

Find all messages that have 4 or more line items.

specifically

  • Filter only orders that have at least 4 elements in the “items” array.
  • Highlight the item array as blue, prices with the default color (red), and all name attributes in purple.
  • start from 1 day ago and stop when messages exist after the time this command starts executing.

syntax

kafka-console-consumer-filter --bootstrap-server localhost:9092 --topic orders-pickup \
    --filter "$..[?( @.items.length() >= 4 )]" \
    --highlight "BLUE=$..items" \
    --highlight "$..price" \
    --highlight "Purple=$..name" \
    --rewind 1d \
    --end now

output

example2
example JSON message filtered and displayed

Display Avro in JSON format, do not filter anything out, but highlight orderId’s as blue and prices as red.

specifically

  • Select avro as the format, providing the URL to the schema registry.

  • Start from March 11th, 2024 at 17:00 UTC, and stop at the time when the command started.

  • Highlight the orderId in blue and all price elements in red.

kafka-console-consumer-filter --bootstrap-server localhost:9092 --topic purchase-orders \
    --format avro \
    --schema-registry http://localhost:8081 \
    --highlight "BLUE=$..orderId" \
    --highlight "$..price" \
    --start "2024-03-11T17Z" \
    --end now

example3
example Avro message filtered and displayed

Configuration

Connection

  • The connection parameters are similar to standard Kafka CLI tools, with a few exceptions:

    • While the bootstrap.servers kafka consumer property is required, it can be provided in the --consumer.config property file, or by the option --bootstrap-server.

    • Since Apache Kafka uses --command-config for tools like kafka-topics, and --consumer.config for tools like kafka-console-consumer, this tool supports both --consumer.config and --consumer-config. Why? Because I always forget what the middle delimiter is . or -.

    • The connection with Apache Kafka is through the assignment() API, not subscribe(). So no consumer offsets are ever committed to the cluster, and an internal group.id is created only to aid in any metric gathering.

    • Schema configurations, when required, are maintained in their configuration file and can be supplied with --schema-registry.config (or --schema-registry-config).

Topic

  • --topic
    • The name of the topic to consume.
  • --partitions
    • All partitions are assigned unless a list of partitions is provided.
    • Partitions that do not exist (e.g. too large of value) are ignored.
  • Navigation is done by setting an optional start time, end time, and max message count. The start and end times are based on two settings to make date selection easier.
Start Time
  • --start to provide a start time in a flexible format, yyyy-MM-dd(T, )HH:mm:ss.SSSZ.

    • 3 token-based values are allowed: now, earliest, latest.
      • now and latest are the same thing, using the current timestamp as the starting point.
      • earliest will start as the earliest offset in each partition.
    • now is the default.
    • The ‘T’ delimiter can also be a space, provided is interpreted as a single argument by your shell (e.g. quote it).
    • Hours, Minutes, Seconds, Microseconds, and Timezone are optional and will be defaulted, if not provided.
    • Timezone defaults to system timezone, so use Z if you mean UTC.
    • Keep in Mind a Date w/out Hours is going to adjust hours based on the default timezone, so still add a timezone if needed.
  • --rewind is the ability to quickly change the start time by providing a duration.

    • Units

      • ms - milliseconds (default), s - seconds, m - minutes, h - hours, d - days.
    • The default is 0ms.

    • To rewind 2 hours from now is simply, --rewind 2h.

    • Both --start and --rewind can be on the command line together; rewind will be applied to whatever start time is given.

      • keep in mind that rewind goes backward, so if you want to go forward from the provided start time, negate it.
      • earliest will use an epoch of 0 for seeking, so the use of earliest and rewind is not something I expect someone to do.
    • Keep in mind rewind is typically used w/out --start as a quick way to move backward from now. This is why it is a positive value that is subtracted from the start time. So 3d is 3 days earlier than the start time.

End Time
  • End time is optional, if not provided, the consumer will not end by time (could still end by max messages being reached).

  • To process the end time, at least 1 message in every partition after the end time needs to be processed by the command line tool; since event time != processing time. These extra messages are not displayed.

  • --end to provide a start time in a flexible format, yyyy-MM-dd(T, )HH:mm:ss.SSSZ.

    • Same formatting rules as --start.
    • There is no default.
  • --forward changes the end time by providing a duration.

    • Same formatting rules as --rewind.
    • There is no default, meaning if both --end and --forward are missing, the consumer will not end based on a date-time.
    • Where a positive --rewind moves the start time earlier, a positive --forward moves end time later. This all makes sense to my use cases, but I could see the positive/negative behavior could be confusing (but I hope the name of the command line switches makes it easier).
Max Messages
  • --max-messages is the number of messages to consume before quitting.

  • Repeating for a given start time, the results may not be identical, since the number of messages pulled from a partition is non-deterministic.

Filtering and Display

  • These are the core commands of this tool. This filters the data you want to display and the rules on how they are displayed.

  • --format

    • json - the value is JSON serialized, as with the standard Apache Kafka JSON Serializer.
    • json-schema - the value is serialized with the Confluent JSON Schema Serializers that leverage the Schema Registry.
    • avro - the value is serialized with the Confluent Avro Serializers that leverage the Schema Registry.
  • --filter is a JSON Path expression that must match the value message to be outputted to the screen.

    • For Avro, this is a JSON Path on how the Avro is rendered, this means that the type string for union types could cause the JSON to not be quite as expected.

    • Jayway open-source project is used for parsing JSON Path expressions, so if a JSON Path expression isn’t working as expected; the implementation does have its nuances.

  • --highlight (or --highlights)

    • If your terminal honors ANSI coloring, highlights can be used to make those elements stand out in the output.
    • syntax: COLOR=Path
    • If COLOR is omitted, then the default color is used for highlighting.
    • JSON Path expressions can overlap, as shown in one of the examples above.
  • --default-color

    • The default color is red, but any of the available ascii colors can be used.
    • The color options are: black, red, green, yellow, blue, purple, cyan, and white.
  • --include-key

    • Include the key in the output, the key will be represented in String form and added to the top-level JSON.
    • By putting the key into the outputted JSON, JSON Path expressions can be used to filter by key.
    • Keep in mind, that the value is then nested in a value instance and can impact JSON Path expressions.
  • --include-metadata

    • Include the message in the output and the metadata is added to a top-level object called metadata with topic, partition, offset, and timestamp elements within it.
    • By putting the metadata into the outputted JSON, JSON Path expressions can be used to filter by anything in the metadata.
    • Keep in mind, that the value is then nested in a value instance and can impact JSON Path expressions.

Project

This CLI tool is the second tool to be added to the KTools open-source project. For more details on this project, please see the release page for the first tool kafka-topic-truncate. It is the initial release of this command, so please be patient and let me know if there are issues, enhancements, or other questions.

Reach out

Please contact us if you have ideas or suggestions. Use the project’s GitHub Issues page for specifics for the project.