Skip to main content
Version: Current

Table integration

Enterprise

Overview

This documentation outlines how Nussknacker leverages Apache Flink's Table API to configure integrations focusing on the sources and sinks that can be defined using Flink SQL CREATE TABLE statements. It is based on the Apache Flink Table API & SQL Connectors documentationFlink Documentation.

What is Table API Integration?

The Flink's Table API provides a relational abstraction for unified stream and batch processing, allowing to define logical tables in Flink that map to data stored in external systems such as databases, message queues or file systems using SQL-like syntax. If the external system is a database, the CREATE TABLE provides Flink a mapping between data types used by the external database and those supported by Flink. In the case of the file systems and queues, the scope of the mapping information depends on whether the data format in which data is stored in the filesystem contains schema information. Check Flink Table API connectors information for more details.

Once defined, Nussknacker will generate a source and / or sink components and place them in the Component Palette. Note that Table API based sources do not allow to limit the scope of the ingested data with filters of any nature. For example, if you use a database table in a Table API definition, the corresponding source will ingest all records from the table. See Alternatives section for alternatives.

note

In the case of the storage engines like Apache Fluss, Apache Paimon, Iceberg, Hive the CREATE TABLE statement will create tables in those systems. In the case fo databases like MySQL, PostgreSQL, MariaDB the Flink SQL CREATE TABLE statement creates data (columns, data types) mappings between the source and the logical table created in Flink. No physical table-like objects are created.

Capabilities

Using Table API one can, you can:

  • Read from Diverse Sources: Ingest data from external systems like Apache Kafka, databases (e.g., MySQL, PostgreSQL), file systems, or message queues in real-time streaming mode or in batch mode (varies by connector).
  • Write to Various Sinks: Output processed data to external systems such as Kafka, databases, file systems, or key-value stores.
  • Digest Data from Change Data Capture (CDC) Logs: Capture and process incremental changes from databases using Flink CDC connectors, enabling real-time data synchronization or auditing.
  • Use Flexible Data Formats: Support multiple data formats (e.g., JSON, Avro, CSV, Parquet) for encoding and decoding data.

Alternatives

If you use Table API connector with catalog support (in a Flink's meaning of this term - check Flink documentation if not sure), Flink will provide the required data type mappings. Catalog based connectors are available for table formats like Paimon, Iceberg and selected relational databases. You can recognize that catalog data mappings exists for your database system if in the Integrations tab the database "tile" lists source and sink - not just an enricher.

If you need only a subset of records from a database table, or if the input records are a result of a join between multiple tables, you can define a corresponding view in the database and use this view to configure the Table API integration. However, this is not always an ideal approach. If there is a catalog support for your database system, you can use the Query Source component in such cases. This component allows to enter a custom SQL SELECT statement to specify the records to be ingested into the scenario. Check Query Source docs for more details.

If in the Integration tab of the Admin panel you choose system for which catalog support is available or data type mappings already exist in Nussknacker, all possible components which can be derived from this Integration will be available in the Component Palette: SQL Source, Table source, SQL enricher and sink. You do need to use Nussknacker Table integration in such cases.

Supported Connectors

We currently support connectors to PostgreSQL, MySQL and MQTT.

Integration parameters

NameDescription
Integration nameUnique name for the integration; the name of the generated enricher will start with the integration name.
Table definitionFlink SQL CREATE TABLE statement to create the data mappings between source system and Flink. Choose the CREATE TABLE template and configure as required.

Consult Flink Table API connectors information if in doubt how to configure this integration.