Kafka 101 Tutorial - Kafka Sales Producer mit Apache Avro
Anforderungen
Das Projekt wurde mit openjdk 11.0.18
entwickelt und mit Apache Maven 3.8.7
gebaut.
Um Ihre Java- und Maven-Version zu überprüfen, geben Sie die folgenden Befehle ein; die Ausgabe für meine Konfiguration finden Sie unten.
|
|
Wenn Sie diese Konfigurationen nicht haben, oder wenn Sie Java und Maven nicht installieren können oder wollen, habe ich auch ein Docker-Image für diese kleine Java-Anwendung erstellt. Stellen Sie sicher, dass Sie Docker installiert haben. Meine Docker-Konfiguration lautet
|
|
Das Image der Anwendung kann von meinem dockerhub repo gezogen werden.
Erstellen Sie das Projekt
Klonen des Repositorys
Der erste Schritt besteht darin, dieses Repository zu klonen. Wechseln Sie mit cd
in den Ordner, in dem Sie das Repository dieses Projekts hosten wollen, und geben Sie den folgenden Befehl ein
|
|
Wenn Sie mit dem Kafka-Producer herumspielen möchten, können Sie dies tun und die jar-Datei der Anwendung mit dem folgenden Befehl im Ordner ./kafka_sales_producer
erstellen.
|
|
Sie können auch das mitgelieferte Dockerfile verwenden, um Ihr eigenes Docker Image zu erstellen, auf dem Ihr Producer läuft. Um Ihr lokales Abbild zu erstellen, führen Sie den folgenden Befehl im Ordner /kafka_sales_producer
aus.
|
|
Die Infrastruktur
Dieses Repo enthält nur die Java-Anwendung, die Messages an einen bestehenden Kafka-Cluster weiterleitet. Das bedeutet, dass dieser Code nicht läuft, es sei denn, er kann Messages an einen bestehenden Kafka-Cluster pushen. Um diese Anwendung zum Laufen zu bringen, müssen wir einen Kafka-Cluster und die zugehörigen Dienste starten…
Das Gute daran ist, dass dies das Topic meines vorherigen Blogposts war! Die Einrichtung einer lokalen Kafka-Infrastruktur ist ganz einfach: Schauen Sie sich einfach mein github repo an, das diesem Blogbeitrag beiliegt.
Um den Cluster zum Laufen zu bringen, gehen Sie wie folgt vor
|
|
Sie sollten 4 Dienste sehen, die im Docker-Netzwerk kafka_101_default
laufen:
- control-center
- schema-registry
- broker
- zookeeper
Jetzt, wo die Infrastruktur läuft, können wir den Kafka-Producer starten!
Ausführen der Anwendung
Die Anwendung kann entweder direkt in der CLI mit dem Befehl “java” ausgeführt werden oder in einem Docker-Container, was ich empfehlen würde da es sich in der Praxis bewährt hat.
In der CLI
Sie können den Producer lokal mit der java CLI ausführen. Die Hauptmethode App.java
nimmt einige Kommandozeilenargumente für die Adresse des Kafka-Servers und der Schemaregistrierung entgegen. Denn ja, in diesem Repo werden wir auch mit der Confluent’s schema registry herumspielen. Eine Videoeinführung zur Schemaregistrierung und warum Sie die Schemaregistrierung überhaupt verwenden sollten, finden Sie hier.
Unter der Annahme, dass Sie den Kafka-Cluster mit der vorgeschlagenen Methode gestartet haben, sollte der Kafka-Broker auf localhost:9092
und die Schema-Registry auf localhost:8081
laufen. Wenn Sie eine andere Portzuordnung gewählt haben, aktualisieren Sie den Wert für Ports und geben Sie den folgenden Befehl ein, um mit der Erzeugung von Ereignissen zu beginnen:
|
|
In einem Docker-Container
Alternativ können Sie die Anwendung auch in einem Docker-Container ausführen. Wenn Sie den Producer in einem Docker-Container ausführen, sind die Kafka- und Schema-Registrierungsdienste nicht mehr lokal beim Producer. Daher müssen Sie den Producer mit dem Netzwerk verbinden, in dem diese Container ausgeführt werden, und sie über ihre Containernamen ansprechen. Nochmals, wenn Sie den vorgeschlagenen Ansatz befolgt haben, um die Infrastruktur zum Laufen zu bringen, wird der Kafka-Broker auf http://broker:29092
und die Schema-Registry auf http://schema-registry:8081
laufen; und alle diese Container sollten auf dem kafka_101_default
Netzwerk laufen.
Um zu überprüfen, ob dies tatsächlich der Fall ist, prüfen Sie, ob das Netzwerk tatsächlich existiert:
|
|
Um zu überprüfen, ob der Broker und die Schemaregistrierung tatsächlich mit diesem Netzwerk verbunden sind, geben Sie folgenden Befehl ein
|
|
Mit folgendem Befehl startet der Producer das produzieren
|
|
Sanity Checks
Standardmäßig sollte der Producer Messages in dem automatisch erstellten Topic SALES
mit einer Frequenz von 1 msg/s produzieren. Um zu überprüfen, ob tatsächlich Messages produziert werden, gehen Sie zur Confluent Control Center UI auf localhost:9021 und navigieren Sie zur Registerkarte Topics
. Klicken Sie auf das Topic SALES
und dann auf die Registerkarte Messages
. Dort sollten Sie die Messages sehen, die den Cluster erreichen.
Sie sollten etwas wie das folgende Bild sehen:
Ein Schritt weiter
Um einen Schritt weiter zu gehen, werden wir die Schemaregistrierung nutzen, um die [Schemavalidierung] (https://docs.confluent.io/platform/current/schema-registry/schema-validation.html#sv-on-cs) durchzusetzen. Beim Ausführen der Java-Anwendung wurde das Topic SALES
, da es ursprünglich nicht registriert war, automatisch erstellt, als es veröffentlicht wurde. Und standardmäßig ist bei der Erstellung des Topics die Schemavalidierung nicht aktiviert, auch wenn der Producer das Schema in der Schemaregistrierung registriert hat.
Um zu sehen, ob das Schema tatsächlich registriert wurde, navigieren Sie zur Registerkarte Schema
des Topic SALES
in der Benutzeroberfläche. Oder geben Sie das Folgende in Ihren Browser ein:
|
|
Aber die Tatsache, dass die Schema-Validierung nicht aktiviert ist, bedeutet, dass die Producer-Anwendung anfangen könnte, “verunreinigte” Messages zu produzieren, die nicht dem Schema entsprechen, und damit davonkommen könnte. Um das auszuprobieren, fügen wir dem Topic mithilfe von Confluent CLI manuell einige Messages hinzu, indem wir einfach Strings ohne Schlüssel senden.
|
|
Dann konsumieren Sie diese Meldungen über die CLI.
|
|
Sie sehen dann etwas wie unten
Und da die Messages mit Avro serialisiert werden, werden sie im Terminal als seltsame Zeichen angezeigt. Sie sehen aber, dass die beiden Messages hello
und world
zum Topic hinzugefügt wurden. Dies ist kein gutes Verhalten, da diese Daten nicht in das Schema passen und den Verbrauch auf der Seite der Consumer stören könnten.
Um das zu umgehen, müssen wir eine Schemavalidierung erzwingen. Dies geschieht auf der Ebene des Topics, bei der Erstellung des Topics. Löschen wir das Topic SALES
und erstellen es neu mit:
|
|
Wie Sie sehen können, wird beim Senden von Messages, die nicht in das Schema passen, eine org.apache.kafka.common.InvalidRecordException
ausgelöst. Dadurch wird sichergestellt, dass verunreinigte Datensätze nicht an die Consumer und schließlich an die nachgeschalteten Anwendungen weitergegeben werden.
Sie können nun wieder normal produzieren, indem Sie den Producer verwenden, der Messages passend zum Schema sendet! Et voilà!
Was kommt als nächstes?
Dieses Repo wird als Blogpost auf der Acosom-Website veröffentlicht. Im folgenden Beitrag (und Repo) werden wir die vorgestellten Infrastrukturen und Konzepte verwenden, um mit Apache Flink zu beginnen. Wir werden diesen fiktiven Verkaufsdatenproduzenten verwenden, um eine Streaming-Analytics-Pipeline mit Flink unter Verwendung der verschiedenen APIs zu erstellen. Jetzt fangen die coolen Sachen an! Sie können meine Beiträge verfolgen und benachrichtigt werden, wenn die Blogbeiträge veröffentlicht werden, indem Sie mir auf Twitter folgen.
📝 Gefällt dir, was du liest?
Dann lass uns in Kontakt treten! Unser Engineering-Team wird sich so schnell wie möglich bei dir melden.