Uses any Kafka Connect source connector to ingest data from external systems.
Property
Type
Required
Description
connectorClass
string
Yes
Fully qualified connector class name
propertiesText
string
Yes
Connector configuration (key=value, one per line)
cluster
string
Yes
Kafka cluster for offset storage
topic
string
Yes
Kafka topic used to store connector offsets
stubbed
boolean
No
When true, runs as a stub during interactive testing
errorHandling
enum
No
FAIL_PIPELINE (default), LOG_SKIP, or SEND_TO_ERROR_PORT
Output: One record per message emitted by the connector, with schema inferred or configured in the editor.The Generic Connect Source lets you use any Kafka Connect source connector. Place the connector JAR in the libs/ directory and configure it here.
Use Fetch Schema in the editor to contact the connector with the current settings and infer an output schema. See Connector Plugins for details on installing connectors.
Routes records to different outputs based on conditions.
Property
Type
Required
Description
branches
array
Yes
List of { id, label, condition } objects
Each branch contains:
id — Unique identifier
label — Display name
condition — JSONata expression that returns true or false
Input: Any record
Output: Multiple outputs — one per branch, plus a default outputRecords are evaluated against each condition in order. The first matching condition routes the record to that branch’s output. Records that match no condition go to default.
branch-1: type: branch branches: - id: high-value label: High Value condition: total > 1000 - id: priority label: Priority condition: customer.tier = "gold"
Conditions are evaluated in order. Place more specific conditions before general ones to avoid unexpected routing.
Combines multiple parallel input streams into a single output stream (fan-in).
Property
Type
Required
Description
inputCount
number
No
Number of input ports (default 2, range 2–10)
Inputs: Multiple inputs — input-0 through input-(n-1)Output: One merged stream on outputInput 0 defines the schema for all inputs and the output. Connect the primary stream to input 0; additional inputs must match that schema.
merge-1: type: merge inputCount: 3
Use Merge after a Branch node to reunite split streams, or to combine records from independent sources that share the same schema.
Expands an array field into individual records (a flatMap operation).
Property
Type
Required
Description
arrayToFlatMap
string
Yes
Name of the array field to explode
Input: Record containing an array field
Output: One record per array element, with the array replaced by its individual itemsUse Explode when you need to process array elements individually. For example, if an order contains multiple line items, Explode creates a separate record for each item.
Enriches records by joining with cached reference data (stream-table join).
Property
Type
Required
Description
lookupKey
string
Yes
Expression evaluated on the incoming record to produce the join key
cacheKey
string
Yes
Expression evaluated on the reference data to produce the cache key
fieldName
string
Yes
Name for the enriched field added to the output
lookupFailureBehavior
enum
No
REJECT (default) or CONTINUE
Inputs:
input (left) — Main data stream
reference (top) — Reference data, cached in memory
Outputs:
output — Enriched records
reject — Records with no match (only when behavior is REJECT)
The reference data is loaded into an in-memory cache keyed by cacheKey. For each incoming record, lookupKey is evaluated and matched against the cache.
Choose CONTINUE if missing reference data is acceptable — the lookup field will be null. Choose REJECT to route unmatched records to a separate output for error handling or dead-letter queues.
Enriches each record by executing a parameterized SQL query against a relational database.
Property
Type
Required
Description
query
string
Yes
SQL query with ? placeholders
parameters
array
No
List of { expression, reconcileColumn } objects mapping record fields to query placeholders
fieldName
string
No
Name for the result field (default jdbc_result)
lookupFailureBehavior
enum
No
REJECT (default) or CONTINUE
batchSupport
boolean
No
Enable batched query execution for throughput
Input: Any record
Outputs:
output — Record enriched with the query result in fieldName
reject — Records where the query returned no rows (only when behavior is REJECT)
Unlike Lookup, which joins against a pre-cached dataset, JDBC Request-Reply executes a live query per record (or per batch). This is ideal when reference data is too large to cache or changes frequently.
jdbc-rr-1: type: jdbc-request-reply query: "SELECT name, credit_limit FROM customers WHERE id = ?" parameters: - expression: customer_id reconcileColumn: id fieldName: customer lookupFailureBehavior: CONTINUE batchSupport: true
Enable batchSupport when enriching high-throughput streams — queries are grouped into batches, significantly reducing round-trips to the database.
Observes records without modifying them — useful for debugging and monitoring.
Property
Type
Required
Description
logLevel
enum
No
DEBUG, INFO, or WARN
Input: Any record
Output: Same record, unmodifiedUse Peek to:
Debug pipeline behavior during development
Log records at specific points in the pipeline
Inspect data shapes between processing steps
peek-1: type: peek logLevel: INFO
Peek output appears in the console panel in the editor. Use it liberally during development, then reduce log levels or remove Peek nodes before deploying to production.
Masks sensitive fields in each record’s value — redaction, nulling, numeric jitter, or synthetic replacement via Datafaker expressions.
Property
Type
Required
Description
maskingRules
array
Yes
List of rules. Each rule has field, mask, and optionally varianceRange (for VARIANCE) or fakerExpression (for FAKER)
schema
string
Conditional
JSON string of the output Connect/JSON schema — the editor includes this when exporting so the runner can attach the correct valueSchema
Input: Any record with a JSON object valueOutput: Same record shape with masked fields applied; if there are no rules, or the value is missing, the record passes through unchanged.Field paths use dot notation for nested structs. Append [] to a segment to apply the rule to every element of an array at that path (for example, items[].email masks email inside each item).
mask
Behavior
REDACT
Replace the value with ***
NULL
Set the field to JSON null
VARIANCE
Add a random delta in [-varianceRange, +varianceRange] to numeric fields (integers or floats). If varianceRange is omitted or null, it is treated as 0
FAKER
Replace with the result of the Datafaker expression. A blank or missing expression, or a failed evaluation, falls back to ***
mask values are matched case-insensitively at runtime.
Unknown mask values are logged and the field is left unchanged. VARIANCE on non-numeric fields is skipped with a warning.
Configure rules in the node editor from the incoming schema. The editor derives the output schema for downstream nodes (for example, REDACT / FAKER may widen types to string; NULL may mark fields optional).
Applies a Kafka Connect Single Message Transform (SMT) to reshape records in flight.
Property
Type
Required
Description
className
string
Yes
Fully qualified SMT class name
propertiesText
string
No
SMT configuration properties (key=value, one per line)
stubbed
boolean
No
When true, runs as a stub during interactive testing
Input: Any record
Output: Record transformed by the configured SMTUse Transform SMT when you need standard Kafka Connect transforms — such as ExtractField, ReplaceField, or Cast — without writing custom code. Configure both input and output schemas in the editor so downstream nodes can validate the result.
Transforms records using Python or JavaScript code.
Property
Type
Required
Description
language
enum
Yes
python or js
code
string
Yes
Transform code evaluated per record
Input: Any record
Output: Transformed record based on the codeEach record is passed to your code as input (the record value). The last expression or assigned output variable becomes the transformed value. Use the built-in code editor to write, test, and debug transforms with sample data.
Extra connection properties (key=value, one per line)
stubbed
boolean
No
When true, runs as a stub during interactive testing
Input: Any record (fields must match table columns)
Output: None (terminal node)The JDBC Sink maps record fields to table columns by name. Ensure your upstream transformation produces a schema compatible with the target table.
When true, runs as a stub during interactive testing
Input: Any record
Output: None (terminal node)The Generic Connect Sink lets you use any Kafka Connect sink connector. Place the connector JAR in the libs/ directory and configure it here.
Check the connector documentation for available configuration options. Streemlined passes configuration directly to the connector. See Connector Plugins for details on installing connectors.