Kapitel 4. Kafka mit Kafka-Streams abfragen

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

AATD hat derzeit keinen Echtzeit-Einblick in die Anzahl der Bestellungen oder die erzielten Einnahmen. Das Unternehmen möchte wissen, ob es Spitzen oder Einbrüche in der Anzahl der Bestellungen gibt, damit es im operativen Teil des Geschäfts schneller reagieren kann.

Das AATD-Entwicklungsteam kennt Kafka Streams bereits aus anderen Anwendungen, die es gebaut hat. Deshalb werden wir eine Kafka Streams-App erstellen, die einen HTTP-Endpunkt mit den letzten Bestellungen und Umsätzen anzeigt. Wir werden diese App mit dem Quarkus-Framework erstellen und mit einer naiven Version beginnen. Dann werden wir einige Optimierungen vornehmen. Abschließend fassen wir die Grenzen der Verwendung eines Stream-Prozessors zur Abfrage von Streaming-Daten zusammen.Abbildung 4-1 zeigt, was wir in diesem Kapitel bauen werden.

bras 0401
Abbildung 4-1. Architektur von Kafka Streams

Was ist Kafka Streams?

Kafka Streams ist eine Bibliothek zum Erstellen von Streaming-Anwendungen , die Kafka-Themen von der Eingabe in die Ausgabe umwandeln. Sie ist ein Beispiel für die Stream-Prozessor-Komponente des in Kapitel 2 beschriebenen Echtzeit-Analytik-Stacks.

Kafka Streams wird oft zum Verbinden, Filtern und Transformieren von Streams verwendet, aber in diesem Kapitel werden wir es nutzen, um einen bestehenden Stream abzufragen.

Das Herzstück einer Kafka Streams Anwendung ist eine Topologie, die die Stream-Verarbeitungslogik der Anwendung definiert. Eine Topologie beschreibt, wie Daten aus Eingangsstreams (Source) konsumiert und dann in etwas umgewandelt werden, das in Ausgangsstreams (Sink) produziert werden kann.

Genauer gesagt, definiert Jacek Laskowski, Autor von The Internals of Kafka Streams, eine Topologie wie folgt:

Ein gerichteter azyklischer Graph von Stream-Verarbeitungs-Knoten, der die Stream-Verarbeitungslogik einer Kafka-Streams-Anwendung darstellt.

In diesem Diagramm sind die Knoten die Verarbeitungsarbeit und die Beziehungen die Streams. Mit dieser Topologie können wir leistungsstarke Streaming-Anwendungen erstellen, die selbst die komplexesten Datenverarbeitungsaufgaben bewältigen können. Eine Beispieltopologie siehst du in Abbildung 4-2.

Kafka Streams bietet eine domänenspezifische Sprache (DSL), die den Aufbau dieser Topologien vereinfacht.

bras 0402
Abbildung 4-2. Kafka Streams Topologie

Gehen wir die Definitionen einiger Kafka-Streams-Abstraktionen durch, die wir in diesem Abschnitt verwenden werden. Die folgenden Definitionen stammen aus der offiziellen Dokumentation:

KStream

Ein KStream ist eine Abstraktion eines Datensatzstroms, bei dem jeder Datensatz ein in sich geschlossenes Datum im unbegrenzten Datensatz darstellt. Die Datensätze in einem KStream werden als "INSERT"-Operationen interpretiert, bei denen jeder Datensatz einen neuen Eintrag in ein "append-only"-Hauptbuch hinzufügt. Mit anderen Worten: Jeder Datensatz stellt ein neues Stück Daten dar, das dem Strom hinzugefügt wird, ohne bestehende Daten mit demselben Schlüssel zu ersetzen.

KTable

Eine KTable ist eine Abstraktion eines Änderungsprotokollstroms, bei dem jeder Datensatz eine Aktualisierung darstellt. Jeder Datensatz in einer KTable stellt eine Aktualisierung des vorherigen Wertes für einen bestimmten Datensatzschlüssel dar, sofern ein solcher existiert. Wenn ein entsprechender Schlüssel noch nicht existiert, wird die Aktualisierung als "INSERT"-Operation behandelt. Mit anderen Worten: Jeder Datensatz in einer KTable stellt eine Aktualisierung der bestehenden Daten mit demselben Schlüssel oder das Hinzufügen eines neuen Datensatzes mit einem neuen Schlüssel-Wert-Paar dar.

Staatsladen

