State Stores in Kafka Streams

This post is part of a series of posts on Kafka Streams. Related posts:

State Stores

Each stateful operator in Kafka Streams is materialized in a state store. Different types of state stores exists in kafka streams depending on the streaming operation, level of persistency and fault tolerance. By default, Kafka Streams uses RocksDB as the underlying state store implementation. Yet, it is possible to change that with a custom implementation of the StateStore interface. Below we go over several state store configurations for aggregate and join operators.

State Stores for Aggregates

State stores can be categorized into implicit (with default configuration) and explicit (with custom configuration). The implicit state store receives an internal name (e.g., KSTREAM-AGGREGATE-STATE-STORE-0000000019), it is persisted on disk, and has its change log enabled (a change log is maintained on a corresponding Kafka topic to ensure fault tolerance). For an explicit state store we can configure a custom name, we can specify whether it is persisted or only kept in memory, and we can disable logging (the change log is not maintained in this case). In addition, we can configure the window duration corresponding to the aggregate operation and the retention time. The retention time has to be at least the size of the window. The code snippet below shows examples of configuring different types of state stores for an aggregate operator that computes the count of pages having the same id.

State Stores for Joins

State stores can be similarly configured for join operators. By default a state store is created for each input that is joined, names are automatically generated (e.g., KSTREAM-JOINTHIS-0000000018-store and KSTREAM-JOINOTHER-0000000019-store), the state stores are persistent, and their corresponding change logs are maintained. Explicit state stores can be defined for joins as well as shown in the code snippet below. Something to keep in mind is that the retention time has to be equal with the size of the window plus the grace period configured on the JoinWindows instance. JoinWindows.of(timeDifference) specifies that records of the same key are join-able if their timestamps are within the timeDifference, which is half the window size. Currently, it is not possible to disable change logs for join operators. This feature has been requested (KAFKA-9126), and the implementation will be available as of Kafka Streams 2.8.0.

Querying the State Stores

Interactive Queries provides an API to query the state of the internal state stores. In the context of Spring applications, InteractiveQueryService is a service that provides access to this API and can be simply auto wired in the application. In the following snippet we show how to use it to query the “join-main” store configured in the join state store example above. Care should be taken in providing exactly the type of the state store used in the stateful operation and the signature of the returned object. Possible query-able state store types include: windowStore(), timestampedWindowStore(), keyValueStore(), timestampedKeyValueStore() and sessionStore().

Conclusions

In this blog we went over state stores in Kafka Streams. We have seen that default state stores are defined for stateful operations, which can be further customized with explicit configurations. The custom settings can specify the level of persistency (in-memory, on disk), the window size, the retention time, and whether the state store is fault tolerant or not by setting the logging level. Default state stores are fault tolerant by default, and their corresponding change log is materialized not only locally on disk but also in a corresponding Kafka topic.

Recommended Readings

Kafka Streams State

State Store Types

Kafka Streams Interactive Queries

Use Cases for Interactive Queries

    Previous article

    Unit Testing Kafka Streams with Avro