Kafka
Kafka topics are a common streaming data input to Nussknacker and output from Nussknacker scenarios. This document provides details of Nussknacker's integration with Kafka clusters and Schema Registry and the overview of integration configuration. You can also check Step By Step Tutorials for detailed instructions how to configure integration to your chosen provider.
Once configured, the following components will be generated:
- Kafka Source
- Kafka Sink
Prerequisites
To better understand how Nussknacker works with Kafka, it's recommended to read the following first:
Available integrations
Nussknacker integrates with several Kafka and Kafka compatible providers; follow the links for step by step instructions how to configure integration for your provider:
- Aiven
- Azure Event Hubs
- Confluent Cloud
- Kafka (self hosted or generic implementation) - this document
- Redpanda
TLDR - minimal integration (Kafka) properties for most use cases
In most self-managed Kafka deployments, especially when running outside of cloud-managed services, you only need to configure a small set of core parameters to get up and running reliably. This section highlights the essential settings that cover around 80% of common use cases—without getting lost in the hundreds of available options. If the minimal Kafka configuration is not sufficient, refer to the the Configuration section.
{
"bootstrap.servers": "kafka.some.domain-1:9092,kafka.some.domain-2:9092,kafka.some.domain-3:9092",
"security.protocol": "SASL_SSL",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username = \"username\" password=\"password\";",
"sasl.mechanism": "PLAIN",
"ssl.truststore.type": "PEM",
"ssl.truststore.location": "/certs/client.truststore",
"schema.registry.url": "https://some.schema.registry.host:8080",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "usernameToSR:passwordToSR"
}
Schemas and Schema Registry
Schema defines the format of data. Nussknacker expects that messages in topics are described by the schema. Nussknacker uses information contained in schemas for code completion and validation of messages. Schema of message can be described in Avro Schema format or JSON Schema format (Confluent Schema Registry only)
Schemas are managed by Schema Registry - Confluent Schema Registry and Azure Schema Registry are supported.
To preview schemas or add a new version, you can use tools available on your cloud platform or tools like AKHQ.
If you use plain Kafka integration, you may choose not to provide Schema Registry connection information. In such a case, Nussknacker will not be able to help with field names and data type related validation of SpEL expressions. In such a case, if Kafka messages are transmitted in plain text and are in the JSON format, you can still use dynamic field access to access fields of the Kafka messages.
Association between schema and topic
To properly present information about topics and version and to recognize which schema is assigned to version, Nussknacker follow conventions:
- For Confluent-based implementation it uses TopicNameStrategy for subject names.
It means that it looks for schemas available at
<topic-name>-(key or value)subject. For example for topictransactions, it looks for schemas attransactions-keysubject for key andtransactions-valuesubject for value - In the Azure Schema Registry, subject concept doesn't exist - schemas are grouped by the same schema name. Because of that, Nussknacker introduces
own convention for association between schema and topic: schema name should be in format:
CamelCasedTopicNameKeyfor keys andCamelCasedTopicNameValuefor values. For example forinput-events(orinput.events) topic, schema name should be namedInputEventsKeyfor key orInputEventsValuefor value. Be aware that it may require change of schema name not only in Azure portal but also inside schema content - those names should be the same to make serialization works correctly
Message payload
By default, Nussknacker supports two combinations of schema type and payload type:
- Avro schema + Avro payload (binary format)
- JSON schema + JSON payload (human readable, text format)
Avro payloads are more concise, because messages contain only values and schema id - without information about message structure like field names.
Avro payload is compatible with standard Kafka serializers and deserializers delivered by Schema Registry providers. Thanks to that you should be able to send messages to Nussknacker and read messages produced by Nussknacker using standard tooling available around those Schema Registries. To see how those formats are different, take a look at Schema Registry comparison section
For some situations it might be helpful to use JSON payload with Avro schema. Especially when your Schema Registry doesn't support JSON schemas.
You can do that by enabling avroAsJsonSerialization configuration setting.
Schema ID
Each topic can contain messages written using different schema versions. Schema versions are identified by Schema ID. Nussknacker needs to know what was the schema used during writing to make message validation and schema evolution possible. Because of that Nussknacker needs to extract Schema ID from the message.
Additionally, in sources and sinks, you can choose which schema version should be used during reading/writing. Thanks to schema evolution mechanism, message in the original format will be evolved to desired format. This desired schema will be used in code completion and validation.
At runtime Nussknacker determines the schema version of a message value and key in the following way:
- It checks in
key.schemaId,value.schemaIdand Azure-specificcontent-typeheaders; - If no such headers provided, it looks for the magic byte (0x00) and a schema id in the message, in a format used by Confluent;
- If the magic byte is not found, it assumes the schema version chosen by the user in the scenario.
Kafka integration configuration
Properties
Here, you should provide Kafka properties required by:
to connect to your Kafka cluster, and by
to connect to Schema Registry server.
Configuration should be provided in JSON format.
To configure connection to Kafka, you need to configure at least bootstrap.servers property. It should contain comma separated list of urls to Kafka brokers.
Authentication
Kafka cluster has multiple options to configure Authentication. Take a look at Kafka security documentation to see detailed examples how those options should be translated into properties. F
Keystore
If your Kafka cluster is configured to use TLS/SSL encryption and therefore requires a keystore to connect, here you can paste it in PEM format. Please remember to include both key and certificates (see e.g. this blog for the details)
At runtime keystore file will be available in /certs/client.keystore location, so in Properties config, set it at key ssl.keystore.location.
If your Kafka configuration does not require keystore leave this field empty.
Truststore
If your Kafka cluster requires a truststore to connect, here you can provide the truststore file in PEM format.
At runtime the truststore file will be available in /certs/client.truststore location, so in Properties config, set it at key ssl.truststore.location.
Frequently used Kafka configuration options
The table below contains a list of commonly used Kafka configuration options. Please note the the list is not intended to be complete and that some of the configuration options are specific to Kafka providers - Aiven, Azure, Confluent.
| Name | Importance | Type | Default value | Description |
|---|---|---|---|---|
| bootstrap.servers | High | string | Comma separated list of bootstrap servers | |
| schema.registry.url | High | string | Comma separated list of schema registry urls | |
| basic.auth.credentials.source | High | string | (Confluent-only) Source of credential e.g. USER_INFO | |
| basic.auth.user.info | High | string | (Confluent-only) User and password e.g. some_user:some_password | |
| schema.group | High | string | (Azure-only) Schema group with all available schemas | |
| azure.tenant.id | High | string | (Azure-only) Azure's tenant id | |
| azure.client.id | High | string | (Azure-only) Azure's client id | |
| azure.client.secret | High | string | (Azure-only) Azure's client secret | |
| transaction.timeout.ms | Medium | number | 600000 | Transaction timeout in millis for transactional producer [transaction timeout](https://kafka.apache.org/documentation/#producerconfigs_transaction.timeout ms) |
| isolation.level | High | string | Controls how to read messages written transactionally. isolation.level |
Schema registry related properties
Currently, Nussknacker supports two implementations of Schema Registries: based on Confluent Schema Registry and based on Azure Schema Registry.
To configure connection Schema Registry, you need to configure at least schema.registry.url. It should contain comma separated list of urls to Schema Registry.
For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with https:// or http://.
It's possible to use Kafka without schema registry, in this case You should not provide schema.registry.url property. Without
schema registry you can use only json kafka topics. Values read from it will be typed to Unknown.
Nussknacker determines which registry implementation (Confluent or Azure) is used from the schema.registry.url property.
If the URL ends with .servicebus.windows.net, Nussknacker assumes that Azure schema registry is used; if not Confluent schema registry is assumed.
Confluent-based schema registry - connection and authentication
For Confluent-based implementation you should provide at least schema.registry.url. If your schema registry is secured
by user and password, you should additionally provide "basic.auth.credentials.source": USER_INFO and "basic.auth.user.info": "some_user:some_password" entries.
To read more see Schema registry documentation
To make sure if your configuration is correct, you can test it with kafka-avro-console-consumer, kafka-avro-console-producer available
in Confluent Schema Registry distribution. After you'll get properly working set of properties, you just need to copy it to Nussknacker's configuration.
Azure-based schema registry - connection and authentication
For Azure-based implementation, firstly you should provide schema.registry.url and schema.group properties. First one should be the
https://<event-hubs-namespace>.servicebus.windows.net url, the second one should be the name of schema groups where will be
located all schemas used by Nussknacker.
Regarding authentication, a couple of options can be used - you can provide credential via:
azure.tenant.id, azure.client.id and azure.client.secret properties, or you can use one of other methods handled
by Azure's DefaultAzureCredential.
For example via Azure CLI or Azure PowerShell.
Schema registries comparison
Below you can find a quick comparison of how given schema registry types are handled:
| Schema registry type | What is used for association between schema with topic | Convention | The way how schema id is passed in message | Payload content |
|---|---|---|---|---|
| Confluent | Subjects | Subject = <topic-name>-(key or value) | For Avro: in payload in format: 0x00, 4 bytes schema id, Avro payload | 0x00, 4 bytes schema id, Avro payload |
For JSON: in key.schemaId or value.schemaId headers | JSON payload | |||
| Azure | Schema names | Schema name = <CamelCasedTopicName>(Key or Value) | For Avro: In header: content-type: avro/binary+schemaId | Avro payload |
For JSON: in key.schemaId or value.schemaId headers(only when avroAsJsonSerialization option enabled) | JSON payload |