State Stores sind Speicher-Engines zur Verwaltung des Status von Stream-Prozessoren. Sie können den Status im Speicher oder in einer Datenbank wie RocksDB speichern.

Wenn zustandsbehaftete Funktionen wie Aggregations- oder Windowing-Funktionen aufgerufen werden, werden Zwischendaten im State Store gespeichert. Diese Daten können dann von der Leseseite einer Stream-Processing-Anwendung abgefragt werden, um Ausgabeströme oder -tabellen zu erzeugen. State Stores sind eine effiziente Möglichkeit, den Zustand von Stream-Prozessoren zu verwalten und ermöglichen die Erstellung leistungsstarker Stream-Processing-Anwendungen in Kafka Streams.

Was ist Quarkus?

Quarkus ist ein Java-Framework, das für die Entwicklung von Cloud-nativen Anwendungen optimiert ist, die auf Kubernetes bereitgestellt werden. 2019 wurde Quarkus vom Red Hat Engineering Team entwickelt und bietet einen modernen, leichtgewichtigen Ansatz für die Entwicklung von Java-Anwendungen, der ideal auf die Bedürfnisse der Cloud-nativenEntwicklung abgestimmt ist.

Das Framework enthält eine Vielzahl von Erweiterungen für beliebte Technologien wie Camel, Hibernate, MongoDB, Kafka Streams u.v.m. Diese Erweiterungen bieten eine einfache und effiziente Möglichkeit, diese Tools in deine Microservices-Architektur zu integrieren, die Entwicklungszeit zu verkürzen und den Aufbau komplexer verteilter Systeme zu optimieren.

Vor allem die native Kafka-Streams-Integration macht es für uns zu einer guten Wahl.

Quarkus Bewerbung

Jetzt, wo wir die Definitionen aus dem Weg geräumt haben, ist es an der Zeit, mit dem Aufbau unserer Kafka-Streams-App zu beginnen.

Installation der Quarkus CLI

Das Quarkus CLI ist ein leistungsstarkes Werkzeug , mit dem wir Quarkus-Anwendungen von der Kommandozeile aus erstellen und verwalten können. Mit dem Quarkus CLI können wir schnell neue Anwendungen erstellen, Code generieren, Tests durchführen und unsere Anwendungen in verschiedenen Umgebungen bereitstellen. Es gibt viele Möglichkeiten, das CLI zu installieren, so dass du mit Sicherheit eine findest, die dir am besten gefällt.

Ich bin ein großer Fan von SDKMAN, also werde ich es damit installieren. SDKMAN macht es einfach, Software Development Kits (SDKs) zu installieren und zu verwalten. Es hat viele nützliche Funktionen, darunter automatische Updates, Umgebungsverwaltung und Unterstützung für mehrere Plattformen. Ich verwende es, um verschiedene Java-Versionen auf meinem Rechner auszuführen.

Wir können Quarkus mit SDKMAN installieren, indem wir den folgenden Befehl ausführen:

sdk install quarkus

Wir können überprüfen, ob sie installiert ist, indem wir den folgenden Befehl ausführen:

quarkus --version

Du solltest eine ähnliche Ausgabe wie in Beispiel 4-1 sehen.

Beispiel 4-1. Quarkus Version
2.13.1.Final
Hinweis

Das Quarkus CLI ist nicht zwingend erforderlich, aber wenn du es installiert hast, wird der Entwicklungsprozess viel reibungsloser, also empfehlen wir dir, es zu installieren!

Erstellen einer Quarkus Anwendung

Jetzt, wo wir das installiert haben, können wir folgenden Befehl ausführen, um unsere Pizzaladen-App zu erstellen:

quarkus create app pizzashop --package-name pizzashop
cd pizzashop

Dieser Befehl erstellt eine Maven-Anwendung mit den meisten Abhängigkeiten, die wir benötigen, und einer Grundstruktur, mit der wir loslegen können. Das Einzige, was fehlt, sind Kafka-Streams, die wir mit der Erweiterung kafka-streams hinzufügen können:

quarkus extension add 'kafka-streams'

Jetzt können wir mit der Erstellung unserer Anwendung beginnen.

Erstellen einer Topologie

Als Erstes müssen wir eine Kafka Streams-Topologie erstellen. Eine Quarkus-Anwendung kann eine einzige Topologie definieren, in der wir alle unsere Stream-Operationen festlegen, z. B. das Zusammenfügen von Streams zu einem neuen Stream, das Filtern eines Streams, das Erstellen eines Key-Value-Stores auf der Grundlage eines Streams und vieles mehr.

