Skip to main content
This page documents every node available in Streemlined. Nodes are the building blocks of a pipeline and fall into four categories.
CategoryNodesPurpose
SourcesKafka Consumer, CSV Source, JDBC Source, Generic Connect SourceIngest data into the pipeline
ProcessorsTransform, Branch, Merge, Explode, Lookup, JDBC Request-Reply, Peek, Data Masking, Transform SMT, Code TransformTransform, route, enrich, and inspect data
SinksKafka Producer, JDBC Sink, Generic Connect SinkOutput data to external systems
UtilityCommentAnnotate the canvas

Sources

Source nodes ingest data into your pipeline from external systems. Every source has a single output port.

Kafka Consumer

Consumes records from one or more partitions of a Kafka topic.
PropertyTypeRequiredDescription
clusterstringYesKafka cluster name (defined in configuration)
topicstringYesKafka topic to consume from
schemaTypeenumYesJSON, AVRO_SR, JSON_SR, or PROTO_SR
schemaIdnumberConditionalSchema Registry ID — required for AVRO_SR, JSON_SR, and PROTO_SR
propertiesTextstringNoExtra consumer properties (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Output: One record per Kafka message, with its schema inferred or fetched from Schema Registry.
When using Schema Registry, Streemlined automatically fetches and displays the schema in the editor so downstream nodes can offer auto-complete.
kafka-consumer-1:
  type: kafka-consumer
  cluster: default
  topic: orders
  schemaType: AVRO_SR
  schemaId: 8

CSV Source

Reads records from a local CSV file — useful for development, testing, and seeding reference data.
PropertyTypeRequiredDescription
filePathstringYesPath to the CSV file
separatorstringNoField separator (default ,)
skipHeaderbooleanNoTreat the first row as a header (default true)
Output: One record per CSV row, with field names taken from the header (or positional indices if skipHeader is false).
csv-source-1:
  type: csv-source
  filePath: data/customers.csv
  separator: ","
  skipHeader: true
CSV Source is primarily intended for local development and testing. For production ingestion, prefer Kafka Consumer or a dedicated connector.

JDBC Source

Polls rows from a relational database table and ingests them into the pipeline using the JDBC Source Connector.
PropertyTypeRequiredDescription
databasestringYesDatabase connection name (defined in configuration)
tablestringYesTable (or schema-qualified table) to poll
modeenumYesbulk, incrementing, timestamp, or timestamp+incrementing
incrementingColumnstringConditionalNumeric monotonic column — required for incrementing and timestamp+incrementing
timestampColumnsarrayConditionalTimestamp column(s) — required for timestamp and timestamp+incrementing. When two columns are provided, COALESCE is applied
clusterstringYesKafka cluster for offset storage
topicstringYesKafka topic used to store connector offsets
propertiesTextstringNoExtra connector properties (key=value, one per line)
modeBehavior
bulkPolls the full table on every interval — suitable only for small tables
incrementingAppends a WHERE clause on an always-growing numeric column to capture new rows
timestampAppends a WHERE clause on one or more timestamp columns to capture new or updated rows
timestamp+incrementingCombines timestamp and incrementing modes for robust change detection
Output: One record per row returned by the connector, with schema fetched from the table or configured in the editor.
jdbc-source-1:
  type: jdbc-source
  database: default
  table: customers
  mode: incrementing
  incrementingColumn: id
  cluster: default
  topic: streemlined-offsets
  propertiesText: |
    poll.interval.ms=5000
    batch.max.rows=100
Use the Fetch Schema button in the editor to auto-populate the output schema from the selected table.

Generic Connect Source

Uses any Kafka Connect source connector to ingest data from external systems.
PropertyTypeRequiredDescription
connectorClassstringYesFully qualified connector class name
propertiesTextstringYesConnector configuration (key=value, one per line)
clusterstringYesKafka cluster for offset storage
topicstringYesKafka topic used to store connector offsets
stubbedbooleanNoWhen true, runs as a stub during interactive testing
errorHandlingenumNoFAIL_PIPELINE (default), LOG_SKIP, or SEND_TO_ERROR_PORT
Output: One record per message emitted by the connector, with schema inferred or configured in the editor. The Generic Connect Source lets you use any Kafka Connect source connector. Place the connector JAR in the libs/ directory and configure it here.
generic-source-1:
  type: generic-source
  connectorClass: io.confluent.connect.jdbc.JdbcSourceConnector
  cluster: default
  topic: streemlined-offsets
  propertiesText: |
    connection.url=jdbc:postgresql://localhost:5432/mydb
    connection.user=postgres
    connection.password=secret
    table.whitelist=orders
    mode=incrementing
    incrementing.column.name=id
Use Fetch Schema in the editor to contact the connector with the current settings and infer an output schema. See Connector Plugins for details on installing connectors.

Processors

Processor nodes transform, route, filter, or enrich data as it flows through the pipeline. Each processor has at least one input and one output port.

Transform

Transforms records using JSONata expressions.
PropertyTypeRequiredDescription
mappingstringYesJSONata expression defining the transformation
Input: Any record Output: Transformed record based on the JSONata expression The Transform node is the workhorse of most pipelines. Use it to:
  • Map between schemas
  • Reshape data structures
  • Compute derived fields
  • Filter out unwanted fields
  • Combine multiple fields
See Transformations to learn JSONata syntax and best practices.
transform-1:
  type: transform
  mapping: |
    {
      "order_id": id,
      "customer_name": customer.name,
      "total": items.price ~> $sum(),
      "processed_at": $now()
    }
Rename fields:
{
  "userId": user_id,
  "userName": user_name
}
Flatten nested objects:
{
  "id": order.id,
  "customerEmail": order.customer.email,
  "itemCount": $count(order.items)
}
Conditional logic:
{
  "tier": totalSpent > 1000 ? "gold" : totalSpent > 500 ? "silver" : "bronze"
}
Aggregate arrays:
{
  "totalPrice": items.price ~> $sum(),
  "avgPrice": $average(items.price),
  "maxPrice": $max(items.price)
}

Branch

Routes records to different outputs based on conditions.
PropertyTypeRequiredDescription
branchesarrayYesList of { id, label, condition } objects
Each branch contains:
  • id — Unique identifier
  • label — Display name
  • condition — JSONata expression that returns true or false
Input: Any record Output: Multiple outputs — one per branch, plus a default output Records are evaluated against each condition in order. The first matching condition routes the record to that branch’s output. Records that match no condition go to default.
branch-1:
  type: branch
  branches:
    - id: high-value
      label: High Value
      condition: total > 1000
    - id: priority
      label: Priority
      condition: customer.tier = "gold"
Conditions are evaluated in order. Place more specific conditions before general ones to avoid unexpected routing.

Merge

Combines multiple parallel input streams into a single output stream (fan-in).
PropertyTypeRequiredDescription
inputCountnumberNoNumber of input ports (default 2, range 2–10)
Inputs: Multiple inputs — input-0 through input-(n-1) Output: One merged stream on output Input 0 defines the schema for all inputs and the output. Connect the primary stream to input 0; additional inputs must match that schema.
merge-1:
  type: merge
  inputCount: 3
Use Merge after a Branch node to reunite split streams, or to combine records from independent sources that share the same schema.

Explode

Expands an array field into individual records (a flatMap operation).
PropertyTypeRequiredDescription
arrayToFlatMapstringYesName of the array field to explode
Input: Record containing an array field Output: One record per array element, with the array replaced by its individual items Use Explode when you need to process array elements individually. For example, if an order contains multiple line items, Explode creates a separate record for each item.
explode-1:
  type: flatmap
  arrayToFlatMap: items
Before — one record:
{
  "orderId": 123,
  "items": [
    { "sku": "A", "qty": 2 },
    { "sku": "B", "qty": 1 }
  ]
}
After — two records:
{ "orderId": 123, "item": { "sku": "A", "qty": 2 } }
{ "orderId": 123, "item": { "sku": "B", "qty": 1 } }

Lookup

Enriches records by joining with cached reference data (stream-table join).
PropertyTypeRequiredDescription
lookupKeystringYesExpression evaluated on the incoming record to produce the join key
cacheKeystringYesExpression evaluated on the reference data to produce the cache key
fieldNamestringYesName for the enriched field added to the output
lookupFailureBehaviorenumNoREJECT (default) or CONTINUE
Inputs:
  • input (left) — Main data stream
  • reference (top) — Reference data, cached in memory
Outputs:
  • output — Enriched records
  • reject — Records with no match (only when behavior is REJECT)
The reference data is loaded into an in-memory cache keyed by cacheKey. For each incoming record, lookupKey is evaluated and matched against the cache.
lookup-1:
  type: lookup
  lookupKey: customer_id
  cacheKey: id
  fieldName: customer_details
  lookupFailureBehavior: REJECT
Choose CONTINUE if missing reference data is acceptable — the lookup field will be null. Choose REJECT to route unmatched records to a separate output for error handling or dead-letter queues.

JDBC Request-Reply

Enriches each record by executing a parameterized SQL query against a relational database.
PropertyTypeRequiredDescription
querystringYesSQL query with ? placeholders
parametersarrayNoList of { expression, reconcileColumn } objects mapping record fields to query placeholders
fieldNamestringNoName for the result field (default jdbc_result)
lookupFailureBehaviorenumNoREJECT (default) or CONTINUE
batchSupportbooleanNoEnable batched query execution for throughput
Input: Any record Outputs:
  • output — Record enriched with the query result in fieldName
  • reject — Records where the query returned no rows (only when behavior is REJECT)
Unlike Lookup, which joins against a pre-cached dataset, JDBC Request-Reply executes a live query per record (or per batch). This is ideal when reference data is too large to cache or changes frequently.
jdbc-rr-1:
  type: jdbc-request-reply
  query: "SELECT name, credit_limit FROM customers WHERE id = ?"
  parameters:
    - expression: customer_id
      reconcileColumn: id
  fieldName: customer
  lookupFailureBehavior: CONTINUE
  batchSupport: true
Enable batchSupport when enriching high-throughput streams — queries are grouped into batches, significantly reducing round-trips to the database.

Peek

Observes records without modifying them — useful for debugging and monitoring.
PropertyTypeRequiredDescription
logLevelenumNoDEBUG, INFO, or WARN
Input: Any record Output: Same record, unmodified Use Peek to:
  • Debug pipeline behavior during development
  • Log records at specific points in the pipeline
  • Inspect data shapes between processing steps
peek-1:
  type: peek
  logLevel: INFO
Peek output appears in the console panel in the editor. Use it liberally during development, then reduce log levels or remove Peek nodes before deploying to production.

Data Masking

Masks sensitive fields in each record’s value — redaction, nulling, numeric jitter, or synthetic replacement via Datafaker expressions.
PropertyTypeRequiredDescription
maskingRulesarrayYesList of rules. Each rule has field, mask, and optionally varianceRange (for VARIANCE) or fakerExpression (for FAKER)
schemastringConditionalJSON string of the output Connect/JSON schema — the editor includes this when exporting so the runner can attach the correct valueSchema
Input: Any record with a JSON object value Output: Same record shape with masked fields applied; if there are no rules, or the value is missing, the record passes through unchanged. Field paths use dot notation for nested structs. Append [] to a segment to apply the rule to every element of an array at that path (for example, items[].email masks email inside each item).
maskBehavior
REDACTReplace the value with ***
NULLSet the field to JSON null
VARIANCEAdd a random delta in [-varianceRange, +varianceRange] to numeric fields (integers or floats). If varianceRange is omitted or null, it is treated as 0
FAKERReplace with the result of the Datafaker expression. A blank or missing expression, or a failed evaluation, falls back to ***
mask values are matched case-insensitively at runtime.
data-masking-1:
  type: data-masking
  maskingRules:
    - field: customer.email
      mask: REDACT
    - field: age
      mask: VARIANCE
      varianceRange: 2
    - field: display_name
      mask: FAKER
      fakerExpression: "#{Name.fullName}"
    - field: order_lines[].internal_id
      mask: NULL
  schema: |
    {"type":"struct","fields":[
      {"field":"customer","type":"struct","fields":[
        {"field":"email","type":"string"}
      ]},
      {"field":"age","type":"int32"},
      {"field":"display_name","type":"string"},
      {"field":"order_lines","type":"array","items":{"type":"struct","fields":[
        {"field":"internal_id","type":"string","optional":true}
      ]}}
    ]}
Unknown mask values are logged and the field is left unchanged. VARIANCE on non-numeric fields is skipped with a warning.
Configure rules in the node editor from the incoming schema. The editor derives the output schema for downstream nodes (for example, REDACT / FAKER may widen types to string; NULL may mark fields optional).

Transform SMT

Applies a Kafka Connect Single Message Transform (SMT) to reshape records in flight.
PropertyTypeRequiredDescription
classNamestringYesFully qualified SMT class name
propertiesTextstringNoSMT configuration properties (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Input: Any record Output: Record transformed by the configured SMT Use Transform SMT when you need standard Kafka Connect transforms — such as ExtractField, ReplaceField, or Cast — without writing custom code. Configure both input and output schemas in the editor so downstream nodes can validate the result.
smt-transform-1:
  type: smt-transform
  className: org.apache.kafka.connect.transforms.ExtractField$Value
  propertiesText: |
    field=customer_id
Press Ctrl+Space in the properties editor for autocomplete suggestions based on the selected SMT class.

Code Transform

Transforms records using Python or JavaScript code.
PropertyTypeRequiredDescription
languageenumYespython or js
codestringYesTransform code evaluated per record
Input: Any record Output: Transformed record based on the code Each record is passed to your code as input (the record value). The last expression or assigned output variable becomes the transformed value. Use the built-in code editor to write, test, and debug transforms with sample data.
LanguageRuntime
pythonGraalPy
jsJavaScript (GraalJS)
code-transform-1:
  type: code-transform
  language: python
  code: |
    import datetime
    output = {
      "order_id": input["id"],
      "customer_name": input["customer"]["name"].upper(),
      "processed_at": str(datetime.utcnow())
    }
Open the Code Editor from the node properties panel for syntax highlighting, sample input, live execution, and an AI assistant.
Configure input and output schemas in the editor. The output schema defines the shape downstream nodes expect.

Sinks

Sink nodes write data from your pipeline to external systems. Every sink has a single input port and no outputs (terminal nodes).

Kafka Producer

Produces records to a Kafka topic.
PropertyTypeRequiredDescription
clusterstringYesKafka cluster name (defined in configuration)
topicstringYesKafka topic to produce to
schemaTypeenumYesJSON, AVRO_SR, JSON_SR, or PROTO_SR
schemaIdnumberConditionalSchema Registry ID — required for AVRO_SR, JSON_SR, and PROTO_SR
keyExpressionstringNoJSONata expression for the record key
propertiesTextstringNoExtra producer properties (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Input: Any record Output: None (terminal node)
kafka-producer-1:
  type: kafka-producer
  cluster: default
  topic: processed-orders
  schemaType: JSON
  keyExpression: order_id

JDBC Sink

Writes records to a relational database table.
PropertyTypeRequiredDescription
tablestringYesTarget table name
modeenumNoINSERT, UPSERT, or UPDATE
propertiesTextstringNoExtra connection properties (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Input: Any record (fields must match table columns) Output: None (terminal node) The JDBC Sink maps record fields to table columns by name. Ensure your upstream transformation produces a schema compatible with the target table.
jdbc-sink-1:
  type: jdbc-sink
  table: orders
  mode: UPSERT
Column names are case-sensitive. Ensure your field names exactly match your database column names.

Generic Connect Sink

Uses any Kafka Connect sink connector for output.
PropertyTypeRequiredDescription
connectorClassstringYesFully qualified connector class name
propertiesTextstringYesConnector configuration (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Input: Any record Output: None (terminal node) The Generic Connect Sink lets you use any Kafka Connect sink connector. Place the connector JAR in the libs/ directory and configure it here.
connect-sink-1:
  type: generic-sink
  connectorClass: io.aiven.kafka.connect.http.HttpSinkConnector
  propertiesText: |
    http.url=https://api.example.com/webhook
    http.authorization.type=none
    batching.enabled=true
Check the connector documentation for available configuration options. Streemlined passes configuration directly to the connector. See Connector Plugins for details on installing connectors.

Utility

Utility nodes help organize and document your pipeline but do not affect data processing.

Comment

Adds a text annotation to the pipeline canvas.
PropertyTypeRequiredDescription
textstringNoComment text displayed on the canvas
widthnumberNoBox width in pixels (default 200)
heightnumberNoBox height in pixels (default 100)
Comment nodes are visual-only — they have no ports and are ignored at runtime. Use them to:
  • Document the purpose of a pipeline section
  • Leave notes for teammates
  • Mark areas that need future work
Comments are saved as part of the pipeline definition but have no effect on execution.

Next Steps

Transformations

Learn JSONata syntax and best practices

Configuration

Configure Kafka clusters, databases, and more

Interactive Testing

Test your pipeline with stubbed sources and sinks

Core Concepts

Understand schemas, ports, and data flow