Technologie

Kafka 101 Tutorial - Streaming Analytics mit Apache Flink

14. April 2023

Button Github Button Linkedin Button Twitter


💡 Einführung

In diesem neuen Blogbeitrag bauen wir auf der Infrastruktur und dem Kafka-Producer auf, die in den ersten beiden Blogbeiträgen unserer Kafka 101 Tutorial Reihe eingeführt wurden. Wir simulieren ein Unternehmen, das Verkaufsereignisse von seinen vielen physischen Geschäften innerhalb seiner Kafka-Infrastruktur erhält, und wir führen die grundlegende Apache Flink-Architektur ein, um Streaming-Analysen zusätzlich zu diesen Verkaufsereignissen durchzuführen.

Speziell für diesen Anwendungsfall aggregieren wir Verkaufsereignisse pro Geschäft und geben den Gesamtverkaufsbetrag pro Geschäft und Zeitfenster an. Der Ausgabedatenstrom der aggregierten Verkäufe wird dann an die Kafka-Infrastruktur zurückgemeldet – und der neue Datentyp wird in der Schema-Registrierung registriert – für nachgelagerte Anwendungen. Wir führen eine zeitbasierte Aggregation durch, da dies ein grundlegendes Beispiel für zeitbasierte Analysen ist, die mit Apache Flink durchgeführt werden können. Für diesen ersten Flink-basierten Blogbeitrag verwenden wir die Flink SQL-API. In einem zukünftigen Blogbeitrag werden wir uns mit der mächtigeren Java-API befassen.

🐋 Voraussetzungen

Um dieses Projekt zum Laufen zu bringen, benötigen Sie nur minimale Anforderungen: Es muss nur Docker und Docker Compose installiert sein.

Die Versionen, die ich zum Erstellen des Projekts verwendet habe, sind

1
2
3
4
5
6
7
## Docker
docker --version
> Docker version 23.0.3, build 3e7cbfdee1

## Docker Compose
docker-compose --version
> Docker Compose version 2.17.2

Wenn Ihre Versionen unterschiedlich sind, sollte das kein großes Problem darstellen. Neuere Versionen können jedoch Warnungen oder Fehler auslösen, die Sie selbst debuggen müssten.

🏭 Infrastruktur

Wir beginnen damit, die Infrastruktur auf unserem lokalen Computer zum Laufen zu bringen. Nichts einfacher als das! Geben Sie einfach die folgenden Befehle ein

1
2
3
4
5
6
7
8
## Clone the repo
git clone https://github.com/theodorecurtil/flink_sql_job.git

## cd to the repo
cd flink_sql_job

## docker-compose the infra
docker-compose up -d

Das bringt die uns vertraute Kafka-Infrastruktur sowie den Kafka-Sales-Producer, den wir in unserem letzten Blogbeitrag vorgestellt haben. Der Producer beginnt sofort mit der Produktion mit einer Frequenz von 1 Nachricht pro Sekunde. Sie können überprüfen, ob alles ordnungsgemäß ausgeführt wird, indem Sie zur Confluent Control Center-Benutzeroberfläche auf localhost:9021 navigieren. Gehen Sie dann zur Registerkarte „Topics“ und klicken Sie auf das Topic SALES. Zu diesem Topic produziert der Kafka-Producer. Sie sollten etwas Ähnliches wie das Folgende sehen.

🐿 Sie können aus der Datei docker-compose erkennen, dass am Ende der Datei 3 Dienste erzeugt werden.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
version: '3.3'
services:
  ...
    
  jobmanager:
    ...

  taskmanager:
    ...

  sql-client:
    ...

Der jobmanager Container unterstützt den Flink Job Manager und der taskmanager den Flink Task Manager. Eine Erinnerung oder einen Crashkurs über die Flink-Architektur finden Sie unter diesem Link. Grundsätzlich besteht ein Flink-Cluster aus einem Job-Manager und einem oder mehreren Task-Managern. Der Job-Manager ist für die Verteilung der Arbeit an die Task-Manager verantwortlich, und Task-Manager erledigen die eigentliche Arbeit.

Der dritte Dienst – „sql-client“ – ist der SQL-Client, mit dem wir SQL-Jobs an unseren Flink-Cluster senden können. In einer typischen Produktionsumgebung werden Flink-Jobs mit der Java/Scala-API entworfen, und der SQL-Client ist nicht erforderlich. Für Testfälle bleibt dieser SQL-Client praktisch.

Flink wird mit einer sehr schönen Benutzeroberfläche ausgeliefert. Sie können unter localhost:18081 darauf zugreifen. Wenn Sie diesem Link folgen, sollten Sie so etwas wie das Folgende sehen

Sie können sehen, dass 1 Task-Manager registriert ist – da wir einen einzelnen Task-Manager-Container gestartet haben – mit 2 verfügbaren Task-Slots und 0 laufenden Jobs. Es gibt 2 Task-Slots, da wir speziell 1 Task-Manager mit 2 Task-Slots erstellt haben – taskmanager.numberOfTaskSlots: 2 config im taskmanager – insgesamt 2 Task-Slots.

⚡ Machen wir ein paar Streaming-Analysen

Jetzt, da alles läuft, ist es an der Zeit, die Leistungsfähigkeit von Apache Flink zu demonstrieren.

