Technologie

Effektives Testen von Flink SQL-Skripten mit Unit- & Integrationstests

18. Juli 2024

Überblick

Die Entwicklung von Flink SQL-Skripten kann anspruchsvoll sein, insbesondere für Nicht-Entwickler wie Data Scientists.
Die Vereinfachung des Entwicklungsprozesses ist entscheidend, um Korrektheit und Zuverlässigkeit zu gewährleisten, wobei Tests eine zentrale Rolle spielen.

In diesem Artikel untersuchen wir, wie man ein Tool nutzt, das wir intern Test Runner nennen. Es ermöglicht uns, Unit- und Integrationstests für Flink SQL-Skripte durchzuführen.

Wir werden auch die Architektur des Test Runner beleuchten, damit Sie lernen, wie Sie Unit- und Integrationstests für Flink SQL-Skripte angehen können.

Der Code für den Test Runner ist noch nicht Open Source. Wir beabsichtigen, ihn bald zu veröffentlichen. Bitte kontaktieren Sie uns, wenn Sie interessiert sind oder zum Projekt beitragen möchten.

Anwendungsfall

In den folgenden Schritten konzentrieren wir uns auf einen einfachen Anwendungsfall: den Aufbau eines Berichts über die Anzahl deutscher Kunden pro Stadt. Wir verwenden dieses SQL-Skript als unser Testobjekt.

Um dies zu erreichen, können wir das Skript in vier Hauptteile unterteilen:

  1. Lesen des Streams von Kundendatensätzen aus einem Kafka-Topic;
  2. Filtern der Datensätze nach Ländercode;
  3. Gruppierung der gefilterten Kunden nach Stadt, um eine Kundenanzahl pro Stadt zu erstellen;
  4. Schreiben des Ergebnisses in eine MySQL-Tabelle.

So sieht das Flink SQL-Skript aus:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
CREATE TABLE CUSTOMER_SOURCE (
 id STRING,
    name STRING,
 city STRING,
 country_code STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka_broker:29092',
    'properties.group.id' = 'customer-real-time-report',
    'properties.security.protocol' = 'PLAINTEXT',
    'properties.auto.offset.reset' = 'earliest',
    'topic' = 'customers',
    'value.format' = 'json',
    'key.format' = 'raw'
);

CREATE TABLE CUSTOMER_REPORT_SINK (
 city STRING,
 customer_count BIGINT,
    PRIMARY KEY (city) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka_broker:29092',
    'properties.group.id' = 'customer-real-time-report',
    'properties.security.protocol' = 'PLAINTEXT',
    'properties.auto.offset.reset' = 'earliest',
    'topic' = 'customer-report',
    'value.format' = 'json',
    'key.format' = 'raw'
);

CREATE VIEW GERMANY_CUSTOMERS_VIEW AS
SELECT
 id,
 city
FROM CUSTOMER_SOURCE
WHERE country_code = 'DE';

CREATE VIEW CUSTOMER_COUNT_VIEW AS
SELECT
 city,
    COUNT(id) AS customer_count
FROM GERMANY_CUSTOMERS_VIEW
GROUP BY city;

INSERT INTO CUSTOMER_REPORT_SINK
SELECT * FROM CUSTOMER_COUNT_VIEW;

Unit Testing

Architektur

Wir benötigen Unit-Tests, um das Ergebnis einer oder mehrerer Anweisungen aus einem Flink SQL-Skript zu verifizieren.
Hier sind einige wichtige Details zu den Unit-Tests:

  • Tests werden mit einer Java-Datei implementiert, die zur Laufzeit kompiliert und vom Test Runner ausgeführt wird.
  • Die Java-Datei erweitert UnitTestBase, bereitgestellt vom Test Runner, was einige Hilfsmethoden beinhaltet.
  • Die Methode getScriptName() muss den Namen der Flink SQL-Skriptdatei zurückgeben.
  • Alle Flink SQL-Tabellen werden mit Apache Paimon gemockt.

Während des Testlaufs mockt der Test Runner alle WITH()-Klauseln und ersetzt sie durch einen Apache Paimon-Connector, der die Ergebnisse lokal speichert. Dies ermöglicht es dem Test Runner, alle Tabellen abzufragen. Für VIEWS wird ein Workaround angewendet, da Apache Paimon Flink-Views noch nicht unterstützt. Indem alle Views als TEMPORARY VIEW gemockt werden, behandelt Flink sie wie Tabellen, was mit Apache Paimon kompatibel ist.

Konfiguration

Um die Unit-Tests auszuführen, kann der Test Runner über die folgenden Umgebungsvariablen konfiguriert werden:

1
2
3
4
RUN_UNIT_TESTS - auf true setzen, um Unit-Tests zu aktivieren;
RESULT_FILE - Pfad zur Textdatei, in die die Ergebnisse geschrieben werden;
UNIT_TEST_SQL_DIR - Pfad zum Verzeichnis mit den zu testenden Flink SQL-Skripten;
UNIT_TEST_JAVA_DIR - Pfad zum Verzeichnis mit den Unit-Tests.

Test-Implementierung

Als Nächstes verwenden wir die folgende Basis-Java-Datei, um unseren Unit-Test zu implementieren:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.junit.Assert;
import org.junit.Test;
import sqlassertrunner.unit.UnitTestBase;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;


public class CustomerReportTest extends UnitTestBase {

    @Test
    public void testCustomerReport() throws ExecutionException, InterruptedException, TimeoutException {
        prepareTestData();
        // ASSERTIONS
    }

    @Override
    public String getScriptName() {
        return "customer-report.sql";
    }

    private void prepareTestData() {
        // TEST DATA PREPARATIONS
    }
}

Als Nächstes bereiten wir die Testdaten vor, indem wir Zeilen in die Quelltabelle einfügen.
Dies tun wir, indem wir die folgenden Anweisungen in der Methode prepareTestData() ausführen:

1
2
3
    tEnv.executeSql("INSERT INTO CUSTOMER_SOURCE VALUES ('customer-1', 'Adam', 'Berlin', `DE` );");
    tEnv.executeSql("INSERT INTO CUSTOMER_SOURCE VALUES ('customer-1', 'Alex', 'Berlin', `DE` );");
    tEnv.executeSql("INSERT INTO CUSTOMER_SOURCE VALUES ('customer-1', 'Michael', 'London', `UK` );");

Schließlich verwenden wir die folgenden Hilfsmethoden der Klasse UnitTestBase, um die Ausgabe jeder Verarbeitungsstufe zu verifizieren:

1
2
3
- selectAllRowsWithTimeout(<table/view-name>, <timeout-in-seconds>)
- selectRowsWithTimeout(<table/view-name>, <expected-size>)
- selectRowsWithTimeout(<table/view-name>, <expected-size>, <timeout-in-seconds>)

Wir können daher die folgenden Assertions definieren, um zu verifizieren, dass die Kunden korrekt gefiltert wurden:

1
2
3
4
5
    List<Row> result = selectAllRowsWithTimeout("GERMANY_CUSTOMERS_VIEW", 15);
    Assert.assertEquals(result.get(0).getKind().toString(), "INSERT");
    Assert.assertEquals(result.get(0).getField(1), "Berlin");
    Assert.assertEquals(result.get(1).getKind().toString(), "INSERT");
    Assert.assertEquals(result.get(1).getField(1), "Berlin");

Und die folgenden Assertions, um zu prüfen, ob die Kunden korrekt gezählt werden:

1
2
3
4
5
 result = selectAllRowsWithTimeout("CUSTOMER_COUNT_VIEW", 15);
    Assert.assertEquals(3, result.size());
    Assert.assertEquals(result.get(2).getKind().toString(), "UPDATE_AFTER");
    Assert.assertEquals(result.get(2).getField(0), "Berlin");
    Assert.assertEquals(result.get(2).getField(1), 2L);

Ergebnisse

Wenn alles gut geht, sehen wir eine Pass-Nachricht in der Ergebnisdatei:

1
2
Test Suite: CustomerReportTest: 
 - Result: Pass

Integration Testing

Architektur

Wir benötigen Integrationstests immer dann, wenn wir nicht nur unsere Operatoren, sondern auch die Connectoren für unsere Quellen (Sources) und Senken (Sinks) testen wollen. Integrationstests basieren auf der Idee, Tests für das Flink SQL-Skript direkt in Flink SQL zu schreiben. Das Deployment des Jobs sowie dessen Ausführung und das Abrufen der Ergebnisse werden vom Test Runner verwaltet.

Hinweis: Derzeit unterstützen die Integrationstests des Test Runner nur Apache Kafka-Quellen.

Die Eingabedatei (eine JSON-Datei) enthält die Datensätze, die der Test Runner an Kafka sendet.
Der Job verarbeitet die eingehenden Kafka-Datensätze und veröffentlicht das Ergebnis in einer Kafka-Senke, von der der Test Runner liest und anschließend seine Assertions durchführt.

Wie im obigen Architekturdiagramm gezeigt, nutzen wir einen SQL Runner, um den SQL-Job auszuführen. Wir haben den SQL Runner modifiziert, um die Injektion von Umgebungsvariablen zu ermöglichen, die eine literale Ersetzung im SQL-Skript durchführen. Dies erlaubt es uns, Secrets in den SQL-Job einzubinden.

Lokale Services

Lassen Sie uns alle zugehörigen Dienste lokal mit Docker Compose bereitstellen, indem wir den folgenden Befehl ausführen:
docker-compose up -d

