Skip to main content
Version: 1.8

Migration guide

To see the biggest differences please consult the changelog.

In version 1.8.0

Scenario authoring changes

  • #3924
    • Fixup: {} is now interpreted as "allow everything schema", not as "object schema". Objects schemas have to have declared "type": "object".
    • Unknown is now allowed on sinks in both validation modes if output schema is "everything allowed schema".

Code API changes

  • #3924 - changes to SwaggerTyped hierarchy
    • SwaggerMap(valuesType) -> SwaggerObject(Map.empty, additionalProperties = AdditionalPropertiesEnabled(valuesType))
    • AdditionalPropertiesSwaggerTyped -> AdditionalPropertiesEnabled
    • AdditionalPropertiesWithoutType -> AdditionalPropertiesEnabled(SwaggerAny)
    • SwaggerRecursiveSchema/SwaggerUnknownFallback -> SwaggerAny

Other changes

  • #3835 Removed Signals and QueryableState. This change affects:

    • Configuration
    • Components and DeploymentManager API
    • REST API
  • #3823, #3836, #3843 - scenarios with multiple sources can be tested from file

    • TestDataGenerator#generateTestData returns JSON test records instead of raw bytes. Test records are serialized to a file by designer Test record can optionally contain timestamp which is used to sort records generated by many sources
    • TestDataParser was replaced with TestRecordParser that turns a single JSON test record into a source record
    • TestData.newLineSeparated helper was removed. Scenario test records have to be created explicitly. Each scenario test record has assigned source
    • DeploymentManager#test takes ScenarioTestData instead of TestData
    • Designer configuration testDataSettings.testDataMaxBytes renamed to testDataMaxLength
  • #3916 Designer configuration environmentAlert.cssClass renamed to environmentAlert.color

  • #3922 Bumps: jwks: 0.19.0 -> 0.21.3, jackson: 2.11.3 -> 2.13.4

  • #3929 From now, SchemaId value class is used in every place where schema id was represented as an Int. For conversion between SchemaId and Int use SchemaId.fromInt and SchemaId.asInt. Use ConfluentUtils.toSchemaWithMetadata instead of SchemaWithMetadata.apply for conversion between Confluent's SchemaMetadata and ours SchemaWithMetadata.

  • #3948 Now, we are less dependent from Confluent schema registry. To make it possible, some kafka universal/avro components refactors were done. Most important changes in public API:

    • ConfluentSchemaBasedSerdeProvider.universal was replaced by UniversalSchemaBasedSerdeProvider.create

    Some other, internal changes:

    • Non-confluent classes renamed and moved to desired packages
    • Extracted new class: SchemaIdFromMessageExtractor to make Confluent logic explicit and moved to top level
    • Extracted SchemaValidator to make Confluent logic explicit and be able to compose
    • Some renames: ConsumerRecordUtils -> KafkaRecordUtils
    • RecordDeserializer -> AvroRecordDeserializer (also inheritance replaced by composition)
    • (De)SerializerFactory - easier abstractions
    • ConfluentSchemaRegistryFactory is not necessary now - removed

In version 1.7.0

