Skip to main content
Version: Current

Kafka

Overview

Kafka topics are a common streaming data input to Nussknacker and output from Nussknacker scenarios. This document provides details of Nussknacker's integration with Kafka clusters and Schema Registry and the overview of integration configuration. If you use Nussknacker Cloud, you can also check Step By Step Tutorials for detailed instructions how to configure integration to your chosen provider.

Once configured, the following components will be generated:

  • Kafka Source
  • Kafka Sink
note

The functionality allowing to define integrations using UI rather than config files is available in Cloud Version only. If you use OSS edition, check the Open Source version integrations documentation for details on how to configure integrations.

Prerequisites

To better understand how Nussknacker works with Kafka, it's recommended to read the following first:

Available integrations

Both Cloud and OSS editions support the same set of Kafka compatible brokers. Additionally, Cloud version provides dedicated Integration set up mechanism, which allows configuration of the connection to the selected Kafka providers using UI. If you use OSS edition, consult the configuration documentation for additional guidance.

The following specialized integration set up forms are available in Nussknacker Cloud version:

and

  • generic Kafka - this document

Integration parameters

ParameterDescription -
Integration nameUnique name for the integration
PropertiesProperties in a JSON format (you can paste configuration also in the standard Java properties format)
CA CertificateTrusted certificate authority used to verify Kafka broker's certificate. Can be found in truststore file. Fill only when your Kafka cluster requires CA Certificate
Access KeyPrivate key of client used for mutual TLS authentication. Can be found in keystore file or is supplied by your cloud Kafka provider. Fill only when your Kafka cluster requires access key.
Access CertificatePublic certificate of client used for mutual TLS authentication. Can be found in keystore file or is supplied by your cloud Kafka provider. Fill only when your Kafka cluster requires access certificate.

Schemas and Schema Registry

Schema defines the format of data. Nussknacker expects that messages in topics are described by the schema. Nussknacker uses information contained in schemas for code completion and validation of messages. Schema of message can be described in Avro Schema format or JSON Schema format (Confluent Schema Registry only)

Schemas are managed by Schema Registry - Confluent Schema Registry and Azure Schema Registry are supported.

To preview schemas or add a new version, you can use tools available on your cloud platform or tools like AKHQ.

If you use plain Kafka integration, you may choose not to provide Schema Registry connection information. In such a case, Nussknacker will not be able to help with field names and data type related validation of SpEL expressions. In such a case, if Kafka messages are transmitted in plain text and are in the JSON format, you can still use dynamic field access to access fields of the Kafka messages.

Association between schema and topic

To properly present information about topics and version and to recognize which schema is assigned to version, Nussknacker follows conventions:

  • For Confluent-based implementation it uses TopicNameStrategy for subject names. It means that it looks for schemas available at <topic-name>-(key or value) subject. For example for topic transactions, it looks for schemas at transactions-key subject for key and transactions-value subject for value
  • In the Azure Schema Registry, subject concept doesn't exist - schemas are grouped by the same schema name. Because of that, Nussknacker introduces its own convention for association between schema and topic: schema name should be in format: CamelCasedTopicNameKey for keys and CamelCasedTopicNameValue for values. For example for input-events (or input.events) topic, schema name should be named InputEventsKey for key or InputEventsValue for value. Be aware that it may require change of schema name not only in Azure portal but also inside schema content - those names should be the same to make serialization works correctly

Message payload

By default, Nussknacker supports two combinations of schema type and payload type:

  • Avro schema + Avro payload (binary format)
  • JSON schema + JSON payload (human readable, text format)

Avro payloads are more concise, because messages contain only values and schema id - without information about message structure like field names.

Avro payload is compatible with standard Kafka serializers and deserializers delivered by Schema Registry providers. Thanks to that you should be able to send messages to Nussknacker and read messages produced by Nussknacker using standard tooling available around those Schema Registries. To see how those formats are different, take a look at Schema Registry comparison section

For some situations it might be helpful to use JSON payload with Avro schema. Especially when your Schema Registry doesn't support JSON schemas. You can do that by enabling avroAsJsonSerialization configuration setting.

Schema ID

Each topic can contain messages written using different schema versions. Schema versions are identified by Schema ID. Nussknacker needs to know what was the schema used during writing to make message validation and schema evolution possible. Because of that Nussknacker needs to extract Schema ID from the message.

