Chapter 4. Querying Kafka with Kafka Streams

AATD doesn’t currently have real-time insight into the number of orders being placed or the revenue being generated. The company would like to know if there are spikes or dips in the numbers of orders so that it can react more quickly in the operations part of the business.

The AATD engineering team is already familiar with Kafka Streams from other applications that they’ve built, so we’re going to create a Kafka Streams app that exposes an HTTP endpoint showing recent orders and revenue. We’ll build this app with the Quarkus framework, starting with a naive version. Then we’ll apply some optimizations. We’ll conclude with a summary of the limitations of using a stream processor to query streaming data. Figure 4-1 shows what we’ll be building in this chapter.

bras 0401
Figure 4-1. Kafka Streams architecture

What Is Kafka Streams?

Kafka Streams is a library for building streaming applications that transform input Kafka topics into output Kafka topics. It is an example of the stream processor component of the real-time analytics stack described in Chapter 2.

Kafka Streams is often used for joining, filtering, and transforming streams, but in this chapter we’re going to use it to query an existing stream.

At the heart of a Kafka Streams application is a topology, which defines the stream processing logic of the application. A topology describes how data is consumed from input streams (source) and then transformed into something that can be produced to output streams (sink).

More specifically, Jacek Laskowski, author of The Internals of Kafka Streams, defines a topology as follows:

A directed acyclic graph of stream processing nodes that represents the stream processing logic of a Kafka Streams application.

In this graph, the nodes are the processing work, and the relationships are streams. Through this topology, we can create powerful streaming applications that can handle even the most complex data processing tasks. You can see an example topology in Figure 4-2.

Kafka Streams provides a domain-specific language (DSL) that simplifies the building of these topologies.

bras 0402
Figure 4-2. Kafka Streams topology

Let’s go through the definitions of some Kafka Streams abstractions that we’ll be using in this section. The following definitions are from the official documentation:

KStream

A KStream is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded dataset. The data records in a KStream are interpreted as “INSERT” operations, where each record adds a new entry to an append-only ledger. In other words, each record represents a new piece of data that is added to the stream without replacing any existing data with the same key.

KTable

A KTable is an abstraction of a change log stream, where each data record represents an update. Each record in a KTable represents an update to the previous value for a specific record key, if any exists. If a corresponding key doesn’t exist yet, the update is treated as an “INSERT” operation. In other words, each record in a KTable represents an update to the existing data with the same key or the addition of a new record with a new key-value pair.

State Store

State Stores are storage engines for managing the state of stream processors. They can store the state in memory or in a database like RocksDB.

When stateful functions like aggregation or windowing functions are called, intermediate data is stored in the State Store. This data can then be queried by the read side of a stream processing application to generate output streams or tables. State Stores are an efficient way to manage the state of stream processors and enable the creation of powerful stream processing applications in Kafka Streams.

What Is Quarkus?

Quarkus is a Java framework optimized for building cloud native applications that are deployed on Kubernetes. Developed by Red Hat’s engineering team and released in 2019, Quarkus offers a modern, lightweight approach to building Java applications that is ideally suited to the needs of cloud native development.

The framework includes a wide range of extensions for popular technologies, including Camel, Hibernate, MongoDB, Kafka Streams, and more. These extensions provide a simple and efficient way to integrate these tools into your microservices architecture, speeding up development time and streamlining the process of building complex distributed systems.

The native Kafka Streams integration in particular makes it a great choice for us.

Quarkus Application

Now that we’ve got the definitions out of the way, it’s time to start building our Kafka Streams app.

Installing the Quarkus CLI

The Quarkus CLI is a powerful tool that lets us create and manage Quarkus applications from the command line. With the Quarkus CLI, we can quickly scaffold new applications, generate code, run tests, and deploy our applications to various environments There are many ways to install the CLI, so you can almost certainly find one that you prefer.

I’m a big fan of SDKMAN, so I’m going to install it using that. SDKMAN makes it easy to install and manage software development kits (SDKs). It has lots of useful features, including automated updates, environment management, and support for multiple platforms. I use it to run different Java versions on my machine.

We can install Quarkus with SDKMAN by running the following command:

sdk install quarkus

We can check that it’s installed by running the following command:

quarkus --version

You should see output similar to Example 4-1.

Example 4-1. Quarkus version
2.13.1.Final
Note

The Quarkus CLI isn’t mandatory, but having it installed does make the development process much smoother, so we suggest installing it!

Creating a Quarkus Application

Now that we’ve got that installed, we can run the following command to create our pizza shop app:

quarkus create app pizzashop --package-name pizzashop
cd pizzashop

This command will create a Maven application with most of the dependencies that we’ll need and a skeleton structure to get us started. The only thing missing is Kafka Streams, which we can add using the kafka-streams extension:

quarkus extension add 'kafka-streams'

We’re now ready to start building our application.

Creating a Topology

The first thing we need to do is create a Kafka Streams topology. A Quarkus application can define a single topology, in which we’ll define all our stream operations. This could include joining streams together to create a new stream, filtering a stream, creating a key-value store based on a stream, and more.

