Skip to main content
This page documents all available nodes in Kanal, organized by category.

Sources

Source nodes ingest data into your pipeline from external systems.

Kafka Consumer

Consumes records from a Kafka topic.
PropertyTypeRequiredDescription
topicstringYesKafka topic to consume from
schemaTypeenumYesJSON, AVRO_SR, or PROTOBUF_SR
schemaIdnumberNoSchema Registry schema ID (for Avro/Protobuf)
Output: One record per Kafka message, with schema inferred or fetched from Schema Registry.
When using Schema Registry, Kanal automatically fetches and displays the schema in the editor for downstream transformations.
kafka-consumer-1:
  type: kafka-consumer
  topic: orders
  schemaType: AVRO_SR
  schemaId: 8

Processors

Processor nodes transform, route, filter, or enrich data as it flows through your pipeline.

Transform

Transforms records using JSONata expressions.
PropertyTypeRequiredDescription
mappingstringYesJSONata expression defining the transformation
schemaobjectNoOutput schema (auto-inferred if not provided)
Input: Any record
Output: Transformed record based on JSONata expression
The Transform node is the main component of most Kanal pipelines. Use it to:
  • Map between schemas
  • Reshape data structures
  • Compute derived fields
  • Filter out unwanted fields
  • Combine multiple fields
See Transformations to learn JSONata syntax and best practices.
transform-1:
  type: transform
  mapping: |
    {
      "order_id": id,
      "customer_name": customer.name,
      "total": items.price ~> $sum(),
      "processed_at": $now()
    }
Rename fields:
{
  "userId": user_id,
  "userName": user_name
}
Flatten nested objects:
{
  "id": order.id,
  "customerEmail": order.customer.email,
  "itemCount": $count(order.items)
}
Conditional logic:
{
  "tier": totalSpent > 1000 ? "gold" : totalSpent > 500 ? "silver" : "bronze"
}
Aggregate arrays:
{
  "totalPrice": items.price ~> $sum(),
  "avgPrice": $average(items.price),
  "maxPrice": $max(items.price)
}

Branch

Routes records to different outputs based on conditions.
PropertyTypeRequiredDescription
branchesarrayYesList of conditions and their output labels
Each branch has:
  • id — Unique identifier for the branch
  • label — Display name
  • condition — JSONata expression that returns true or false
Input: Any record
Output: Multiple outputs—one per branch condition, plus a default output
Records are evaluated against each condition in order. The first matching condition routes the record to that output. Records matching no conditions go to default.
branch-1:
  type: branch
  branches:
    - id: high-value
      label: High Value
      condition: total > 1000
    - id: priority
      label: Priority
      condition: customer.tier = "gold"
Conditions are evaluated in order. Place more specific conditions before general ones to avoid unexpected routing.

Explode

Expands array fields into individual records (flatMap operation).
PropertyTypeRequiredDescription
arrayToFlatMapstringYesName of the array field to explode
Input: Record containing an array field
Output: One record per array element, with the array replaced by its individual items
Use Explode when you need to process array elements individually. For example, if an order contains multiple line items, Explode creates a separate record for each item.
explode-1:
  type: flatmap
  arrayToFlatMap: items
Before:
{
  "orderId": 123,
  "items": [
    {"sku": "A", "qty": 2},
    {"sku": "B", "qty": 1}
  ]
}
After (2 records):
{"orderId": 123, "item": {"sku": "A", "qty": 2}}
{"orderId": 123, "item": {"sku": "B", "qty": 1}}

Lookup

Enriches records by joining with reference data.
PropertyTypeRequiredDescription
fieldNamestringYesName for the enriched field in the output
lookupFailureBehaviorenumYesREJECT or CONTINUE
Inputs:
  • input — Main data stream (left side)
  • reference — Reference data stream (top, cached in memory)
Outputs:
  • output — Enriched records
  • reject — Records with no match (when behavior is REJECT)
The Lookup node performs a stream-table join. Reference data is cached in memory, so it must fit in the available heap.
lookup-1:
  type: lookup
  fieldName: customer_details
  lookupFailureBehavior: REJECT
Choose CONTINUE if missing reference data is acceptable—the lookup field will be null. Choose REJECT to route unmatched records to a separate output for handling.

Peek

Observes records without modifying them (for debugging).
PropertyTypeRequiredDescription
logLevelenumNoLog level for output (DEBUG, INFO, WARN)
Input: Any record
Output: Same record, unmodified
Use Peek to:
  • Debug pipeline behavior during development
  • Log records at specific points in the pipeline
  • Inspect data flow without affecting processing
peek-1:
  type: peek
  logLevel: INFO
Peek output appears in the console log panel in the editor. Use it liberally during development, but consider removing or reducing log levels in production.

Sinks

Sink nodes output data from your pipeline to external systems.

Kafka Producer

Produces records to a Kafka topic.
PropertyTypeRequiredDescription
topicstringYesKafka topic to produce to
keyExpressionstringNoJSONata expression for the record key
Input: Any record
Output: None (terminal node)
kafka-producer-1:
  type: kafka-producer
  topic: processed-orders
  keyExpression: order_id

JDBC Sink

Writes records to a relational database table.
PropertyTypeRequiredDescription
databasestringYesDatabase connection name (from configuration)
tablestringYesTarget table name
writeModeenumNoINSERT, UPSERT, or UPDATE
Input: Any record (fields must match table columns)
Output: None (terminal node)
The JDBC Sink maps record fields to table columns by name. Ensure your upstream transformation produces a schema compatible with the target table.
jdbc-sink-1:
  type: jdbc-sink
  database: default
  table: orders
  writeMode: UPSERT
Column names are case-sensitive. Ensure your field names exactly match your database column names.

Generic Connect Sink

Uses a Kafka Connect sink connector for output.
PropertyTypeRequiredDescription
connectorClassstringYesFully qualified connector class name
configobjectYesConnector-specific configuration
Input: Any record
Output: None (terminal node)
The Generic Connect Sink allows you to use any Kafka Connect sink connector. Place the connector JAR in the libs/ directory and configure it here.
connect-sink-1:
  type: generic-connect-sink
  connectorClass: io.aiven.kafka.connect.http.HttpSinkConnector
  config:
    http.url: https://api.example.com/webhook
    http.authorization.type: none
    batching.enabled: true
Check the connector documentation for available configuration options. Kanal passes configuration directly to the connector.

Node Categories Summary

CategoryNodesPurpose
SourcesKafka ConsumerIngest data into pipeline
ProcessorsTransform, Branch, Explode, Lookup, PeekProcess and route data
SinksKafka Producer, JDBC Sink, Generic Connect SinkOutput data to external systems

Next Steps