Bite Size Streams: Stream to Table Join

blog-post

A demonstration of leveraging a stream-to-table join in Kafka Streams.

The Source Code

  protected void build(StreamsBuilder builder) {

    KTable<String, OSProcess> processes = builder
            .<String, OSProcess>stream(Constants.PROCESSES)
            .toTable(Materialized.as("processes-store"));

    builder.<String, OSWindow>stream(Constants.WINDOWS)
            .selectKey((k, v) -> "" + v.processId())
            .join(processes, StreamToTableJoin::asString)
            .to(OUTPUT_TOPIC, Produced.with(null, Serdes.String()));
  }

  private static String asString(OSWindow w, OSProcess p) {
    return "pId=" + p.processId() + ", wId=" + w.windowId() + " : " + rectangleToString(w);
  }

The Topology

The Display

The Video