Once we have our topology class, we’ll create a couple of window stores that keep track of the total orders and revenue generated in the last couple of minutes. This will allow us to create an HTTP endpoint that returns a summary of the latest orders based on the contents of these stores.

Create the file src/main/java/pizzashop/streams/Topology.java and add this:

package pizzashop.streams;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.WindowStore;
import pizzashop.deser.JsonDeserializer;
import pizzashop.deser.JsonSerializer;
import pizzashop.models.Order;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import java.time.Duration;

@ApplicationScoped
public class Topology {
    @Produces
    public org.apache.kafka.streams.Topology buildTopology() {
        final Serde<Order> orderSerde = Serdes.serdeFrom(new JsonSerializer<>(),
            new JsonDeserializer<>(Order.class));

        // Create a stream over the `orders` topic
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Order> orders = builder.stream("orders",
            Consumed.with(Serdes.String(), orderSerde));

        // Defining the window size of our state store
        Duration windowSize = Duration.ofSeconds(60);
        Duration advanceSize = Duration.ofSeconds(1);
        Duration gracePeriod = Duration.ofSeconds(60);
        TimeWindows timeWindow = TimeWindows.ofSizeAndGrace(
            windowSize, gracePeriod).advanceBy(advanceSize);

        // Create an OrdersCountStore that keeps track of the
        // number of orders over the last two minutes
        orders.groupBy(
                (key, value) -> "count",
                Grouped.with(Serdes.String(), orderSerde))
                .windowedBy(timeWindow)
                .count(Materialized.as("OrdersCountStore")
        );

        // Create a RevenueStore that keeps track of the amount of revenue
        // generated over the last two minutes
        orders.groupBy(
                (key, value) -> "count",
                Grouped.with(Serdes.String(), orderSerde))
                .windowedBy(timeWindow)
                .aggregate(
                    () -> 0.0,
                    (key, value, aggregate) -> aggregate + value.price,
                    Materialized.
                        <String, Double, WindowStore<Bytes, byte[]>>
                        as("RevenueStore")
                        .withValueSerde(Serdes.Double())
                );

        return builder.build();
    }
}

In this code, we first create a KStream based on the orders topic, before creating the OrdersCountStore and RevenueStore, which store a one-minute rolling window of the number of orders and revenue generated. The grace period is usually used to capture late-arriving events, but we’re using it so that we have two minutes’ worth of windows kept around, which we’ll need later on.

We also have the following model classes that represent events in the orders stream:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;
import java.util.List;

@RegisterForReflection
public class Order {
    public Order() { }

    public String id;
    public String userId;
    public String createdAt;
    public double price;
    public double deliveryLat;
    public double deliveryLon;
    public List<OrderItem> items;
}
package pizzashop.models;

public class OrderItem {
    public String productId;
    public int quantity;
    public double price;
}

Querying the Key-Value Store

Next, we’ll create the class src/main/java/pizzashop/streams/OrdersQuer⁠ies​.java, which will abstract our interactions with the OrdersStore. The querying of state stores (like OrdersStore) uses a feature of Kafka Streams called interactive queries:

package pizzashop.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.*;

import pizzashop.models.*;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.time.Instant;

@ApplicationScoped
public class OrdersQueries {
    @Inject
    KafkaStreams streams;

    public OrdersSummary ordersSummary() {
        KStreamsWindowStore<Long> countStore = new KStreamsWindowStore<>(
            ordersCountsStore());
        KStreamsWindowStore<Double> revenueStore = new KStreamsWindowStore<>(
            revenueStore());

        Instant now = Instant.now();
        Instant oneMinuteAgo = now.minusSeconds(60);
        Instant twoMinutesAgo = now.minusSeconds(120);

        long recentCount = countStore.firstEntry(oneMinuteAgo, now);
        double recentRevenue = revenueStore.firstEntry(oneMinuteAgo, now);

        long  previousCount = countStore.firstEntry(twoMinutesAgo, oneMinuteAgo);
        double previousRevenue = revenueStore.firstEntry(
            twoMinutesAgo, oneMinuteAgo);

        TimePeriod currentTimePeriod = new TimePeriod
          (recentCount, recentRevenue);
        TimePeriod previousTimePeriod = new TimePeriod
          (previousCount, previousRevenue);
        return new OrdersSummary(currentTimePeriod, previousTimePeriod);
    }

    private ReadOnlyWindowStore<String, Double> revenueStore() {
        while (true) {
            try {
                return streams.store(StoreQueryParameters.fromNameAndType(
                        "RevenueStore", QueryableStoreTypes.windowStore()
                ));
            } catch (InvalidStateStoreException e) {
                System.out.println("e = " + e);
            }
        }
    }

    private ReadOnlyWindowStore<String, Long> ordersCountsStore() {
        while (true) {
            try {
                return streams.store(StoreQueryParameters.fromNameAndType(
                        "OrdersCountStore", QueryableStoreTypes.windowStore()
                ));
            } catch (InvalidStateStoreException e) {
                System.out.println("e = " + e);
            }
        }
    }
}

