Technologie

Kafka 101 Tutorial - Kafka Sales Producer mit Apache Avro

30. März 2023

Button Github Button Linkedin Button Twitter

Anforderungen

Das Projekt wurde mit openjdk 11.0.18 entwickelt und mit Apache Maven 3.8.7 gebaut.

Um Ihre Java- und Maven-Version zu überprüfen, geben Sie die folgenden Befehle ein; die Ausgabe für meine Konfiguration finden Sie unten.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
## Java-Version prüfen
($)> java --version
openjdk 11.0.18 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10)
OpenJDK 64-Bit Server VM (build 11.0.18+10, mixed mode)

## Maven-Version prüfen
($)> mvn --version
Apache Maven 3.8.7 (b89d5959fcde851dcb1c8946a785a163f14e1e29)
Maven home: /opt/maven
Java version: 11.0.18, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-11-openjdk
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "6.1.12-arch1-1", arch: "amd64", family: "unix"

Wenn Sie diese Konfigurationen nicht haben, oder wenn Sie Java und Maven nicht installieren können oder wollen, habe ich auch ein Docker-Image für diese kleine Java-Anwendung erstellt. Stellen Sie sicher, dass Sie Docker installiert haben. Meine Docker-Konfiguration lautet

1
2
3
## Docker-Version prüfen
($)> docker --version
Docker version 23.0.1, build a5ee5b1dfc

Das Image der Anwendung kann von meinem dockerhub repo gezogen werden.

Erstellen Sie das Projekt

Klonen des Repositorys

Der erste Schritt besteht darin, dieses Repository zu klonen. Wechseln Sie mit cd in den Ordner, in dem Sie das Repository dieses Projekts hosten wollen, und geben Sie den folgenden Befehl ein

1
2
3
4
5
## Repository klonen
git clone git@github.com:theodorecurtil/kafka_sales_producer.git

## cd in das Repo
cd ./kafka_sales_producer

Wenn Sie mit dem Kafka-Producer herumspielen möchten, können Sie dies tun und die jar-Datei der Anwendung mit dem folgenden Befehl im Ordner ./kafka_sales_producer erstellen.

1
mvn clean compile assembly:single

Sie können auch das mitgelieferte Dockerfile verwenden, um Ihr eigenes Docker Image zu erstellen, auf dem Ihr Producer läuft. Um Ihr lokales Abbild zu erstellen, führen Sie den folgenden Befehl im Ordner /kafka_sales_producer aus.

1
docker build -t <image-name> .

Die Infrastruktur

Dieses Repo enthält nur die Java-Anwendung, die Messages an einen bestehenden Kafka-Cluster weiterleitet. Das bedeutet, dass dieser Code nicht läuft, es sei denn, er kann Messages an einen bestehenden Kafka-Cluster pushen. Um diese Anwendung zum Laufen zu bringen, müssen wir einen Kafka-Cluster und die zugehörigen Dienste starten…

Das Gute daran ist, dass dies das Topic meines vorherigen Blogposts war! Die Einrichtung einer lokalen Kafka-Infrastruktur ist ganz einfach: Schauen Sie sich einfach mein github repo an, das diesem Blogbeitrag beiliegt.

Um den Cluster zum Laufen zu bringen, gehen Sie wie folgt vor

1
2
3
4
5
## Repository klonen
git clone git@github.com:theodorecurtil/kafka_101.git

## Docker compose up
cd ./kafka_101 && docker-compose up -d

Sie sollten 4 Dienste sehen, die im Docker-Netzwerk kafka_101_default laufen:

  1. control-center
  2. schema-registry
  3. broker
  4. zookeeper

Jetzt, wo die Infrastruktur läuft, können wir den Kafka-Producer starten!

Ausführen der Anwendung

Die Anwendung kann entweder direkt in der CLI mit dem Befehl “java” ausgeführt werden oder in einem Docker-Container, was ich empfehlen würde da es sich in der Praxis bewährt hat.

In der CLI

Sie können den Producer lokal mit der java CLI ausführen. Die Hauptmethode App.java nimmt einige Kommandozeilenargumente für die Adresse des Kafka-Servers und der Schemaregistrierung entgegen. Denn ja, in diesem Repo werden wir auch mit der Confluent’s schema registry herumspielen. Eine Videoeinführung zur Schemaregistrierung und warum Sie die Schemaregistrierung überhaupt verwenden sollten, finden Sie hier.

Unter der Annahme, dass Sie den Kafka-Cluster mit der vorgeschlagenen Methode gestartet haben, sollte der Kafka-Broker auf localhost:9092 und die Schema-Registry auf localhost:8081 laufen. Wenn Sie eine andere Portzuordnung gewählt haben, aktualisieren Sie den Wert für Ports und geben Sie den folgenden Befehl ein, um mit der Erzeugung von Ereignissen zu beginnen:

1
java -Dkafka.server.ip=http://localhost:9092 -Dschema.registry.ip=http://localhost:8081 -jar target/kafka_producer-1.0-SNAPSHOT-jar-with-dependencies.jar

In einem Docker-Container

Alternativ können Sie die Anwendung auch in einem Docker-Container ausführen. Wenn Sie den Producer in einem Docker-Container ausführen, sind die Kafka- und Schema-Registrierungsdienste nicht mehr lokal beim Producer. Daher müssen Sie den Producer mit dem Netzwerk verbinden, in dem diese Container ausgeführt werden, und sie über ihre Containernamen ansprechen. Nochmals, wenn Sie den vorgeschlagenen Ansatz befolgt haben, um die Infrastruktur zum Laufen zu bringen, wird der Kafka-Broker auf http://broker:29092 und die Schema-Registry auf http://schema-registry:8081 laufen; und alle diese Container sollten auf dem kafka_101_default Netzwerk laufen.

