Skip to main content
Version: Current

Kafka

Kafka topics are a common streaming data input to Nussknacker and output from Nussknacker scenarios. This document provides details of Nussknacker's integration with Kafka clusters and Schema Registry and the overview of integration configuration. You can also check Step By Step Tutorials for detailed instructions how to configure integration to your chosen provider.

Once configured, the following components will be generated:

  • Kafka Source
  • Kafka Sink

Prerequisites

To better understand how Nussknacker works with Kafka, it's recommended to read the following first:

Available integrations

Nussknacker integrates with several Kafka and Kafka compatible providers; follow the links for step by step instructions how to configure integration for your provider:

TLDR - minimal integration (Kafka) properties for most use cases

In most self-managed Kafka deployments, especially when running outside of cloud-managed services, you only need to configure a small set of core parameters to get up and running reliably. This section highlights the essential settings that cover around 80% of common use cases—without getting lost in the hundreds of available options. If the minimal Kafka configuration is not sufficient, refer to the the Configuration section.

{
"bootstrap.servers": "kafka.some.domain-1:9092,kafka.some.domain-2:9092,kafka.some.domain-3:9092",
"security.protocol": "SASL_SSL",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username = \"username\" password=\"password\";",
"sasl.mechanism": "PLAIN",
"ssl.truststore.type": "PEM",
"ssl.truststore.location": "/certs/client.truststore",
"schema.registry.url": "https://some.schema.registry.host:8080",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "usernameToSR:passwordToSR"
}

Schemas and Schema Registry

Schema defines the format of data. Nussknacker expects that messages in topics are described by the schema. Nussknacker uses information contained in schemas for code completion and validation of messages. Schema of message can be described in Avro Schema format or JSON Schema format (Confluent Schema Registry only)

Schemas are managed by Schema Registry - Confluent Schema Registry and Azure Schema Registry are supported.

To preview schemas or add a new version, you can use tools available on your cloud platform or tools like AKHQ.

If you use plain Kafka integration, you may choose not to provide Schema Registry connection information. In such a case, Nussknacker will not be able to help with field names and data type related validation of SpEL expressions. In such a case, if Kafka messages are transmitted in plain text and are in the JSON format, you can still use dynamic field access to access fields of the Kafka messages.

Association between schema and topic

To properly present information about topics and version and to recognize which schema is assigned to version, Nussknacker follow conventions:

  • For Confluent-based implementation it uses TopicNameStrategy for subject names. It means that it looks for schemas available at <topic-name>-(key or value) subject. For example for topic transactions, it looks for schemas at transactions-key subject for key and transactions-value subject for value
  • In the Azure Schema Registry, subject concept doesn't exist - schemas are grouped by the same schema name. Because of that, Nussknacker introduces own convention for association between schema and topic: schema name should be in format: CamelCasedTopicNameKey for keys and CamelCasedTopicNameValue for values. For example for input-events (or input.events) topic, schema name should be named InputEventsKey for key or InputEventsValue for value. Be aware that it may require change of schema name not only in Azure portal but also inside schema content - those names should be the same to make serialization works correctly

Message payload

By default, Nussknacker supports two combinations of schema type and payload type:

  • Avro schema + Avro payload (binary format)
  • JSON schema + JSON payload (human readable, text format)

Avro payloads are more concise, because messages contain only values and schema id - without information about message structure like field names.

Avro payload is compatible with standard Kafka serializers and deserializers delivered by Schema Registry providers. Thanks to that you should be able to send messages to Nussknacker and read messages produced by Nussknacker using standard tooling available around those Schema Registries. To see how those formats are different, take a look at Schema Registry comparison section

For some situations it might be helpful to use JSON payload with Avro schema. Especially when your Schema Registry doesn't support JSON schemas. You can do that by enabling avroAsJsonSerialization configuration setting.

Schema ID

Each topic can contain messages written using different schema versions. Schema versions are identified by Schema ID. Nussknacker needs to know what was the schema used during writing to make message validation and schema evolution possible. Because of that Nussknacker needs to extract Schema ID from the message.

Additionally, in sources and sinks, you can choose which schema version should be used during reading/writing. Thanks to schema evolution mechanism, message in the original format will be evolved to desired format. This desired schema will be used in code completion and validation.

At runtime Nussknacker determines the schema version of a message value and key in the following way:

  1. It checks in key.schemaId, value.schemaId and Azure-specific content-type headers;
  2. If no such headers provided, it looks for the magic byte (0x00) and a schema id in the message, in a format used by Confluent;
  3. If the magic byte is not found, it assumes the schema version chosen by the user in the scenario.

