Kafka
Overview
Kafka topics are a common streaming data input to Nussknacker and output from Nussknacker scenarios. This document provides details of Nussknacker's integration with Kafka clusters and Schema Registry and the overview of integration configuration. If you use Nussknacker Cloud, you can also check Step By Step Tutorials for detailed instructions how to configure integration to your chosen provider.
Once configured, the following components will be generated:
- Kafka Source
- Kafka Sink
The functionality allowing to define integrations using UI rather than config files is available in Cloud Version only. If you use OSS edition, check the Open Source version integrations documentation for details on how to configure integrations.
Prerequisites
To better understand how Nussknacker works with Kafka, it's recommended to read the following first:
Available integrations
Both Cloud and OSS editions support the same set of Kafka compatible brokers. Additionally, Cloud version provides dedicated Integration set up mechanism, which allows configuration of the connection to the selected Kafka providers using UI. If you use OSS edition, consult the configuration documentation for additional guidance.
The following specialized integration set up forms are available in Nussknacker Cloud version:
and
- generic Kafka - this document
Integration parameters
| Parameter | Description - |
|---|---|
| Integration name | Unique name for the integration |
| Properties | Properties in a JSON format (you can paste configuration also in the standard Java properties format) |
| CA Certificate | Trusted certificate authority used to verify Kafka broker's certificate. Can be found in truststore file. Fill only when your Kafka cluster requires CA Certificate |
| Access Key | Private key of client used for mutual TLS authentication. Can be found in keystore file or is supplied by your cloud Kafka provider. Fill only when your Kafka cluster requires access key. |
| Access Certificate | Public certificate of client used for mutual TLS authentication. Can be found in keystore file or is supplied by your cloud Kafka provider. Fill only when your Kafka cluster requires access certificate. |
Schemas and Schema Registry
Schema defines the format of data. Nussknacker expects that messages in topics are described by the schema. Nussknacker uses information contained in schemas for code completion and validation of messages. Schema of message can be described in Avro Schema format or JSON Schema format (Confluent Schema Registry only)
Schemas are managed by Schema Registry - Confluent Schema Registry and Azure Schema Registry are supported.
To preview schemas or add a new version, you can use tools available on your cloud platform or tools like AKHQ.
If you use plain Kafka integration, you may choose not to provide Schema Registry connection information. In such a case, Nussknacker will not be able to help with field names and data type related validation of SpEL expressions. In such a case, if Kafka messages are transmitted in plain text and are in the JSON format, you can still use dynamic field access to access fields of the Kafka messages.
Association between schema and topic
To properly present information about topics and version and to recognize which schema is assigned to version, Nussknacker follows conventions:
- For Confluent-based implementation it uses TopicNameStrategy for subject names.
It means that it looks for schemas available at
<topic-name>-(key or value)subject. For example for topictransactions, it looks for schemas attransactions-keysubject for key andtransactions-valuesubject for value - In the Azure Schema Registry, subject concept doesn't exist - schemas are grouped by the same schema name. Because of that, Nussknacker introduces its
own convention for association between schema and topic: schema name should be in format:
CamelCasedTopicNameKeyfor keys andCamelCasedTopicNameValuefor values. For example forinput-events(orinput.events) topic, schema name should be namedInputEventsKeyfor key orInputEventsValuefor value. Be aware that it may require change of schema name not only in Azure portal but also inside schema content - those names should be the same to make serialization works correctly
Message payload
By default, Nussknacker supports two combinations of schema type and payload type:
- Avro schema + Avro payload (binary format)
- JSON schema + JSON payload (human readable, text format)
Avro payloads are more concise, because messages contain only values and schema id - without information about message structure like field names.
Avro payload is compatible with standard Kafka serializers and deserializers delivered by Schema Registry providers. Thanks to that you should be able to send messages to Nussknacker and read messages produced by Nussknacker using standard tooling available around those Schema Registries. To see how those formats are different, take a look at Schema Registry comparison section
For some situations it might be helpful to use JSON payload with Avro schema. Especially when your Schema Registry doesn't support JSON schemas.
You can do that by enabling avroAsJsonSerialization configuration setting.
Schema ID
Each topic can contain messages written using different schema versions. Schema versions are identified by Schema ID. Nussknacker needs to know what was the schema used during writing to make message validation and schema evolution possible. Because of that Nussknacker needs to extract Schema ID from the message.
Additionally, in sources and sinks, you can choose which schema version should be used during reading/writing. Thanks to schema evolution mechanism, message in the original format will be evolved to desired format. This desired schema will be used in code completion and validation.
At runtime Nussknacker determines the schema version of a message value and key in the following way:
- It checks in
key.schemaId,value.schemaIdand Azure-specificcontent-typeheaders; - If no such headers provided, it looks for the magic byte (0x00) and a schema id in the message, in a format used by Confluent;
- If the magic byte is not found, it assumes the schema version chosen by the user in the scenario.
Kafka integration configuration
Properties
Here, you should provide Kafka properties required by:
to connect to your Kafka cluster, and by
to connect to Schema Registry server.
Configuration should be provided in JSON format.
To configure connection to Kafka, you need to configure at least bootstrap.servers property. It should contain comma separated list of urls to Kafka brokers.
Frequently used Kafka configuration options
The table below contains a list of commonly used Kafka configuration options. Please note the the list is not intended to be complete and that some of the configuration options are specific to Kafka providers - Aiven, Azure, Confluent.
| Name | Importance | Type | Default value | Description |
|---|---|---|---|---|
| bootstrap.servers | High | string | Comma separated list of bootstrap servers | |
| schema.registry.url | High | string | Comma separated list of schema registry urls | |
| basic.auth.credentials.source | High | string | (Confluent-only) Source of credential e.g. USER_INFO | |
| basic.auth.user.info | High | string | (Confluent-only) User and password e.g. some_user:some_password | |
| schema.group | High | string | (Azure-only) Schema group with all available schemas | |
| azure.tenant.id | High | string | (Azure-only) Azure's tenant id | |
| azure.client.id | High | string | (Azure-only) Azure's client id | |
| azure.client.secret | High | string | (Azure-only) Azure's client secret | |
| transaction.timeout.ms | Medium | number | 600000 | Transaction timeout in millis for transactional producer [transaction timeout](https://kafka.apache.org/documentation/#producerconfigs_transaction.timeout ms) |
| isolation.level | High | string | Controls how to read messages written transactionally. isolation.level |
Schema registry related properties
Currently, Nussknacker supports two implementations of Schema Registries: based on Confluent Schema Registry and based on Azure Schema Registry.
To configure connection to Schema Registry, you need to configure at least schema.registry.url. It should contain comma separated list of urls to Schema Registry.
For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with https:// or http://.
It's possible to use Kafka without schema registry, in this case you should not provide schema.registry.url property. Without
schema registry you can use only json kafka topics. Values read from it will be typed to Unknown.
Nussknacker determines which registry implementation (Confluent or Azure) is used from the schema.registry.url property.
If the URL ends with .servicebus.windows.net, Nussknacker assumes that Azure schema registry is used; if not Confluent schema registry is assumed.
Confluent-based schema registry - connection and authentication
For Confluent-based implementation you should provide at least schema.registry.url. If your schema registry is secured
by user and password, you should additionally provide "basic.auth.credentials.source": USER_INFO and "basic.auth.user.info": "some_user:some_password" entries.
To read more see Schema registry documentation
To make sure if your configuration is correct, you can test it with kafka-avro-console-consumer, kafka-avro-console-producer available
in Confluent Schema Registry distribution. After you'll get properly working set of properties, you just need to copy it to Nussknacker's configuration.
Azure-based schema registry - connection and authentication
For Azure-based implementation, firstly you should provide schema.registry.url and schema.group properties. First one should be the
https://<event-hubs-namespace>.servicebus.windows.net url, the second one should be the name of schema groups where will be
located all schemas used by Nussknacker.
Regarding authentication, a couple of options can be used - you can provide credential via:
azure.tenant.id, azure.client.id and azure.client.secret properties, or you can use one of other methods handled
by Azure's DefaultAzureCredential.
For example via Azure CLI or Azure PowerShell.
Schema registries comparison
Below you can find a quick comparison of how given schema registry types are handled:
| Schema registry type | What is used for association between schema with topic | Convention | The way how schema id is passed in message | Payload content |
|---|---|---|---|---|
| Confluent | Subjects | Subject = <topic-name>-(key or value) | For Avro: in payload in format: 0x00, 4 bytes schema id, Avro payload | 0x00, 4 bytes schema id, Avro payload |
For JSON: in key.schemaId or value.schemaId headers | JSON payload | |||
| Azure | Schema names | Schema name = <CamelCasedTopicName>(Key or Value) | For Avro: In header: content-type: avro/binary+schemaId | Avro payload |
For JSON: in key.schemaId or value.schemaId headers(only when avroAsJsonSerialization option enabled) | JSON payload |