Um zu überprüfen, ob dies tatsächlich der Fall ist, prüfen Sie, ob das Netzwerk tatsächlich existiert:

1
docker network ls

Um zu überprüfen, ob der Broker und die Schemaregistrierung tatsächlich mit diesem Netzwerk verbunden sind, geben Sie folgenden Befehl ein

1
docker network inspect kafka_101_default

Mit folgendem Befehl startet der Producer das produzieren

1
docker run --name my_producer --network <kafka-infra-network> -e KAFKASERVER=http://<kafka-server-container-name>:29092 -e SCHEMAREGISTRY=http://<schema-registry-container-name>:8081  theodorecurtil/kafka_sales_producer:latest

Sanity Checks

Standardmäßig sollte der Producer Messages in dem automatisch erstellten Topic SALES mit einer Frequenz von 1 msg/s produzieren. Um zu überprüfen, ob tatsächlich Messages produziert werden, gehen Sie zur Confluent Control Center UI auf localhost:9021 und navigieren Sie zur Registerkarte Topics. Klicken Sie auf das Topic SALES und dann auf die Registerkarte Messages. Dort sollten Sie die Messages sehen, die den Cluster erreichen.

Sie sollten etwas wie das folgende Bild sehen:

Ein Schritt weiter

Um einen Schritt weiter zu gehen, werden wir die Schemaregistrierung nutzen, um die [Schemavalidierung] (https://docs.confluent.io/platform/current/schema-registry/schema-validation.html#sv-on-cs) durchzusetzen. Beim Ausführen der Java-Anwendung wurde das Topic SALES, da es ursprünglich nicht registriert war, automatisch erstellt, als es veröffentlicht wurde. Und standardmäßig ist bei der Erstellung des Topics die Schemavalidierung nicht aktiviert, auch wenn der Producer das Schema in der Schemaregistrierung registriert hat.

Um zu sehen, ob das Schema tatsächlich registriert wurde, navigieren Sie zur Registerkarte Schema des Topic SALES in der Benutzeroberfläche. Oder geben Sie das Folgende in Ihren Browser ein:

1
http://localhost:8081/subjects/SALES-value/versions/latest

Aber die Tatsache, dass die Schema-Validierung nicht aktiviert ist, bedeutet, dass die Producer-Anwendung anfangen könnte, “verunreinigte” Messages zu produzieren, die nicht dem Schema entsprechen, und damit davonkommen könnte. Um das auszuprobieren, fügen wir dem Topic mithilfe von Confluent CLI manuell einige Messages hinzu, indem wir einfach Strings ohne Schlüssel senden.

1
2
3
4
5
6
7
## Geben Sie den Container ein
docker exec -it broker bash

## Messages an den Broker senden
kafka-console-producer --bootstrap-server localhost:9092 --topic SALES
>hello
>world

Dann konsumieren Sie diese Meldungen über die CLI.

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic SALES --from-beginning

Sie sehen dann etwas wie unten

Und da die Messages mit Avro serialisiert werden, werden sie im Terminal als seltsame Zeichen angezeigt. Sie sehen aber, dass die beiden Messages hello und world zum Topic hinzugefügt wurden. Dies ist kein gutes Verhalten, da diese Daten nicht in das Schema passen und den Verbrauch auf der Seite der Consumer stören könnten.

Um das zu umgehen, müssen wir eine Schemavalidierung erzwingen. Dies geschieht auf der Ebene des Topics, bei der Erstellung des Topics. Löschen wir das Topic SALES und erstellen es neu mit:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
## Geben Sie den Container ein
docker exec -it broker bash

## Löschen Sie das Topic
kafka-topics --bootstrap-server localhost:9092 --delete --topic SALES

## Erstellen Sie das Topic mit Schemaüberprüfung neu
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \
--partitions 1 --topic SALES \
--config confluent.value.schema.validation=true

## Vorlage der kontaminierten Unterlagen
kafka-console-producer --bootstrap-server localhost:9092 --topic SALES
>hello
>[2023-02-28 14:47:51,851] ERROR Error when sending message to topic SALES with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.InvalidRecordException: Log record DefaultRecord(offset=0, timestamp=1677595670815, key=0 bytes, value=5 bytes) is rejected by the record interceptor io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator

Wie Sie sehen können, wird beim Senden von Messages, die nicht in das Schema passen, eine org.apache.kafka.common.​InvalidRecordException ausgelöst. Dadurch wird sichergestellt, dass verunreinigte Datensätze nicht an die Consumer und schließlich an die nachgeschalteten Anwendungen weitergegeben werden.

Sie können nun wieder normal produzieren, indem Sie den Producer verwenden, der Messages passend zum Schema sendet! Et voilà!

Was kommt als nächstes?

Dieses Repo wird als Blogpost auf der Acosom-Website veröffentlicht. Im folgenden Beitrag (und Repo) werden wir die vorgestellten Infrastrukturen und Konzepte verwenden, um mit Apache Flink zu beginnen. Wir werden diesen fiktiven Verkaufsdatenproduzenten verwenden, um eine Streaming-Analytics-Pipeline mit Flink unter Verwendung der verschiedenen APIs zu erstellen. Jetzt fangen die coolen Sachen an! Sie können meine Beiträge verfolgen und benachrichtigt werden, wenn die Blogbeiträge veröffentlicht werden, indem Sie mir auf Twitter folgen.

📝 Gefällt dir, was du liest?

Dann lass uns in Kontakt treten! Unser Engineering-Team wird sich so schnell wie möglich bei dir melden.