Sobald wir unsere Topologieklasse haben, erstellen wir ein paar Fensterspeicher, die die Gesamtbestellungen und den Umsatz der letzten Minuten aufzeichnen. So können wir einen HTTP-Endpunkt erstellen, der eine Zusammenfassung der letzten Bestellungen basierend auf den Inhalten dieser Speicher zurückgibt.

Erstelle die Datei src/main/java/pizzashop/streams/Topology.java und füge Folgendes hinzu:

package pizzashop.streams;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.WindowStore;
import pizzashop.deser.JsonDeserializer;
import pizzashop.deser.JsonSerializer;
import pizzashop.models.Order;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import java.time.Duration;

@ApplicationScoped
public class Topology {
    @Produces
    public org.apache.kafka.streams.Topology buildTopology() {
        final Serde<Order> orderSerde = Serdes.serdeFrom(new JsonSerializer<>(),
            new JsonDeserializer<>(Order.class));

        // Create a stream over the `orders` topic
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Order> orders = builder.stream("orders",
            Consumed.with(Serdes.String(), orderSerde));

        // Defining the window size of our state store
        Duration windowSize = Duration.ofSeconds(60);
        Duration advanceSize = Duration.ofSeconds(1);
        Duration gracePeriod = Duration.ofSeconds(60);
        TimeWindows timeWindow = TimeWindows.ofSizeAndGrace(
            windowSize, gracePeriod).advanceBy(advanceSize);

        // Create an OrdersCountStore that keeps track of the
        // number of orders over the last two minutes
        orders.groupBy(
                (key, value) -> "count",
                Grouped.with(Serdes.String(), orderSerde))
                .windowedBy(timeWindow)
                .count(Materialized.as("OrdersCountStore")
        );

        // Create a RevenueStore that keeps track of the amount of revenue
        // generated over the last two minutes
        orders.groupBy(
                (key, value) -> "count",
                Grouped.with(Serdes.String(), orderSerde))
                .windowedBy(timeWindow)
                .aggregate(
                    () -> 0.0,
                    (key, value, aggregate) -> aggregate + value.price,
                    Materialized.
                        <String, Double, WindowStore<Bytes, byte[]>>
                        as("RevenueStore")
                        .withValueSerde(Serdes.Double())
                );

        return builder.build();
    }
}

In diesem Code erstellen wir zunächst einen KStream, der auf dem Thema orders basiert, bevor wir OrdersCountStore und RevenueStore erstellen, die ein einminütiges rollierendes Fenster für die Anzahl der Bestellungen und die erzielten Umsätze speichern. Die Karenzzeit wird normalerweise verwendet, um spät eintreffende Ereignisse zu erfassen, aber wir verwenden sie, damit wir Fenster im Wert von zwei Minuten zur Verfügung haben, die wir später benötigen.

Wir haben auch die folgenden Modellklassen, die Ereignisse im orders Stream darstellen:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;
import java.util.List;

@RegisterForReflection
public class Order {
    public Order() { }

    public String id;
    public String userId;
    public String createdAt;
    public double price;
    public double deliveryLat;
    public double deliveryLon;
    public List<OrderItem> items;
}
package pizzashop.models;

public class OrderItem {
    public String productId;
    public int quantity;
    public double price;
}

Abfrage des Key-Value Store

Als Nächstes erstellen wir die Klasse src/main/java/pizzashop/streams/OrdersQuer⁠ies​.java, die unsere Interaktionen mit OrdersStore abstrahieren wird. Die Abfrage von State Stores (wie OrdersStore) nutzt eine Funktion von Kafka Streams, die interaktive Abfragen genannt wird:

package pizzashop.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.*;

import pizzashop.models.*;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.time.Instant;

@ApplicationScoped
public class OrdersQueries {
    @Inject
    KafkaStreams streams;

    public OrdersSummary ordersSummary() {
        KStreamsWindowStore<Long> countStore = new KStreamsWindowStore<>(
            ordersCountsStore());
        KStreamsWindowStore<Double> revenueStore = new KStreamsWindowStore<>(
            revenueStore());

        Instant now = Instant.now();
        Instant oneMinuteAgo = now.minusSeconds(60);
        Instant twoMinutesAgo = now.minusSeconds(120);

        long recentCount = countStore.firstEntry(oneMinuteAgo, now);
        double recentRevenue = revenueStore.firstEntry(oneMinuteAgo, now);

        long  previousCount = countStore.firstEntry(twoMinutesAgo, oneMinuteAgo);
        double previousRevenue = revenueStore.firstEntry(
            twoMinutesAgo, oneMinuteAgo);

        TimePeriod currentTimePeriod = new TimePeriod
          (recentCount, recentRevenue);
        TimePeriod previousTimePeriod = new TimePeriod
          (previousCount, previousRevenue);
        return new OrdersSummary(currentTimePeriod, previousTimePeriod);
    }

