Kafka Streams with Spring Cloud Stream

In this post we show how to configure Kafka Streams with Spring Cloud Stream. Then we deploy it and run it on Cloud Foundry PaaS.

This post is part of a series of posts on Kafka Streams. Related posts:

picture for joining streams
Photo by Nathan Anderson Unsplash

1. Software Stack

In this tutorial we use Kafka Streams version 2.4.0, Spring Boot 2.2.2.RELEASE and Spring Cloud dependencies HOXTON.RELEASE. For the actual version of all the other libraries please check the pom.xml file in the project repository on Github.

2. Joining Two Input Streams

In our example application we join data flowing from two kafka topics. The results are saved in a new output topic which contains the joined records. There are multiple join possibilities in Kafka Streams. In this post, we focus only on (KStream, KStream) windowed joins. That means that matching tuples within a window are joined and output to the resulting output stream.

3. Resource Configuration for Spring Boot Application

The resource configuration parameters that are particularly relevant for Kafka Streams applications are the memory size and the disk quota. Kafka Streams uses the so called state stores to keep the internal state of the application. Depending on the type of state store that is used: persistent or in-memory, the application may require memory and disk tuning. JVM memory pools (such as direct memory size, reserved code cache size) can be configured when deploying the application to Cloud Foundry by specifying the configurations in the JAVA_OPTS environmental variable, as shown in the snippet below.

---
applications:
- name: spring-cloud-stream-with-kafka-streams-join-example
  memory: 3G
  disk_quota: 1G
  instances: 1
  env:
    JBP_CONFIG_JMX: '{enabled: true}'
    JBP_CONFIG_OPEN_JDK_JRE: '{jre: { version: 11.+ }}'
    TZ: Europe/Amsterdam
    JAVA_OPTS: -XX:MaxDirectMemorySize=500M -XX:ReservedCodeCacheSize=128M

4. Application Configuration

For configuring our application we use a yml configuration file as listed below. The first section of the file declares the application properties: the names of the two input topics, the kafka broker address, and the schema registry url. They have to be changed to fit the configuration of your Kafka setup. The next property, `spring.application.name` defines the name of the Kafka Streams application, and is used as a prefix for all the internal topics that are created automatically by the Kafka Streams library.

application:
    pagesTopicName: pages
    visitsTopicName: visits
    kafkaBrokerAddress: 192.168.1.103:9092
    kafkaSchemaRegistryURL: http://192.168.1.103:8081
spring:
    application:
      name: kafka-streams-join-example
[...]

Two main categories of properties are following in the configuration file: binding properties and binder properties. Binding properties are describing the properties of the input/output queues (such as the name of the Kafka topic, the content-type, etc). Binder properties are describing the properties of the messaging queue implementation (such as Kafka).

4.1 Defining Input/Output Bindings

`spring.cloud.stream.bindings.*` properties can be used to define the bindings in terms of inputs and outputs. Each binding has an identification name, a destination (a Kafka topic, when using Kafka as a binder), and a content-type. The content-type is used to identify the corresponding serializer / deserializer implementation (i.e., SerDe). It is important to note that for each input binding the application is a consumer, and that for each output binding the application is a producer.

To instruct Spring Boot to enable these bindings, a binding interface has to be implemented. The interface defines an access method for each binding name and is implemented by the Spring framework based on the following annotation @EnableBinding(BindingInterface.class). Please check out KafkaStreamsBindings on the Github project for a complete example.

spring:
[...]
    cloud:
      stream:
        bindings:
          input-page:
            destination: ${application.pagesTopicName}
            content-type: avro/bytes

          input-visit:
            destination: ${application.visitsTopicName}
            content-type: avro/bytes

          output-page-visit:
            destination: page.visits
            content-type: avro/bytes

4.2. Binder Properties

`spring.cloud.stream.kafka.streams.binder.*` properties define the properties corresponding to the binder, in our case Kafka. The Kafka brokers, bootstrap servers, schema registry address, and Kafka specific configuration can be provided in this section of the configuration file. Default key/value SerDe(s) can be also specified. Custom Kafka properties can be provided for each binding as well. For instance, for the “output-page-visit” binding we define to use a custom “keySerde“. Finally, default consumer/producer properties can be defined. These configurations apply to all consumers (i.e., all input bindings), and all producers (i.e., all output bindings).

