Streamlang reference
Streamlang is a JSON DSL for defining stream processing and routing logic. You can write Streamlang directly using the YAML editing mode in the Processing tab, or use the interactive mode which generates Streamlang behind the scenes.
Streamlang provides a consistent processing interface that can be transpiled to multiple execution targets, including Elasticsearch ingest pipelines and ES|QL. This allows processing to run at ingest time or query time without rewriting rules.
A Streamlang configuration is a YAML document with a single top-level steps array. Each step is either a processor (an action block) or a condition block (a condition block with nested steps):
steps:
- action: <processor_type>
# processor-specific parameters
- action: <processor_type>
# processor-specific parameters
where:
# optional condition
- condition:
field: <field_path>
eq: <value>
steps:
- action: <processor_type>
# nested processor
Steps run in order. Each processor transforms the document, and the result is passed to the next step.
Processors are the building blocks of a Streamlang configuration. Each processor has an action field that specifies the type of operation to perform.
All processors support the following common options:
| Option | Type | Description |
|---|---|---|
description |
string | A human-readable description of the processor. |
ignore_failure |
boolean | When true, document processing continues even if this processor fails. |
where |
condition | A condition that must be met for the processor to run. |
The following table lists all available processors. Refer to the individual processor pages for YAML parameters and examples.
| Action | Description |
|---|---|
append |
Adds values to an array field, or creates the field as an array if it doesn't exist. |
concat |
Concatenates a mix of field values and literal strings into a single field. |
convert |
Converts a field value to a different data type. |
date |
Parses date strings into timestamps. |
dissect |
Parses structured text using delimiter-based patterns. |
drop_document |
Prevents a document from being indexed based on a condition. |
grok |
Parses unstructured text using predefined or custom patterns. |
join |
Concatenates the values of multiple fields with a delimiter. |
lowercase |
Converts a string field to lowercase. |
math |
Evaluates an arithmetic expression and stores the result. |
network_direction |
Determines network traffic direction based on source and destination IP addresses. |
redact |
Redacts sensitive data in a string field by matching patterns. |
remove |
Removes a field from the document. |
remove_by_prefix |
Removes a field and all nested fields matching a prefix. |
rename |
Moves a field's value to a new field name and removes the original. |
replace |
Replaces portions of a string field that match a regular expression. |
set |
Assigns a value to a field, creating the field if it doesn't exist. |
trim |
Removes leading and trailing whitespace from a string field. |
uppercase |
Converts a string field to uppercase. |
Conditions are Boolean expressions used to control when processors run and how data is routed. They appear in where clauses on processors, in condition blocks, and in stream partitioning.
Each comparison condition specifies a field and an operator with a value:
| Operator | Description | Example value |
|---|---|---|
eq |
Equals | "active", 200 |
neq |
Not equals | "error" |
lt |
Less than | 100 |
lte |
Less than or equal to | 100 |
gt |
Greater than | 0 |
gte |
Greater than or equal to | 1 |
contains |
Field value contains the substring | "error" |
startsWith |
Field value starts with the string | "/api" |
endsWith |
Field value ends with the string | ".log" |
includes |
Multivalue field includes the value | "admin" |
where:
field: attributes.status
eq: active
Use range to match values within a numeric range. You can combine any of gt, gte, lt, and lte:
where:
field: attributes.status_code
range:
gte: 200
lt: 300
Use exists to check whether a field is present:
# Field must exist
where:
field: attributes.user_id
exists: true
# Field must not exist
where:
field: attributes.temp
exists: false
Combine conditions using and, or, and not:
# All conditions must be true
where:
and:
- field: attributes.env
eq: production
- field: attributes.level
eq: error
# At least one condition must be true
where:
or:
- field: attributes.level
eq: error
- field: attributes.level
eq: warn
# Negate a condition
where:
not:
field: attributes.path
startsWith: "/internal"
| Condition | Description |
|---|---|
always: {} |
Always evaluates to true. |
never: {} |
Always evaluates to false. |
Condition blocks group processors that should only run when a condition is met. Use a condition step with nested steps:
steps:
- condition:
field: attributes.env
eq: production
steps:
- action: set
to: attributes.is_prod
value: true
- action: remove
from: attributes.debug_info
Condition blocks can be nested for complex logic:
steps:
- condition:
field: attributes.source
eq: webserver
steps:
- action: grok
from: body.message
patterns:
- "%{IP:attributes.client_ip} %{WORD:attributes.method} %{URIPATHPARAM:attributes.path} %{NUMBER:attributes.status}"
- condition:
field: attributes.status
gte: 500
steps:
- action: set
to: attributes.alert_level
value: critical
For wired streams, fields must follow OTel-compatible namespacing. Custom fields must use one of these prefixes:
attributes.*body.structured.*resource.attributes.*scope.attributes.*
The following special fields are allowed without a namespace prefix: @timestamp, observed_timestamp, trace_id, span_id, severity_text, severity_number, event_name, body, and body.text.
System-managed fields like stream.name are reserved and cannot be modified by processors.
steps:
- action: grok
from: body.message
patterns:
- "%{IP:attributes.client_ip} - %{DATA:attributes.user} \\[%{HTTPDATE:attributes.timestamp}\\] \"%{WORD:attributes.method} %{URIPATHPARAM:attributes.path} HTTP/%{NUMBER:attributes.http_version}\" %{NUMBER:attributes.status} %{NUMBER:attributes.bytes}"
- action: date
from: attributes.timestamp
formats:
- "dd/MMM/yyyy:HH:mm:ss Z"
- action: convert
from: attributes.status
type: integer
- action: convert
from: attributes.bytes
type: long
- action: remove
from: attributes.timestamp
ignore_missing: true
steps:
- condition:
field: attributes.level
eq: DEBUG
steps:
- action: drop_document
- condition:
and:
- field: attributes.level
eq: ERROR
- field: attributes.service
eq: payments
steps:
- action: append
to: attributes.tags
value:
- critical
- pager
- action: set
to: attributes.priority
value: 1
steps:
- action: redact
from: body.message
patterns:
- "%{EMAILADDRESS:email}"
- "%{IP:ip_address}"
- action: replace
from: attributes.auth_header
pattern: "Bearer .+"
replacement: "Bearer [REDACTED]"