    private ReadOnlyWindowStore<String, Double> revenueStore() {
        while (true) {
            try {
                return streams.store(StoreQueryParameters.fromNameAndType(
                        "RevenueStore", QueryableStoreTypes.windowStore()
                ));
            } catch (InvalidStateStoreException e) {
                System.out.println("e = " + e);
            }
        }
    }

    private ReadOnlyWindowStore<String, Long> ordersCountsStore() {
        while (true) {
            try {
                return streams.store(StoreQueryParameters.fromNameAndType(
                        "OrdersCountStore", QueryableStoreTypes.windowStore()
                ));
            } catch (InvalidStateStoreException e) {
                System.out.println("e = " + e);
            }
        }
    }
}

Sowohl ordersCountsStore als auch revenueStore geben Daten aus den Fensterspeichern zurück, in denen die Anzahl der Bestellungen bzw. die Höhe der erzielten Einnahmen gespeichert sind. Der Grund für den Codeblock while(true) { try {} catch {} } in beiden Funktionen ist, dass der Speicher möglicherweise nicht verfügbar ist, wenn wir diesen Code aufrufen, bevor der Stream-Thread den Zustand RUN⁠NING erreicht hat. Vorausgesetzt, wir haben keine Fehler in unserem Code, werden wir irgendwann den Zustand RUNNING erreichen; es könnte nur etwas länger dauern, als es dauert, bis der HTTP-Endpunkt startet.

ordersSummary ruft diese beiden Funktionen auf, um die Anzahl der Bestellungen in der letzten Minute und in der Minute davor sowie den Gesamtumsatz in der letzten Minute und in der Minute davor zu ermitteln.

KStreamsWindowStore.java ist hier definiert:

package pizzashop.models;

import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.time.Instant;

public class KStreamsWindowStore<T> {
    private final ReadOnlyWindowStore<String, T> store;

    public KStreamsWindowStore(ReadOnlyWindowStore<String, T> store) {
        this.store = store;
    }

    public T firstEntry(Instant from, Instant to) {
        try (WindowStoreIterator<T> iterator = store.fetch("count", from, to)) {
            if (iterator.hasNext()) {
                return iterator.next().value;
            }
        }
        throw new RuntimeException(
            "No entries found in store between " + from + " and " + to);
    }
}

Die Methode firstEntry findet den ersten Eintrag im Fensterspeicher im angegebenen Datumsbereich und gibt den Wert zurück. Wenn keine Einträge vorhanden sind, wird ein Fehler ausgegeben.

OrdersSummary.java ist hier definiert:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class OrdersSummary {
    private TimePeriod currentTimePeriod;
    private TimePeriod previousTimePeriod;

    public OrdersSummary(
        TimePeriod currentTimePeriod, TimePeriod previousTimePeriod) {
        this.currentTimePeriod = currentTimePeriod;
        this.previousTimePeriod = previousTimePeriod;
    }

    public TimePeriod getCurrentTimePeriod() { return currentTimePeriod; }
    public TimePeriod getPreviousTimePeriod() { return previousTimePeriod; }
}

Diese Klasse ist ein Datenobjekt, das Aufträge und Einnahmen für den aktuellen und den vorherigen Zeitraum aufzeichnet.

TimePeriod.java ist hier definiert:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class TimePeriod {
    private int orders;
    private double totalPrice;

    public TimePeriod(long orders, double totalPrice) {
        this.orders = orders;
        this.totalPrice = totalPrice;
    }

    public int getOrders() { return orders; }
    public double getTotalPrice() { return totalPrice; }
}

Bei dieser Klasse handelt es sich um ein Datenobjekt, das den Überblick über Bestellungen und Einnahmen behält.

Erstellen eines HTTP-Endpunkts

Zum Schluss erstellen wir den HTTP-Endpunkt , der die Übersichtsdaten für unsere Nutzer bereitstellt. Erstelle die Datei src/main/java/pizzashop/rest/OrdersResource.java und füge Folgendes hinzu:

