Unit testing Kafka Streams with Avro is critical to ensure correctness of your streaming processing pipelines. If you decide to skip unit testing, you will likely spend a lot of time debugging at run time. The TopologyTestDriver API developed for Kafka Streams is powerful, intuitive and quite easy to use. In the following, I describe how to configure the TopologyTestDriver and use it for unit testing.
This post is part of a series of posts on Kafka Streams. Related posts:
Note that we focus on testing Kafka Streams pipelines that consume events in Avro format. Each event type in Avro format has to be registered with a Schema Registry which holds the schema description, and provides an API for consumers to check and validate schemas. For unit testing we do not want to use an actual Schema Registry server but rather to mock it. Since the streaming pipeline has multiple input and intermediate event types in Avro format (corresponding to input/intermediate topics in Kafka), we have to mock responses from the Schema Registry for all of them. This process is quite involved. Instead of mocking the schema registry for all the event types manually, we will use fluent-kafka-streams-tests. The library mocks the schema registry API under the hood. The SchemaRegistryMock provided by the library is based on WireMockServer.
The main dependencies for testing include: kafka-streams-test-utils and schema-registry-mock (link to the Git repo at the bottom of the page).
The setup() function in the code snippet below summarizes the steps to configure the TopologyTestDriver. These are:
- creating and starting a SchemaRegistryMock
- instantiating a TopologyTestDriver
- creating TestInputTopic and TestOutputTopic instances for each input and output topic
We configure a test class for the join application example presented in a prior post. For creating the TopologyTestDriver object (line 37) we need to configure the input KStream objects. Then, we invoke the streaming pipeline(s) to generate the corresponding output stream(s). We use the StreamBuilder to create the input KStream objects. StreamBuilder takes two parameters: the topic name and the Consumed object that describes the SerDe for keys and values. For all the Avro objects we use SpecificAvroSerde. To mock all calls to the schema registry, configure the SCHEMA_REGISTRY_URL_CONFIG property of the configuration map with the URL value provided by the SchemaRegistryMock, as shown in createSpecificAvroSerde() function.
2. Input/Output Operations
Once the TestInputTopic and TestOutputTopic are created we can start implementing our tests. We can pipe input key/value pairs to the input topics and then check for the expected result events in the output topics. The code snippet below shows a simple example where we pipe a Page and a Visit, and we expect to receive a join result at the output.
There are several possibilities to pipe input into the input topics: piping key/value pairs one by one (as in the earlier example), piping test records one by one, piping a list of key/value pairs, piping a list of only values, piping a list of test records. The difference between piping key/values and piping a TestRecord consist in that the later has additional attributes, in particular: record time and headers. For records, the record time is used as the time associated with the event. When piping key/values, the time of the event can be provided as an additional argument. If omitted, the event time is the same with the time of the TestInputTopic, which is subsequently advanced with the configured advance duration.
3. Checking State Stores
The TopologyTestDriver provides support for checking the state of internal state stores as well. Multiple options are available: e.g., fetching all the state stores or fetching only a given type of state store based on its name. One test example is illustrated in the snippet below.
- In this post we showed how to configure unit testing for Kafka Streams using evens in Avro format.
- We provided several examples and showed the APIs available as part of TopologyTestDriver.
- We showed some examples on how to fetch the state of the internal state stores.
- All tests can be found in spring-cloud-stream-with-kafka-streams-join-example repository on Github.
5. Recommended Readings
- Testing Kafka Streams using TestInputTopic and TestOutputTopic, Confluent blog
- Fluent Kafka Streams Tests, Medium blog
- Fluent Kafka Streams Tests, Github repository