Migration guide
To see the biggest differences please consult the changelog.
In version 1.2.0
Configuration changes
- #2483
COUNTS_URL
environment variable is notINFLUXDB_URL
, withoutquery
path part. - #2493 kafka configuration should be moved to components provider configuration - look at
components.kafka
in dev-application.conf for example - #2624 Default name for
process
tag is nowscenario
. This affects metrics and count functionalities. Please update you Flink/Telegraf setup accordingly (see nussknacker-quickstart for details). If you still want to useprocess
tag (e.g. you have a lot of dashboards), please setcountsSettings.metricsConfig.scenarioTag
setting toprocess
Also, dashboard links format changed, see documentation for the details. - #2645 Default models:
genericModel.jar
,liteModel.jar
. were merged todefaultModel.jar
,managementSample.jar
was renamed todevModel.jar
. If you usedefaultModel.jar
it's important to includeflinkExecutor.jar
explicitly on model classpath.
Scenario authoring changes
- #2564 Flink union now takes only 'Output expression' parameters for branches (previously 'value' parameter), output variable must be of the same type, if you want to distinguish source branch in output variable please use map variable, example in Basic Nodes docs.
Other changes
- #2554 Maven artifact
nussknacker-kafka-flink-util
becomenussknacker-flink-kafka-util
andnussknacker-avro-flink-util
becomenussknacker-flink-avro-util
. General naming convention isnussknacker-$runtimeType-$moduleName
. Components inside distribution changed layout tocomponents(/$runtimeType)/componentName.jar
e.g.components/flink/kafka.jar
orcomponents/openapi.jar
KafkaSource
becomeFlinkKafkaSource
,ConsumerRecordBasedKafkaSource
becomeFlinkConsumerRecordBasedKafkaSource
,KafkaSink
becomeFlinkKafkaSink
,KafkaAvroSink
becomeFlinkKafkaAvroSink
- #2535, #2625, #2645 Rename
standalone
torequest-response
:- Renamed modules and artifacts
StandaloneMetaData
is nowRequestResponseMetaData
- Move
request-response
modules tobase
dir. standalone
in package names changed torequestresponse
Standalone
in class/variable names changed toRequestResponse
DeploymentManager/Service
uses dedicated format of status DTO, instead of the ones fromdeployment-manager-api
- Removed old, deprecated
jarPath
settings, in favour ofclassPath
used in other places - Extracted
nussknacker-lite-request-response-components
module
- #2582
KafkaUtils.toProducerProperties
setup only basic properties now (bootstrap.servers
and serializers) - before the change it was setting options which are not always good choice (for transactional producers wasn't) - #2600
ScenarioInterpreter
,ScenarioInterpreterWithLifecycle
now takes additional generic parameter:Input
.ScenarioInterpreter.invoke
takesScenarioInputBatch
which now contains list ofSourceId -> Input
instead ofSourceId -> Context
. Logic ofContext
preparation should be done inLiteSource
instead of beforeScenarioInterpreter.invoke
. invocation It means thatLiteSource
also takes this parameter and have a new methodcreateTransformation
. - #2635
ContextInitializer.initContext
now takesContextIdGenerator
instead ofnodeId
and returns just a function with strategy of context initialization instead of serializable function withLifecycle
. To use it with Flink engine, useFlinkContextInitializingFunction
wrapper. - #2649
DeploymentManagerProvider
takes newProcessingTypeDeploymentService
class as an implicit parameter - #2564 'UnionParametersMigration' available to migrate parameter name from 'value' to 'Output expression' - please turn it on you are using 'union' like component
- #2645 Simplify structure of available models (implementations of
ProcessConfigCreator
).defaultModel.jar
and components should be used instead of custom implementations ofProcessConfigCreator
, the only exception is when one wants to customizeExpressionConfig
. Also,nussknacker-flink-engine
module becamenussknacker-flink-executor
. - #2651
ValidationContext.clearVariables
now clears also parent reference. Important when invoked inside fragments. - #2673
KafkaZookeeperUtils
renamed toKafkaTestUtils
, it doesn't depend on ZooKeeper anymore. - #2686
ServiceWithStaticParameters
renamed toEagerServiceWithStaticParameters
. - #2695
nodeId
replaced withNodeComponentInfo
inNuExceptionInfo
. Simple wrapper class which holds the samenodeId
and alsocomponentInfo
. Migration is straightforward, just putnodeId
into the new case class:NuExceptionInfo(None, exception, context)
=> stays the sameNuExceptionInfo(Some(nodeId), exception, context)
=>NuExceptionInfo(Some(NodeComponentInfo(nodeId, None)), exception, context)
- if an exception is thrown inside the component, additional information can be provided:
- for base component (like
filter
orsplit
):NodeComponentInfo.forBaseNode("nodeId", ComponentType.Filter)
- for other:
NodeComponentInfo("nodeId", ComponentInfo("kafka-avro-source", ComponentType.Source))
- for base component (like
- if an exception is thrown inside the component, additional information can be provided:
- The same migration has to be applied to
ExceptionHandler.handling()
method.
- #2824 'ProcessSplitterMigration' available to migrate node name from 'split' to 'for-each' (see #2781)- please turn it on if you are using 'split' component
In version 1.1.0
Summary:
- A lot of internal refactoring was made to separate code/API specific for Flink.
If your deployment has custom components pay special attention to:
Lifecycle
management- Kafka components
- Differences in artifacts and packages
- Some of the core dependencies: cats, cats-effect and circe were upgraded. It affects mainly code, but it may also have impact on state compatibility and performance.
- Default Flink version was bumped do 1.14 - see https://github.com/TouK/nussknacker-flink-compatibility on how to run Nu on older Flink versions.
- Execution of SpEL expressions is now checked more strictly, due to security considerations. These checks can be overridden with custom
ExpressionConfig
.
- Apart from that:
- minor configuration naming changes
- removal of a few of minor, not documented features (e.g. SQL Variable)
- #2208 Upgrade, cats, cats-effects, circe. An important nuisance: we didn't upgrade sttp, so we cannot depend on
"com.softwaremill.sttp.client" %% "circe"
. Instead, the code is copied. Make sure you don't include sttp-circe integration as transitive dependency, but use class from http-utils instead. - #2176
EnrichDeploymentWithJarDataFactory
was replaced withProcessConfigEnricher
. - #2278 SQL Variable is removed
- #2280 Added optional
defaultValue
field toParameter
. InGenericNodeTransformation
can be set toNone
- values will be determined automatically. - #2289 Savepoint path in
/api/adminProcessManagement/deploy
endpoint is passed as asavepointPath
parameter instead of path segment. - #2293 Enhancement: change
nodeCategoryMapping
configuration tocomponentsGroupMapping
- #2301 #2620
GenericNodeTransformation.initialParameters
was removed - nowGenericNodeTransformation.contextTransformation
is used instead. To make Admin tab -> Invoke service form working, useWithLegacyStaticParameters
trait - #2409
JsonValidator
is now not determined by default based onJsonParameterEditor
but must be explicitly defined by@JsonValidator
annotation - #2304 Upgrade to Flink 1.14. Pay attention to Flink dependencies - in some (e.g. runtime) there is no longer scala version.
- #2295
FlinkLazyParameterFunctionHelper
allows (and sometimes requires) correct exception handling - #2307 Changed
nussknacker-kafka
module name tonussknacker-kafka-util
- #2310 Changed
nussknacker-process
module name tonussknacker-flink-engine
- #2300 #2343 Enhancement: refactor and improvements at components group:
- Provided
ComponentGroupName
as VO SingleNodeConfig
was renamed toSingleComponentConfig
and moved frompl.touk.nussknacker.engine.api.process
package topl.touk.nussknacker.engine.api.component
- Configuration
category
in node configuration was replaced bycomponentGroup
- Configuration
nodes
in model configuration was replaced bycomponentsUiConfig
- Additional refactor:
ProcessToolbarService
moved frompl.touk.nussknacker.ui.service
package topl.touk.nussknacker.ui.process
- Additional refactor:
ProcessToolbarService
moved frompl.touk.nussknacker.ui.service
package topl.touk.nussknacker.ui.process
DefinitionPreparer
was renamed toComponentDefinitionPreparer
NodesConfigCombiner
was removed- REST API /api/processDefinitionData/* response JSON was changed:
nodesToAdd
was renamed tocomponentGroups
posibleNode
was renamed tocomponents
nodesConfig
was renamed tocomponentsConfig
- config
icon
property fromcomponentsConfig
right now should be relative tohttp.publicPath
e.g./assets/components/Filter.svg
(before was justFilter.svg
) or url (withhttp
/https
)
- Provided
- #2346 Remove
endResult
fromSink
in graph.Sink
no longer definestestOutput
method - they should be handled by respective implementations- Change in definition of
StandaloneSink
previouslyStandaloneSinkWithParameters
, as output always has to be computed with sink parameters now - Changes in definition of
FlinkSink
, to better handle capturing test data - Removal of
.sink
method inGraphBuilder
- use.emptySink
if suitable
- #2331
KafkaAvroBaseTransformer
companion object renamed toKafkaAvroBaseComponentTransformer
KryoGenericRecordSchemaIdSerializationSupport
renamed toGenericRecordSchemaIdSerializationSupport
- #2305 Enhancement: change
processingTypeToDashboard
configuration toscenarioTypeToDashboard
- #2296 Scenarios & Fragments have separate TypeSpecificData implementations. Also, we remove
isSubprocess
field from process json, and respectively from MetaData constructor. See corresponding db migrationV1_031__FragmentSpecificData.scala
- #2368
WithCategories
now takes categories as anOption[List[String]]
instead ofList[String]
. You should wrap given list of categories withSome(...)
.None
mean that component will be available in all categories. - #2360
union
,union-memo
anddead-end
components were extracted frommodel/genericModel.jar
tocomponents/baseComponents.jar
If you have your ownapplication.conf
which changesscenarioTypes
, you should add"components/baseComponents.jar"
entry intoclassPath
array - #2337 Extract base engine from standalone
- Common functionality of base engine (i.e. microservice based, without Flink) is extracted to
base-api
andbase-runtime
- new API for custom components (
pl.touk.nussknacker.engine.baseengine.api.customComponentTypes
) StandaloneProcessInterpreter
becomesStandaloneScenarioEngine
- Replace
Either[NonEmptyList[Error], _]
withValidatedNel[Error, _]
as return type StandaloneContext
becomesEngineRuntimeContext
- Common functionality of base engine (i.e. microservice based, without Flink) is extracted to
- #2349
queryable-state
module was removed,FlinkQueryableClient
was moved tonussknacker-flink-manager
.PrettyValidationErrors
,CustomActionRequest
andCustomActionResponse
moved fromnussknacker-ui
tonussknacker-restmodel
. - #2361 Removed
security
dependency fromlistener-api
.LoggedUser
replaced with dedicated class inlistener-api
. - #2385 Deprecated
CustomStreamTransformer.clearsContext
was removed. Use
@MethodToInvoke
def execute(...) =
ContextTransformation
.definedBy(ctx => Valid(ctx.clearVariables ...))
.implementedBy(...)
}
instead.
- #2348 #2459 #2486
#2490 #2496 #2536
Introduce
KafkaDeserializationSchema
andKafkaSerializationSchema
traits to decouple from flink dependency. moveKeyedValue
tonussknacker-util
, moveSchemaRegistryProvider
toutils/avro-util
To move between nussknacker's/flink's Kafka(De)serializationSchema usewrapToFlink(De)serializatioinSchema
fromFlinkSerializationSchemaConversions
.SchemaRegistryProvider
andConfluentSchemaRegistryProvider
is now innussknacker-avro-util
module.FlinkSourceFactory
is gone - useSourceFactory
instead.KafkaSourceFactory
,KafkaAvroSourceFactory
,KafkaSinkFactory
,KafkaAvroSinkFactory
, andContextIdGenerator
not depends on flink. ExtractedKafkaSourceImplFactory
,KafkaSinkImplFactory
andKafkaAvroSinkImplFactory
which deliver implementation of component (after all validations and parameters evaluation). Use respectively:FlinkKafkaSourceImplFactory
,FlinkKafkaSinkImplFactory
andFlinkKafkaAvroSinkImplFactory
to deliver flink implementations. Moved non-flink specific serializers, deserializers,BestEffortAvroEncoder
,ContextIdGenerator
s andRecordFormatter
s to kafka-util/avro-utilKafkaDelayedSourceFactory
is nowDelayedKafkaSourceFactory
.FixedRecordFormatterFactoryWrapper
moved toRecordFormatterFactory
- #2477
FlinkContextInitializer
andFlinkGenericContextInitializer
merged toContextInitializer
,BasicFlinkContextInitializer
andBasicFlinkGenericContextInitializer
merged toBasicContextInitializer
. All of them moved topl.touk.nussknacker.engine.api.process
package.ContextInitializer.validationContext
returnsValidatedNel
- before this change errors during context initialization weren't accumulated.ContextInitializingFunction
now is a scala's function instead of Flink's MapFunction. You should wrap it withRichLifecycleMapFunction
to make sure that it will be opened correctly by Flink.InputMeta
was moved tokafka-util
module. - #2389 #2391
deployment-manager-api
module was extracted andDeploymentManagerProvider
,ProcessingTypeData
andQueryableClient
was moved frominterpreter
into it.DeploymentManager
,CustomAction
andProcessState
was moved fromapi
todeployment-manager-api
.ProcessingType
was moved torest-model
package. - #2393 Added
ActorSystem
,ExecutionContext
andSttpBackend
intoDeploymentManagerProvider.createDeploymentManager
. During clean ups also was removednussknacker-http-utils
dependency toasync-http-client-backend-future
and addedSttpBackend
toCountsReporterCreator.createReporter
arguments. - #2397 Common
EngineRuntimeContext
lifecycle andMetricsProvider
. This may cause runtime consequences - make sure your custom services/listeners invokeopen
/close
correctly - especially in complex inheritance scenarios.Lifecycle
has nowEngineRuntimeContext
as parameter,JobData
is embedded in it.TimeMeasuringService
replacesGenericTimeMeasuringService
, Flink/Standalone flavours ofTimeMeasuringService
are removedEngineRuntimeContext
andMetricsProvider
moved to base API,RuntimeContextLifecycle
moved to base API asLifecycle
GenericInstantRateMeter
is nowInstantRateMeter
- Flink
RuntimeContextLifecycle
should be replaced in most cases byLifecycle
- In Flink engine
MetricsProvider
(obtained withEngineRuntimeContext
) should be used in most places instead ofMetricUtils
- #2486
Context.withInitialId
is deprecated now - useEngineRuntimeContext.contextIdGenerator
instead.EngineRuntimeContext
can be accessible viaFlinkCustomNodeContext.convertToEngineRuntimeContext
- #2377 #2534 Removed
clazz
fromSourceFactory
. Remove generic parameter fromSource
andSourceFactory
. Return type of source should be returned either by:returnType
field of@MethodToInvoke
ContextTransformation
APIGenericNodeTransformer
APISourceFactory.noParam
- #2453 Custom actions for
PeriodicDeploymentManager
now can be defined and implemented outside this class, inPeriodicCustomActionsProvider
created byPeriodicCustomActionsProviderFactory
. If you do not need them, just passPeriodicCustomActionsProviderFactory.noOp
to object'sPeriodicDeploymentManager
factory method. - #2501
nussknacker-baseengine-components
module renamed tonussknacker-lite-base-components
- #2221 ReflectUtils
fixedClassSimpleNameWithoutParentModule
renamed tosimpleNameWithoutSuffix
- #2495 TypeSpecificDataInitializer trait change to TypeSpecificDataInitializ
- 2245
FailedEvent
has been specified inFailedOnDeployEvent
andFailedOnRunEvent
In version 1.0.0
- #1439 #2090 Upgrade do Flink 1.13.
setTimeCharacteristic
is deprecated, and should be handled automatically by Flink.UserClassLoader
was removed, use appropriate Flink objects or context ClassLoader.- RocksDB configuration is turned on by
rocksdb.enable
instead ofrocksdb.checkpointDataUri
which is not used now.
- #2133 SQL Variable is hidden in generic model, please look at comment in
defaultModelConfig.conf
- #2152
schedulePropertyExtractor
parameter ofPeriodicDeploymentManagerProvider
was changed to a factory, replace with a lambda creating the original property extractor. - #2159
useTypingResultTypeInformation
option is now enabled by default - #2108 Changes in
ClassExtractionSettings
:- Refactor of classes defining extraction rules,
TypedClass
has privateapply
method, please useTyped.typedClass
- Fewer classes/methods are accessible in SpEL, in particular Scala collections, internal time API, methods returning or having parameters from excluded classes
- Changes in
OAuth2
security components:- refactoring of
OpenIdConnectService
, now it's namedGenericOidcService
(it's best to useOidcService
, which can handle most of the configuration automatically)
- refactoring of
- New security settings, in particular new flags in
ExpressionConfig
:strictMethodsChecking
staticMethodInvocationsChecking
methodExecutionForUnknownAllowed
dynamicPropertyAccessAllowed
spelExpressionExcludeList
- #2101 Global permissions can be arbitrary string, for admin user it's not necessary to return global permissions
- #2182 To avoid classloader leaks during SQL
DriverManager
registration, HSQLDB (used e.g. for SQL Variable) is no longer included in model jars, it should be added in Flinklib
dir
In version 0.4.0
-
#1479
ProcessId
andVersionId
moved to API included inProcessVersion
, remove spuriousProcessId
andProcessVersionId
in restmodel. -
#1422 Removed
ServiceReturningType
andWithExplicitMethod
, useEagerServiceWithStaticParameters
,EnricherContextTransformation
orSingleInputGenericNodeTransformation
-
#1845
AuthenticatorData
has been renamed toAuthenticationResources
and changed into a trait,apply
construction has been preserved.AuthenticatorFactory
and itscreateAuthenticator
method has been renamed toAuthenticationProvider
andcreateAuthenticationResources
. It is recommended to rename the main class of any custom authentication module to<Something>AuthenticationProvider
accordingly. -
#1542
KafkaConfig
now has new parametertopicsExistenceValidationConfig
. WhentopicsExistenceValidationConfig.enabled = true
Kafka sources/sinks do not validate if provided topic does not exist and cluster is configured withauto.create.topics.enable=false
-
#1416
OAuth2Service
has changed. You can still use your old implementation by importingOAuth2OldService
with an alias.OAuth2ServiceFactory.create
method now accepts implicit parameters for anExecutionContext
andsttp.HttpBackend
. You can ignore them to maintain previous behaviour, but it is always better to use them instead of locally defined ones. -
#1346
AggregatorFunction
now takes type of stored state that can beimmutable.SortedMap
(previous behaviour) orjava.util.Map
(using Flink's serialization) andvalidatedStoredType
parameter for providing betterTypeInformation
for aggregated values -
#1343
FirstAggregator
changed serialized state, it is not compatible,Aggregator
trait has new methodcomputeStoredType
-
#1352 and #1568 AvroStringSettings class has been introduced, which allows control whether Avro type
string
is represented byjava.lang.String
(also in runtime) orjava.lang.CharSequence
(implemented in runtime byorg.apache.avro.util.Utf8
). This setting is available through environment variableAVRO_USE_STRING_FOR_STRING_TYPE
- default istrue
. Please mind that this setting is global - it applies to all processes running on Flink and also requires restarting TaskManager when changing the value. -
#1361 Lazy variables are removed, you should use standard enrichers for those cases. Their handling has been source of many problems and they made it harder to reason about the exeuction of process.
-
#1373 Creating
ClassLoaderModelData
directly is not allowed, useModelData.apply
with plain config, wrapping with ModelConfigToLoad by yourself is not needed. -
#1406
ServiceReturningType
is deprecated in favour ofEagerService
-
#1445
RecordFormatter
now handlesTestDataSplit
for Kafka sources. It is required inKafkaSource
creation, instead ofTestDataSplit
-
#1433 Pass DeploymentData to process via JobData, additional parameters to deployment methods are needed. Separate
ExternalDeploymentId
fromDeploymentId
(generated by NK) -
#1466
ProcessManager.deploy
can returnExternalDeploymentId
-
- Slight change of API of
StringKeyedValueMapper
- Change of semantics of some parameters of
AggregatorFunction
,AggregatorFunctionMixin
(storedAggregateType becomes aggregateElementType)
- Slight change of API of
-
#1405 'KafkaAvroSink' requires more generic 'AvroSinkValue' as value parameter
-
- Change of
FlinkSource
API: sourceStream produces stream of initializedContext
(DataStream[Context]
) This initialization step was previously performed withinFlinkProcessRegistrar.registerSourcePart
. Now it happens explicitly within the flink source. FlinkIntermediateRawSource
is used as an extension to flink sources, it prepares source with typical stream transformations (add source function, set uid, assign timestamp, initializeContext
)FlinkContextInitializer
is used to initializeContext
. It provides map function that transforms raw event (produced by flink source function) intoContext
variable. Default implementation ofFlinkContextInitializer
, seeBasicFlinkContextInitializer
, sets raw event value to singe "input" variable.- For sources based on
GenericNodeTransformation
it allows to initializeContext
with more than one variable. Default implementation of initializer, seeBasicFlinkGenericContextInitializer
, provides default definition of variables as aValidationContext
with single "input" variable. The implementation requires to provide separately the definition of "input" variable type (TypingResult
). SeeGenericSourceWithCustomVariablesSample
. - To enable "test source" functionality, a source needs to be extended with
SourceTestSupport
. - For flink sources that use
TestDataParserProvider
switch toFlinkSourceTestSupport
(which is used to provide "test source" functionality for flink sources). - Old
TestDataParserProvider
is renamed toSourceTestSupport
- To enable test data generator for "test source" , a source needs to be extended with both
SourceTestSupport
andTestDataGenerator
. What was related to "test source" functionality and was obsolete inFlinkSource
now is excluded toFlinkSourceTestSupport
. FlinkCustomNodeContext
has access toTypeInformationDetection
, it allows to get TypeInformation for the node stream mapping from ValidationContext.- For kafka sources
RecordFormatter
parses raw test data toConsumerRecord
which fits into deserializer (instead ofProducerRecord
that required another transformation). - Definitions of names of common
Context
variables are moved toVariableConstants
(instead ofInterpreter
).
- Change of
-
#1497 Changes in
PeriodicProcessManager
, changePeriodicProperty
toScheduleProperty
-
- trait
KafkaAvroDeserializationSchemaFactory
uses both key and value ClassTags and schemas (instead of value-only), check the order of parameters. - ClassTag is provided in params in avro key-value deserialization schema factory:
KafkaAvroKeyValueDeserializationSchemaFactory
BaseKafkaAvroSourceFactory
is able to read both key and value schema determiner to build proper DeserializationSchema (support for keys is not fully introduced in this change)
- trait
-
#1514
ExecutionConfigPreparer
has different method parameter -JobData
, which has more info than previous parameters -
#1532
TypedObjectTypingResult#fields
uses nowscala.collection.immutable.ListMap
to keep fields order -
#1546
StandaloneCustomTransformer
now takes a list ofContext
objects, to process them in one go -
#1557 Some classes from standalone engine were moved to standalone api to remove engine to (model) utils dependency:
StandaloneContext
,StandaloneContextLifecycle
,MetricsProvider
-
#1558
FlinkProcessRegistrar
takes configuration directly fromFlinkProcessCompiler
(this can affect some tests setup) -
#1631 Introduction of
nussknacker.config.locations
property, drop use of standardconfig.file
property. Model configuration no longer has direct access to root UI config. -
- Replaced
KafkaSourceFactory
with source based onGenericNodeTransformation
, which gives access to setup ofValidationContext
andContext
initialization. To migrateKafkaSourceFactory
:- provide deserializer factory (source factory requires deserialization to
ConsumerRecord
):- use
ConsumerRecordDeserializationSchemaFactory
with currentDeserializationSchema
as a value deserializer, add key deserializer (e.g. org.apache.kafka.common.serialization.StringDeserializer) - or use
FixedValueDeserializationSchemaFactory
with simple key-as-string deserializer
- use
- provide RecordFormatterFactory
- use
ConsumerRecordToJsonFormatterFactory
for whole key-value-and-metadata serialization - or, for value-only-and-without-metadata scenario, you can use current
RecordFormater
wrapped inFixedRecordFormatterFactoryWrapper
- use
- provide timestampAssigner that is able to extract time from
ConsumerRecord[K, V]
- provide deserializer factory (source factory requires deserialization to
- Removed
BaseKafkaSourceFactory
with multiple topics support: useKafkaSourceFactory
instead, see "source with two input topics" test case - Removed
SingleTopicKafkaSourceFactory
: useKafkaSourceFactory
with customprepareInitialParameters
,contextTransformation
andextractTopics
to alter parameter list and provide constant topic value. TypingResultAwareTypeInformationCustomisation
is moved to package pl.touk.nussknacker.engine.flink.api.typeinformation
Example of source with value-only deserialization and custom timestampAssigner:
// provide new deserializer factory with old schema definition for event's value
val oldSchema = new EspDeserializationSchema[SampleValue](bytes => io.circe.parser.decode[SampleValue](new String(bytes)).right.get)
val schemaFactory: KafkaDeserializationSchemaFactory[ConsumerRecord[String, SampleValue]] = new FixedValueDeserializationSchemaFactory(oldSchema)
// ... provide timestampAssigner that extracts timestamp from SampleValue.customTimestampField
// ... or use event's metadata: record.timestamp()
def timestampExtractor(record: ConsumerRecord[String, SampleValue]): Long = record.value().customTimestampField
val watermarkHandler = StandardTimestampWatermarkHandler.boundedOutOfOrderness[ConsumerRecord[String, SampleValue]](timestampExtractor, java.time.Duration.ofMinutes(10L))
val timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[String, SampleValue]]] = Some(watermarkHandler)
// ... provide RecordFormatterFactory that allows to generate and parse test data with key, headers and other metadata
val formatterFactory: RecordFormatterFactory = new ConsumerRecordToJsonFormatterFactory[String, SampleValue]
// ... and finally
val sourceFactory = new KafkaSourceFactory[String, SampleValue](schemaFactory, timestampAssigner, formatterFactory, dummyProcessObjectDependencies) - Replaced
-
#1651
KafkaAvroSourceFactory
provides additional #inputMeta variable with event's metadata.- That source now has key and value type parameters. That parameters are relevant for sources handling
SpecificRecord
s. ForGenericRecords
use explicitlyKafkaAvroSourceFactory[Any, Any]
. SpecificRecordKafkaAvroSourceFactory
extends wholeKafkaAvroSourceFactory
with context validation and initialization- New flag in
KafkaConfig
:useStringForKey
determines if event's key should be intepreted as ordinary String (which is default scenario). It is used in deserialization and for generating/parsing test data. SchemaRegistryProvider
now provides factories to produce SchemaRegistryClient and RecordFormatter.- For
ConfluentSchemaRegistryProvider
KafkaConfig and ProcessObjectDependencies (that contains KafkaConfig data) are no longer required. That configuration is required by factories in the moment the creation of requested objects that happens inKafkaAvroSourceFactory
(and that makes that all objects withinKafkaAvroSourceFactory
see the same kafka configuration). - Removed:
BaseKafkaAvroSourceFactory
, the class is incorporated intoKafkaAvroSourceFactory
to provide elastic approach to create KafkaSourcewith ReturningType
for generic types (this is defined by ValidationContext, see alsoKafkaContextInitializer
that allows to return more than one variable)KafkaAvroValueDeserializationSchemaFactory
(source requires deserialization toConsumerRecord[K, V]
, there are only deserializers based onKafkaAvroKeyValueDeserializationSchemaFactory
)ConfluentKafkaAvroDeserializationSchemaFactory
, useConfluentKeyValueKafkaAvroDeserializationFactory
TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory
, this approach is deprecated due to #inputMeta variable that contains key data
To migrate
KafkaAvroSourceFactory
:- Provide
KafkaConfig
with correctuseStringForKey
flag value. By default we want to EvictableStatehandle keys as ordinary String and all topics related to such config require only value schema definitions (key schemas are ignored). For specific scenario, when complex key with its own schema is provided, this flag is false and all topics related to this config require both key and value schema definitions. Example of default KafkaConfig override:override protected def prepareKafkaConfig: KafkaConfig = super.prepareKafkaConfig.copy(useStringForKey = false)
- provide your own
SchemaRegistryProvider
(or useConfluentSchemaRegistryProvider
) - custom RecordFormatter can be wrapped in
FixedRecordFormatterFactoryWrapper
(or keepConfluentAvroToJsonFormatterFactory
) - provide timestampAssigner that is able to extract time from
ConsumerRecord[K, V]
(see example above)
- That source now has key and value type parameters. That parameters are relevant for sources handling
-
#1741 Minor changes in
KafkaUtils
,NonTransientException
usesInstant
instead ofLocalDateTime
-
#1806 Remove old, deprecated API:
EvictableState
,RichEvictableState
- useEvictableStateFunction
checkpointInterval
- usecheckpointConfig.checkpointInterval
- old versions of
sampleTransformers
- use newer ones MiniClusterExecutionEnvironment.runningJobs()
- useflinkMiniClusterHolder.runningJobs()
-
#1807 Removed
jdbcServer
, please use Postgres for production-ready setups -
- RecordFormatterFactory instead of one, uses two type parameters: K, V
- ConfluentAvroToJsonFormatter is produced by ConfluentAvroToJsonFormatterFactory
- ConfluentAvroToJsonFormatter produces test data in valid json format, does not use Separator
- ConfluentAvroMessageFormatter has asJson method instead of writeTo
- ConfluentAvroMessageReader has readJson method instead of readMessage Example test data object:
{"keySchemaId":null,"valueSchemaId":1,"consumerRecord":{"key":null,"value":{"first":"Jan","last":"Kowalski"},"topic":"testAvroRecordTopic1","partition":0,"offset":0,"timestamp":1624279687756,"timestampType":"CreateTime","headers":{},"leaderEpoch":0}}
-
#1663 Default
FlinkExceptionHandler
implementations are deprecated, useConfigurableExceptionHandler
instead. -
#1731 RockDB config's flag
incrementalCheckpoints
is turned on by default. -
#1825 Default dashboard renamed from
flink-esp
tonussknacker-scenario
-
#1836 Change default
kafka.consumerGroupNamingStrategy
toprocessId-nodeId
. -
#1357 Run mode added to nodes.
ServiceInvoker
interface was extended with new, implicitrunMode
parameter. -
#1836 Change default
kafka.consumerGroupNamingStrategy
toprocessId-nodeId
. -
#1886 aggregate-sliding with emitWhenEventLeft = true, aggregate-tumbling and aggregate-session components now doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted). If you want to emit some information other than aggregated value and key (availabled via new
#key
variable), you should use#AGG.map
expression inaggregateBy
. -
#1910
processTypes
renamed toscenarioTypes
. You can still use oldprocessTypes
configuration. Old configuration will be removed in version0.5.0
. -
Various naming changes:
In version 0.3.0
-
#1313 Kafka Avro API passes
KafkaConfig
duringTypeInformation
determining -
#1305 Kafka Avro API passes
RuntimeSchemaData
instead ofSchema
in various places -
#1304
SerializerWithSpecifiedClass
was moved toflink-api
module. -
#1044 Upgrade to Flink 1.11. Current watermark/timestamp mechanisms are deprectated in Flink 1.11, new API
TimestampWatermarkHandler
is introduced, withLegacyTimestampWatermarkHandler
as wrapper for previous mechanisms. -
#1244
Parameter
has new parameter 'variablesToHide' withSet
of variable names that will be hidden before parameter's evaluation -
#1159 #1170 Changes in
GenericNodeTransformation
API:- Now
implementation
takes additional parameter with final state value determined duringcontextTransformation
DefinedLazyParameter
andDefinedEagerParameter
holdsexpression: TypedExpression
instead ofreturnType: TypingResult
DefinedLazyBranchParameter
andDefinedEagerBranchParameter
holdsexpressionByBranchId: Map[String, TypedExpression]
instead ofreturnTypeByBranchId: Map[String, TypingResult]
- Now
-
- Now
SimpleSlidingAggregateTransformerV2
andSlidingAggregateTransformer
is deprecated in favour ofSlidingAggregateTransformerV2
- Now
SimpleTumblingAggregateTransformer
is deprecated in favour ofTumblingAggregateTransformer
- Now
SumAggregator
,MaxAggregator
andMinAggregator
doesn't change type of aggregated value (previously was changed to Double) - Now
SumAggregator
,MaxAggregator
andMinAggregator
return null instead of0D
/Double.MaxValue
/Double.MinValue
for case when there was no element added beforegetResult
- Now
-
#1149 FlinkProcessRegistrar refactor (can affect test code)
-
#1166
model.conf
should be renamed todefaultModelConfig.conf
-
#1218 FlinkProcessManager is no longer bundled in ui uber-jar. In docker/tgz distribution
-
#1255 Moved displaying
Metrics tab
tocustomTabs
-
#1257 Improvements: Flink test util package
- Added methods:
cancelJob
,submitJob
,listJobs
,runningJobs
toFlinkMiniClusterHolder
- Deprecated:
runningJobs
, fromMiniClusterExecutionEnvironment
- Removed:
getClusterClient
fromFlinkMiniClusterHolder
interface, because of flink compatibility at Flink 1.9 - Renamed:
FlinkStreamingProcessRegistrar
toFlinkProcessManager
- Added methods:
-
#1303 TypedObjectTypingResult has a new field: additionalInfo
In version 0.2.0
-
#1104 Creation of
FlinkMiniCluster
is now extracted fromStoppableExecutionEnvironment
. You should create it using e.g.:val flinkMiniClusterHolder = FlinkMiniClusterHolder(FlinkTestConfiguration.configuration(parallelism))
flinkMiniClusterHolder.start()and then create environment using:
flinkMiniClusterHolder.createExecutionEnvironment()
. At the end you should cleanup
flinkMiniClusterHolder
by:flinkMiniClusterHolder.stop()
.
FlinkMiniClusterHolder
should be created once for test class - it is thread safe and resource expensive.MiniClusterExecutionEnvironment
in the other hand should be created for each process. It is not thread safe because underlyingStreamExecutionEnvironment
is not. You can useFlinkSpec
to achieve that. -
pl.touk.nussknacker.engine.queryablestate.QueryableClient
was moved fromqueryableState
module topl.touk.nussknacker.engine.api.queryablestate
package inapi
modulepl.touk.nussknacker.engine.queryablestate.QueryableState
was moved topl.touk.nussknacker.engine.api.queryablestate
- CustomTransformers from
pl.touk.nussknacker.engine.flink.util.transformer
inflinkUtil
module were moved to newflinkModelUtil
module. pl.touk.nussknacker.engine.testing.EmptyProcessConfigCreator
was moved frominterpreter
module topl.touk.nussknacker.engine.util.process
package inutil
module
-
#1039 Generic parameter of
LazyParameter[T]
has upper bound AnyRef now to avoid problems with bad type extraction. It caused changesAny
toAnyRef
in a few places - mainlyFlinkLazyParameterFunctionHelper
andFlinkCustomStreamTransformation
-
#1039
FlinkStreamingProcessRegistrar.apply
has a new parameter of typeExecutionConfigPreparer
. In production code you should passExecutionConfigPreparer.defaultChain()
there and in test code you should passExecutionConfigPreparer.unOptimizedChain()
. See scaladocs for more info. If you already have done some Flink'sExecutionConfig
set up before you've registered process, you should consider create your own chain usingExecutionConfigPreparer.chain()
. -
#1039
FlinkSourceFactory
doesn't takeTypeInformation
type class as a generic parameter now. Instead of this, it takesClassTag
.TypeInformation
is determined during source creation.typeInformation[T]
method was moved fromBasicFlinkSource
toFlinkSource
because still must be some place to determine it for tests purpose. -
#965 'aggregate' node in generic model was renamed to 'aggregate-sliding'
-
#922 HealthCheck API has new structure, naming and json responses:
- old
/healthCheck
is moved to/healthCheck/process/deployment
- old
/sanityCheck
is moved to/healthCheck/process/validation
- top level
/healthCheck
indicates general "app-is-running" state
- old
-
#879 Metrics use variables by default, see docs to enable old mode, suitable for graphite protocol. To use old way of sending:
- put
globalParameters.useLegacyMetrics = true
in each model configuration (to configure metrics sending in Flink) - put:
countsSettings {
user: ...
password: ...
influxUrl: ...
metricsConfig {
nodeCountMetric: "nodeCount.count"
sourceCountMetric: "source.count"
nodeIdTag: "action"
countField: "value"
}
} - put
-
Introduction to KafkaAvro API: #871, #881, #903, #981, #989, #998, #1007, #1014, #1034, #1041
API for KafkaAvroSourceFactory
was changed:
KafkaAvroSourceFactory
old way:
val clientFactory = new SchemaRegistryClientFactory
val source = new KafkaAvroSourceFactory(
new AvroDeserializationSchemaFactory[GenericData.Record](clientFactory, useSpecificAvroReader = false),
clientFactory,
None,
processObjectDependencies = processObjectDependencies
)
KafkaAvroSourceFactory
new way :
val schemaRegistryProvider = ConfluentSchemaRegistryProvider(processObjectDependencies)
val source = new KafkaAvroSourceFactory(schemaRegistryProvider, processObjectDependencies, None)
Provided new API for Kafka Avro Sink:
val kafkaAvroSinkFactory = new KafkaAvroSinkFactory(schemaRegistryProvider, processObjectDependencies)
Additional changes:
- Bump up confluent package to 5.5.0
- (Refactor Kafka API) Moved
KafkaSourceFactory
topl.touk.nussknacker.engine.kafka.sink
package - (Refactor Kafka API) Changed
BaseKafkaSourceFactory
, now it requiresdeserializationSchemaFactory: KafkaDeserializationSchemaFactory[T]
- (Refactor Kafka API) Moved
KafkaSinkFactory
topl.touk.nussknacker.engine.kafka.source
package - (Refactor Kafka API) Renamed
SerializationSchemaFactory
toKafkaSerializationSchemaFactory
- (Refactor Kafka API) Renamed
DeserializationSchemaFactory
toKafkaDeserializationSchemaFactory
- (Refactor Kafka API) Renamed
FixedDeserializationSchemaFactory
toFixedKafkaDeserializationSchemaFactory
- (Refactor Kafka API) Renamed
FixedSerializationSchemaFactory
toFixedKafkaSerializationSchemaFactory
- (Refactor Kafka API) Removed
KafkaSerializationSchemaFactoryBase
- (Refactor Kafka API) Replaced
KafkaKeyValueSerializationSchemaFactoryBase
byKafkaAvroKeyValueSerializationSchemaFactory
(it handles only avro case now) - (Refactor Kafka API) Removed
KafkaDeserializationSchemaFactoryBase
- (Refactor Kafka API) Replaced
KafkaKeyValueDeserializationSchemaFactoryBase
byKafkaAvroKeyValueDeserializationSchemaFactory
(it handles only avro case now) - (Refactor KafkaAvro API) Renamed
AvroDeserializationSchemaFactory
toConfluentKafkaAvroDeserializationSchemaFactory
and moved toavro.schemaregistry.confluent
package - (Refactor KafkaAvro API) Renamed
AvroKeyValueDeserializationSchemaFactory
toConfluentKafkaAvroDeserializationSchemaFactory
and moved toavro.schemaregistry.confluent
package - (Refactor KafkaAvro API) Renamed
AvroSerializationSchemaFactory
toConfluentAvroSerializationSchemaFactory
and moved toavro.schemaregistry.confluent
package - (Refactor KafkaAvro API) Renamed
AvroKeyValueSerializationSchemaFactory
toConfluentAvroKeyValueSerializationSchemaFactory
and moved toavro.schemaregistry.confluent
package - (Refactor KafkaAvro API) Removed
FixedKafkaAvroSourceFactory
andFixedKafkaAvroSinkFactory
(now we don't support fixed schema) - (Refactor Kafka API) Replaced
topics: List[String]
byList[PreparedKafkaTopic]
and removedprocessObjectDependencies
inKafkaSource
Be aware that we are using avro 1.9.2 instead of default Flink's 1.8.2 (for java time logical types conversions purpose).
- #1013 Expression evaluation is synchronous now. It shouldn't cause any problems (all languages were synchronous anyway), but some internal code may have to change.
In version 0.1.2
- #957 Custom node
aggregate
fromgeneric
model has changed parameter fromwindowLengthInSeconds
towindowLength
with human friendly duration input. If you have used it in process, you need to insert correct value again. - #954
TypedMap
is not a case class wrapping scala Map anymore. If you have done some pattern matching on it, you should usecase typedMap: TypedMap => typedMap.asScala
instead.
In version 0.1.1
- #930
DeeplyCheckingExceptionExtractor
was moved fromnussknacker-flink-util
module tonussknacker-util
module. - #919
KafkaSource
constructor now doesn't takeconsumerGroup
. Instead of this it computesconsumerGroup
on their own based onkafka.consumerGroupNamingStrategy
inmodelConfig
(default set toprocessId
). You can also override it byoverriddenConsumerGroup
optional parameter. Regards to this changes,KafkaConfig
has new, optional parameterconsumerGroupNamingStrategy
. - #920
KafkaSource
constructor now takesKafkaConfig
instead of using one that was parsed byBaseKafkaSourceFactory.kafkaConfig
. Also if you parse Typesafe Config toKafkaSource
on your own, now you should use dedicated methodKafkaConfig.parseConfig
to avoid further problems when parsing strategy will be changed. - #914
pl.touk.nussknacker.engine.api.definition.Parameter
has deprecated main factory method withruntimeClass
parameter. Now should be passedisLazyParameter
instead. Also were removedruntimeClass
from variances of factory methods prepared for easy testing (optional
method and so on).
In version 0.1.0
- #755 Default async execution context does not depend on parallelism.
asyncExecutionConfig.parallelismMultiplier
has been deprecated and should be replaced withasyncExecutionConfig.workers
. 8 should be sane default value. - #722 Old way of configuring Flink and model (via
flinkConfig
andprocessConfig
) is removed.processTypes
configuration should be used from now on. Example:becomes:flinkConfig {...}
processConfig {...}processTypes {
"type e.g. streaming" {
deploymentConfig {
type: "flinkStreaming"
PUT HERE PROPERTIES OF flinkConfig FROM OLD CONFIG
}
modelConfig {
classPath: PUT HERE VALUE OF flinkConfig.classPath FROM OLD CONFIG
PUT HERE PROPERTIES OF processConfig FROM OLD CONFIG
}
}
} - #763 Some API traits (ProcessManager, DictRegistry DictQueryService, CountsReporter) now extend
java.lang.AutoCloseable
. - Old way of additional properties configuration should be replaced by the new one, which is now mapped to
Map[String, AdditionalPropertyConfig]
. Example in your config:becomes:additionalFieldsConfig: {
mySelectProperty {
label: "Description"
type: "select"
isRequired: false
values: ["true", "false"]
}
}additionalPropertiesConfig {
mySelectProperty {
label: "Description"
defaultValue: "false"
editor: {
type: "FixedValuesParameterEditor",
possibleValues: [
{"label": "Yes", "expression": "true"},
{"label": "No", "expression": "false"}
]
}
}
} - #588 #882
FlinkSource
API changed, current implementation is nowBasicFlinkSource
- #839 #882
FlinkSink
API changed, current implementation is nowBasicFlinkSink
- #841
ProcessConfigCreator
API changed; note that currently all process objects are invoked withProcessObjectDependencies
as a parameter. The APIs ofKafkaSinkFactory
,KafkaSourceFactory
, and all their implementations were changed.Config
is available as property ofProcessObjectDependencies
instance. - #863
restUrl
indeploymentConfig
need to be preceded with protocol. Host with port only is not allowed anymore. - Rename
grafanaSettings
tometricsSettings
in configuration.
In version 0.0.12
- Upgrade to Flink 1.7
- Refactor of custom transformations, dictionaries, unions, please look at samples in example or generic to see API changes
- Considerable changes to authorization configuration, please look at sample config to see changes
- Circe is now used by default instead of Argonaut, but still can use Argonaut in Displayable
In version 0.0.11
- Changes in CustomStreamTransformer implementation, LazyInterpreter became LazyParameter, please look at samples to see changes in API
In version 0.0.8
- Upgrade to Flink 1.4
- Change of format of Flink cluster configuration
- Parameters of sources and sinks are expressions now - automatic update of DB is available
- Change of configuration of Grafana dashboards
- Custom processes are defined in main configuration file now