How to Effectively Test Flink SQL Scripts Using Unit & Intregration Test
Overview
Developing Flink SQL scripts can be challenging, especially for non-developers like data scientists.
Simplifying the development process is essential to ensure correctness and reliability, with testing playing a crucial role.
In this article, we’ll explore how to build and use a tool we internally call the Test Runner
, which enables us to run unit and integration tests for Flink SQL scripts.
We’ll also delve into the architecture of the Test Runner
so you can learn how to approach unit and integration testing for Flink SQL scripts.
The code for the Test Runner isn't open source yet. We intend to publish it soon. Please reach out if you're interested or would like to contribute to the project.
Use Case
In the following steps, we’ll focus on a simple use case where we build a report on the number of German customers per city. We’ll use this SQL script as our test subject.
To achieve this, we can split the script into four main parts:
- Reading the stream of customer records from a Kafka topic;
- Filtering the records by country code;
- Grouping the filtered customers by city to create a customer count per city;
- Writing the result into a MySQL table.
Here is what the Flink SQL script looks like:
|
|
Unit Testing
Architecture
We need unit tests to verify the result of one or more statements from a Flink SQL script.
Here are some key details about the unit tests:
- Tests are implemented using a Java file that is compiled at runtime and executed by the
Test Runner
. - The Java file extends
UnitTestBase
, provided by theTest Runner
, which includes a few helper methods. - The
getScriptName()
method must return the name of the Flink SQL script file. - All Flink SQL tables are mocked using Apache Paimon.
During the test run, the Test Runner
mocks all WITH()
clauses and replaces them with an Apache Paimon connector that stores the results locally. This enables the Test Runner
to query all tables. For VIEWS
, a workaround is applied since Apache Paimon doesn’t yet support Flink views. By mocking all views as TEMPORARY VIEW
, Flink treats them as tables, which is compatible with Apache Paimon.
Configuration
To run the unit tests, the Test Runner
can be configured using the following environment variables:
|
|
Test Implementation
Next, we’ll use the following base Java file to implement our unit test:
|
|
Next, let’s prepare the test data by inserting rows into the source table.
We’ll do this by executing the following statements in the prepareTestData()
method:
|
|
Finally, to verify the output of each processing stage, we will use the following helper methods provided by the UnitTestBase
class:
|
|
We can therefore define the following assertions to verify that the customers were filtered correctly:
|
|
And, the following assertions to verify that the customers are counted correctly:
|
|
Results
If everything goes well, we’ll see a Pass
message in the result file:
|
|
Integration Testing
Architecture
We need integration tests whenever we want to test not only our operators but also the connectors for our sources and sinks. Integration tests are based on the idea of writing tests for the Flink SQL script directly in Flink SQL. The deployment of the job, as well as its execution and result retrieval, are managed by the Test Runner
.
Note: Currently, the Test Runner integration tests only support Apache Kafka sources.
The input file, which is a JSON file, contains the records that the Test Runner
will publish to Kafka.
The job processes the incoming Kafka records and publishes the result to a Kafka sink, from which the Test Runner
consumes and then performs its assertions.
As shown in the architecture diagram above, to run the SQL job, we leverage a SQL Runner. We modified the SQL Runner
to allow injection of environment variables, which perform a literal replacement in the SQL script. This enables us to include secrets in the SQL job.
Local Services
Let’s deploy all related services locally using Docker Compose by running the following command:
docker-compose up -d
The following services will be deployed:
- Kafka Broker, exposed on port
9092
; - Kafka UI, exposed on port
9001
; - Flink Job Manager, exposed on port
8081
; - Flink Task Manager.
Our integration test will use all these services to verify the Flink SQL script.
Configuration
To run the integration tests, the Test Runner
can be configured using the following environment variables:
|
|
Test Implementation
The integration tests will have the following folder structure:
📁 test-1
│
├── 📁 input
│ └── 📄 customers.json
│
├── 📁 sqlAssertions
│ ├── 📄 test-customers-positive.sql
│ └── 📄 test-customers-negative.sql
📁 test-2
...
The test-1
directory should reside within the directory specified via the environment variable INTEGRATION_TEST_DATA_DIR
, which may contain multiple sub-directories for other scripts.
Within this test specific folder, there is an input
folder, which contains a JSON array with all the records that will be published to the input topic, named the same as the JSON file.
Next, we have the sqlAssertions
directory, which contains the Flink SQL testing files.
Testing File
This is a SQL file that supports Flink SQL and a custom metadata assertion syntax.
It is intended to be used to read the records from the sinks of the Flink SQL script being tested and verify the results.
First, let’s initialize the source table for the test:
|
|
We currently support two assertion modes: 1. Negative Mode
|
|
The test will assert based on the provided metadata / SQL comments. In the example above, it enables positive mode using mode:positive
and sets the expected number of records to 1
with outputCount:1
.
Results
If everything goes well, we the following messages will be printed to the result file:
Test Suite: sqlassertrunner.SqlParserIntegrationTest:
- Result: Pass
Conclusion
In this article, we explored how to use the Test Runner to implement and run unit and integration tests for Flink SQL Scripts.
The full implementation of the tests is available in the Test Runner Git repository.