package pizzashop.rest;

import pizzashop.models.OrdersSummary;
import pizzashop.streams.InteractiveQueries;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

@ApplicationScoped
@Path("/orders")
public class OrdersResource {
    @Inject
    OrdersQueries ordersQueries;

    @GET
    @Path("/overview")
    public Response overview() {
        OrdersSummary ordersSummary = ordersQueries.ordersSummary();
        return Response.ok(ordersSummary).build();
    }
}

Ausführen der Anwendung

Nachdem wir nun alle Klassen erstellt haben, ist es an der Zeit, die Anwendung zu starten. Dazu führen wir den folgenden Befehl aus:

QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS=localhost:29092 quarkus dev

Wir geben die Umgebungsvariable QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS ein, damit Quarkus sich mit unserem Kafka-Broker verbinden kann.

Abfrage des HTTP-Endpunkts

Jetzt können wir den HTTP-Endpunkt abfragen, um zu sehen, wie viele Bestellungen unser Online-Service erhält. Der Endpunkt ist auf Port 8080 unter /orders/overview erreichbar:

curl http://localhost:8080/orders/overview 2>/dev/null | jq '.'

Die Ergebnisse dieses Befehls sind in Beispiel 4-2 dargestellt.

Beispiel 4-2. Letzter Auftragsstatus
{
  "currentTimePeriod": {
    "orders": 994,
    "totalPrice": 4496973
  },
  "previousTimePeriod": {
    "orders": 985,
    "totalPrice": 4535117
  }
}

Erfolg! Wir können die Anzahl der Bestellungen und den Gesamtumsatz im aktuellen und vorherigen Zeitraum sehen.

Einschränkungen von Kafka Streams

Obwohl sich dieser Ansatz für die Abfrage von Streams in vielen Fällen bewährt hat, könnten bestimmte Faktoren seine Effizienz für unseren speziellen Anwendungsfall beeinträchtigen. In diesem Abschnitt werden wir uns diese Einschränkungen genauer ansehen, um besser zu verstehen, wie sie die Leistung dieses Ansatzes beeinflussen könnten.

Die zugrunde liegende Datenbank von Kafka Streams ist RocksDB, ein Key-Value-Store, mit dem du Daten mithilfe von Key-Value-Paaren speichern und abrufen kannst. Dieser Fork von GooglesLevelDB ist für schreibintensive Workloads mit großen Datensätzen optimiert.

Eine der Einschränkungen ist, dass wir nur einen Index pro Key-Value-Store erstellen können. Das bedeutet, dass wir die Topologie aktualisieren müssen, um einen weiteren Key-Value-Store zu erstellen, wenn wir die Daten entlang einer anderen Dimension abfragen wollen. Wenn wir eine Suche ohne Schlüssel durchführen, führt RocksDB einen vollständigen Scan durch, um die passenden Datensätze zu finden, was zu einer hohen Abfragelatenz führt.

Unsere Key-Value-Stores erfassen auch nur Ereignisse, die in der letzten Minute und der Minute davor passiert sind. Wenn wir Daten erfassen wollten, die weiter zurückreichen, müssten wir die Topologie aktualisieren, um mehr Ereignisse zu erfassen. Im Fall von AATD könnten wir uns einen zukünftigen Anwendungsfall vorstellen, in dem wir die Verkaufszahlen von jetzt mit denen der letzten Woche oder des letzten Monats zur gleichen Zeit vergleichen wollen. Das wäre in Kafka Streams schwierig zu realisieren, weil wir historische Daten speichern müssten, was viel Speicherplatz beanspruchen würde.

Auch wenn wir Kafka Streams zum Schreiben von Echtzeit-Analyseabfragen verwenden können und es einen vernünftigen Job macht, müssen wir wahrscheinlich ein Tool finden, das besser zu diesem Problem passt.

Zusammenfassung

In diesem Kapitel haben wir uns angeschaut, wie wir eine HTTP-API auf dem orders Stream aufbauen können, um einen Überblick über die Auftragslage im Unternehmen zu erhalten. Wir haben diese Lösung mit Kafka Streams erstellt, aber wir haben festgestellt, dass dies vielleicht nicht das am besten geeignete Werkzeug für diese Aufgabe ist. Im nächsten Kapitel werden wir erfahren, warum wir eine Serving-Schicht brauchen, um eine skalierbare Echtzeit-Analyseanwendung zu erstellen.

Get Aufbau von Echtzeit-Analysesystemen now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.