Routes records to different outputs based on conditions.
Property
Type
Required
Description
branches
array
Yes
List of { id, label, condition } objects
Each branch contains:
id — Unique identifier
label — Display name
condition — JSONata expression that returns true or false
Input: Any record
Output: Multiple outputs — one per branch, plus a default outputRecords are evaluated against each condition in order. The first matching condition routes the record to that branch’s output. Records that match no condition go to default.
Copy
branch-1: type: branch branches: - id: high-value label: High Value condition: total > 1000 - id: priority label: Priority condition: customer.tier = "gold"
Conditions are evaluated in order. Place more specific conditions before general ones to avoid unexpected routing.
Expands an array field into individual records (a flatMap operation).
Property
Type
Required
Description
arrayToFlatMap
string
Yes
Name of the array field to explode
Input: Record containing an array field
Output: One record per array element, with the array replaced by its individual itemsUse Explode when you need to process array elements individually. For example, if an order contains multiple line items, Explode creates a separate record for each item.
Enriches records by joining with cached reference data (stream-table join).
Property
Type
Required
Description
lookupKey
string
Yes
Expression evaluated on the incoming record to produce the join key
cacheKey
string
Yes
Expression evaluated on the reference data to produce the cache key
fieldName
string
Yes
Name for the enriched field added to the output
lookupFailureBehavior
enum
No
REJECT (default) or CONTINUE
Inputs:
input (left) — Main data stream
reference (top) — Reference data, cached in memory
Outputs:
output — Enriched records
reject — Records with no match (only when behavior is REJECT)
The reference data is loaded into an in-memory cache keyed by cacheKey. For each incoming record, lookupKey is evaluated and matched against the cache.
Choose CONTINUE if missing reference data is acceptable — the lookup field will be null. Choose REJECT to route unmatched records to a separate output for error handling or dead-letter queues.
Enriches each record by executing a parameterized SQL query against a relational database.
Property
Type
Required
Description
query
string
Yes
SQL query with ? placeholders
parameters
array
No
List of { expression, reconcileColumn } objects mapping record fields to query placeholders
fieldName
string
No
Name for the result field (default jdbc_result)
lookupFailureBehavior
enum
No
REJECT (default) or CONTINUE
batchSupport
boolean
No
Enable batched query execution for throughput
Input: Any record
Outputs:
output — Record enriched with the query result in fieldName
reject — Records where the query returned no rows (only when behavior is REJECT)
Unlike Lookup, which joins against a pre-cached dataset, JDBC Request-Reply executes a live query per record (or per batch). This is ideal when reference data is too large to cache or changes frequently.
Copy
jdbc-rr-1: type: jdbc-request-reply query: "SELECT name, credit_limit FROM customers WHERE id = ?" parameters: - expression: customer_id reconcileColumn: id fieldName: customer lookupFailureBehavior: CONTINUE batchSupport: true
Enable batchSupport when enriching high-throughput streams — queries are grouped into batches, significantly reducing round-trips to the database.
Observes records without modifying them — useful for debugging and monitoring.
Property
Type
Required
Description
logLevel
enum
No
DEBUG, INFO, or WARN
Input: Any record
Output: Same record, unmodifiedUse Peek to:
Debug pipeline behavior during development
Log records at specific points in the pipeline
Inspect data shapes between processing steps
Copy
peek-1: type: peek logLevel: INFO
Peek output appears in the console panel in the editor. Use it liberally during development, then reduce log levels or remove Peek nodes before deploying to production.
Masks sensitive fields in each record’s value — redaction, nulling, numeric jitter, or synthetic replacement via Datafaker expressions.
Property
Type
Required
Description
maskingRules
array
Yes
List of rules. Each rule has field, mask, and optionally varianceRange (for VARIANCE) or fakerExpression (for FAKER)
schema
string
Conditional
JSON string of the output Connect/JSON schema — the editor includes this when exporting so the runner can attach the correct valueSchema
Input: Any record with a JSON object valueOutput: Same record shape with masked fields applied; if there are no rules, or the value is missing, the record passes through unchanged.Field paths use dot notation for nested structs. Append [] to a segment to apply the rule to every element of an array at that path (for example, items[].email masks email inside each item).
mask
Behavior
REDACT
Replace the value with ***
NULL
Set the field to JSON null
VARIANCE
Add a random delta in [-varianceRange, +varianceRange] to numeric fields (integers or floats). If varianceRange is omitted or null, it is treated as 0
FAKER
Replace with the result of the Datafaker expression. A blank or missing expression, or a failed evaluation, falls back to ***
mask values are matched case-insensitively at runtime.
Unknown mask values are logged and the field is left unchanged. VARIANCE on non-numeric fields is skipped with a warning.
Configure rules in the node editor from the incoming schema. The editor derives the output schema for downstream nodes (for example, REDACT / FAKER may widen types to string; NULL may mark fields optional).
Extra connection properties (key=value, one per line)
stubbed
boolean
No
When true, runs as a stub during interactive testing
Input: Any record (fields must match table columns)
Output: None (terminal node)The JDBC Sink maps record fields to table columns by name. Ensure your upstream transformation produces a schema compatible with the target table.
When true, runs as a stub during interactive testing
Input: Any record
Output: None (terminal node)The Generic Connect Sink lets you use any Kafka Connect sink connector. Place the connector JAR in the libs/ directory and configure it here.
Check the connector documentation for available configuration options. Streemlined passes configuration directly to the connector. See Connector Plugins for details on installing connectors.