Die folgenden Dienste werden bereitgestellt:

  • Kafka Broker, erreichbar auf Port 9092;
  • Kafka UI, erreichbar auf Port 9001;
  • Flink Job Manager, erreichbar auf Port 8081;
  • Flink Task Manager.

Unser Integrationstest wird all diese Dienste nutzen, um das Flink SQL-Skript zu verifizieren.

Konfiguration

Um die Integrationstests auszuführen, kann der Test Runner über die folgenden Umgebungsvariablen konfiguriert werden:

1
2
3
4
5
6
7
8
RUN_UNIT_TESTS - auf false setzen, um Integrationstests anstelle von Unit-Tests auszuführen;
RESULT_FILE - Pfad zur Textdatei, in die die Ergebnisse geschrieben werden;
INTEGRATION_TEST_DATA_DIR - Pfad zum Verzeichnis, in dem die Testdaten definiert sind;
INTEGRATION_KAFKA_SERVER - Host und Port des Kafka-Brokers;
INTEGRATION_FLINK_JOBMANAGER_SERVER - Host und Port des Flink Job Managers;
INTEGRATION_TEST_SUCCESS_TIMEOUT_MS - wird exklusiv im negativen Modus verwendet; es stellt die Zeitspanne dar, die auf Datensätze gewartet wird, bis der Test als erfolgreich markiert wird;
INTEGRATION_TEST_JOB_SQL_FILE - der Name der zu testenden SQL-Datei;
INTEGRATION_OUTPUT_TOPICS - die Liste der Output-Topics, getrennt durch Komma.

Test-Implementierung

Die Integrationstests haben die folgende Ordnerstruktur:

📁 test-1
│
├── 📁 input
│   └── 📄 customers.json
│
├── 📁 sqlAssertions
│   ├── 📄 test-customers-positive.sql
│   └── 📄 test-customers-negative.sql
📁 test-2
...

Das Verzeichnis test-1 sollte in dem Verzeichnis liegen, das über die Umgebungsvariable INTEGRATION_TEST_DATA_DIR angegeben wurde. Dieses kann mehrere Unterverzeichnisse für andere Skripte enthalten.

Innerhalb dieses testspezifischen Ordners gibt es einen input-Ordner, der ein JSON-Array mit allen Datensätzen enthält, die an das Eingabetopic gesendet werden (benannt wie die JSON-Datei).

Als Nächstes haben wir das Verzeichnis sqlAssertions, das die Flink SQL-Testdateien enthält.

Testdatei

Dies ist eine SQL-Datei, die Flink SQL und eine benutzerdefinierte Metadaten-Assertion-Syntax unterstützt.
Sie dient dazu, die Datensätze aus den Senken des zu testenden Flink SQL-Skripts zu lesen und die Ergebnisse zu verifizieren.

Zuerst initialisieren wir die Quelltabelle für den Test:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE TABLE CUSTOMER_REPORT_SOURCE (
 city STRING,
 customer_count BIGINT,
    PRIMARY KEY (city) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka_broker:29092',
    'properties.group.id' = 'customer-real-time-report',
    'properties.security.protocol' = 'PLAINTEXT',
    'properties.auto.offset.reset' = 'earliest',
    'topic' = 'customer-report',
    'value.format' = 'json',
    'key.format' = 'raw'
);

Wir unterstützen derzeit zwei Assertion-Modi:
1. Negativer Modus

1
SELECT * FROM CUSTOMER_REPORT_SOURCE WHERE id NOT IN ('1', '2', '3');

Der Test wird sicherstellen, dass das Ergebnis der Anweisung innerhalb der durch die Umgebungsvariable INTEGRATION_TEST_SUCCESS_TIMEOUT_MS definierten Zeit keine Datensätze zurückgibt.

Sofern nicht anders angegeben, wird der negative Modus verwendet.

2. Positiver Modus

1
2
SELECT * FROM CUSTOMER_REPORT_SOURCE
WHERE customer_count = 2 AND city = 'Berlin'; -- outputCount:1 mode:positive

Der Test prüft basierend auf den bereitgestellten Metadaten / SQL-Kommentaren. Im obigen Beispiel wird der positive Modus mit mode:positive aktiviert und die erwartete Anzahl an Datensätzen mit outputCount:1 auf 1 gesetzt.

Ergebnisse

Wenn alles gut geht, werden die folgenden Meldungen in die Ergebnisdatei geschrieben:

Test Suite: sqlassertrunner.SqlParserIntegrationTest: 
 - Result: Pass

Fazit

In diesem Artikel haben wir untersucht, wie man den Test Runner verwendet, um Unit- und Integrationstests für Flink SQL-Skripte zu implementieren und auszuführen.

Die vollständige Implementierung der Tests ist im Test Runner Git-Repository verfügbar.