spring:
[...]
    cloud:
      stream: 

        kafka:
          streams:
            binder:
              brokers: ${application.kafkaBrokerAddress}
              autoCreateTopics: true

              configuration:
                bootstrap-servers: ${application.kafkaBrokerAddress}
                commit.interval.ms: 1000
                default.key.serde: com.scaleoutdata.spring.cloud.stream.kafka.streams.join_example.serdes.StringAvroSerde
                default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                schema.registry.url: ${application.kafkaSchemaRegistryURL}

            bindings:
              output-page-visit:
                producer:
                  keySerde: com.scaleoutdata.spring.cloud.stream.kafka.streams.join_example.serdes.StringAvroSerde

        default:
          producer:
            useNativeEncoding: true
            startOffset: latest

          consumer:
            useNativeEncoding: true
            startOffset: latest

4.3 Custom SerDe(s)

By default, Spring Cloud Stream chooses a default SerDe implementation for serializing / de-serializing key/values based on the content-type specified in the binding. If native serialization / de-serialization is desired, we need to set useNativeEncoding: true for each consumer/producer. Then, we need to define default key/value SerDes for the binder (or custom SerDes for each binding). In our example, we use SpecificAvroSerde for serializing/de-serializing values (default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde).

Native Kafka SerDe(s) for primitive types: when using native Kafka SerDes, serialization/de-serialization of primitive type should be done through a custom class that uses under the hood KafkaAvroSerializer() and KafkaAvroDeserializer() rather than using the available SerDes for primitive types (e.g., Serdes.String(), Serdes.Long(), etc). The reason is an incompatibility between Confluent and Kafka SerDes for primitive types.

5. Joining the Streams

The configuration is ready, so we can finally focus on joining tuples coming from the two input topics. Mapping the inputs and the output to the application is done by annotating the processing method with @StreamListener and the input and output streams as follows. For mapping an input to a KStream we use the @Input annotation. To map an output to a KStream we use the @SendTo annotation.

Once the inputs and output are mapped to KStreams we can focus on implementing the join operation using the Kafka Streams DSL. From each input we select the joining key (i.e., the pageId), then we perform a windowed inner join with a custom window size. You will notice that we use a custom in-memory state store, with a given name, persistency and window size. The join result is then sent to the output topic and also outputted to the application logs. For the cases when the state stores are large and cannot fit in memory, persistent state stores, that are stored on disk, can be alternatively used. In either case, the internal state is saved also in Kafka in the application’s internal topics. More about internal application topics will be covered in a following post.

@StreamListener
@SendTo(KafkaStreamsBindings.OUTPUT_PAGE_VISIT)
private KStream<String, PageVisit> process(
@Input(KafkaStreamsBindings.INPUT_PAGE) KStream<String, Page> pageKStream,
@Input(KafkaStreamsBindings.INPUT_VISIT) KStream<String, Visit> visitKStream) {

WindowBytesStoreSupplier storeSupplier = Stores.inMemoryWindowStore("join-main",
Duration.ofSeconds(86400), Duration.ofMinutes(2 * WINDOW), true);
WindowBytesStoreSupplier otherSupplier = Stores.inMemoryWindowStore("join-other",
Duration.ofSeconds(86400), Duration.ofMinutes(2 * WINDOW), true);

return pageKStream
.map((key, value) -> new KeyValue<String, Page>(value.getPageId(), value))

.selectKey((key, value) -> value.getPageId())

.join(
visitKStream
.map((key, value) -> new KeyValue<String, Visit>(value.getPageId(), value))
.selectKey((key, value) -> value.getPageId()),

(page, visit) -> new PageVisit(page.getPageId(), page, visit),
JoinWindows.of(Duration.ofMinutes(WINDOW)),
StreamJoined.<String, Page, Visit>with(storeSupplier, otherSupplier)
)

.peek((key, value) -> log.info("Joined PageVisit, key: {} pageId: {} visitId: {} ",
key, value.getPage().getPageId(), value.getVisit().getVisitId())
)
;
}

6. Conclusions

In this post we showed how to configure Kafka Streams with Spring Cloud Stream, and how to deploy it to Cloud Foundry.

  • We showed how to specify input/output bindings, and how to configure their properties
  • Binder properties in the context of Kafka Streams were presented
  • Native serialization was configured using Confluent Avro SerDes
  • We showed how to perform a windowed inner join on two input topics using Kafka Streams
  • All the code corresponding to this tutorial can be found on Github at the following location: Join Example

    Next article

    Unit Testing Kafka Streams with Avro