A demonstration of leveraging a stream-to-table join in Kafka Streams.
The Source Code
protected void build(StreamsBuilder builder) {
KTable<String, OSProcess> processes = builder
.<String, OSProcess>stream(Constants.PROCESSES)
.toTable(Materialized.as("processes-store"));
builder.<String, OSWindow>stream(Constants.WINDOWS)
.selectKey((k, v) -> "" + v.processId())
.join(processes, StreamToTableJoin::asString)
.to(OUTPUT_TOPIC, Produced.with(null, Serdes.String()));
}
private static String asString(OSWindow w, OSProcess p) {
return "pId=" + p.processId() + ", wId=" + w.windowId() + " : " + rectangleToString(w);
}