Skip to main content
Version: 1.15

Flink components

Sources, sinks and custom transformations are based on Flink API. In order to implement any of those you need to provide:

  • a Flink DataStreamSource in case of sources and a DataStream into DataStreamSink transformation in case of sinks
  • a Nussknacker specification

Sources

Standard implementation

The recommended way to implement a source is through StandardFlinkSource interface. Your source only has to implement a sourceStream method that provides DataStreamSource based on StreamExecutionEnvironment.

This approach provides a standard transformation of the input value into a Nussknacker Context and allows the implementation to customize these steps:

  • Timestamp watermark handler so that events are correctly processed downstream, for example to avoid (or force!) dropping late events by aggregates. Read more about notion of time and watermarks
  • Context initializer to emit more variables than #input. For example built-in Kafka sources emit #inputMeta variable with Kafka record metadata like: partition, topic, offset, etc. The other example could be a file source that emits current line number as a new variable along with the content (as #input variable)

Generic implementation

Nussknacker also provides a more generic interface for implementing sources - FlinkSource. Instead of providing a Flink DataStreamSource, you can provide an arbitrary DataStream[Context] directly. However, you have to remember to assign timestamps, watermarks, and initialize the context.

Test support

To enable testing functionality in scenarios using your source implementation, your source needs to implement certain test-specific interfaces:

  • Basic test support - FlinkSourceTestSupport - besides the more general SourceTestSupport, the implementation:
    • has to provide a Flink TypeInformation for serializing/deserializing data emitted from source (e.g. #input)
    • optionally can provide a TimestampWatermarkHandler that will be used only for tests
  • Test data generation - TestDataGenerator
  • Ad hoc test support - TestWithParametersSupport

Read more about testing functionality in this section.

Specification

Your Nussknacker source component specification should be a SourceFactory returning your source implementation.

Examples

Sources for various systems like RabbitMQ, JDBC, etc. do not necessarily have to be implemented from scratch. Flink comes with simple sources already predefined and connectors with third-party systems. All of them can be used to implement a Nussknacker source.

Sinks

Implementation

Sinks are easier to implement than sources. Nussknacker provides a factory for sinks that take only one parameter. The only thing that has to be provided is a Flink SinkFunction.

Sinks with multiple parameters can be implemented using FlinkSink. The following things are required:

  • prepareValue - a method that turns DataStream[Context] into DataStream[ValueWithContext[Value]] containing a final, evaluated value for the sink
  • registerSink - a method that turns DataStream[ValueWithContext[Value]] into DataStreamSink. It's the place where a Flink SinkFunction should be registered

Specification

Similarly to sources, all sinks returned by SinkFactory have to implement FlinkSink (or its subtrait BasicFlinkSink).

Examples

Flink provides basic sinks and connectors which can be used while implementing own Nussknacker sinks.

Custom stream transformations

Custom transformation can arbitrarily change DataStream[Context], it is implemented with FlinkCustomStreamTransformation. Great examples of custom transformers are aggregates. See here how components like previousValue, delay and aggregates are implemented.

Common details

Access to metadata like node id or scenario name and various helpers is provided by FlinkCustomNodeContext.

Special care should be taken to handle:

  • lifecycle - preparing the operator (source, sink or functions registered by custom transformers), closing resources, handling failures and restoring the state. See Flink's operators lifecycle for more details here
  • exceptions, e.g. during deserialization, since any thrown and unhandled exception by the source, causes the Flink job to restart.

⚠️ Flink components should not extend Lifecycle - it won't be handled properly