Additionally, in sources and sinks, you can choose which schema version should be used during reading/writing. Thanks to schema evolution mechanism, message in the original format will be evolved to desired format. This desired schema will be used in code completion and validation.

At runtime Nussknacker determines the schema version of a message value and key in the following way:

  1. It checks in key.schemaId, value.schemaId and Azure-specific content-type headers;
  2. If no such headers provided, it looks for the magic byte (0x00) and a schema id in the message, in a format used by Confluent;
  3. If the magic byte is not found, it assumes the schema version chosen by the user in the scenario.

Kafka integration configuration

Properties

Here, you should provide Kafka properties required by:

to connect to your Kafka cluster, and by

to connect to Schema Registry server.

Configuration should be provided in JSON format.

To configure connection to Kafka, you need to configure at least bootstrap.servers property. It should contain comma separated list of urls to Kafka brokers.

Frequently used Kafka configuration options

The table below contains a list of commonly used Kafka configuration options. Please note the the list is not intended to be complete and that some of the configuration options are specific to Kafka providers - Aiven, Azure, Confluent.

NameImportanceTypeDefault valueDescription
bootstrap.serversHighstringComma separated list of bootstrap servers
schema.registry.urlHighstringComma separated list of schema registry urls
basic.auth.credentials.sourceHighstring(Confluent-only) Source of credential e.g. USER_INFO
basic.auth.user.infoHighstring(Confluent-only) User and password e.g. some_user:some_password
schema.groupHighstring(Azure-only) Schema group with all available schemas
azure.tenant.idHighstring(Azure-only) Azure's tenant id
azure.client.idHighstring(Azure-only) Azure's client id
azure.client.secretHighstring(Azure-only) Azure's client secret
transaction.timeout.msMediumnumber600000Transaction timeout in millis for transactional producer [transaction timeout](https://kafka.apache.org/documentation/#producerconfigs_transaction.timeout ms)
isolation.levelHighstringControls how to read messages written transactionally. isolation.level

Currently, Nussknacker supports two implementations of Schema Registries: based on Confluent Schema Registry and based on Azure Schema Registry.

To configure connection to Schema Registry, you need to configure at least schema.registry.url. It should contain comma separated list of urls to Schema Registry. For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with https:// or http://.

It's possible to use Kafka without schema registry, in this case you should not provide schema.registry.url property. Without schema registry you can use only json kafka topics. Values read from it will be typed to Unknown.

Nussknacker determines which registry implementation (Confluent or Azure) is used from the schema.registry.url property. If the URL ends with .servicebus.windows.net, Nussknacker assumes that Azure schema registry is used; if not Confluent schema registry is assumed.

Confluent-based schema registry - connection and authentication

For Confluent-based implementation you should provide at least schema.registry.url. If your schema registry is secured by user and password, you should additionally provide "basic.auth.credentials.source": USER_INFO and "basic.auth.user.info": "some_user:some_password" entries. To read more see Schema registry documentation

To make sure if your configuration is correct, you can test it with kafka-avro-console-consumer, kafka-avro-console-producer available in Confluent Schema Registry distribution. After you'll get properly working set of properties, you just need to copy it to Nussknacker's configuration.

Azure-based schema registry - connection and authentication

For Azure-based implementation, firstly you should provide schema.registry.url and schema.group properties. First one should be the https://<event-hubs-namespace>.servicebus.windows.net url, the second one should be the name of schema groups where will be located all schemas used by Nussknacker.

Regarding authentication, a couple of options can be used - you can provide credential via: azure.tenant.id, azure.client.id and azure.client.secret properties, or you can use one of other methods handled by Azure's DefaultAzureCredential. For example via Azure CLI or Azure PowerShell.


Schema registries comparison

Below you can find a quick comparison of how given schema registry types are handled:

Schema registry typeWhat is used for association
between schema with topic
ConventionThe way how schema id is passed in messagePayload content
ConfluentSubjectsSubject = <topic-name>-(key or value)For Avro: in payload in format: 0x00, 4 bytes schema id, Avro payload0x00, 4 bytes schema id, Avro payload
For JSON: in key.schemaId or value.schemaId headersJSON payload
AzureSchema namesSchema name = <CamelCasedTopicName>(Key or Value)For Avro: In header: content-type: avro/binary+schemaIdAvro payload
For JSON: in key.schemaId or value.schemaId headers
(only when avroAsJsonSerialization option enabled)
JSON payload