Technology

How to Effectively Test Flink SQL Scripts Using Unit & Intregration Test

Flaviu Cicio
Architect @ Acosom
Robin Fehr
Architect & Co-Founder @ Acosom
July 18, 2024

Overview

Developing Flink SQL scripts can be challenging, especially for non-developers like data scientists.
Simplifying the development process is essential to ensure correctness and reliability, with testing playing a crucial role.

In this article, we’ll explore how to build and use a tool we internally call the Test Runner, which enables us to run unit and integration tests for Flink SQL scripts.

We’ll also delve into the architecture of the Test Runner so you can learn how to approach unit and integration testing for Flink SQL scripts.

The code for the Test Runner isn't open source yet. We intend to publish it soon. Please reach out if you're interested or would like to contribute to the project.

Use Case

In the following steps, we’ll focus on a simple use case where we build a report on the number of German customers per city. We’ll use this SQL script as our test subject.

To achieve this, we can split the script into four main parts:

  1. Reading the stream of customer records from a Kafka topic;
  2. Filtering the records by country code;
  3. Grouping the filtered customers by city to create a customer count per city;
  4. Writing the result into a MySQL table.

Here is what the Flink SQL script looks like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
CREATE TABLE CUSTOMER_SOURCE (
 id STRING,
    name STRING,
 city STRING,
 country_code STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka_broker:29092',
    'properties.group.id' = 'customer-real-time-report',
    'properties.security.protocol' = 'PLAINTEXT',
    'properties.auto.offset.reset' = 'earliest',
    'topic' = 'customers',
    'value.format' = 'json',
    'key.format' = 'raw'
);

CREATE TABLE CUSTOMER_REPORT_SINK (
 city STRING,
 customer_count BIGINT,
    PRIMARY KEY (city) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka_broker:29092',
    'properties.group.id' = 'customer-real-time-report',
    'properties.security.protocol' = 'PLAINTEXT',
    'properties.auto.offset.reset' = 'earliest',
    'topic' = 'customer-report',
    'value.format' = 'json',
    'key.format' = 'raw'
);

CREATE VIEW GERMANY_CUSTOMERS_VIEW AS
SELECT
 id,
 city
FROM CUSTOMER_SOURCE
WHERE country_code = 'DE';

CREATE VIEW CUSTOMER_COUNT_VIEW AS
SELECT
 city,
    COUNT(id) AS customer_count
FROM GERMANY_CUSTOMERS_VIEW
GROUP BY city;

INSERT INTO CUSTOMER_REPORT_SINK
SELECT * FROM CUSTOMER_COUNT_VIEW;

Unit Testing

Architecture

We need unit tests to verify the result of one or more statements from a Flink SQL script.
Here are some key details about the unit tests:

  • Tests are implemented using a Java file that is compiled at runtime and executed by the Test Runner.
  • The Java file extends UnitTestBase, provided by the Test Runner, which includes a few helper methods.
  • The getScriptName() method must return the name of the Flink SQL script file.
  • All Flink SQL tables are mocked using Apache Paimon.

During the test run, the Test Runner mocks all WITH() clauses and replaces them with an Apache Paimon connector that stores the results locally. This enables the Test Runner to query all tables. For VIEWS, a workaround is applied since Apache Paimon doesn’t yet support Flink views. By mocking all views as TEMPORARY VIEW, Flink treats them as tables, which is compatible with Apache Paimon.

Configuration

To run the unit tests, the Test Runner can be configured using the following environment variables:

1
2
3
4
RUN_UNIT_TESTS - set to true to enable unit testing;
RESULT_FILE - path to the text file where the results will be written;
UNIT_TEST_SQL_DIR - path to the directory with the Flink SQL scripts to be tested;
UNIT_TEST_JAVA_DIR - path to the directory with the unit tests.

Test Implementation

Next, we’ll use the following base Java file to implement our unit test:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.junit.Assert;
import org.junit.Test;
import sqlassertrunner.unit.UnitTestBase;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;


public class CustomerReportTest extends UnitTestBase {

    @Test
    public void testCustomerReport() throws ExecutionException, InterruptedException, TimeoutException {
        prepareTestData();
        // ASSERTIONS
    }

    @Override
    public String getScriptName() {
        return "customer-report.sql";
    }

    private void prepareTestData() {
        // TEST DATA PREPARATIONS
    }
}

Next, let’s prepare the test data by inserting rows into the source table.   We’ll do this by executing the following statements in the prepareTestData() method:

1
2
3
    tEnv.executeSql("INSERT INTO CUSTOMER_SOURCE VALUES ('customer-1', 'Adam', 'Berlin', `DE` );");
    tEnv.executeSql("INSERT INTO CUSTOMER_SOURCE VALUES ('customer-1', 'Alex', 'Berlin', `DE` );");
    tEnv.executeSql("INSERT INTO CUSTOMER_SOURCE VALUES ('customer-1', 'Michael', 'London', `UK` );");

Finally, to verify the output of each processing stage, we will use the following helper methods provided by the UnitTestBase class:

1
2
3
- selectAllRowsWithTimeout(<table/view-name>, <timeout-in-seconds>)
- selectRowsWithTimeout(<table/view-name>, <expected-size>)
- selectRowsWithTimeout(<table/view-name>, <expected-size>, <timeout-in-seconds>)

We can therefore define the following assertions to verify that the customers were filtered correctly:

1
2
3
4
5
    List<Row> result = selectAllRowsWithTimeout("GERMANY_CUSTOMERS_VIEW", 15);
    Assert.assertEquals(result.get(0).getKind().toString(), "INSERT");
    Assert.assertEquals(result.get(0).getField(1), "Berlin");
    Assert.assertEquals(result.get(1).getKind().toString(), "INSERT");
    Assert.assertEquals(result.get(1).getField(1), "Berlin");

And, the following assertions to verify that the customers are counted correctly:

1
2
3
4
5
 result = selectAllRowsWithTimeout("CUSTOMER_COUNT_VIEW", 15);
    Assert.assertEquals(3, result.size());
    Assert.assertEquals(result.get(2).getKind().toString(), "UPDATE_AFTER");
    Assert.assertEquals(result.get(2).getField(0), "Berlin");
    Assert.assertEquals(result.get(2).getField(1), 2L);

Results

If everything goes well, we’ll see a Pass message in the result file:

1
2
Test Suite: CustomerReportTest: 
 - Result: Pass

Integration Testing

Architecture

We need integration tests whenever we want to test not only our operators but also the connectors for our sources and sinks. Integration tests are based on the idea of writing tests for the Flink SQL script directly in Flink SQL. The deployment of the job, as well as its execution and result retrieval, are managed by the Test Runner.

Note: Currently, the Test Runner integration tests only support Apache Kafka sources.

The input file, which is a JSON file, contains the records that the Test Runner will publish to Kafka. The job processes the incoming Kafka records and publishes the result to a Kafka sink, from which the Test Runner consumes and then performs its assertions.

As shown in the architecture diagram above, to run the SQL job, we leverage a SQL Runner. We modified the SQL Runner to allow injection of environment variables, which perform a literal replacement in the SQL script. This enables us to include secrets in the SQL job.

Local Services

Let’s deploy all related services locally using Docker Compose by running the following command:   docker-compose up -d

The following services will be deployed:

  • Kafka Broker, exposed on port 9092;
  • Kafka UI, exposed on port 9001;
  • Flink Job Manager, exposed on port 8081;
  • Flink Task Manager.

Our integration test will use all these services to verify the Flink SQL script.

Configuration

To run the integration tests, the Test Runner can be configured using the following environment variables:

1
2
3
4
5
6
7
8
RUN_UNIT_TESTS - set to false to run integration tests instead of unit tests;
RESULT_FILE - path to the text file where the results will be written;
INTEGRATION_TEST_DATA_DIR - path to the directory where the test data will be defined;
INTEGRATION_KAFKA_SERVER - the host and the port of the Kafka broker;
INTEGRATION_FLINK_JOBMANAGER_SERVER - the host and the port of the Flink Job Manager;
INTEGRATION_TEST_SUCCESS_TIMEOUT_MS - used exclusively in negative mode, it represents the amount of time to wait for records until the test is marked as successful;
INTEGRATION_TEST_JOB_SQL_FILE - the name of the SQL file to be tested;
INTEGRATION_OUTPUT_TOPICS - the list of output topics separated by comma.

Test Implementation

The integration tests will have the following folder structure:

📁 test-1
│
├── 📁 input
│   └── 📄 customers.json
│
├── 📁 sqlAssertions
│   ├── 📄 test-customers-positive.sql
│   └── 📄 test-customers-negative.sql
📁 test-2
...

The test-1 directory should reside within the directory specified via the environment variable INTEGRATION_TEST_DATA_DIR, which may contain multiple sub-directories for other scripts.

Within this test specific folder, there is an input folder, which contains a JSON array with all the records that will be published to the input topic, named the same as the JSON file.

Next, we have the sqlAssertions directory, which contains the Flink SQL testing files.

Testing File

This is a SQL file that supports Flink SQL and a custom metadata assertion syntax.
It is intended to be used to read the records from the sinks of the Flink SQL script being tested and verify the results.

First, let’s initialize the source table for the test:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE TABLE CUSTOMER_REPORT_SOURCE (
 city STRING,
 customer_count BIGINT,
    PRIMARY KEY (city) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka_broker:29092',
    'properties.group.id' = 'customer-real-time-report',
    'properties.security.protocol' = 'PLAINTEXT',
    'properties.auto.offset.reset' = 'earliest',
    'topic' = 'customer-report',
    'value.format' = 'json',
    'key.format' = 'raw'
);

We currently support two assertion modes:   1. Negative Mode

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
SELECT * FROM CUSTOMER_REPORT_SOURCE WHERE id NOT IN ('1', '2', '3');
```  
The test will assert that the result of the statement returns no records within the time defined by the `INTEGRATION_TEST_SUCCESS_TIMEOUT_MS` environment variable.

If not otherwise specified, the negative mode will be used.

**2. Positive Mode**
```sql
SELECT * FROM CUSTOMER_REPORT_SOURCE
WHERE customer_count = 2 AND city = 'Berlin'; -- outputCount:1 mode:positive

The test will assert based on the provided metadata / SQL comments. In the example above, it enables positive mode using mode:positive and sets the expected number of records to 1 with outputCount:1.

Results

If everything goes well, we the following messages will be printed to the result file:

Test Suite: sqlassertrunner.SqlParserIntegrationTest: 
 - Result: Pass

Conclusion

In this article, we explored how to use the Test Runner to implement and run unit and integration tests for Flink SQL Scripts.

The full implementation of the tests is available in the Test Runner Git repository.