Loading

Streamlang reference

Streamlang is a JSON DSL for defining stream processing and routing logic. You can write Streamlang directly using the YAML editing mode in the Processing tab, or use the interactive mode which generates Streamlang behind the scenes.

Streamlang provides a consistent processing interface that can be transpiled to multiple execution targets, including Elasticsearch ingest pipelines and ES|QL. This allows processing to run at ingest time or query time without rewriting rules.

A Streamlang configuration is a YAML document with a single top-level steps array. Each step is either a processor (an action block) or a condition block (a condition block with nested steps):

steps:
  - action: <processor_type>
    # processor-specific parameters
  - action: <processor_type>
    # processor-specific parameters
    where:
      # optional condition
  - condition:
      field: <field_path>
      eq: <value>
      steps:
        - action: <processor_type>
          # nested processor
		

Steps run in order. Each processor transforms the document, and the result is passed to the next step.

Processors are the building blocks of a Streamlang configuration. Each processor has an action field that specifies the type of operation to perform.

All processors support the following common options:

Option Type Description
description string A human-readable description of the processor.
ignore_failure boolean When true, document processing continues even if this processor fails.
where condition A condition that must be met for the processor to run.

The following table lists all available processors. Refer to the individual processor pages for YAML parameters and examples.

Action Description
append Adds values to an array field, or creates the field as an array if it doesn't exist.
concat Concatenates a mix of field values and literal strings into a single field.
convert Converts a field value to a different data type.
date Parses date strings into timestamps.
dissect Parses structured text using delimiter-based patterns.
drop_document Prevents a document from being indexed based on a condition.
grok Parses unstructured text using predefined or custom patterns.
join Concatenates the values of multiple fields with a delimiter.
lowercase Converts a string field to lowercase.
math Evaluates an arithmetic expression and stores the result.
network_direction Determines network traffic direction based on source and destination IP addresses.
redact Redacts sensitive data in a string field by matching patterns.
remove Removes a field from the document.
remove_by_prefix Removes a field and all nested fields matching a prefix.
rename Moves a field's value to a new field name and removes the original.
replace Replaces portions of a string field that match a regular expression.
set Assigns a value to a field, creating the field if it doesn't exist.
trim Removes leading and trailing whitespace from a string field.
uppercase Converts a string field to uppercase.

Conditions are Boolean expressions used to control when processors run and how data is routed. They appear in where clauses on processors, in condition blocks, and in stream partitioning.

Each comparison condition specifies a field and an operator with a value:

Operator Description Example value
eq Equals "active", 200
neq Not equals "error"
lt Less than 100
lte Less than or equal to 100
gt Greater than 0
gte Greater than or equal to 1
contains Field value contains the substring "error"
startsWith Field value starts with the string "/api"
endsWith Field value ends with the string ".log"
includes Multivalue field includes the value "admin"
where:
  field: attributes.status
  eq: active
		

Use range to match values within a numeric range. You can combine any of gt, gte, lt, and lte:

where:
  field: attributes.status_code
  range:
    gte: 200
    lt: 300
		

Use exists to check whether a field is present:

# Field must exist
where:
  field: attributes.user_id
  exists: true

# Field must not exist
where:
  field: attributes.temp
  exists: false
		

Combine conditions using and, or, and not:

# All conditions must be true
where:
  and:
    - field: attributes.env
      eq: production
    - field: attributes.level
      eq: error

# At least one condition must be true
where:
  or:
    - field: attributes.level
      eq: error
    - field: attributes.level
      eq: warn

# Negate a condition
where:
  not:
    field: attributes.path
    startsWith: "/internal"
		
Condition Description
always: {} Always evaluates to true.
never: {} Always evaluates to false.

Condition blocks group processors that should only run when a condition is met. Use a condition step with nested steps:

steps:
  - condition:
      field: attributes.env
      eq: production
      steps:
        - action: set
          to: attributes.is_prod
          value: true
        - action: remove
          from: attributes.debug_info
		

Condition blocks can be nested for complex logic:

steps:
  - condition:
      field: attributes.source
      eq: webserver
      steps:
        - action: grok
          from: body.message
          patterns:
            - "%{IP:attributes.client_ip} %{WORD:attributes.method} %{URIPATHPARAM:attributes.path} %{NUMBER:attributes.status}"
        - condition:
            field: attributes.status
            gte: 500
            steps:
              - action: set
                to: attributes.alert_level
                value: critical
		

For wired streams, fields must follow OTel-compatible namespacing. Custom fields must use one of these prefixes:

  • attributes.*
  • body.structured.*
  • resource.attributes.*
  • scope.attributes.*

The following special fields are allowed without a namespace prefix: @timestamp, observed_timestamp, trace_id, span_id, severity_text, severity_number, event_name, body, and body.text.

System-managed fields like stream.name are reserved and cannot be modified by processors.

steps:
  - action: grok
    from: body.message
    patterns:
      - "%{IP:attributes.client_ip} - %{DATA:attributes.user} \\[%{HTTPDATE:attributes.timestamp}\\] \"%{WORD:attributes.method} %{URIPATHPARAM:attributes.path} HTTP/%{NUMBER:attributes.http_version}\" %{NUMBER:attributes.status} %{NUMBER:attributes.bytes}"
  - action: date
    from: attributes.timestamp
    formats:
      - "dd/MMM/yyyy:HH:mm:ss Z"
  - action: convert
    from: attributes.status
    type: integer
  - action: convert
    from: attributes.bytes
    type: long
  - action: remove
    from: attributes.timestamp
    ignore_missing: true
		
steps:
  - condition:
      field: attributes.level
      eq: DEBUG
      steps:
        - action: drop_document
  - condition:
      and:
        - field: attributes.level
          eq: ERROR
        - field: attributes.service
          eq: payments
      steps:
        - action: append
          to: attributes.tags
          value:
            - critical
            - pager
        - action: set
          to: attributes.priority
          value: 1
		
steps:
  - action: redact
    from: body.message
    patterns:
      - "%{EMAILADDRESS:email}"
      - "%{IP:ip_address}"
  - action: replace
    from: attributes.auth_header
    pattern: "Bearer .+"
    replacement: "Bearer [REDACTED]"