Common Expression Language input
Use the cel
input to read messages from a file path or HTTP API with a variety of payloads using the Common Expression Language (CEL) and the mito CEL extension libraries.
CEL is a non-Turing complete language that can perform evaluation of expression in inputs, which can include file and API endpoints using the mito extension library. The cel
input periodically runs a CEL program that is given an execution environment that may be configured by the user, and publishes the set of events that result from the program evaluation. Optionally the CEL program may return cursor states that will be provided to the next execution of the CEL program. The cursor states may be used to control the behaviour of the program.
This input supports:
Auth
- Basic
- Digest
- OAuth2
Retrieval at a configurable interval
Pagination
Retries
Rate limiting
Proxying
Example configurations:
filebeat.inputs:
# Fetch your public IP every minute.
- type: cel
interval: 1m
resource.url: https://api.ipify.org/?format=json
program: |
bytes(get(state.url).Body).as(body, {
"events": [body.decode_json()]
})
or equivalently using the text format from ipify.org
filebeat.inputs:
# Fetch your public IP every minute.
- type: cel
interval: 1m
resource.url: https://api.ipify.org/?format=text
program: |
{
"events": [{"ip": string(get(state.url).Body)}]
}
filebeat.inputs:
- type: cel
resource.url: http://localhost:9200/_search
state:
scroll: 5m
program: |
(
!has(state.cursor) || !has(state.cursor.scroll_id) ?
post(state.url+"?scroll=5m", "", "")
:
post(
state.url+"/scroll?"+{"scroll_id": [state.cursor.scroll_id]}.format_query(),
"application/json",
{"scroll": state.scroll}.encode_json()
)
).as(resp, bytes(resp.Body).decode_json().as(body, {
"events": body.hits.hits,
"cursor": {"scroll_id": body._scroll_id},
}))
The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library. A single JSON object is provided as an input accessible through a state
variable. state
contains a string url
field and may contain arbitrary other fields configured via the input’s state
configuration. If the CEL program saves cursor states between executions of the program, the configured state.cursor
value will be replaced by the saved cursor prior to execution.
On start the state
is will be something like this:
{
"url": <resource address>,
"cursor": { ... },
...
}
The state.url
field will be present and may be an HTTP end-point or a file path. It is the responsibility of the CEL program to handle removing the scheme from a file URL if it is present. The state.url
field may be mutated during execution of the program, but the mutated state will not be persisted between restarts The state.url
field must be present in the returned value to ensure that it is available in the next evaluation unless the program has the resource address hard-coded in or it is available from the cursor.
Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.
If the cursor is present the program should perform and process requests based on its value. If cursor is not present the program must have alternative logic to determine what requests to make.
After completion of a program’s execution it should return a single object with a structure looking like this:
{
"events": [ 1
{...},
...
],
"cursor": [ 2
{...},
...
],
"url": <resource address>,
"status_code": <HTTP request status code if a network request>,
"header": <HTTP response headers if a network request>,
"rate_limit": <HTTP rate limit map if required by API>, 3
"want_more": false 4
}
- The
events
field must be present, but may be empty or null. If it is not empty, it must only have objects as elements. The field should be an array, but in the case of an error condition in the CEL program it is acceptable to return a single object instead of an array; this will will be wrapped as an array for publication and an error will be logged. If the single object contains a key, "error", the error value will be used to update the status of the input to report to Elastic Agent. This can be used to more rapidly respond to API failures. - If
cursor
is present it must be either be a single object or an array with the same length as events; each element i of thecursor
will be the details for obtaining the events at and beyond event i in theevents
array. If thecursor
is a single object it is will be the details for obtaining events after the last event in theevents
array and will only be retained on successful publication of all the events in theevents
array. - If
rate_limit
is present it must be a map with numeric fieldsrate
andburst
. Therate_limit
field may also have a stringerror
field and other fields which will be logged. If it has anerror
field, therate
andburst
will not be used to set rate limit behavior. The Limit, and Okta Rate Limit policy and Draft Rate Limit policy documentation show how to construct this field. - The evaluation is repeated with the new state, after removing the events field, if the "want_more" field is present and true, and a non-zero events array is returned. If the "want_more" field is present after a failed evaluation, it is set to false.
The status_code
, header
and rate_limit
values may be omitted if the program is not interacting with an HTTP API end-point and so will not be needed to contribute to program control.
The CEL input will log the complete state after evaluation when logging at the DEBUG level. This will include any sensitive or secret information kept in the state
object, and so DEBUG level logging should not be used in production when sensitive information is retained in the state
object. See redact
configuration parameters for settings to exclude sensitive fields from DEBUG logs.
As noted above the cel
input provides functions, macros, and global variables to extend the language.
File — the file extension is initialized with MIME handlers for "application/gzip", "application/x-ndjson", "application/zip", "text/csv; header=absent", and "text/csv; header=present".
XML — the XML extension is initialized with XML schema definitions provided via the
xsd
configuration option.Limit — the rate limit extension is initialized with Okta (as "okta") and the Draft Rate Limit (as "draft") policies.
MIME — the MIME extension is initialized with MIME handlers for "application/gzip", "application/x-ndjson", "application/zip", "text/csv; header=absent", and "text/csv; header=present".
Regexp — the regular expression extension is initialized with the patterns specified in the user input configuration via the
regexp
field.Debug — the debug handler registers a logger with the name extension
cel_debug
and calls to the CELdebug
function are emitted to that logger.
In addition to the extensions provided in the packages listed above, a global variable useragent
is also provided which gives the user CEL program access to the filebeat user-agent string. By default, this value is assigned to all requests' user-agent headers unless the CEL program has already set the user-agent header value. Programs wishing to not provide a user-agent, should set this header to the empty string, ""
.
Host environment variables are made available via the global map env
. Only environment variables that have been allow listed via the allowed_environment
configuration list are visible to the CEL program.
The CEL environment enables the optional types library using the version defined here.
Additionally, it supports authentication via Basic Authentication, Digest Authentication or OAuth2.
Example configurations with authentication:
filebeat.inputs:
- type: cel
auth.basic:
user: user@domain.tld
password: P@$$W0₹D
resource.url: http://localhost
filebeat.inputs:
- type: cel
auth.digest:
user: user@domain.tld
password: P@$$W0₹D
resource.url: http://localhost
filebeat.inputs:
- type: cel
auth.oauth2:
client.id: 12345678901234567890abcdef
client.secret: abcdef12345678901234567890
token_url: http://localhost/oauth2/token
resource.url: http://localhost
filebeat.inputs:
- type: cel
auth.oauth2:
client.id: 12345678901234567890abcdef
client.secret: abcdef12345678901234567890
token_url: http://localhost/oauth2/token
user: user@domain.tld
password: P@$$W0₹D
resource.url: http://localhost
The cel
input keeps a runtime state between requests. This state can be accessed by the CEL program and may contain arbitrary objects.
The state must contain a url
string and may contain any object the user wishes to store in it.
All objects are stored at runtime, except cursor
, which has values that are persisted between restarts.
The cel
input supports the following configuration options plus the Common options described later.
Duration between repeated requests. It may make additional pagination requests in response to the initial request if pagination is enabled. Default: 60s
.
The CEL program that is executed each polling period. This field is required.
max_executions
is the maximum number of times a CEL program can request to be re-run with a want_more
field. This is used to ensure that accidental infinite loops do not halt processing. When the execution budget is exceeded, execution will be restarted at the next interval and a warning will be written into the logs. Default: 1000.
state
is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the state
variable. It is made available to subsequent executions of the program during the life of input as the returned value of the previous execution, but with the state.events
field removed. Except for the state.cursor
field, state
does not persist over restarts.
The cursor is an object available as state.cursor
where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflect the cursor for completion of the full set of events.
filebeat.inputs:
# Fetch your public IP every minute and note when the last request was made.
- type: cel
interval: 1m
resource.url: https://api.ipify.org/?format=json
program: |
bytes(get(state.url).Body).as(body, {
"events": [body.decode_json().with({
"last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
state.cursor.last_requested_at
:
now
})],
"cursor": {"last_requested_at": now}
})
A list of host environment variable that will be made visible to the CEL execution environment. By default, no environment variables are visible.
filebeat.inputs:
# Publish the list of files in $PATH every minute.
- type: cel
interval: 1m
resource.url: ""
allowed_environment:
- PATH
program: |
{
"events": {
"message": env.?PATH.orValue("").split(":")
.map(p, try(dir(p)))
.filter(d, type(d) != type(""))
.flatten()
.collate("name")
}
}
A set of named regular expressions that may be used during a CEL program’s execution using the regexp
extension library. The syntax used for the regular expressions is RE2.
filebeat.inputs:
- type: cel
# Define two regular expressions, 'products' and 'solutions' for use during CEL execution.
regexp:
products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
solutions: '(?i)(Search|Observability|Security)'
XML documents may require additional type information to enable correct parsing and ingestion. This information can be provided as an XML Schema Definitions (XSD) for XML documents using the xsd
option. The key under which the XSD information is provided is accessed via the decode_xml
CEL extension.
filebeat.inputs:
- type: cel
# Provide an XSD, 'order', for use during CEL execution (truncated for example).
xsd:
order: |
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="order">
<xs:complexType>
<xs:sequence>
<xs:element name="sender" type="xs:string"/>
<xs:element name="address">
<xs:complexType>
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="company" type="xs:string"/>
When set to false
, disables the basic auth configuration. Default: true
.
Basic auth settings are disabled if either enabled
is set to false
or the auth.basic
section is missing.
The user to authenticate with.
The password to use.
When set to false
, disables the digest auth configuration. Default: true
.
digest auth settings are disabled if either enabled
is set to false
or the auth.digest
section is missing.
The user to authenticate with.
The password to use.
When set to true
, Digest Authentication challenges are not reused.
When set to false
, disables the oauth2 configuration. Default: true
.
OAuth2 settings are disabled if either enabled
is set to false
or the auth.oauth2
section is missing.
Used to configure supported oauth2 providers. Each supported provider will require specific settings. It is not set by default. Supported providers are: azure
, google
, okta
.
The client ID used as part of the authentication flow. It is always required except if using google
as provider. Required for providers: default
, azure
, okta
.
The client secret used as part of the authentication flow. It is always required except if using google
or okta
as provider. Required for providers: default
, azure
.
The user used as part of the authentication flow. It is required for authentication - grant type password. It is only available for provider default
.
The password used as part of the authentication flow. It is required for authentication - grant type password. It is only available for provider default
.
user and password are required for grant_type password. If user and password is not used then it will automatically use the token_url
and client credential
method.
A list of scopes that will be requested during the oauth2 flow. It is optional for all providers.
The endpoint that will be used to generate the tokens during the oauth2 flow. It is required if no provider is specified.
For azure
provider either token_url
or azure.tenant_id
is required.
Set of values that will be sent on each request to the token_url
. Each param key can have multiple values. Can be set for all providers except google
.
- type: cel
auth.oauth2:
endpoint_params:
Param1:
- ValueA
- ValueB
Param2:
- Value
Used for authentication when using azure
provider. Since it is used in the process to generate the token_url
, it can’t be used in combination with it. It is not required.
For information about where to find it, you can refer to https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal.
The accessed WebAPI resource when using azure
provider. It is not required.
The credentials file for Google.
Only one of the credentials settings can be set at once. If none is provided, loading default credentials from the environment will be attempted via ADC. For more information about how to provide Google credentials, please refer to https://cloud.google.com/docs/authentication.
Your credentials information as raw JSON.
Only one of the credentials settings can be set at once. If none is provided, loading default credentials from the environment will be attempted via ADC. For more information about how to provide Google credentials, please refer to https://cloud.google.com/docs/authentication.
The JWT Account Key file for Google.
Only one of the credentials settings can be set at once. If none is provided, loading default credentials from the environment will be attempted via ADC. For more information about how to provide Google credentials, please refer to https://cloud.google.com/docs/authentication.
The JWT Account Key file as raw JSON.
Only one of the credentials settings can be set at once. If none is provided, loading default credentials from the environment will be attempted via ADC. For more information about how to provide Google credentials, please refer to https://cloud.google.com/docs/authentication.
Email of the delegated account used to create the credentials (usually an admin). Used in combination with auth.oauth2.google.jwt_file
or auth.oauth2.google.jwt_json
.
The RSA JWK Private Key file for your Okta Service App which is used for interacting with Okta Org Auth Server to mint tokens with okta.* scopes.
Only one of the credentials settings can be set at once. For more information please refer to https://developer.okta.com/docs/guides/implement-oauth-for-okta-serviceapp/main/
The RSA JWK Private Key JSON for your Okta Service App which is used for interacting with Okta Org Auth Server to mint tokens with okta.* scopes.
Only one of the credentials settings can be set at once. For more information please refer to https://developer.okta.com/docs/guides/implement-oauth-for-okta-serviceapp/main/
The RSA JWK private key PEM block for your Okta Service App which is used for interacting with Okta Org Auth Server to mint tokens with okta.* scopes.
Only one of the credentials settings can be set at once. For more information please refer to https://developer.okta.com/docs/guides/implement-oauth-for-okta-serviceapp/main/
The URL of the HTTP API. Required.
The API endpoint may be accessed via unix socket and Windows named pipes by adding +unix
or +npipe
to the URL scheme, for example, http+unix:///var/socket/
.
Duration before declaring that the HTTP client connection has timed out. Valid time units are ns
, us
, ms
, s
, m
, h
. Default: 30s
.
This specifies SSL/TLS configuration. If the ssl section is missing, the host’s CAs are used for HTTPS connections. See SSL for more information.
This specifies whether to disable keep-alives for HTTP end-points. Default: true
.
The maximum number of idle connections across all hosts. Zero means no limit. Default: 0
.
The maximum idle connections to keep per-host. If zero, defaults to two. Default: 0
.
The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are ns
, us
, ms
, s
, m
, h
. Zero means no limit. Default: 0s
.
The maximum number of retries for the HTTP client. Default: 5
.
The minimum time to wait before a retry is attempted. Default: 1s
.
The maximum time to wait before a retry is attempted. Default: 60s
.
When set to true
request headers are forwarded in case of a redirect. Default: false
.
When redirect.forward_headers
is set to true
, all headers except the ones defined in this list will be forwarded. Default: []
.
The maximum number of redirects to follow for a request. Default: 10
.
The value of the response that specifies the maximum overall resource request rate.
The maximum burst size. Burst is the maximum number of resource requests that can be made above the overall rate limit.
It is possible to log HTTP requests and responses in a CEL program to a local file-system for debugging configurations. This option is enabled by setting resource.tracer.enabled
to true and setting the resource.tracer.filename
value. Additional options are available to tune log rotation behavior. To delete existing logs, set resource.tracer.enabled
to false without unsetting the filename option.
Enabling this option compromises security and should only be used for debugging.
To differentiate the trace files generated from different input instances, a placeholder *
can be added to the filename and will be replaced with the input instance id. For Example, http-request-trace-*.ndjson
. Setting resource.tracer.filename
with resource.tracer.enable
set to false will cause any existing trace logs matching the filename option to be deleted.
This value sets the maximum size, in megabytes, the log file will reach before it is rotated. By default logs are allowed to reach 1MB before rotation. Individual request/response bodies will be truncated to 10% of this size.
This specifies the number days to retain rotated log files. If it is not set, log files are retained indefinitely.
The number of old logs to retain. If it is not set all old logs are retained subject to the resource.tracer.maxage
setting.
Whether to use the host’s local time rather that UTC for timestamping rotated log file names.
This determines whether rotated logs should be gzip compressed.
During debug level logging, the state
object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged state
. The redact
configuration allows users to configure this field redaction behaviour. For safety reasons if the redact
configuration is missing a warning is logged.
In the case of no-required redaction an empty redact.fields
configuration should be used to silence the logged warning.
- type: cel
redact:
fields: ~
As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so
filebeat.inputs:
- type: cel
resource.url: http://localhost:9200/_search
state:
user: user@domain.tld
password: P@$$W0₹D
redact:
fields:
- password
delete: true
Note that fields under the auth
configuration hierarchy are not exposed to the state
and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible.
This specifies fields in the state
to be redacted prior to debug logging. Fields listed in this array will be either replaced with a *
or deleted entirely from messages sent to debug logs.
This specifies whether fields should be replaced with a *
or deleted entirely from messages sent to debug logs. If delete is true
, fields will be deleted rather than replaced.
It is possible to log CEL program evaluation failures to a local file-system for debugging configurations. This option is enabled by setting failure_dump.enabled
to true and setting the failure_dump.filename
value. To delete existing failure dumps, set failure_dump.enabled
to false without unsetting the filename option.
Enabling this option compromises security and should only be used for debugging.
This specifies a directory path to write failure dumps to. If it is not empty and a CEL program evaluation fails, the complete set of states for the CEL program’s evaluation will be written as a JSON file, along with the error that was reported. This option should only be used when debugging a failure as it imposes a significant performance impact on the input and may potentially use large quantities of memory to hold the full set of states. If a failure dump is configured, it is recommended that data input sizes be reduced to avoid excessive memory consumption, and making dumps that are intractable to analysis. To delete existing failure dumps, set failure_dump.enabled
to false without unsetting the filename option.
This specifies that CEL code evaluation coverage should be recorded and logged in debug logs. This is a developer-only option.
This input exposes metrics under the HTTP monitoring endpoint. These metrics are exposed under the /inputs
path. They can be used to observe the activity of the input.
Metric | Description |
---|---|
resource |
URL or path of the input resource. |
cel_executions |
Number times the CEL program has been executed. |
batches_received_total |
Number of event arrays received. |
events_received_total |
Number of events received. |
batches_published_total |
Number of event arrays published. |
events_published_total |
Number of events published. |
cel_processing_time |
Histogram of the elapsed successful CEL program processing times in nanoseconds. |
batch_processing_time |
Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). |
http_request_total |
Total number of processed requests. |
http_request_errors_total |
Total number of request errors. |
http_request_delete_total |
Total number of DELETE requests. |
http_request_get_total |
Total number of GET requests. |
http_request_head_total |
Total number of HEAD requests. |
http_request_options_total |
Total number of OPTIONS requests. |
http_request_patch_total |
Total number of PATCH requests. |
http_request_post_total |
Total number of POST requests. |
http_request_put_total |
Total number of PUT requests. |
http_request_body_bytes_total |
Total of the requests body size. |
http_request_body_bytes |
Histogram of the requests body size. |
http_response_total |
Total number of responses received. |
http_response_errors_total |
Total number of response errors. |
http_response_1xx_total |
Total number of 1xx responses. |
http_response_2xx_total |
Total number of 2xx responses. |
http_response_3xx_total |
Total number of 3xx responses. |
http_response_4xx_total |
Total number of 4xx responses. |
http_response_5xx_total |
Total number of 5xx responses. |
http_response_body_bytes_total |
Total of the responses body size. |
http_response_body_bytes |
Histogram of the responses body size. |
http_round_trip_time |
Histogram of the round trip time. |
A stand-alone CEL environment that implements the majority of the CEL input’s Comment Expression Language functionality is available in the Elastic Mito repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running go install github.com/elastic/mito/cmd/mito@latest
and requires a Go toolchain.
The following configuration options are supported by all inputs.
Use the enabled
option to enable and disable inputs. By default, enabled is set to true.
A list of tags that Filebeat includes in the tags
field of each published event. Tags make it easy to select specific events in Kibana or apply conditional filtering in Logstash. These tags will be appended to the list of tags specified in the general configuration.
Example:
filebeat.inputs:
- type: cel
. . .
tags: ["json"]
Optional fields that you can specify to add additional information to the output. For example, you might add fields that you can use for filtering log data. Fields can be scalar values, arrays, dictionaries, or any nested combination of these. By default, the fields that you specify here will be grouped under a fields
sub-dictionary in the output document. To store the custom fields as top-level fields, set the fields_under_root
option to true. If a duplicate field is declared in the general configuration, then its value will be overwritten by the value declared here.
filebeat.inputs:
- type: cel
. . .
fields:
app_id: query_engine_12
If this option is set to true, the custom fields are stored as top-level fields in the output document instead of being grouped under a fields
sub-dictionary. If the custom field names conflict with other field names added by Filebeat, then the custom fields overwrite the other fields.
A list of processors to apply to the input data.
See Processors for information about specifying processors in your config.
The ingest pipeline ID to set for the events generated by this input.
The pipeline ID can also be configured in the Elasticsearch output, but this option usually results in simpler configuration files. If the pipeline is configured both in the input and output, the option from the input is used.
The pipeline
is always lowercased. If pipeline: Foo-Bar
, then the pipeline name in Elasticsearch needs to be defined as foo-bar
.
If this option is set to true, fields with null
values will be published in the output document. By default, keep_null
is set to false
.
If present, this formatted string overrides the index for events from this input (for elasticsearch outputs), or sets the raw_index
field of the event’s metadata (for other outputs). This string can only refer to the agent name and version and the event timestamp; for access to dynamic fields, use output.elasticsearch.index
or a processor.
Example value: "%{[agent.name]}-myindex-%{+yyyy.MM.dd}"
might expand to "filebeat-myindex-2019.11.01"
.
By default, all events contain host.name
. This option can be set to true
to disable the addition of this field to all events. The default value is false
.