Zur Erinnerung: Wir haben einen Kafka-Producer, der jede Sekunde Verkaufsaufzeichnungen für 3 Geschäfte produziert. Eine mögliche Analyse wäre, den Gesamtumsatz pro Geschäft zu aggregieren, um zu vergleichen, welche Geschäfte mehr Umsatz machen als andere. Und wenn wir schon dabei sind, lassen Sie uns auch pro 1-Minuten-Zeitfenster aggregieren, indem wir “tumbling windows” verwenden. Diese “tumbling windows” haben eine feste Zeitdauer und überschneiden sich nicht. Das Bild aus der Flink-Dokumentation veranschaulicht das sehr gut.

Unser Ausgabedatenstrom von Flink wird 1 Datenpunkt pro Geschäft und Minute betragen. Für diesen speziellen Anwendungsfall werden wir den Datenstrom in einem neuen Topic an Kafka zurückführen, aber Flink unterstützt beispielsweise auch das Sinken in eine SQL-Datenbank oder auch benutzerdefinierte Sinks wie eine Redis-Sink; hängt von den Anforderungen ab. Ich mag es zu Kafka zurück zu “sinken”, um einen zentralisierten Stream-Katalog zu haben, da ich es auf der Verbraucher-/Downstream-Seite einfacher finde.

Die Logik ist in einer .sql-Datei implementiert. Quellen- und “Sink”-Tabellen werden erstellt, und dann ist die Logik für diesen Anwendungsfall sehr einfach

1
2
3
SELECT store_id, window_start, window_end, sum(sale_amount) as aggregated_sales
FROM TABLE(TUMBLE(TABLE SALES, DESCRIPTOR(sale_ts), INTERVAL '60' SECONDS))
GROUP BY store_id, window_start, window_end;

Dieser sehr einfache SQL-Ausdruck weist Flink an, Verkaufsereignisse Zeitfenstern (diesen “tumbling windows” mit einer Breite von 1 Minute) zuzuweisen und dann die Gesamtverkäufe pro „store_id“ und pro „window_start“-Zeitstempel zu berechnen.

Wir verwenden die tatsächlichen Zeitstempel der Verkaufsereignisse, um Zeit und Gruppenbeobachtungen zu messen; Dies wird in Flink als Ereigniszeit bezeichnet. Flink unterstützt eine andere Art von Zeit, um Zeit und geordnete Ereignisse im Auge zu behalten: Bearbeitungszeit. Mit der Ereigniszeit tragen Ereignisse, die in Flink einfließen, ihre Zeitstempel (wie die genauen Zeitstempel, wann die Verkäufe stattgefunden haben).

Um den Flink-Job zu starten, müssen wir in den Container „sql-client“ gehen und den Job manuell starten, indem wir die Datei „.sql“ übergeben. Dazu geben wir die folgenden Befehle ein:

1
2
3
4
5
## Enter the SQL client
docker exec -it sql-client bash

## Start the job
flink@sql-client~$ sql-client.sh -f sales-aggregate.sql

Der SQL-Client sollte dann die folgende Ausgabe anzeigen

1
2
3
4
5
6
7
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c984a31a8e94fd41025f55621c2f7d02


Flink SQL> 
Shutting down the session...
done.

wobei die Job-ID für Sie anders wäre, da sie zufällig generiert wird. Wir können auch sehen, dass der Job ausgeführt wird, indem wir zur Flink-Benutzeroberfläche auf localhost:18081 gehen.

Sie können den ausgeführten Job in der Hauptansicht der Benutzeroberfläche sehen und dann weitere Details zum ausgeführten Job anzeigen.

Abschließend, um den Kreis zu schließen, lassen Sie uns schließlich überprüfen, ob unser Ausgabedatenstrom tatsächlich in Kafka “gesinkt” wird! Flink schreibt – produziert – zum Topic SALES_AGGREGATE. Das Topic wird automatisch erstellt, wenn Flink damit beginnt zu produzieren – da wir die automatische Topic-Erstellung auf unserem Kafka-Cluster zugelassen haben – und die entsprechenden Schemas werden in der Schema-Registrierung registriert. Gehen Sie zum Confluent Control Center unter localhost:9021 und navigieren Sie zur Registerkarte „Topics“ und dann zu SALES_AGGREGATE. Sie sollten sehen, dass jede Minute Nachrichten veröffentlicht werden – da dies die Länge unseres Aggregationsfensters ist.

☠ Die Infrastruktur abschalten

Sobald wir mit unserem Spielzeugbeispiel fertig sind, ist es einfach, alle Prozesse zu stoppen. Geben Sie einfach den folgenden Befehl ein

1
docker-compose down

⏭ Was kommt als nächstes?

In diesem Blogbeitrag haben wir gesehen, wie man auf der zuvor entwickelten Kafka-Infrastruktur aufbauen kann, um mit Apache Flink unter Verwendung der SQL-API herumzuspielen. In einem nächsten Blog-Beitrag werden wir sehen, wie der Flink-Job, den wir heute geschrieben haben, mithilfe der Java-API repliziert werden kann, um das Design komplexerer Jobs zu ermöglichen. Bleiben Sie dran! 📻

📝 Gefällt dir, was du liest?

Dann lass uns in Kontakt treten! Unser Engineering-Team wird sich so schnell wie möglich bei dir melden.