Sources
Source nodes ingest data into your pipeline from external systems.Kafka Consumer
Consumes records from a Kafka topic.| Property | Type | Required | Description |
|---|---|---|---|
topic | string | Yes | Kafka topic to consume from |
schemaType | enum | Yes | JSON, AVRO_SR, or PROTOBUF_SR |
schemaId | number | No | Schema Registry schema ID (for Avro/Protobuf) |
Processors
Processor nodes transform, route, filter, or enrich data as it flows through your pipeline.Transform
Transforms records using JSONata expressions.| Property | Type | Required | Description |
|---|---|---|---|
mapping | string | Yes | JSONata expression defining the transformation |
schema | object | No | Output schema (auto-inferred if not provided) |
Output: Transformed record based on JSONata expression The Transform node is the main component of most Kanal pipelines. Use it to:
- Map between schemas
- Reshape data structures
- Compute derived fields
- Filter out unwanted fields
- Combine multiple fields
JSONata Examples
JSONata Examples
Rename fields:Flatten nested objects:Conditional logic:Aggregate arrays:
Branch
Routes records to different outputs based on conditions.| Property | Type | Required | Description |
|---|---|---|---|
branches | array | Yes | List of conditions and their output labels |
id— Unique identifier for the branchlabel— Display namecondition— JSONata expression that returnstrueorfalse
Output: Multiple outputs—one per branch condition, plus a
default output
Records are evaluated against each condition in order. The first matching condition routes the record to that output. Records matching no conditions go to default.
Explode
Expands array fields into individual records (flatMap operation).| Property | Type | Required | Description |
|---|---|---|---|
arrayToFlatMap | string | Yes | Name of the array field to explode |
Output: One record per array element, with the array replaced by its individual items Use Explode when you need to process array elements individually. For example, if an order contains multiple line items, Explode creates a separate record for each item.
Lookup
Enriches records by joining with reference data.| Property | Type | Required | Description |
|---|---|---|---|
fieldName | string | Yes | Name for the enriched field in the output |
lookupFailureBehavior | enum | Yes | REJECT or CONTINUE |
input— Main data stream (left side)reference— Reference data stream (top, cached in memory)
output— Enriched recordsreject— Records with no match (when behavior isREJECT)
Choose
CONTINUE if missing reference data is acceptable—the lookup field will be null. Choose REJECT to route unmatched records to a separate output for handling.Peek
Observes records without modifying them (for debugging).| Property | Type | Required | Description |
|---|---|---|---|
logLevel | enum | No | Log level for output (DEBUG, INFO, WARN) |
Output: Same record, unmodified Use Peek to:
- Debug pipeline behavior during development
- Log records at specific points in the pipeline
- Inspect data flow without affecting processing
Sinks
Sink nodes output data from your pipeline to external systems.Kafka Producer
Produces records to a Kafka topic.| Property | Type | Required | Description |
|---|---|---|---|
topic | string | Yes | Kafka topic to produce to |
keyExpression | string | No | JSONata expression for the record key |
Output: None (terminal node)
JDBC Sink
Writes records to a relational database table.| Property | Type | Required | Description |
|---|---|---|---|
database | string | Yes | Database connection name (from configuration) |
table | string | Yes | Target table name |
writeMode | enum | No | INSERT, UPSERT, or UPDATE |
Output: None (terminal node) The JDBC Sink maps record fields to table columns by name. Ensure your upstream transformation produces a schema compatible with the target table.
Generic Connect Sink
Uses a Kafka Connect sink connector for output.| Property | Type | Required | Description |
|---|---|---|---|
connectorClass | string | Yes | Fully qualified connector class name |
config | object | Yes | Connector-specific configuration |
Output: None (terminal node) The Generic Connect Sink allows you to use any Kafka Connect sink connector. Place the connector JAR in the
libs/ directory and configure it here.
Node Categories Summary
| Category | Nodes | Purpose |
|---|---|---|
| Sources | Kafka Consumer | Ingest data into pipeline |
| Processors | Transform, Branch, Explode, Lookup, Peek | Process and route data |
| Sinks | Kafka Producer, JDBC Sink, Generic Connect Sink | Output data to external systems |