Scenario authoring changes

  • #3701 Right now access in SpEL to not existing field on TypedMap won't throw exception, just will return null
  • #3727 Improvements: Change RR Sink validation way:
    • Added param Value validation mode at RR response component
    • We no longer support nullable param from Everit schema. Nullable schema are supported by union with null e.g. `["null", "string"]

Configuration changes

  • #3768 request-response-embedded and streaming-lite-embedded DeploymentManager types where replaced by one lite-embedded DeploymentManager type with two modes: streaming and request-response like it is done in lite-k8s case

Code API changes

  • #3560, #3595 Remove dependency on flink-scala. In particular:
    • Switched from using scala.DataStream to datastream.DataStream. Some tools exclusive to scala datastreams are available in engine.flink.api.datastream
    • Scala based TypeInformation derivation is no longer used, for remaining cases flink-scala-utils module is provided (probably will be removed in the future)
  • #3680 SubprocessRef::outputVariableNames type is changed from Option[Map[String,String]] with default None, to Map[String,String] with default Map.empty
  • #3692 Rename mockedResult to externalInvocation in test results collectors.
  • #3606 Removed nussknacker-request-response-app. As a replacement you can use:
    • nussknacker-request-response-app in version <= 1.6
    • Lite K8s engine with request-response processing mode
    • lite-embedded Deployment Manager with request-response processing mode
  • #3610 Removed deprecated code. For details see changes in pull request.
  • #3607 Request-response jsonSchema based encoder:
    • ValidationMode moved to package pl.touk.nussknacker.engine.api.validation in nussknacker-components-api
    • BestEffortJsonSchemaEncoder moved to package pl.touk.nussknacker.engine.json.encode in nussknacker-json-utils
  • #3738 Kafka client libraries upgraded to 3.2.3. If using older Flink version, make sure to use 2.8.x client libraries. For Flink versions 1.15.0-1.15.2 include also fixed KafkaMetricWrapper
  • #3668 Method runWithRequests of RequestResponseTestScenarioRunner (returned by TestScenarioRunner.requestResponseBased()) now returns ValidatedNel with scenario compilation errors instead of throwing exception in that case

REST API changes

  • #3576 /processes endpoint without query parameters returns all scenarios - the previous behaviour was to return only unarchived ones. To fetch only unarchived scenarios isArchived=false query parameter has to be passed.

Other changes

  • #3824 Due to data serialization fix, Flink scenarios using Kafka sources with schemas may be incompatible and may need to be restarted with clean state.

In version 1.6.0

  • #3440 Feature: allow to define fragment's outputs
    • Right now using fragments in scenario is changed. We have to provide each outputName for outputs defined in fragment.

Scenario authoring changes

  • #3370 Feature: scenario node category verification on validation From now import scenario with nodes from other categories than scenario category will be not allowed.
  • #3436 Division by zero will cause validation error. Tests that rely on 1/0 to generate exceptions should have it changed to code like 1/{0, 1}[0]
  • #3473 JsonRequestResponseSinkFactory provides also 'raw editor', to turn on 'raw editor' add SinkRawEditorParamName -> "true"
  • #3608 Use ZonedDateTime for date-time JsonSchema format, OffsetTime for time format.

Code API changes

  • #3406 Migration from Scalatest 3.0.8 to Scalatest 3.2.10 - if necessary, see the Scalatest migration guides, and
  • #3431 Renamed helper-utils to default-helpers, separated MathUtils from components-utils to math-utils, removed dependencies from helper-utils
  • #3420 DeploymentManagerProvider.typeSpecificInitialData takes deploymentConfig Config now
  • #3493, #3582 Added methods DeploymentManagerProvider.additionalPropertiesConfig, DeploymentManagerProvider.additionalValidators
  • #3506 Changed LocalDateTime to Instant in OnDeployActionSuccess in listener-api
  • #3513 Replace EspProcess with CanonicalProcess in all parts of the API except for the compiler.
  • #3545 TestScenarioRunner.flinkBased should be used instead of NuTestScenarioRunner.flinkBased. Before this, you need to import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
  • #3386 Changed CustomProcessValidator validate method. It now receives CanonicalProcess instead of DisplayableProcess and returns ValidatedNel[ProcessCompilationError, Unit] instead of ValidationResult. Moved CustomProcessValidator from module nussknacker-restmodel in package validation to nussknacker-extensions-api.
  • #3586 Module nussknacker-ui was renamed to nussknacker-designer, ui.conf was renamed to designer.conf, defaultUiConfing.conf renamed to defaultDesignerConfig.conf

REST API changes

  • #3506 Dates returned by REST API (createdAt, modifiedAt, createDate) are now returned in Zulu time, with timezone indication. This affects e.g. /api/procecesses, /api/processes/{scenarioId}, /api/processes/{scenarioId}/activity
  • #3542 Node additional info path renamed from nodes/{scenarioId}/additionalData to nodes/{scenarioId}/additionalInfo

Scenario API changes

  • #3471, #3553 RequestResponseMetaData(path) is changed to RequestResponseMetaData(slug). V1_033__RequestResponseUrlToSlug migration is ready for that, the change also applies to Scenario DSL.
  • #3513 Scenario DSL returns CanonicalProcess instead of EspProcess.
  • #3630 SubprocessOutput changed to SubprocessUsageOutput, changes in OutputVar definition

Configuration changes

  • #3425 Deployment Manager for request-response-embedded configuration parameters changed:
    • interface -> http.interface
    • port -> http.port
    • definitionMetadata -> request-response.definitionMetadata
  • #3502 Refactor of KafkaProperties: kafkaAddress property has been deprecated. Please provide kafkaProperties."bootstrap.servers" instead

Other changes

  • #3441 Updated Flink 1.14.5 -> 1.15.2. Some Flink artefacts no longer have Scala version. Test using Flink may need to disable checkpointing or reduce time between checkpoints to prevent timeouts or long waits for tasks to finish.

In version 1.5.0

Configuration changes

  • #2992 deploySettings changed to deploymentCommentSettings, now when specified require you to also specify field validationPattern, specifying exampleComment is optional.
  • commentSettings fields modified. matchExpression changed to substitutionPattern, link changed to substitutionLink.
  • #3165 Config is not exposed over http (GET /api/app/config/) by default. To enable it set configuration enableConfigEndpoint to true.
  • #3223 OAuth2 configuration defaultTokenExpirationTime changed to defaultTokenExpirationDuration
  • #3263 Batch periodic scenarios carry processing type to distinguish scenarios with different categories. For existing scenarios processing type is migrated to default. Set deploymentManager.processingType to default or update periodic scenarios table with actual processing type value - ideally it should be same value as the periodic engine key in scenarioTypes.

Code API changes

  • #2992 OnDeployActionSuccess in ProcessChangeEvent now requires instance of Option[Comment] instead of Option[String] as parameter with deploymentComment information. Added abstract class Comment in listener-api.
  • #3136 Improvements: Lite Kafka testkit
    • ConfluentUtils.serializeRecordToBytesArray replaced by ConfluentUtils.serializeDataToBytesArray
    • ConfluentUtils.deserializeSchemaIdAndRecord replaced by ConfluentUtils.deserializeSchemaIdAndData
  • #3178 Improvements: more complex test scenario runner result:
    • Right now each method from TestScenarioRunner should return ValidatedNel[ProcessCompilationError, RunResult[R]] where:
      • Invalid is representation of process compilation errors
      • Valid is representation of positive and negative scenario running result
  • #3255 TestReporter util class is safer to use in parallel tests, methods require passing scenario name
  • #3265 #3288 #3297 #3299#3309#3316 #3322 #3328 #3330 Changes related with UniversalKafkaSource/Sink:
    • RuntimeSchemaData is generic - parametrized by ParsedSchema (AvroSchema and JsonSchema is supported).
    • NkSerializableAvroSchema renamed to NkSerializableParsedSchema
    • SchemaWithMetadata wraps ParsedSchema instead of Avro Schema.
    • SchemaRegistryProvider refactoring:
      • rename SchemaRegistryProvider to SchemaBasedSerdeProvider
      • decouple SchemaRegistryClientFactory from SchemaBasedSerdeProvider
    • KafkaAvroKeyValueDeserializationSchemaFactory renamed to KafkaSchemaBasedKeyValueDeserializationSchemaFactory
    • KafkaAvroValueSerializationSchemaFactory renamed to KafkaSchemaBasedValueSerializationSchemaFactory
    • KafkaAvroKeyValueSerializationSchemaFactory renamed to KafkaSchemaBasedKeyValueSerializationSchemaFactory
  • #3253 DeploymentManager has separate validate method, which should perform initial scenario validation and return reasonably quickly (while deploy can e.g. make Flink savepoint etc.)
  • #3313 Generic types handling changes:
    • Typed.typedClass(Class[_], List[TypingResult]) is not available anymore. You should use more explicit Typed.genericTypeClass instead
    • We check count of generic parameters in Typed.genericTypeClass - wrong number will cause throwing exception now
    • We populate generic parameters by correct number of Unknown in non-generic aware versions of Typed factory methods like Typed.apply or Typed.typedClass
  • #3071 More strict Avro schema validation:
    • ValidationMode.allowOptional was removed, instead of it please use ValidationMode.lax
    • ValidationMode.allowRedundantAndOptional was removed, instead of it please use ValidationMode.lax
    • Changes of ValidationMode, fields: acceptUnfilledOptional and acceptRedundant were removed
  • #3376 FlinkKafkaSource.flinkSourceFunction, FlinkKafkaSource.createFlinkSource and DelayedFlinkKafkaConsumer.apply takes additional argument, FlinkCustomNodeContext now
  • #3272 KafkaZookeeperServer renamed to EmbeddedKafkaServer, zooKeeperServer field changed type to Option and is hidden now.
  • #3365 Numerous renames:
    • module nussknacker-avro-components-utils -> nussknacker-schemed-kafka-components-utils
    • module nussknacker-flink-avro-components-utils -> nussknacker-flink-schemed-kafka-components-utils
    • package pl.touk.nussknacker.engine.avro -> pl.touk.nussknacker.engine.schemedkafka
    • object KafkaAvroBaseComponentTransformer -> KafkaUniversalComponentTransformer
  • #3412 More strict filtering method types. Methods with parameters or result like Collection[IllegalType] are no longer available in SpEl.
  • #3542 Numerous renames:
    • trait NodeAdditionalInfo -> AdditionalInfo,
    • class MarkdownNodeAdditionalInfo -> MarkdownAdditionalInfo
    • trait NodeAdditionalInfoProvider -> AdditionalInfoProvider - the SPI provider's configuration files must be renamed from pl.touk.nussknacker.engine.additionalInfo.NodeAdditionalInfoProvider to pl.touk.nussknacker.engine.additionalInfo.AdditionalInfoProvider
    • method AdditionalInfoProvider.additionalInfo renamed to nodeAdditionalInfo and new method added propertiesAdditionalInfo

REST API changes

  • #3169 API endpoint /api/app/healthCheck returning short JSON answer with "OK" status is now not secured - before change it required to be an authenticated user with "read" permission.

Scenario authoring changes

  • #3187 #3224 Choice component replaces Switch component. "Default" choice edge type, exprVal and expression are now deprecated. For existing usages, you don't need to change anything. For new usages, if you want extract value e.g. to simplify choice conditions, you need to define new local variable before choice using variable component. "Default" choice edge type can be replaced by adding "true" condition at the end of list of conditions

Breaking changes

  • #3328 Due to addition of support for different schema type (AvroSchema and JsonSchema for now) serialization format of NkSerializableParsedSchema has changed. Flink state compatibility of scenarios which use Avro sources or sinks has been broken.
  • #3365 Due to renames (see section Code API changes) Flink state compatibility of scenarios which use Avro sources or sinks has been broken.

Other changes

  • #3249#3250 Some kafka related libraries were bumped: Confluent 5.5->7.2, avro 1.9->1.11, kafka 2.4 -> 3.2. It may have influence on your custom components if you depend on kafka-components-utils or avro-components-utils module
  • #3376 Behavior of Flink's Kafka deserialization errors handling was changed - now instead of job failure, invalid message is omitted and configured exceptionHandler mechanism is used.

In version 1.4.0

Configuration changes

  • security.rolesClaim changed to security.rolesClaims, type changed to list of strings
  • kafka.schemaRegistryCacheConfig configuration entry was added - it was hardcoded before. Default value of kafka.schemaRegistryCacheConfig.availableSchemasExpirationTime was changed from 1 minute to 10 seconds which will cause more often schema cache invalidation
  • #3031 Attachments are now stored in database (see more in section Other changes). attachmentsPath was removed. Optional config attachments.maxSizeInBytes was introduced with default value of 10mb

Code API changes

  • #2983 Extract Permission to extensions-api
    • Moved (security module) to (extension-api module)
  • #3040 Deprecated pl.touk.nussknacker.engine.api.ProcessListener.sinkInvoked method. Switch to more general endEncountered method.
  • #3076 new implicit parameter componentUseCase: ComponentUseCase was added to invoke() method of all services extending EagerServiceWithStaticParameters

Other changes

  • #3031 Attachments are now stored in database. As this feature was rarely used, automatic migration of attachments from disk to db is not provided. To stay consistent db table process_attachments had to be truncated.

Breaking changes

  • #3029 KafkaConfig has new field schemaRegistryCacheConfig: SchemaRegistryCacheConfig. Flink state compatibility has been broken.
  • #3116 Refactor SchemaRegistryClientFactory so it takes dedicated config object instead of KafkaConfig. This change minimizes chance of future Flink state compatibility break. SchemaIdBasedAvroGenericRecordSerializer is serialized in Flink state, so we provide it now with as little dependencies as necessary. Flink state compatibility has been broken again.
  • #3363 Kafka consumer no longer set auto.offset.reset to earliest by default. For default configuration files, you can use KAFKA_AUTO_OFFSET_RESET env variable to easily change this setting.

In version 1.3.0

Code API changes

  • #2741 #2841 Remove custom scenario provides some changes on API:
    • Replace ProcessDeploymentData by CanonicalProcess (as VO)
    • Replace scenario jsonString by CanonicalProcess at DeploymentManager, ProcessConfigEnricherInputData
  • #2773 Using VersionId / ProcessId / ProcessName instead of Long or String:
    • PullProcessRepository API was changed, right now we use VersionId instead of Long
  • #2830 RunMode is renamed to ComponanteUseCase and Normal value is split into: EngineRuntime, Validation, ServiceQuery, TestDataGeneration. RunMode.Test becomes ComponanteUseCase.TestRuntime
  • #2825, #2868 #2912 API modules changes:
    • Extracted new modules:
      • nussknacker-scenario-api with all scenario API parts from api and interpreter
      • nussknacker-components-api (and nussknacker-lite-components-api, nussknacker-flink-components-api etc.), which contain API for creating components
      • nussknacker-common-api - base value classes shared between scenario-api and components-api like NodeId, Metadata etc.
      • nussknacker-extensions-api - API of extensions other than components
    • Because of that, some changes in code were also introduced:
      • NodeId moved from pl.touk.nussknacker.engine.api.context.ProcessCompilationError to pl.touk.nussknacker.engine.api
      • NodeExpressionId, DefaultExpressionId and branchParameterExpressionId moved from pl.touk.nussknacker.engine.api.context.ProcessCompilationError to pl.touk.nussknacker.engine.graph.expression
      • JobData no longer contains DeploymentData, which is not accessible for components anymore
      • DisplayJson, WithJobData, MultiMap moved to utils
      • Some methods from API classes (e.g. Parameter.validate) and classes (InterpretationResult) moved to interpreter
      • DeploymentManagerProvider.createDeploymentManager takes now BaseModelData as an argument instead of ModelData. If you want to use this data to invoke scenario, you should cast it to invokable representation via: import ModelData._; modelData.asInvokableModelData
  • #2878 2898 #2924 Cleaning up of -utils modules
    • Extracted internal classes, not intended to be used in extensions to nussknacker-internal-utils module
    • Extracted component classes, not used directly by runtime/designer to nussknacker-components-utils module
    • Extracted kafka component classes, not used directly by lite-kafka-runtime/kafka-test-utils to nussknacker-kafka-components-utils
    • Moved some classes that are in fact part of API to -api modules (e.g. ToJsonEncoder)
    • Module renames:
      • nussknacker-avro-util to nussknacker-avro-components-utils
      • nussknacker-flink-avro-util to nussknacker-flink-avro-components-utils
      • nussknacker-flink-kafka-util to nussknacker-flink-kafka-components-utils
      • nussknacker-flink-util to nussknacker-flink-components-utils
      • nussknacker-request-response-util to nussknacker-request-response-components-utils
      • nussknacker-model-util to nussknacker-helpers-utils
    • Minor changes in code:
      • Use val docsConfig = new DocsConfig(config); import docsConfig._ instead of implicit val docsConfig = (...); import DocsConfig._
      • Some components specific methods are not available from KafkaUtils. Instead, they are available from KafkaComponentsUtils
      • ToJsonEncoder.encoder takes Any => Json function instead of BestEffortJsonEncoder as a parameter
  • #2907 Hide some details of metrics to utils-internal (InstantRateMeter, InstantRateMeterWithCount), use method added to MetricsProviderForScenario
  • #2916 Changes in ProcessState API.
    • Six similar methods creating ProcessState based on StateStatus and some other details merged to one.
      • Methods removed:
        • Two variants of ProcessState.apply taking ProcessStateDefinitionManager as a parameter
        • SimpleProcessState.apply
        • Two variants of ProcessStatus.simple
        • ProcessStatus.createState taking ProcessStateDefinitionManager as a parameter
      • Method added: ProcessStateDefinitionManager.processState with some default parameter values
    • ProcessStatus class is removed at all. All methods returning ProcessState by it moved to SimpleProcessStateDefinitionManager and removed previousState: Option[ProcessState] from it. If you want to keep previous state's deployment details and only change "status details" just use processState.withStatusDetails method
    • ProcessState, CustomAction and its dependencies moved from nussknacker-deployment-manager-api to nussknacker-scenario-deployment-api, restmodel module not depend on deployment-manager-api anymore
    • #2969 Action ProcessActionType.Deploy is now available by default for scenarios in SimpleStateStatus.DuringDeploy state. Mind this if you depend on OverridingProcessStateDefinitionManager or SimpleProcessStateDefinitionManager, and specifically on theirs statusActions method. As an exception, implementation for Flink FlinkProcessStateDefinitionManager stays the same as before (only ProcessActionType.Cancel is possible in this state), but this may be unified in the future.

Other changes

  • #2886 This change can break previous flink snapshot compatibility. Restoring state from previous snapshot asserts that restored serializer UID matches current serializer UID. This change ensures that in further release deployments UIDs persisted within snapshots are not re-generated in runtime.
  • #2950 Remove MATH helper, use NUMERIC methods (they work better with some number types conversions)

In version 1.2.0

Configuration changes

  • #2483 COUNTS_URL environment variable is not INFLUXDB_URL, without query path part.
  • #2493 kafka configuration should be moved to components provider configuration - look at components.kafka in dev-application.conf for example
  • #2624 Default name for process tag is now scenario. This affects metrics and count functionalities. Please update you Flink/Telegraf setup accordingly (see nussknacker-quickstart for details). If you still want to use process tag (e.g. you have a lot of dashboards), please set countsSettings.metricsConfig.scenarioTag setting to process Also, dashboard links format changed, see documentation for the details.
  • #2645 Default models: genericModel.jar, liteModel.jar. were merged to defaultModel.jar, managementSample.jar was renamed to devModel.jar. If you use defaultModel.jar it's important to include flinkExecutor.jar explicitly on model classpath.

Scenario authoring changes

  • #2564 Flink union now takes only 'Output expression' parameters for branches (previously 'value' parameter), output variable must be of the same type, if you want to distinguish source branch in output variable please use map variable, example in Basic Nodes docs.

Other changes

  • #2554 Maven artifact nussknacker-kafka-flink-util become nussknacker-flink-kafka-util and nussknacker-avro-flink-util become nussknacker-flink-avro-util. General naming convention is nussknacker-$runtimeType-$moduleName. Components inside distribution changed layout to components(/$runtimeType)/componentName.jar e.g. components/flink/kafka.jar or components/openapi.jarKafkaSource become FlinkKafkaSource, ConsumerRecordBasedKafkaSource become FlinkConsumerRecordBasedKafkaSource, KafkaSink become FlinkKafkaSink, KafkaAvroSink become FlinkKafkaAvroSink
  • #2535, #2625, #2645 Rename standalone to request-response:
    • Renamed modules and artifacts
    • StandaloneMetaData is now RequestResponseMetaData
    • Move request-response modules to base dir.
    • standalone in package names changed to requestresponse
    • Standalone in class/variable names changed to RequestResponse
    • DeploymentManager/Service uses dedicated format of status DTO, instead of the ones from deployment-manager-api
    • Removed old, deprecated jarPath settings, in favour of classPath used in other places
    • Extracted nussknacker-lite-request-response-components module
  • #2582 KafkaUtils.toProducerProperties setup only basic properties now (bootstrap.servers and serializers) - before the change it was setting options which are not always good choice (for transactional producers wasn't)
  • #2600 ScenarioInterpreter, ScenarioInterpreterWithLifecycle now takes additional generic parameter: Input. ScenarioInterpreter.invoke takes ScenarioInputBatch which now contains list of SourceId -> Input instead of SourceId -> Context. Logic of Context preparation should be done in LiteSource instead of before ScenarioInterpreter.invoke. invocation It means that LiteSource also takes this parameter and have a new method createTransformation.
  • #2635 ContextInitializer.initContext now takes ContextIdGenerator instead of nodeId and returns just a function with strategy of context initialization instead of serializable function with Lifecycle. To use it with Flink engine, use FlinkContextInitializingFunction wrapper.
  • #2649 DeploymentManagerProvider takes new ProcessingTypeDeploymentService class as an implicit parameter
  • #2564 'UnionParametersMigration' available to migrate parameter name from 'value' to 'Output expression' - please turn it on you are using 'union' like component
  • #2645 Simplify structure of available models (implementations of ProcessConfigCreator). defaultModel.jar and components should be used instead of custom implementations of ProcessConfigCreator, the only exception is when one wants to customize ExpressionConfig. Also, nussknacker-flink-engine module became nussknacker-flink-executor.
  • #2651 ValidationContext.clearVariables now clears also parent reference. Important when invoked inside fragments.
  • #2673 KafkaZookeeperUtils renamed to KafkaTestUtils, it doesn't depend on ZooKeeper anymore.
  • #2686 ServiceWithStaticParameters renamed to EagerServiceWithStaticParameters.
  • #2695 nodeId replaced with NodeComponentInfo in NuExceptionInfo. Simple wrapper class which holds the same nodeId and also componentInfo. Migration is straightforward, just put nodeId into the new case class:
    • NuExceptionInfo(None, exception, context) => stays the same
    • NuExceptionInfo(Some(nodeId), exception, context) => NuExceptionInfo(Some(NodeComponentInfo(nodeId, None)), exception, context)
      • if an exception is thrown inside the component, additional information can be provided:
        • for base component (like filter or split): NodeComponentInfo.forBaseNode("nodeId", ComponentType.Filter)
        • for other: NodeComponentInfo("nodeId", ComponentInfo("kafka-avro-source", ComponentType.Source))
    • The same migration has to be applied to ExceptionHandler.handling() method.
  • #2824 'ProcessSplitterMigration' available to migrate node name from 'split' to 'for-each' (see #2781)- please turn it on if you are using 'split' component

In version 1.1.0



  • A lot of internal refactoring was made to separate code/API specific for Flink. If your deployment has custom components pay special attention to:
    • Lifecycle management
    • Kafka components
    • Differences in artifacts and packages
  • Some of the core dependencies: cats, cats-effect and circe were upgraded. It affects mainly code, but it may also have impact on state compatibility and performance.
  • Default Flink version was bumped do 1.14 - see on how to run Nu on older Flink versions.
  • Execution of SpEL expressions is now checked more strictly, due to security considerations. These checks can be overridden with custom ExpressionConfig.
  • Apart from that:
    • minor configuration naming changes
    • removal of a few of minor, not documented features (e.g. SQL Variable)
  • #2208 Upgrade, cats, cats-effects, circe. An important nuisance: we didn't upgrade sttp, so we cannot depend on "com.softwaremill.sttp.client" %% "circe". Instead, the code is copied. Make sure you don't include sttp-circe integration as transitive dependency, but use class from http-utils instead.
  • #2176 EnrichDeploymentWithJarDataFactory was replaced with ProcessConfigEnricher.
  • #2278 SQL Variable is removed
  • #2280 Added optional defaultValue field to Parameter. In GenericNodeTransformation can be set to None - values will be determined automatically.
  • #2289 Savepoint path in /api/adminProcessManagement/deploy endpoint is passed as a savepointPath parameter instead of path segment.
  • #2293 Enhancement: change nodeCategoryMapping configuration to componentsGroupMapping
  • #2301 #2620 GenericNodeTransformation.initialParameters was removed - now GenericNodeTransformation.contextTransformation is used instead. To make Admin tab -> Invoke service form working, use WithLegacyStaticParameters trait
  • #2409 JsonValidator is now not determined by default based on JsonParameterEditor but must be explicitly defined by @JsonValidator annotation
  • #2304 Upgrade to Flink 1.14. Pay attention to Flink dependencies - in some (e.g. runtime) there is no longer scala version.
  • #2295 FlinkLazyParameterFunctionHelper allows (and sometimes requires) correct exception handling
  • #2307 Changed nussknacker-kafka module name to nussknacker-kafka-util
  • #2310 Changed nussknacker-process module name to nussknacker-flink-engine
  • #2300 #2343 Enhancement: refactor and improvements at components group:
    • Provided ComponentGroupName as VO
    • SingleNodeConfig was renamed to SingleComponentConfig and moved from pl.touk.nussknacker.engine.api.process package to pl.touk.nussknacker.engine.api.component
    • Configuration category in node configuration was replaced by componentGroup
    • Configuration nodes in model configuration was replaced by componentsUiConfig
    • Additional refactor: ProcessToolbarService moved from pl.touk.nussknacker.ui.service package to pl.touk.nussknacker.ui.process
    • Additional refactor: ProcessToolbarService moved from pl.touk.nussknacker.ui.service package to pl.touk.nussknacker.ui.process
    • DefinitionPreparer was renamed to ComponentDefinitionPreparer
    • NodesConfigCombiner was removed
    • REST API /api/processDefinitionData/* response JSON was changed:
      • nodesToAdd was renamed to componentGroups
      • posibleNode was renamed to components
      • nodesConfig was renamed to componentsConfig
      • config icon property from componentsConfig right now should be relative to http.publicPath e.g. /assets/components/Filter.svg (before was just Filter.svg) or url (with http / https)
  • #2346 Remove endResult from Sink in graph.
    • Sink no longer defines testOutput method - they should be handled by respective implementations
    • Change in definition of StandaloneSink previously StandaloneSinkWithParameters, as output always has to be computed with sink parameters now
    • Changes in definition of FlinkSink, to better handle capturing test data
    • Removal of .sink method in GraphBuilder - use .emptySink if suitable
  • #2331
    • KafkaAvroBaseTransformer companion object renamed to KafkaAvroBaseComponentTransformer
    • KryoGenericRecordSchemaIdSerializationSupport renamed to GenericRecordSchemaIdSerializationSupport
  • #2305 Enhancement: change processingTypeToDashboard configuration to scenarioTypeToDashboard
  • #2296 Scenarios & Fragments have separate TypeSpecificData implementations. Also, we remove isSubprocess field from process JSON, and respectively from MetaData constructor. See corresponding db migration V1_031__FragmentSpecificData.scala
  • #2368 WithCategories now takes categories as an Option[List[String]] instead of List[String]. You should wrap given list of categories with Some(...). None mean that component will be available in all categories.
  • #2360 union, union-memo and dead-end components were extracted from model/genericModel.jar to components/baseComponents.jar If you have your own application.conf which changes scenarioTypes, you should add "components/baseComponents.jar" entry into classPath array
  • #2337 Extract base engine from standalone
    • Common functionality of base engine (i.e. microservice based, without Flink) is extracted to base-api and base-runtime
    • new API for custom components (pl.touk.nussknacker.engine.baseengine.api.customComponentTypes)
    • StandaloneProcessInterpreter becomes StandaloneScenarioEngine
    • Replace Either[NonEmptyList[Error], _] with ValidatedNel[Error, _] as return type
    • StandaloneContext becomes EngineRuntimeContext
  • #2349 queryable-state module was removed, FlinkQueryableClient was moved to nussknacker-flink-manager. PrettyValidationErrors, CustomActionRequest and CustomActionResponse moved from nussknacker-ui to nussknacker-restmodel.
  • #2361 Removed security dependency from listener-api. LoggedUser replaced with dedicated class in listener-api.
  • #2385 Deprecated CustomStreamTransformer.clearsContext was removed. Use
def execute(...) =
.definedBy(ctx => Valid(ctx.clearVariables ...))


  • #2348 #2459 #2486#2490 #2496 #2536 Introduce KafkaDeserializationSchema and KafkaSerializationSchema traits to decouple from flink dependency. move KeyedValue to nussknacker-util, move SchemaRegistryProvider to utils/avro-util To move between nussknacker's/flink's Kafka(De)serializationSchema use wrapToFlink(De)serializatioinSchema from FlinkSerializationSchemaConversions. SchemaRegistryProvider and ConfluentSchemaRegistryProvider is now in nussknacker-avro-util module. FlinkSourceFactory is gone - use SourceFactory instead. KafkaSourceFactory, KafkaAvroSourceFactory, KafkaSinkFactory, KafkaAvroSinkFactory, and ContextIdGenerator not depends on flink. Extracted KafkaSourceImplFactory, KafkaSinkImplFactory and KafkaAvroSinkImplFactory which deliver implementation of component (after all validations and parameters evaluation). Use respectively: FlinkKafkaSourceImplFactory, FlinkKafkaSinkImplFactory and FlinkKafkaAvroSinkImplFactory to deliver flink implementations. Moved non-flink specific serializers, deserializers, BestEffortAvroEncoder, ContextIdGenerators and RecordFormatters to kafka-util/avro-util KafkaDelayedSourceFactory is now DelayedKafkaSourceFactory. FixedRecordFormatterFactoryWrapper moved to RecordFormatterFactory
  • #2477 FlinkContextInitializer and FlinkGenericContextInitializer merged to ContextInitializer, BasicFlinkContextInitializer and BasicFlinkGenericContextInitializer merged to BasicContextInitializer. All of them moved to pl.touk.nussknacker.engine.api.process package. ContextInitializer.validationContext returns ValidatedNel - before this change errors during context initialization weren't accumulated. ContextInitializingFunction now is a scala's function instead of Flink's MapFunction. You should wrap it with RichLifecycleMapFunction to make sure that it will be opened correctly by Flink. InputMeta was moved to kafka-util module.
  • #2389 #2391 deployment-manager-api module was extracted and DeploymentManagerProvider, ProcessingTypeData and QueryableClient was moved from interpreter into it. DeploymentManager, CustomAction and ProcessState was moved from api to deployment-manager-api. ProcessingType was moved to rest-model package.
  • #2393 Added ActorSystem, ExecutionContext and SttpBackend into DeploymentManagerProvider.createDeploymentManager. During clean ups also was removed nussknacker-http-utils dependency to async-http-client-backend-future and added SttpBackend to CountsReporterCreator.createReporter arguments.
  • #2397 Common EngineRuntimeContext lifecycle and MetricsProvider. This may cause runtime consequences - make sure your custom services/listeners invoke open/close correctly - especially in complex inheritance scenarios.
    • Lifecycle has now EngineRuntimeContext as parameter, JobData is embedded in it.
    • TimeMeasuringService replaces GenericTimeMeasuringService, Flink/Standalone flavours of TimeMeasuringService are removed
    • EngineRuntimeContext and MetricsProvider moved to base API, RuntimeContextLifecycle moved to base API as Lifecycle
    • GenericInstantRateMeter is now InstantRateMeter
    • Flink RuntimeContextLifecycle should be replaced in most cases by Lifecycle
    • In Flink engine MetricsProvider (obtained with EngineRuntimeContext) should be used in most places instead of MetricUtils
  • #2486 Context.withInitialId is deprecated now - use EngineRuntimeContext.contextIdGenerator instead. EngineRuntimeContext can be accessible via FlinkCustomNodeContext.convertToEngineRuntimeContext
  • #2377 #2534 Removed clazz from SourceFactory. Remove generic parameter from Source and SourceFactory. Return type of source should be returned either by:
    • returnType field of @MethodToInvoke
    • ContextTransformation API
    • GenericNodeTransformer API
    • SourceFactory.noParam
  • #2453 Custom actions for PeriodicDeploymentManager now can be defined and implemented outside this class, in PeriodicCustomActionsProvider created by PeriodicCustomActionsProviderFactory. If you do not need them, just pass PeriodicCustomActionsProviderFactory.noOp to object's PeriodicDeploymentManager factory method.
  • #2501 nussknacker-baseengine-components module renamed to nussknacker-lite-base-components
  • #2221 ReflectUtils fixedClassSimpleNameWithoutParentModule renamed to simpleNameWithoutSuffix
  • #2495 TypeSpecificDataInitializer trait change to TypeSpecificDataInitializ
  • 2245 FailedEvent has been specified in FailedOnDeployEvent and FailedOnRunEvent

In version 1.0.0

  • #1439 #2090 Upgrade do Flink 1.13.
    • setTimeCharacteristic is deprecated, and should be handled automatically by Flink.
    • UserClassLoader was removed, use appropriate Flink objects or context ClassLoader.
    • RocksDB configuration is turned on by rocksdb.enable instead of rocksdb.checkpointDataUri which is not used now.
  • #2133 SQL Variable is hidden in generic model, please look at comment in defaultModelConfig.conf
  • #2152 schedulePropertyExtractor parameter of PeriodicDeploymentManagerProvider was changed to a factory, replace with a lambda creating the original property extractor.
  • #2159 useTypingResultTypeInformation option is now enabled by default
  • #2108 Changes in ClassExtractionSettings:
    • Refactor of classes defining extraction rules,
    • TypedClass has private apply method, please use Typed.typedClass
    • Fewer classes/methods are accessible in SpEL, in particular Scala collections, internal time API, methods returning or having parameters from excluded classes
  • Changes in OAuth2 security components:
    • refactoring of OpenIdConnectService, now it's named GenericOidcService (it's best to use OidcService, which can handle most of the configuration automatically)
  • New security settings, in particular new flags in ExpressionConfig:
    • strictMethodsChecking
    • staticMethodInvocationsChecking
    • methodExecutionForUnknownAllowed
    • dynamicPropertyAccessAllowed
    • spelExpressionExcludeList
  • #2101 Global permissions can be arbitrary string, for admin user it's not necessary to return global permissions
  • #2182 To avoid classloader leaks during SQL DriverManager registration, HSQLDB (used e.g. for SQL Variable) is no longer included in model jars, it should be added in Flink lib dir

In version 0.4.0

  • #1479 ProcessId and VersionId moved to API included in ProcessVersion, remove spurious ProcessId and ProcessVersionId in restmodel.

  • #1422 Removed ServiceReturningType and WithExplicitMethod, use EagerServiceWithStaticParameters, EnricherContextTransformation or SingleInputGenericNodeTransformation

  • #1845 AuthenticatorData has been renamed to AuthenticationResources and changed into a trait, apply construction has been preserved. AuthenticatorFactory and its createAuthenticator method has been renamed to AuthenticationProvider and createAuthenticationResources. It is recommended to rename the main class of any custom authentication module to <Something>AuthenticationProvider accordingly.

  • #1542 KafkaConfig now has new parameter topicsExistenceValidationConfig. When topicsExistenceValidationConfig.enabled = true Kafka sources/sinks do not validate if provided topic does not exist and cluster is configured with auto.create.topics.enable=false

  • #1416 OAuth2Service has changed. You can still use your old implementation by importing OAuth2OldService with an alias. OAuth2ServiceFactory.create method now accepts implicit parameters for an ExecutionContext and sttp.HttpBackend. You can ignore them to maintain previous behaviour, but it is always better to use them instead of locally defined ones.

  • #1346 AggregatorFunction now takes type of stored state that can be immutable.SortedMap (previous behaviour) or java.util.Map (using Flink's serialization) and validatedStoredType parameter for providing better TypeInformation for aggregated values

  • #1343 FirstAggregator changed serialized state, it is not compatible, Aggregator trait has new method computeStoredType

  • #1352 and #1568 AvroStringSettings class has been introduced, which allows control whether Avro type string is represented by java.lang.String (also in runtime) or java.lang.CharSequence (implemented in runtime by org.apache.avro.util.Utf8). This setting is available through environment variable AVRO_USE_STRING_FOR_STRING_TYPE - default is true. Please mind that this setting is global - it applies to all processes running on Flink and also requires restarting TaskManager when changing the value.

  • #1361 Lazy variables are removed, you should use standard enrichers for those cases. Their handling has been source of many problems and they made it harder to reason about the exeuction of process.

  • #1373 Creating ClassLoaderModelData directly is not allowed, use ModelData.apply with plain config, wrapping with ModelConfigToLoad by yourself is not needed.

  • #1406 ServiceReturningType is deprecated in favour of EagerService

  • #1445 RecordFormatter now handles TestDataSplit for Kafka sources. It is required in KafkaSource creation, instead of TestDataSplit

  • #1433 Pass DeploymentData to process via JobData, additional parameters to deployment methods are needed. Separate ExternalDeploymentId from DeploymentId (generated by NK)

  • #1466 ProcessManager.deploy can return ExternalDeploymentId

  • #1464

    • Slight change of API of StringKeyedValueMapper
    • Change of semantics of some parameters of AggregatorFunction, AggregatorFunctionMixin (storedAggregateType becomes aggregateElementType)
  • #1405 'KafkaAvroSink' requires more generic 'AvroSinkValue' as value parameter

  • #1510

    • Change of FlinkSource API: sourceStream produces stream of initialized Context (DataStream[Context]) This initialization step was previously performed within FlinkProcessRegistrar.registerSourcePart. Now it happens explicitly within the flink source.
    • FlinkIntermediateRawSource is used as an extension to flink sources, it prepares source with typical stream transformations (add source function, set uid, assign timestamp, initialize Context)
    • FlinkContextInitializer is used to initialize Context. It provides map function that transforms raw event (produced by flink source function) into Context variable. Default implementation of FlinkContextInitializer, see BasicFlinkContextInitializer, sets raw event value to singe "input" variable.
    • For sources based on GenericNodeTransformation it allows to initialize Context with more than one variable. Default implementation of initializer, see BasicFlinkGenericContextInitializer, provides default definition of variables as a ValidationContext with single "input" variable. The implementation requires to provide separately the definition of "input" variable type (TypingResult). See GenericSourceWithCustomVariablesSample.
    • To enable "test source" functionality, a source needs to be extended with SourceTestSupport.
    • For flink sources that use TestDataParserProvider switch to FlinkSourceTestSupport (which is used to provide "test source" functionality for flink sources).
    • Old TestDataParserProvider is renamed to SourceTestSupport
    • To enable test data generator for "test source" , a source needs to be extended with both SourceTestSupport and TestDataGenerator. What was related to "test source" functionality and was obsolete in FlinkSource now is excluded to FlinkSourceTestSupport.
    • FlinkCustomNodeContext has access to TypeInformationDetection, it allows to get TypeInformation for the node stream mapping from ValidationContext.
    • For kafka sources RecordFormatter parses raw test data to ConsumerRecord which fits into deserializer (instead of ProducerRecord that required another transformation).
    • Definitions of names of common Context variables are moved to VariableConstants (instead of Interpreter).
  • #1497 Changes in PeriodicProcessManager, change PeriodicProperty to ScheduleProperty

  • #1499

    • trait KafkaAvroDeserializationSchemaFactory uses both key and value ClassTags and schemas (instead of value-only), check the order of parameters.
    • ClassTag is provided in params in avro key-value deserialization schema factory: KafkaAvroKeyValueDeserializationSchemaFactory
    • BaseKafkaAvroSourceFactory is able to read both key and value schema determiner to build proper DeserializationSchema (support for keys is not fully introduced in this change)
  • #1514 ExecutionConfigPreparer has different method parameter - JobData, which has more info than previous parameters

  • #1532 TypedObjectTypingResult#fields uses now scala.collection.immutable.ListMap to keep fields order

  • #1546 StandaloneCustomTransformer now takes a list of Context objects, to process them in one go

  • #1557 Some classes from standalone engine were moved to standalone api to remove engine to (model) utils dependency: StandaloneContext, StandaloneContextLifecycle, MetricsProvider

  • #1558 FlinkProcessRegistrar takes configuration directly from FlinkProcessCompiler (this can affect some tests setup)

  • #1631 Introduction of nussknacker.config.locations property, drop use of standard config.file property. Model configuration no longer has direct access to root UI config.

  • #1512

    • Replaced KafkaSourceFactory with source based on GenericNodeTransformation, which gives access to setup of ValidationContext and Context initialization. To migrate KafkaSourceFactory:
      • provide deserializer factory (source factory requires deserialization to ConsumerRecord):
        • use ConsumerRecordDeserializationSchemaFactory with current DeserializationSchema as a value deserializer, add key deserializer (e.g. org.apache.kafka.common.serialization.StringDeserializer)
        • or use FixedValueDeserializationSchemaFactory with simple key-as-string deserializer
      • provide RecordFormatterFactory
        • use ConsumerRecordToJsonFormatterFactory for whole key-value-and-metadata serialization
        • or, for value-only-and-without-metadata scenario, you can use current RecordFormater wrapped in FixedRecordFormatterFactoryWrapper
      • provide timestampAssigner that is able to extract time from ConsumerRecord[K, V]
    • Removed BaseKafkaSourceFactory with multiple topics support: use KafkaSourceFactory instead, see "source with two input topics" test case
    • Removed SingleTopicKafkaSourceFactory: use KafkaSourceFactory with custom prepareInitialParameters, contextTransformation and extractTopics to alter parameter list and provide constant topic value.
    • TypingResultAwareTypeInformationCustomisation is moved to package pl.touk.nussknacker.engine.flink.api.typeinformation

    Example of source with value-only deserialization and custom timestampAssigner:

    // provide new deserializer factory with old schema definition for event's value
    val oldSchema = new EspDeserializationSchema[SampleValue](bytes => io.circe.parser.decode[SampleValue](new String(bytes)).toOption.get)
    val schemaFactory: KafkaDeserializationSchemaFactory[ConsumerRecord[String, SampleValue]] = new FixedValueDeserializationSchemaFactory(oldSchema)

    // ... provide timestampAssigner that extracts timestamp from SampleValue.customTimestampField
    // ... or use event's metadata: record.timestamp()
    def timestampExtractor(record: ConsumerRecord[String, SampleValue]): Long = record.value().customTimestampField
    val watermarkHandler = StandardTimestampWatermarkHandler.boundedOutOfOrderness[ConsumerRecord[String, SampleValue]](timestampExtractor, java.time.Duration.ofMinutes(10L))
    val timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[String, SampleValue]]] = Some(watermarkHandler)

    // ... provide RecordFormatterFactory that allows to generate and parse test data with key, headers and other metadata
    val formatterFactory: RecordFormatterFactory = new ConsumerRecordToJsonFormatterFactory[String, SampleValue]

    // ... and finally
    val sourceFactory = new KafkaSourceFactory[String, SampleValue](schemaFactory, timestampAssigner, formatterFactory, dummyProcessObjectDependencies)
  • #1651 KafkaAvroSourceFactory provides additional #inputMeta variable with event's metadata.

    • That source now has key and value type parameters. That parameters are relevant for sources handling SpecificRecords. For GenericRecords use explicitly KafkaAvroSourceFactory[Any, Any].
    • SpecificRecordKafkaAvroSourceFactory extends whole KafkaAvroSourceFactory with context validation and initialization
    • New flag in KafkaConfig: useStringForKey determines if event's key should be intepreted as ordinary String (which is default scenario). It is used in deserialization and for generating/parsing test data.
    • SchemaRegistryProvider now provides factories to produce SchemaRegistryClient and RecordFormatter.
    • For ConfluentSchemaRegistryProvider KafkaConfig and ProcessObjectDependencies (that contains KafkaConfig data) are no longer required. That configuration is required by factories in the moment the creation of requested objects that happens in KafkaAvroSourceFactory (and that makes that all objects within KafkaAvroSourceFactory see the same kafka configuration).
    • Removed:
      • BaseKafkaAvroSourceFactory, the class is incorporated into KafkaAvroSourceFactory to provide elastic approach to create KafkaSource
      • with ReturningType for generic types (this is defined by ValidationContext, see also KafkaContextInitializer that allows to return more than one variable)
      • KafkaAvroValueDeserializationSchemaFactory (source requires deserialization to ConsumerRecord[K, V], there are only deserializers based on KafkaAvroKeyValueDeserializationSchemaFactory)
      • ConfluentKafkaAvroDeserializationSchemaFactory, use ConfluentKeyValueKafkaAvroDeserializationFactory
      • TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory, this approach is deprecated due to #inputMeta variable that contains key data

    To migrate KafkaAvroSourceFactory:

    • Provide KafkaConfig with correct useStringForKey flag value. By default we want to EvictableStatehandle keys as ordinary String and all topics related to such config require only value schema definitions (key schemas are ignored). For specific scenario, when complex key with its own schema is provided, this flag is false and all topics related to this config require both key and value schema definitions. Example of default KafkaConfig override: override protected def prepareKafkaConfig: KafkaConfig = super.prepareKafkaConfig.copy(useStringForKey = false)
    • provide your own SchemaRegistryProvider (or use ConfluentSchemaRegistryProvider)
    • custom RecordFormatter can be wrapped in FixedRecordFormatterFactoryWrapper (or keep ConfluentAvroToJsonFormatterFactory)
    • provide timestampAssigner that is able to extract time from ConsumerRecord[K, V] (see example above)
  • #1741 Minor changes in KafkaUtils, NonTransientException uses Instant instead of LocalDateTime

  • #1806 Remove old, deprecated API:

    • EvictableState, RichEvictableState - use EvictableStateFunction
    • checkpointInterval - use checkpointConfig.checkpointInterval
    • old versions of sampleTransformers - use newer ones
    • MiniClusterExecutionEnvironment.runningJobs() - use flinkMiniClusterHolder.runningJobs()
  • #1807 Removed jdbcServer, please use Postgres for production-ready setups

  • #1799

    • RecordFormatterFactory instead of one, uses two type parameters: K, V
    • ConfluentAvroToJsonFormatter is produced by ConfluentAvroToJsonFormatterFactory
    • ConfluentAvroToJsonFormatter produces test data in valid JSON format, does not use Separator
    • ConfluentAvroMessageFormatter has asJson method instead of writeTo
    • ConfluentAvroMessageReader has readJson method instead of readMessage Example test data object:
  • #1663 Default FlinkExceptionHandler implementations are deprecated, use ConfigurableExceptionHandler instead.

  • #1731 RockDB config's flag incrementalCheckpoints is turned on by default.

  • #1825 Default dashboard renamed from flink-esp to nussknacker-scenario

  • #1836 Change default kafka.consumerGroupNamingStrategy to processId-nodeId.

  • #1357 Run mode added to nodes. ServiceInvoker interface was extended with new, implicit runMode parameter.

  • #1836 Change default kafka.consumerGroupNamingStrategy to processId-nodeId.

  • #1886 aggregate-sliding with emitWhenEventLeft = true, aggregate-tumbling and aggregate-session components now doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted). If you want to emit some information other than aggregated value and key (availabled via new #key variable), you should use expression in aggregateBy.

  • #1910 processTypes renamed to scenarioTypes. You can still use old processTypes configuration. Old configuration will be removed in version 0.5.0.

  • Various naming changes:

    • #1917 configuration of engineConfig to deploymentConfig
    • #1921 ProcessManager to DeploymentManager
    • #1927 Rename outer-join to single-side-join

In version 0.3.0

  • #1313 Kafka Avro API passes KafkaConfig during TypeInformation determining

  • #1305 Kafka Avro API passes RuntimeSchemaData instead of Schema in various places

  • #1304 SerializerWithSpecifiedClass was moved to flink-api module.

  • #1044 Upgrade to Flink 1.11. Current watermark/timestamp mechanisms are deprectated in Flink 1.11, new API TimestampWatermarkHandler is introduced, with LegacyTimestampWatermarkHandler as wrapper for previous mechanisms.

  • #1244 Parameter has new parameter 'variablesToHide' with Set of variable names that will be hidden before parameter's evaluation

  • #1159 #1170 Changes in GenericNodeTransformation API:

    • Now implementation takes additional parameter with final state value determined during contextTransformation
    • DefinedLazyParameter and DefinedEagerParameter holds expression: TypedExpression instead of returnType: TypingResult
    • DefinedLazyBranchParameter and DefinedEagerBranchParameter holds expressionByBranchId: Map[String, TypedExpression] instead of returnTypeByBranchId: Map[String, TypingResult]
  • #1083

    • Now SimpleSlidingAggregateTransformerV2 and SlidingAggregateTransformer is deprecated in favour of SlidingAggregateTransformerV2
    • Now SimpleTumblingAggregateTransformer is deprecated in favour of TumblingAggregateTransformer
    • Now SumAggregator, MaxAggregator and MinAggregator doesn't change type of aggregated value (previously was changed to Double)
    • Now SumAggregator, MaxAggregator and MinAggregator return null instead of 0D/Double.MaxValue/Double.MinValue for case when there was no element added before getResult
  • #1149 FlinkProcessRegistrar refactor (can affect test code)

  • #1166 model.conf should be renamed to defaultModelConfig.conf

  • #1218 FlinkProcessManager is no longer bundled in ui uber-jar. In docker/tgz distribution

  • #1255 Moved displaying Metrics tab to customTabs

  • #1257 Improvements: Flink test util package

    • Added methods: cancelJob, submitJob, listJobs, runningJobs to FlinkMiniClusterHolder
    • Deprecated: runningJobs, from MiniClusterExecutionEnvironment
    • Removed: getClusterClient from FlinkMiniClusterHolder interface, because of flink compatibility at Flink 1.9
    • Renamed: FlinkStreamingProcessRegistrar to FlinkProcessManager
  • #1303 TypedObjectTypingResult has a new field: additionalInfo

In version 0.2.0

  • #1104 Creation of FlinkMiniCluster is now extracted from StoppableExecutionEnvironment. You should create it using e.g.:

    val flinkMiniClusterHolder = FlinkMiniClusterHolder(FlinkTestConfiguration.configuration(parallelism))

    and then create environment using:


    . At the end you should cleanup flinkMiniClusterHolder by:


    . FlinkMiniClusterHolder should be created once for test class - it is thread safe and resource expensive. MiniClusterExecutionEnvironment in the other hand should be created for each process. It is not thread safe because underlying StreamExecutionEnvironment is not. You can use FlinkSpec to achieve that.

  • #1077

    • pl.touk.nussknacker.engine.queryablestate.QueryableClient was moved from queryableState module to pl.touk.nussknacker.engine.api.queryablestate package in api module
    • pl.touk.nussknacker.engine.queryablestate.QueryableState was moved to pl.touk.nussknacker.engine.api.queryablestate
    • CustomTransformers from pl.touk.nussknacker.engine.flink.util.transformer in flinkUtil module were moved to new flinkModelUtil module.
    • pl.touk.nussknacker.engine.testing.EmptyProcessConfigCreator was moved from interpreter module to pl.touk.nussknacker.engine.util.process package in util module
  • #1039 Generic parameter of LazyParameter[T] has upper bound AnyRef now to avoid problems with bad type extraction. It caused changes Any to AnyRef in a few places - mainly FlinkLazyParameterFunctionHelper and FlinkCustomStreamTransformation

  • #1039 FlinkStreamingProcessRegistrar.apply has a new parameter of type ExecutionConfigPreparer. In production code you should pass ExecutionConfigPreparer.defaultChain() there and in test code you should pass ExecutionConfigPreparer.unOptimizedChain(). See scaladocs for more info. If you already have done some Flink's ExecutionConfig set up before you've registered process, you should consider create your own chain using ExecutionConfigPreparer.chain().

  • #1039 FlinkSourceFactory doesn't take TypeInformation type class as a generic parameter now. Instead of this, it takes ClassTag. TypeInformation is determined during source creation. typeInformation[T] method was moved from BasicFlinkSource to FlinkSource because still must be some place to determine it for tests purpose.

  • #965 'aggregate' node in generic model was renamed to 'aggregate-sliding'

  • #922 HealthCheck API has new structure, naming and JSON responses:

    • old /healthCheck is moved to /healthCheck/process/deployment
    • old /sanityCheck is moved to /healthCheck/process/validation
    • top level /healthCheck indicates general "app-is-running" state
  • #879 Metrics use variables by default, see docs to enable old mode, suitable for graphite protocol. To use old way of sending:

    • put globalParameters.useLegacyMetrics = true in each model configuration (to configure metrics sending in Flink)

    • put:

      countsSettings {
      user: ...
      password: ...
      influxUrl: ...
      metricsConfig {
      nodeCountMetric: "nodeCount.count"
      sourceCountMetric: "source.count"
      nodeIdTag: "action"
      countField: "value"

  • Introduction to KafkaAvro API: #871, #881, #903, #981, #989, #998, #1007, #1014, #1034, #1041

API for KafkaAvroSourceFactory was changed:

KafkaAvroSourceFactory old way:

val clientFactory = new SchemaRegistryClientFactory
val source = new KafkaAvroSourceFactory(
new AvroDeserializationSchemaFactory[GenericData.Record](clientFactory, useSpecificAvroReader = false),
processObjectDependencies = processObjectDependencies

KafkaAvroSourceFactory new way :

val schemaRegistryProvider = ConfluentSchemaRegistryProvider(processObjectDependencies)
val source = new KafkaAvroSourceFactory(schemaRegistryProvider, processObjectDependencies, None)

Provided new API for Kafka Avro Sink:

val kafkaAvroSinkFactory = new KafkaAvroSinkFactory(schemaRegistryProvider, processObjectDependencies)

Additional changes:

  • Bump up confluent package to 5.5.0
  • (Refactor Kafka API) Moved KafkaSourceFactory to pl.touk.nussknacker.engine.kafka.sink package
  • (Refactor Kafka API) Changed BaseKafkaSourceFactory, now it requires deserializationSchemaFactory: KafkaDeserializationSchemaFactory[T]
  • (Refactor Kafka API) Moved KafkaSinkFactory to pl.touk.nussknacker.engine.kafka.source package
  • (Refactor Kafka API) Renamed SerializationSchemaFactory to KafkaSerializationSchemaFactory
  • (Refactor Kafka API) Renamed DeserializationSchemaFactory to KafkaDeserializationSchemaFactory
  • (Refactor Kafka API) Renamed FixedDeserializationSchemaFactory to FixedKafkaDeserializationSchemaFactory
  • (Refactor Kafka API) Renamed FixedSerializationSchemaFactory to FixedKafkaSerializationSchemaFactory
  • (Refactor Kafka API) Removed KafkaSerializationSchemaFactoryBase
  • (Refactor Kafka API) Replaced KafkaKeyValueSerializationSchemaFactoryBase by KafkaAvroKeyValueSerializationSchemaFactory (it handles only avro case now)
  • (Refactor Kafka API) Removed KafkaDeserializationSchemaFactoryBase
  • (Refactor Kafka API) Replaced KafkaKeyValueDeserializationSchemaFactoryBase by KafkaAvroKeyValueDeserializationSchemaFactory (it handles only avro case now)
  • (Refactor KafkaAvro API) Renamed AvroDeserializationSchemaFactory to ConfluentKafkaAvroDeserializationSchemaFactory and moved to avro.schemaregistry.confluent package
  • (Refactor KafkaAvro API) Renamed AvroKeyValueDeserializationSchemaFactory to ConfluentKafkaAvroDeserializationSchemaFactory and moved to avro.schemaregistry.confluent package
  • (Refactor KafkaAvro API) Renamed AvroSerializationSchemaFactory to ConfluentAvroSerializationSchemaFactory and moved to avro.schemaregistry.confluent package
  • (Refactor KafkaAvro API) Renamed AvroKeyValueSerializationSchemaFactory to ConfluentAvroKeyValueSerializationSchemaFactory and moved to avro.schemaregistry.confluent package
  • (Refactor KafkaAvro API) Removed FixedKafkaAvroSourceFactory and FixedKafkaAvroSinkFactory (now we don't support fixed schema)
  • (Refactor Kafka API) Replaced topics: List[String] by List[PreparedKafkaTopic] and removed processObjectDependencies in KafkaSource

Be aware that we are using Avro 1.9.2 instead of default Flink's 1.8.2 (for Java time logical types conversions purpose).

  • #1013 Expression evaluation is synchronous now. It shouldn't cause any problems (all languages were synchronous anyway), but some internal code may have to change.

In version 0.1.2

  • #957 Custom node aggregate from generic model has changed parameter from windowLengthInSeconds to windowLength with human friendly duration input. If you have used it in process, you need to insert correct value again.
  • #954 TypedMap is not a case class wrapping scala Map anymore. If you have done some pattern matching on it, you should use case typedMap: TypedMap => typedMap.asScala instead.

In version 0.1.1

  • #930 DeeplyCheckingExceptionExtractor was moved from nussknacker-flink-util module to nussknacker-util module.
  • #919 KafkaSource constructor now doesn't take consumerGroup. Instead of this it computes consumerGroup on their own based on kafka.consumerGroupNamingStrategy in modelConfig (default set to processId). You can also override it by overriddenConsumerGroup optional parameter. Regards to this changes, KafkaConfig has new, optional parameter consumerGroupNamingStrategy.
  • #920 KafkaSource constructor now takes KafkaConfig instead of using one that was parsed by BaseKafkaSourceFactory.kafkaConfig. Also if you parse Typesafe Config to KafkaSource on your own, now you should use dedicated method KafkaConfig.parseConfig to avoid further problems when parsing strategy will be changed.
  • #914 pl.touk.nussknacker.engine.api.definition.Parameter has deprecated main factory method with runtimeClass parameter. Now should be passed isLazyParameter instead. Also were removed runtimeClass from variances of factory methods prepared for easy testing (optional method and so on).

In version 0.1.0

  • #755 Default async execution context does not depend on parallelism. asyncExecutionConfig.parallelismMultiplier has been deprecated and should be replaced with asyncExecutionConfig.workers. 8 should be sane default value.
  • #722 Old way of configuring Flink and model (via flinkConfig and processConfig) is removed. processTypes configuration should be used from now on. Example:
    flinkConfig {...}
    processConfig {...}
    processTypes {
    "type e.g. streaming" {
    deploymentConfig {
    type: "flinkStreaming"
    modelConfig {
    classPath: PUT HERE VALUE OF flinkConfig.classPath FROM OLD CONFIG
  • #763 Some API traits (ProcessManager, DictRegistry DictQueryService, CountsReporter) now extend java.lang.AutoCloseable.
  • Old way of additional properties configuration should be replaced by the new one, which is now mapped to Map[String, AdditionalPropertyConfig]. Example in your config:
    additionalFieldsConfig: {
    mySelectProperty {
    label: "Description"
    type: "select"
    isRequired: false
    values: ["true", "false"]
    additionalPropertiesConfig {
    mySelectProperty {
    label: "Description"
    defaultValue: "false"
    editor: {
    type: "FixedValuesParameterEditor",
    possibleValues: [
    {"label": "Yes", "expression": "true"},
    {"label": "No", "expression": "false"}
  • #588 #882 FlinkSource API changed, current implementation is now BasicFlinkSource
  • #839 #882 FlinkSink API changed, current implementation is now BasicFlinkSink
  • #841 ProcessConfigCreator API changed; note that currently all process objects are invoked with ProcessObjectDependencies as a parameter. The APIs of KafkaSinkFactory, KafkaSourceFactory, and all their implementations were changed. Config is available as property of ProcessObjectDependencies instance.
  • #863 restUrl in deploymentConfig need to be preceded with protocol. Host with port only is not allowed anymore.
  • Rename grafanaSettings to metricsSettings in configuration.

In version 0.0.12

  • Upgrade to Flink 1.7
  • Refactor of custom transformations, dictionaries, unions, please look at samples in example or generic to see API changes
  • Considerable changes to authorization configuration, please look at sample config to see changes
  • Circe is now used by default instead of Argonaut, but still can use Argonaut in Displayable

In version 0.0.11

  • Changes in CustomStreamTransformer implementation, LazyInterpreter became LazyParameter, please look at samples to see changes in API

In version 0.0.8

  • Upgrade to Flink 1.4
  • Change of format of Flink cluster configuration
  • Parameters of sources and sinks are expressions now - automatic update of DB is available
  • Change of configuration of Grafana dashboards
  • Custom processes are defined in main configuration file now