Kafka Streams - Suppress surprise
In this post, I will explain the Kafka Streams suppress concept. Even though, it seems quite easy to understand there are some inherent issues/things that one has to understand. This is the continuation from my previous blog post CDC for Analytics.
Architecture
A Typical CDC architecture can be represented as:
In the above architecture,
- Individual table transaction messages are stored in Kafka in separate topics. These can be transferred to the target destination using the Kafka sink connector.
- To do aggregations like counting, statistics, joining with other streams (CRM or static content) we are using Kafka Streams. Some of the things could also be done using KSQL, but implementation using KSQL needs additional KSQL Server and additional deployment to take care. Instead Kafka Streams is an elegant way and it is a standalone application.
Kafka Streams Application can be written in Java/Scala.
My requirement is to join CDC Event Stream from multiple tables and create statistics every day. To do this, we had to use suppress from Kafka Streams.
To understand the Kafka Streams suppress concepts we need to first understand the Aggregation.
Aggregation Concepts
Kafka Streams Aggregation concepts are quite similar to any other functional programming (like Scala/Java Spark Streaming, Akka Streams). This post just covers some of the important concepts. For detailed Aggregation concepts please visit confluent docs.
The aggregation is a stateful transformation operation and it is applied to records of the same key. Kafka Streams supports the following aggregations: aggregate
, count
, and reduce
. You can run groupBy
(or its variations) on a KStream
or a KTable
, which results in a KGroupedStream
and KGroupedTable
, respectively.
To Aggregate in Kafka Streams one can either use,
- Count: Simple operation used to count the elements
- Aggregate:
The aggregate function is used when we want to the result type to be changed. Theaggregate
function has two key components:Initializer
andAggregator
. When the first record is received, theInitializer
is invoked and used as a starting point for theAggregator
. For subsequent records, theAggregator
uses the current record along with the computed aggregate (until now) for its calculation. Conceptually, this is a stateful computation being performed on an infinite data set. It is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc. - Reduce:
You can usereduce
to combine the stream of values. Theaggregate
operation mentioned above is a generalized form ofreduce
. The result type of reduce operation cannot be changed. In our case reduce with windowed operation suffice.
There are different windowing available in Kafka Streams. Please refer to the documentation. We are interested in the Tumbling time window of 1-day.
Note: All the aggregation operations ignore records with the
null
key, which is obvious since the very goal of these sets of functions is to operate on records of a specific key. Hence, we need to ensure we do first to our Event StreamselectKey
operation.
Adding
suppress(untilWindowClose...)
to the program tells Kafka Streams to suppress all output results from thereduce
operation, until the “window closes.” When the window closes, its result can never change again, so any results coming out ofsuppress(untilWindowClose...)
are the final results for their windows.
Based on the above definition from the documentation, we expect every day an aggregated statistics message generated (aligned to the UTC) just after the grace period. But, there is a caveat. The suppress will not flush the aggregated record till it encounters the same group-by key!!!
In the CDC event stream, every table will have its own PK and we cannot use that as a key for the event stream.
To have the same group-by key across all events, I had to hard-code the key in the transform
step while creating the Statistic Message like “KeyValue.pair("store-key", statistic)
”. Then, the groupByKey()
will properly group all the statistic messages.
In the CDC architecture, we cannot expect that there will the DB operation happening just after grace period. During the off-peak/weekend there could be no DB operations! Still we need to generate aggregated message.
To flush the aggregated record from suppress, I had to create a dummy DB operation (update any table row with the same content like update tableX set id=(select max(id) from tableX);
. This dummy DB update operation I had to do via cronjob every day just immediately after the grace-period. May be this cronjob could be replaced ProcessorContext#schedule()
, Processor#punctuate()
(Haven’t tried though as then I need to bring in hard-coded table names inside this application).
Suppress and Replay issues
The problem is more pronounced when we replay to calculate the aggregated statistics for an older period. The stream time gets weird and the aggregated window gets expired and we get the following WARNING.
2021–04–15 08:30:49 WARN Skipping record for expired window. key=[statistics-store-msg-key] topic=[statistics-streaming-aggregates-statistics-stream-store-repartition] partition=[0] offset=[237] timestamp=[1618420920468] window=[1618358400000,1618444800000) expiration=[1618459200477] streamTime=[1618459200477]
To prevent this expiring windows and get weird aggregated result, we need to increase the grace period to quite large value as illustrated below:
As illustrated above, when we do replay and give the “event-collection-start”, we should set the “grace duration” automatically (quite large enough). Then, the kafka streams will process all the aggregated events without any expiration properly. Still the final result will not be “flushed out” of the suppress window. We need to forcefully do that by creating a dummy update just after starting the application. As this is executed as a batch application, we need to also “kill $pid” to shutdown (Till KIP-95 is done: open for 3 years).
I hope many people like me have stumbled upon this problem in using suppress
and for them this is quite useful.
References
- Stream Concepts
- Kafka Streams’ Take on Watermarks and Triggers
- Emit a final result from a time window
- Stack overflow
* Kafka Stream Suppress session-windowed-aggregation
* is kafka streams supress operator untilTimeLimit event driven?
* Kafka — supress not throttling stream as expected
* Intentionally drop state when using suppress for rate limiting updates to KTable
Suggestion given:
Switch to processing time semantics by using WallclockTimetampExtractor (instead of the default extractor). For ensure that each record is only emitted once, you should change the suppress() configuration to only emit “final” results.
* Kafka Streaming Suppress Feature to get hold of transactions which are late beyond grace period
* One Minute aggregate window giving unexpected result in Kafka 2.4.0 - KAFKA-7748: Add wall clock TimeDefinition for suppression of intermediate events
* As of today Mar 30, 2021 this feature ticket is OPEN
* Corresponding KIP: KIP-424 - How to use Apache Kafka to transform a batch pipeline into a real-time one
- Kafka End to End Udemy (Medium Blog)
* Explains how to calculate 1 day average rating over the window period of 90 days + 1 day grace period. In this windowing he hasn’t used the suppress (may be at that time this wasn’t available in Kafka Streams API) and hence there are 91 average ratings.
* He uses upserts which overwrites the same entry in the DB table. (using Kafka connect JDBC sink connector)
* Even if we read in a streaming manner from this new Statistic table, we will get 91 average ratings because there are 91 DB transaction logs !