Both ordersCountsStore and revenueStore are returning data from window stores that hold the order count and amount of revenue generated, respectively. The reason for the while(true) { try {} catch {} } code block in both functions is that the store might not be available if we call this code before the stream thread is in a RUN⁠NING state. Assuming we don’t have any bugs in our code, we will eventually get to the RUNNING state; it just might take a bit longer than it takes for the HTTP endpoint to start up.

ordersSummary calls those two functions to get the number of orders for the last minute and the minute before that, as well as the total revenue for the last minute and the minute before that.

KStreamsWindowStore.java is defined here:

package pizzashop.models;

import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.time.Instant;

public class KStreamsWindowStore<T> {
    private final ReadOnlyWindowStore<String, T> store;

    public KStreamsWindowStore(ReadOnlyWindowStore<String, T> store) {
        this.store = store;
    }

    public T firstEntry(Instant from, Instant to) {
        try (WindowStoreIterator<T> iterator = store.fetch("count", from, to)) {
            if (iterator.hasNext()) {
                return iterator.next().value;
            }
        }
        throw new RuntimeException(
            "No entries found in store between " + from + " and " + to);
    }
}

The firstEntry method finds the first entry in the window store in the provided date range and returns the value. If no entries, exist it will throw an error.

OrdersSummary.java is defined here:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class OrdersSummary {
    private TimePeriod currentTimePeriod;
    private TimePeriod previousTimePeriod;

    public OrdersSummary(
        TimePeriod currentTimePeriod, TimePeriod previousTimePeriod) {
        this.currentTimePeriod = currentTimePeriod;
        this.previousTimePeriod = previousTimePeriod;
    }

    public TimePeriod getCurrentTimePeriod() { return currentTimePeriod; }
    public TimePeriod getPreviousTimePeriod() { return previousTimePeriod; }
}

This class is a data object that keeps track of orders and revenue for the current and previous time periods.

TimePeriod.java is defined here:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class TimePeriod {
    private int orders;
    private double totalPrice;

    public TimePeriod(long orders, double totalPrice) {
        this.orders = orders;
        this.totalPrice = totalPrice;
    }

    public int getOrders() { return orders; }
    public double getTotalPrice() { return totalPrice; }
}

This class is a data object that keeps track of orders and revenue.

Creating an HTTP Endpoint

Finally, let’s create the HTTP endpoint that exposes the summary data to our users. Create the file src/main/java/pizzashop/rest/OrdersResource.java and add this:

package pizzashop.rest;

import pizzashop.models.OrdersSummary;
import pizzashop.streams.InteractiveQueries;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

@ApplicationScoped
@Path("/orders")
public class OrdersResource {
    @Inject
    OrdersQueries ordersQueries;

    @GET
    @Path("/overview")
    public Response overview() {
        OrdersSummary ordersSummary = ordersQueries.ordersSummary();
        return Response.ok(ordersSummary).build();
    }
}

Running the Application

Now that we’ve created all our classes, it’s time to run the application. We can do this by running the following command:

QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS=localhost:29092 quarkus dev

We pass in the QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS environment variable so that Quarkus can connect to our Kafka broker.

Querying the HTTP Endpoint

Now we can query the HTTP endpoint to see how many orders our online service is receiving. The endpoint is available on port 8080 at /orders/overview:

curl http://localhost:8080/orders/overview 2>/dev/null | jq '.'

The results of this command are shown in Example 4-2.

Example 4-2. Latest orders state
{
  "currentTimePeriod": {
    "orders": 994,
    "totalPrice": 4496973
  },
  "previousTimePeriod": {
    "orders": 985,
    "totalPrice": 4535117
  }
}

Success! We can see the number of orders and the total revenue in the current and previous time periods.

Limitations of Kafka Streams

While this approach for querying streams has been successful in many cases, certain factors could impact its efficacy for our particular use case. In this section, we will take a closer look at these limitations to better understand how they could affect the performance of this approach.

The underlying database used by Kafka Streams is RocksDB, a key-value store that allows you to store and retrieve data using key-value pairs. This fork of Google’s LevelDB is optimized for write-heavy workloads with large datasets.

One of its constraints is that we can create only one index per key-value store. This means that if we decide to query the data along another dimension, we’ll need to update the topology to create another key-value store. If we do a non-key search, RocksDB will do a full scan to find the matching records, leading to high query latency.

Our key-value stores are also capturing only events that happened in the last one minute and the minute before that. If we wanted to capture data going further back, we’d need to update the topology to capture more events. In AATD’s case, we could imagine a future use case where we’d want to compare the sales numbers from right now with the numbers from this same time last week or last month. This would be difficult to do in Kafka Streams because we’d need to store historical data, which would take up a lot of memory.

So although we can use Kafka Streams to write real-time analytics queries and it will do a reasonable job, we probably need to find a tool that better fits the problem.

Summary

In this chapter, we looked at how to build an HTTP API on top of the orders stream so that we can get an aggregate view of what’s happening with orders in the business. We built this solution using Kafka Streams, but we realized that this might not be the most appropriate tool for the job. In the next, chapter we’ll learn why we need a serving layer to build a scalable real-time analytics application.

Get Building Real-Time Analytics Systems now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.