Kafka integration configuration

Properties

Here, you should provide Kafka properties required by:

to connect to your Kafka cluster, and by

to connect to Schema Registry server.

Configuration should be provided in JSON format.

To configure connection to Kafka, you need to configure at least bootstrap.servers property. It should contain comma separated list of urls to Kafka brokers.

Authentication

Kafka cluster has multiple options to configure Authentication. Take a look at Kafka security documentation to see detailed examples how those options should be translated into properties. F

Keystore

If your Kafka cluster is configured to use TLS/SSL encryption and therefore requires a keystore to connect, here you can paste it in PEM format. Please remember to include both key and certificates (see e.g. this blog for the details)

At runtime keystore file will be available in /certs/client.keystore location, so in Properties config, set it at key ssl.keystore.location.

If your Kafka configuration does not require keystore leave this field empty.

Truststore

If your Kafka cluster requires a truststore to connect, here you can provide the truststore file in PEM format.

At runtime the truststore file will be available in /certs/client.truststore location, so in Properties config, set it at key ssl.truststore.location.

Frequently used Kafka configuration options

The table below contains a list of commonly used Kafka configuration options. Please note the the list is not intended to be complete and that some of the configuration options are specific to Kafka providers - Aiven, Azure, Confluent.

NameImportanceTypeDefault valueDescription
bootstrap.serversHighstringComma separated list of bootstrap servers
schema.registry.urlHighstringComma separated list of schema registry urls
basic.auth.credentials.sourceHighstring(Confluent-only) Source of credential e.g. USER_INFO
basic.auth.user.infoHighstring(Confluent-only) User and password e.g. some_user:some_password
schema.groupHighstring(Azure-only) Schema group with all available schemas
azure.tenant.idHighstring(Azure-only) Azure's tenant id
azure.client.idHighstring(Azure-only) Azure's client id
azure.client.secretHighstring(Azure-only) Azure's client secret
transaction.timeout.msMediumnumber600000Transaction timeout in millis for transactional producer [transaction timeout](https://kafka.apache.org/documentation/#producerconfigs_transaction.timeout ms)
isolation.levelHighstringControls how to read messages written transactionally. isolation.level

Currently, Nussknacker supports two implementations of Schema Registries: based on Confluent Schema Registry and based on Azure Schema Registry.

To configure connection Schema Registry, you need to configure at least schema.registry.url. It should contain comma separated list of urls to Schema Registry. For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with https:// or http://.

It's possible to use Kafka without schema registry, in this case You should not provide schema.registry.url property. Without schema registry you can use only json kafka topics. Values read from it will be typed to Unknown.

Nussknacker determines which registry implementation (Confluent or Azure) is used from the schema.registry.url property. If the URL ends with .servicebus.windows.net, Nussknacker assumes that Azure schema registry is used; if not Confluent schema registry is assumed.

Confluent-based schema registry - connection and authentication

For Confluent-based implementation you should provide at least schema.registry.url. If your schema registry is secured by user and password, you should additionally provide "basic.auth.credentials.source": USER_INFO and "basic.auth.user.info": "some_user:some_password" entries. To read more see Schema registry documentation

To make sure if your configuration is correct, you can test it with kafka-avro-console-consumer, kafka-avro-console-producer available in Confluent Schema Registry distribution. After you'll get properly working set of properties, you just need to copy it to Nussknacker's configuration.

Azure-based schema registry - connection and authentication

For Azure-based implementation, firstly you should provide schema.registry.url and schema.group properties. First one should be the https://<event-hubs-namespace>.servicebus.windows.net url, the second one should be the name of schema groups where will be located all schemas used by Nussknacker.

Regarding authentication, a couple of options can be used - you can provide credential via: azure.tenant.id, azure.client.id and azure.client.secret properties, or you can use one of other methods handled by Azure's DefaultAzureCredential. For example via Azure CLI or Azure PowerShell.


Schema registries comparison

Below you can find a quick comparison of how given schema registry types are handled:

Schema registry typeWhat is used for association
between schema with topic
ConventionThe way how schema id is passed in messagePayload content
ConfluentSubjectsSubject = <topic-name>-(key or value)For Avro: in payload in format: 0x00, 4 bytes schema id, Avro payload0x00, 4 bytes schema id, Avro payload
For JSON: in key.schemaId or value.schemaId headersJSON payload
AzureSchema namesSchema name = <CamelCasedTopicName>(Key or Value)For Avro: In header: content-type: avro/binary+schemaIdAvro payload
For JSON: in key.schemaId or value.schemaId headers
(only when avroAsJsonSerialization option enabled)
JSON payload