Effektives Testen von Flink SQL-Skripten mit Unit- & Integrationstests

Überblick
Die Entwicklung von Flink SQL-Skripten kann anspruchsvoll sein, insbesondere für Nicht-Entwickler wie Data Scientists.
Die Vereinfachung des Entwicklungsprozesses ist entscheidend, um Korrektheit und Zuverlässigkeit zu gewährleisten, wobei Tests eine zentrale Rolle spielen.
In diesem Artikel untersuchen wir, wie man ein Tool nutzt, das wir intern Test Runner nennen. Es ermöglicht uns, Unit- und Integrationstests für Flink SQL-Skripte durchzuführen.
Wir werden auch die Architektur des Test Runner beleuchten, damit Sie lernen, wie Sie Unit- und Integrationstests für Flink SQL-Skripte angehen können.
Der Code für den Test Runner ist noch nicht Open Source. Wir beabsichtigen, ihn bald zu veröffentlichen. Bitte kontaktieren Sie uns, wenn Sie interessiert sind oder zum Projekt beitragen möchten.
Anwendungsfall
In den folgenden Schritten konzentrieren wir uns auf einen einfachen Anwendungsfall: den Aufbau eines Berichts über die Anzahl deutscher Kunden pro Stadt. Wir verwenden dieses SQL-Skript als unser Testobjekt.
Um dies zu erreichen, können wir das Skript in vier Hauptteile unterteilen:
- Lesen des Streams von Kundendatensätzen aus einem Kafka-Topic;
- Filtern der Datensätze nach Ländercode;
- Gruppierung der gefilterten Kunden nach Stadt, um eine Kundenanzahl pro Stadt zu erstellen;
- Schreiben des Ergebnisses in eine MySQL-Tabelle.
So sieht das Flink SQL-Skript aus:
| |
Unit Testing
Architektur
Wir benötigen Unit-Tests, um das Ergebnis einer oder mehrerer Anweisungen aus einem Flink SQL-Skript zu verifizieren.
Hier sind einige wichtige Details zu den Unit-Tests:
- Tests werden mit einer Java-Datei implementiert, die zur Laufzeit kompiliert und vom
Test Runnerausgeführt wird. - Die Java-Datei erweitert
UnitTestBase, bereitgestellt vomTest Runner, was einige Hilfsmethoden beinhaltet. - Die Methode
getScriptName()muss den Namen der Flink SQL-Skriptdatei zurückgeben. - Alle Flink SQL-Tabellen werden mit Apache Paimon gemockt.
Während des Testlaufs mockt der Test Runner alle WITH()-Klauseln und ersetzt sie durch einen Apache Paimon-Connector, der die Ergebnisse lokal speichert. Dies ermöglicht es dem Test Runner, alle Tabellen abzufragen. Für VIEWS wird ein Workaround angewendet, da Apache Paimon Flink-Views noch nicht unterstützt. Indem alle Views als TEMPORARY VIEW gemockt werden, behandelt Flink sie wie Tabellen, was mit Apache Paimon kompatibel ist.
Konfiguration
Um die Unit-Tests auszuführen, kann der Test Runner über die folgenden Umgebungsvariablen konfiguriert werden:
| |
Test-Implementierung
Als Nächstes verwenden wir die folgende Basis-Java-Datei, um unseren Unit-Test zu implementieren:
| |
Als Nächstes bereiten wir die Testdaten vor, indem wir Zeilen in die Quelltabelle einfügen.
Dies tun wir, indem wir die folgenden Anweisungen in der Methode prepareTestData() ausführen:
| |
Schließlich verwenden wir die folgenden Hilfsmethoden der Klasse UnitTestBase, um die Ausgabe jeder Verarbeitungsstufe zu verifizieren:
| |
Wir können daher die folgenden Assertions definieren, um zu verifizieren, dass die Kunden korrekt gefiltert wurden:
| |
Und die folgenden Assertions, um zu prüfen, ob die Kunden korrekt gezählt werden:
| |
Ergebnisse
Wenn alles gut geht, sehen wir eine Pass-Nachricht in der Ergebnisdatei:
| |
Integration Testing
Architektur
Wir benötigen Integrationstests immer dann, wenn wir nicht nur unsere Operatoren, sondern auch die Connectoren für unsere Quellen (Sources) und Senken (Sinks) testen wollen. Integrationstests basieren auf der Idee, Tests für das Flink SQL-Skript direkt in Flink SQL zu schreiben. Das Deployment des Jobs sowie dessen Ausführung und das Abrufen der Ergebnisse werden vom Test Runner verwaltet.
Hinweis: Derzeit unterstützen die Integrationstests des Test Runner nur Apache Kafka-Quellen.
Die Eingabedatei (eine JSON-Datei) enthält die Datensätze, die der Test Runner an Kafka sendet.
Der Job verarbeitet die eingehenden Kafka-Datensätze und veröffentlicht das Ergebnis in einer Kafka-Senke, von der der Test Runner liest und anschließend seine Assertions durchführt.
Wie im obigen Architekturdiagramm gezeigt, nutzen wir einen SQL Runner, um den SQL-Job auszuführen. Wir haben den SQL Runner modifiziert, um die Injektion von Umgebungsvariablen zu ermöglichen, die eine literale Ersetzung im SQL-Skript durchführen. Dies erlaubt es uns, Secrets in den SQL-Job einzubinden.
Lokale Services
Lassen Sie uns alle zugehörigen Dienste lokal mit Docker Compose bereitstellen, indem wir den folgenden Befehl ausführen:docker-compose up -d
Die folgenden Dienste werden bereitgestellt:
- Kafka Broker, erreichbar auf Port
9092; - Kafka UI, erreichbar auf Port
9001; - Flink Job Manager, erreichbar auf Port
8081; - Flink Task Manager.
Unser Integrationstest wird all diese Dienste nutzen, um das Flink SQL-Skript zu verifizieren.
Konfiguration
Um die Integrationstests auszuführen, kann der Test Runner über die folgenden Umgebungsvariablen konfiguriert werden:
| |
Test-Implementierung
Die Integrationstests haben die folgende Ordnerstruktur:
📁 test-1
│
├── 📁 input
│ └── 📄 customers.json
│
├── 📁 sqlAssertions
│ ├── 📄 test-customers-positive.sql
│ └── 📄 test-customers-negative.sql
📁 test-2
...
Das Verzeichnis test-1 sollte in dem Verzeichnis liegen, das über die Umgebungsvariable INTEGRATION_TEST_DATA_DIR angegeben wurde. Dieses kann mehrere Unterverzeichnisse für andere Skripte enthalten.
Innerhalb dieses testspezifischen Ordners gibt es einen input-Ordner, der ein JSON-Array mit allen Datensätzen enthält, die an das Eingabetopic gesendet werden (benannt wie die JSON-Datei).
Als Nächstes haben wir das Verzeichnis sqlAssertions, das die Flink SQL-Testdateien enthält.
Testdatei
Dies ist eine SQL-Datei, die Flink SQL und eine benutzerdefinierte Metadaten-Assertion-Syntax unterstützt.
Sie dient dazu, die Datensätze aus den Senken des zu testenden Flink SQL-Skripts zu lesen und die Ergebnisse zu verifizieren.
Zuerst initialisieren wir die Quelltabelle für den Test:
| |
Wir unterstützen derzeit zwei Assertion-Modi:
1. Negativer Modus
| |
Der Test wird sicherstellen, dass das Ergebnis der Anweisung innerhalb der durch die Umgebungsvariable INTEGRATION_TEST_SUCCESS_TIMEOUT_MS definierten Zeit keine Datensätze zurückgibt.
Sofern nicht anders angegeben, wird der negative Modus verwendet.
2. Positiver Modus
| |
Der Test prüft basierend auf den bereitgestellten Metadaten / SQL-Kommentaren. Im obigen Beispiel wird der positive Modus mit mode:positive aktiviert und die erwartete Anzahl an Datensätzen mit outputCount:1 auf 1 gesetzt.
Ergebnisse
Wenn alles gut geht, werden die folgenden Meldungen in die Ergebnisdatei geschrieben:
Test Suite: sqlassertrunner.SqlParserIntegrationTest:
- Result: Pass
Fazit
In diesem Artikel haben wir untersucht, wie man den Test Runner verwendet, um Unit- und Integrationstests für Flink SQL-Skripte zu implementieren und auszuführen.
Die vollständige Implementierung der Tests ist im Test Runner Git-Repository verfügbar.