Skip to main content
This page documents every node available in Streemlined. Nodes are the building blocks of a pipeline and fall into four categories.
CategoryNodesPurpose
SourcesKafka Consumer, CSV SourceIngest data into the pipeline
ProcessorsTransform, Branch, Explode, Lookup, JDBC Request-Reply, Peek, Data MaskingTransform, route, enrich, and inspect data
SinksKafka Producer, JDBC Sink, Generic Connect SinkOutput data to external systems
UtilityCommentAnnotate the canvas

Sources

Source nodes ingest data into your pipeline from external systems. Every source has a single output port.

Kafka Consumer

Consumes records from one or more partitions of a Kafka topic.
PropertyTypeRequiredDescription
clusterstringYesKafka cluster name (defined in configuration)
topicstringYesKafka topic to consume from
schemaTypeenumYesJSON, AVRO_SR, JSON_SR, or PROTO_SR
schemaIdnumberConditionalSchema Registry ID — required for AVRO_SR, JSON_SR, and PROTO_SR
propertiesTextstringNoExtra consumer properties (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Output: One record per Kafka message, with its schema inferred or fetched from Schema Registry.
When using Schema Registry, Streemlined automatically fetches and displays the schema in the editor so downstream nodes can offer auto-complete.
kafka-consumer-1:
  type: kafka-consumer
  cluster: default
  topic: orders
  schemaType: AVRO_SR
  schemaId: 8

CSV Source

Reads records from a local CSV file — useful for development, testing, and seeding reference data.
PropertyTypeRequiredDescription
filePathstringYesPath to the CSV file
separatorstringNoField separator (default ,)
skipHeaderbooleanNoTreat the first row as a header (default true)
Output: One record per CSV row, with field names taken from the header (or positional indices if skipHeader is false).
csv-source-1:
  type: csv-source
  filePath: data/customers.csv
  separator: ","
  skipHeader: true
CSV Source is primarily intended for local development and testing. For production ingestion, prefer Kafka Consumer or a dedicated connector.

Processors

Processor nodes transform, route, filter, or enrich data as it flows through the pipeline. Each processor has at least one input and one output port.

Transform

Transforms records using JSONata expressions.
PropertyTypeRequiredDescription
mappingstringYesJSONata expression defining the transformation
Input: Any record Output: Transformed record based on the JSONata expression The Transform node is the workhorse of most pipelines. Use it to:
  • Map between schemas
  • Reshape data structures
  • Compute derived fields
  • Filter out unwanted fields
  • Combine multiple fields
See Transformations to learn JSONata syntax and best practices.
transform-1:
  type: transform
  mapping: |
    {
      "order_id": id,
      "customer_name": customer.name,
      "total": items.price ~> $sum(),
      "processed_at": $now()
    }
Rename fields:
{
  "userId": user_id,
  "userName": user_name
}
Flatten nested objects:
{
  "id": order.id,
  "customerEmail": order.customer.email,
  "itemCount": $count(order.items)
}
Conditional logic:
{
  "tier": totalSpent > 1000 ? "gold" : totalSpent > 500 ? "silver" : "bronze"
}
Aggregate arrays:
{
  "totalPrice": items.price ~> $sum(),
  "avgPrice": $average(items.price),
  "maxPrice": $max(items.price)
}

Branch

Routes records to different outputs based on conditions.
PropertyTypeRequiredDescription
branchesarrayYesList of { id, label, condition } objects
Each branch contains:
  • id — Unique identifier
  • label — Display name
  • condition — JSONata expression that returns true or false
Input: Any record Output: Multiple outputs — one per branch, plus a default output Records are evaluated against each condition in order. The first matching condition routes the record to that branch’s output. Records that match no condition go to default.
branch-1:
  type: branch
  branches:
    - id: high-value
      label: High Value
      condition: total > 1000
    - id: priority
      label: Priority
      condition: customer.tier = "gold"
Conditions are evaluated in order. Place more specific conditions before general ones to avoid unexpected routing.

Explode

Expands an array field into individual records (a flatMap operation).
PropertyTypeRequiredDescription
arrayToFlatMapstringYesName of the array field to explode
Input: Record containing an array field Output: One record per array element, with the array replaced by its individual items Use Explode when you need to process array elements individually. For example, if an order contains multiple line items, Explode creates a separate record for each item.
explode-1:
  type: flatmap
  arrayToFlatMap: items
Before — one record:
{
  "orderId": 123,
  "items": [
    { "sku": "A", "qty": 2 },
    { "sku": "B", "qty": 1 }
  ]
}
After — two records:
{ "orderId": 123, "item": { "sku": "A", "qty": 2 } }
{ "orderId": 123, "item": { "sku": "B", "qty": 1 } }

Lookup

Enriches records by joining with cached reference data (stream-table join).
PropertyTypeRequiredDescription
lookupKeystringYesExpression evaluated on the incoming record to produce the join key
cacheKeystringYesExpression evaluated on the reference data to produce the cache key
fieldNamestringYesName for the enriched field added to the output
lookupFailureBehaviorenumNoREJECT (default) or CONTINUE
Inputs:
  • input (left) — Main data stream
  • reference (top) — Reference data, cached in memory
Outputs:
  • output — Enriched records
  • reject — Records with no match (only when behavior is REJECT)
The reference data is loaded into an in-memory cache keyed by cacheKey. For each incoming record, lookupKey is evaluated and matched against the cache.
lookup-1:
  type: lookup
  lookupKey: customer_id
  cacheKey: id
  fieldName: customer_details
  lookupFailureBehavior: REJECT
Choose CONTINUE if missing reference data is acceptable — the lookup field will be null. Choose REJECT to route unmatched records to a separate output for error handling or dead-letter queues.

JDBC Request-Reply

Enriches each record by executing a parameterized SQL query against a relational database.
PropertyTypeRequiredDescription
querystringYesSQL query with ? placeholders
parametersarrayNoList of { expression, reconcileColumn } objects mapping record fields to query placeholders
fieldNamestringNoName for the result field (default jdbc_result)
lookupFailureBehaviorenumNoREJECT (default) or CONTINUE
batchSupportbooleanNoEnable batched query execution for throughput
Input: Any record Outputs:
  • output — Record enriched with the query result in fieldName
  • reject — Records where the query returned no rows (only when behavior is REJECT)
Unlike Lookup, which joins against a pre-cached dataset, JDBC Request-Reply executes a live query per record (or per batch). This is ideal when reference data is too large to cache or changes frequently.
jdbc-rr-1:
  type: jdbc-request-reply
  query: "SELECT name, credit_limit FROM customers WHERE id = ?"
  parameters:
    - expression: customer_id
      reconcileColumn: id
  fieldName: customer
  lookupFailureBehavior: CONTINUE
  batchSupport: true
Enable batchSupport when enriching high-throughput streams — queries are grouped into batches, significantly reducing round-trips to the database.

Peek

Observes records without modifying them — useful for debugging and monitoring.
PropertyTypeRequiredDescription
logLevelenumNoDEBUG, INFO, or WARN
Input: Any record Output: Same record, unmodified Use Peek to:
  • Debug pipeline behavior during development
  • Log records at specific points in the pipeline
  • Inspect data shapes between processing steps
peek-1:
  type: peek
  logLevel: INFO
Peek output appears in the console panel in the editor. Use it liberally during development, then reduce log levels or remove Peek nodes before deploying to production.

Data Masking

Masks sensitive fields in each record’s value — redaction, nulling, numeric jitter, or synthetic replacement via Datafaker expressions.
PropertyTypeRequiredDescription
maskingRulesarrayYesList of rules. Each rule has field, mask, and optionally varianceRange (for VARIANCE) or fakerExpression (for FAKER)
schemastringConditionalJSON string of the output Connect/JSON schema — the editor includes this when exporting so the runner can attach the correct valueSchema
Input: Any record with a JSON object value Output: Same record shape with masked fields applied; if there are no rules, or the value is missing, the record passes through unchanged. Field paths use dot notation for nested structs. Append [] to a segment to apply the rule to every element of an array at that path (for example, items[].email masks email inside each item).
maskBehavior
REDACTReplace the value with ***
NULLSet the field to JSON null
VARIANCEAdd a random delta in [-varianceRange, +varianceRange] to numeric fields (integers or floats). If varianceRange is omitted or null, it is treated as 0
FAKERReplace with the result of the Datafaker expression. A blank or missing expression, or a failed evaluation, falls back to ***
mask values are matched case-insensitively at runtime.
data-masking-1:
  type: data-masking
  maskingRules:
    - field: customer.email
      mask: REDACT
    - field: age
      mask: VARIANCE
      varianceRange: 2
    - field: display_name
      mask: FAKER
      fakerExpression: "#{Name.fullName}"
    - field: order_lines[].internal_id
      mask: NULL
  schema: |
    {"type":"struct","fields":[
      {"field":"customer","type":"struct","fields":[
        {"field":"email","type":"string"}
      ]},
      {"field":"age","type":"int32"},
      {"field":"display_name","type":"string"},
      {"field":"order_lines","type":"array","items":{"type":"struct","fields":[
        {"field":"internal_id","type":"string","optional":true}
      ]}}
    ]}
Unknown mask values are logged and the field is left unchanged. VARIANCE on non-numeric fields is skipped with a warning.
Configure rules in the node editor from the incoming schema. The editor derives the output schema for downstream nodes (for example, REDACT / FAKER may widen types to string; NULL may mark fields optional).

Sinks

Sink nodes write data from your pipeline to external systems. Every sink has a single input port and no outputs (terminal nodes).

Kafka Producer

Produces records to a Kafka topic.
PropertyTypeRequiredDescription
clusterstringYesKafka cluster name (defined in configuration)
topicstringYesKafka topic to produce to
schemaTypeenumYesJSON, AVRO_SR, JSON_SR, or PROTO_SR
schemaIdnumberConditionalSchema Registry ID — required for AVRO_SR, JSON_SR, and PROTO_SR
keyExpressionstringNoJSONata expression for the record key
propertiesTextstringNoExtra producer properties (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Input: Any record Output: None (terminal node)
kafka-producer-1:
  type: kafka-producer
  cluster: default
  topic: processed-orders
  schemaType: JSON
  keyExpression: order_id

JDBC Sink

Writes records to a relational database table.
PropertyTypeRequiredDescription
tablestringYesTarget table name
modeenumNoINSERT, UPSERT, or UPDATE
propertiesTextstringNoExtra connection properties (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Input: Any record (fields must match table columns) Output: None (terminal node) The JDBC Sink maps record fields to table columns by name. Ensure your upstream transformation produces a schema compatible with the target table.
jdbc-sink-1:
  type: jdbc-sink
  table: orders
  mode: UPSERT
Column names are case-sensitive. Ensure your field names exactly match your database column names.

Generic Connect Sink

Uses any Kafka Connect sink connector for output.
PropertyTypeRequiredDescription
connectorClassstringYesFully qualified connector class name
propertiesTextstringYesConnector configuration (key=value, one per line)
stubbedbooleanNoWhen true, runs as a stub during interactive testing
Input: Any record Output: None (terminal node) The Generic Connect Sink lets you use any Kafka Connect sink connector. Place the connector JAR in the libs/ directory and configure it here.
connect-sink-1:
  type: generic-sink
  connectorClass: io.aiven.kafka.connect.http.HttpSinkConnector
  propertiesText: |
    http.url=https://api.example.com/webhook
    http.authorization.type=none
    batching.enabled=true
Check the connector documentation for available configuration options. Streemlined passes configuration directly to the connector. See Connector Plugins for details on installing connectors.

Utility

Utility nodes help organize and document your pipeline but do not affect data processing.

Comment

Adds a text annotation to the pipeline canvas.
PropertyTypeRequiredDescription
textstringNoComment text displayed on the canvas
widthnumberNoBox width in pixels (default 200)
heightnumberNoBox height in pixels (default 100)
Comment nodes are visual-only — they have no ports and are ignored at runtime. Use them to:
  • Document the purpose of a pipeline section
  • Leave notes for teammates
  • Mark areas that need future work
Comments are saved as part of the pipeline definition but have no effect on execution.

Next Steps

Transformations

Learn JSONata syntax and best practices

Configuration

Configure Kafka clusters, databases, and more

Interactive Testing

Test your pipeline with stubbed sources and sinks

Core Concepts

Understand schemas, ports, and data flow