WebSocket (Source)
Description
WebSocket (Source) connects to a WebSocket endpoint and turns incoming messages into a stream of events. Each message received from the WebSocket becomes a single event in the scenario.
On deploy, Nussknacker opens a WebSocket connection to the given URL and forwards incoming messages to a dynamically created Kafka topic. Scenario reads from that topic. When the scenario is stopped, the connection is closed automatically.
Parameters and configuration
| Name | Description |
|---|---|
| URL | WebSocket URL to connect to, including the scheme (e.g. wss://example.com/stream). |
| Initial messages | Optional JSON array of messages to send to the WebSocket right after the connection is established. Useful for subscription-based protocols that require a handshake message before data starts flowing. E.g. [{"type": "subscribe", "channel": "trades"}]. If no initial messages are needed, leave as []. |
| Content type | Format of the incoming WebSocket messages. Choose JSON for structured JSON messages or PLAIN for raw text messages. |
Advanced parameters
| Name | Description |
|---|---|
| Data sample | Available when Content type is JSON. An example JSON object representing a typical message from the WebSocket. Nussknacker analyzes its structure to determine field types and enable type-aware field access in subsequent nodes. Does not affect runtime behavior — only used in the designer for type inference. |
| Event time | Expression which evaluates to the time when the event was created. For example, if the incoming message contains a timestamp field you can use #input.timestamp. When not specified, processing time is used. |
| Max out-of-orderness | The maximum amount of time an element is allowed to be late before being ignored when computing the result for time-based stream transformations: aggregates in time windows and joins. To read more about this mechanism see Flink documentation. |
| Idleness | The time period after which the source is marked as idle if no events are received. To read more about this mechanism see Flink documentation. |
Additional considerations
- When Content type is
PLAIN:#inputis aStringcontaining the raw message text. - When Content type is
JSON:#inputis a JSON object. If a Data sample was provided, field types are inferred from the sample and named fields can be accessed directly (e.g.#input.price). Without a data sample, dynamic navigation can still be used (e.g.#input?.["price"].). - A new, uniquely named Kafka topic is created automatically for each scenario deployment.
- The Data sample is only used during scenario editing to provide type hints in the Designer. It has no effect on how messages are deserialized at runtime — all incoming JSON messages are processed in a schema-less manner regardless of the sample.
- If the WebSocket server requires authentication, it should be encoded in the URL (e.g. as a query parameter token) or provided in initial messages.