﻿---
title: Kafka output
description: The Kafka output sends events to Apache Kafka. Compatibility: This output can connect to Kafka version 0.8.2.0 and later. Older versions might work as...
url: https://www.elastic.co/elastic/docs-builder/docs/3016/reference/fleet/kafka-output
products:
  - Elastic Agent
  - Fleet
applies_to:
  - Elastic Cloud Serverless: Generally available
  - Elastic Stack: Generally available
---

# Kafka output
The Kafka output sends events to Apache Kafka.
**Compatibility:** This output can connect to Kafka version 0.8.2.0 and later. Older versions might work as well, but are not supported.
<admonition title="Kafka timestamps and Elastic Agent">
  - Kafka 3.6+ introduces stricter timestamp validation with the introduction of two new broker/topic-level properties: [log.message.timestamp.before.max.ms](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-before-max-ms) and
    [log.message.timestamp.after.max.ms](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-after-max-ms).
    These properties limit the time difference between the message timestamp (from Elastic Agent) and the Kafka broker receive time.
    Messages can be rejected if the values are exceeded and `log.message.timestamp.type=CreateTime` is set.
    These checks are ignored if `log.message.timestamp.type=LogAppendTime` is set.
  - For Kafka version 0.10.0.0+ the message creation timestamp is set by Elastic Agent and equals the initial timestamp of the event. This behavior affects the retention policy in Kafka. For example, if an Elastic Agent event was created 2 weeks ago, the retention policy is set to 7 days and the message from Elastic Agent arrives to Kafka today, it is immediately discarded because the timestamp value is before the last 7 days.
    You can change this behavior by setting timestamps on message arrival instead.
    The message is not discarded but kept for 7 more days. Set `log.message.timestamp.type` to `LogAppendTime` (default `CreateTime`) in the Kafka configuration.
</admonition>

This example configures a Kafka output called `kafka-output` in the Elastic Agent `elastic-agent.yml` file, with settings as described further in:
```yaml
outputs:
  kafka-output:
    type: kafka
    hosts:
      - 'kafka1:9092'
      - 'kafka2:9092'
      - 'kafka3:9092'
    client_id: Elastic
    version: 1.0.0
    compression: gzip
    compression_level: 4
    username: <my-kafka-username>
    password: <my-kakfa-password>
    sasl:
      mechanism: SCRAM-SHA-256
    partition:
      round_robin:
        group_events: 1
    topic: 'elastic-agent'
    headers:
    - key: "some-key"
      value: "some value"
    - key: "another-key"
      value: "another value"
    timeout: 30
    broker_timeout: 30
    required_acks: 1
    ssl:
      verification_mode: full
```


## Kafka output and using Logstash to index data to Elasticsearch

If you are considering using Logstash to ship the data from `kafka` to Elasticsearch, be aware the structure of the documents sent from Elastic Agent to `kafka` must not be modified by Logstash. We suggest disabling `ecs_compatibility` on both the `kafka` input and the `json` codec in order to make sure the input doesn’t edit the fields and their contents.
The data streams set up by the integrations expect to receive events having the same structure and field names as they were sent directly from an Elastic Agent.
Refer to [Logstash output for Elastic Agent](https://www.elastic.co/elastic/docs-builder/docs/3016/reference/fleet/logstash-output) documentation for more details.
```yaml
inputs {
  kafka {
    ...
    ecs_compatibility => "disabled"
    codec => json { ecs_compatibility => "disabled" }
    ...
  }
}
...
```


## Kafka output configuration settings

The `kafka` output supports the following settings, grouped by category. Many of these settings have sensible defaults that allow you to run Elastic Agent with minimal configuration.
- [Commonly used settings](#output-kafka-commonly-used-settings)
- [Authentication settings](#output-kafka-authentication-settings)
- [Memory queue settings](#output-kafka-memory-queue-settings)
- [Topics settings](#output-kafka-topics-settings)
- [Partition settings](#output-kafka-partition-settings)
- [Header settings](#output-kafka-header-settings)
- [Other configuration settings](#output-kafka-configuration-settings)


## Commonly used settings

<definitions>
  <definition term="enabled">
    (boolean) Enables or disables the output. If set to `false`, the output is disabled.
  </definition>
  <definition term="hosts">
    The addresses your Elastic Agents will use to connect to one or more Kafka brokers.
    Following is an example `hosts` setting with three hosts defined:
    ```yml
    hosts:
      - 'localhost:9092'
      - 'mykafkahost01:9092'
      - 'mykafkahost02:9092'
    ```
  </definition>
  <definition term="version">
    Kafka protocol version that Elastic Agent will request when connecting. Defaults to 1.0.0.
    The protocol version controls the Kafka client features available to Elastic Agent; it does not prevent Elastic Agent from connecting to Kafka versions newer than the protocol version.
  </definition>
</definitions>


## Authentication settings

<definitions>
  <definition term="username">
    The username for connecting to Kafka. If username is configured, the password must be configured as well.
  </definition>
  <definition term="password">
    The password for connecting to Kafka.
  </definition>
  <definition term="sasl.mechanism">
    The SASL mechanism to use when connecting to Kafka. It can be one of:
    - `PLAIN` for SASL/PLAIN.
    - `SCRAM-SHA-256` for SCRAM-SHA-256.
    - `SCRAM-SHA-512` for SCRAM-SHA-512. If `sasl.mechanism` is not set, `PLAIN` is used if `username` and `password` are provided. Otherwise, SASL authentication is disabled.
  </definition>
  <definition term="ssl">
    When sending data to a secured cluster through the `kafka` output, Elastic Agent can use SSL/TLS. For a list of available settings, refer to [SSL/TLS](https://www.elastic.co/elastic/docs-builder/docs/3016/reference/fleet/elastic-agent-ssl-configuration), specifically the settings under [Table 7, Common configuration options](/elastic/docs-builder/docs/3016/reference/fleet/elastic-agent-ssl-configuration#common-ssl-options) and [Table 8, Client configuration options](/elastic/docs-builder/docs/3016/reference/fleet/elastic-agent-ssl-configuration#client-ssl-options).
  </definition>
</definitions>


## Memory queue settings

The memory queue keeps all events in memory.
The memory queue waits for the output to acknowledge or drop events. If the queue is full, no new events can be inserted into the memory queue. Only after the signal from the output will the queue free up space for more events to be accepted.
The memory queue is controlled by the parameters `flush.min_events` and `flush.timeout`. `flush.min_events` gives a limit on the number of events that can be included in a single batch, and `flush.timeout` specifies how long the queue should wait to completely fill an event request. If the output supports a `bulk_max_size` parameter, the maximum batch size will be the smaller of `bulk_max_size` and `flush.min_events`.
`flush.min_events` is a legacy parameter, and new configurations should prefer to control batch size with `bulk_max_size`. As of 8.13, there is never a performance advantage to limiting batch size with `flush.min_events` instead of `bulk_max_size`.
In synchronous mode, an event request is always filled as soon as events are available, even if there are not enough events to fill the requested batch. This is useful when latency must be minimized. To use synchronous mode, set `flush.timeout` to 0.
For backwards compatibility, synchronous mode can also be activated by setting `flush.min_events` to 0 or 1. In this case, batch size will be capped at 1/2 the queue capacity.
In asynchronous mode, an event request will wait up to the specified timeout to try and fill the requested batch completely. If the timeout expires, the queue returns a partial batch with all available events. To use asynchronous mode, set `flush.timeout` to a positive duration, for example 5s.
This sample configuration forwards events to the output when there are enough events to fill the output’s request (usually controlled by `bulk_max_size`, and limited to at most 512 events by `flush.min_events`), or when events have been waiting for
```yaml
  queue.mem.events: 4096
  queue.mem.flush.min_events: 512
  queue.mem.flush.timeout: 5s
```

<definitions>
  <definition term="queue.mem.events">
    The number of events the queue can store. This value should be evenly divisible by the smaller of `queue.mem.flush.min_events` or `bulk_max_size` to avoid sending partial batches to the output.
    **Default:** `3200 events`
  </definition>
  <definition term="queue.mem.flush.min_events">
    `flush.min_events` is a legacy parameter, and new configurations should prefer to control batch size with `bulk_max_size`. As of 8.13, there is never a performance advantage to limiting batch size with `flush.min_events` instead of `bulk_max_size`
    **Default:** `1600 events`
  </definition>
  <definition term="queue.mem.flush.timeout">
    (int) The maximum wait time for `queue.mem.flush.min_events` to be fulfilled. If set to 0s, events are available to the output immediately.
    **Default:** `10s`
  </definition>
</definitions>


## Topics settings

Use these options to set the Kafka topic for each Elastic Agent event.
<definitions>
  <definition term="topic">
    The default Kafka topic used for produced events.
    You can set a static topic, for example `elastic-agent`, or you can use a format string to set a topic dynamically based on an [Elastic Common Schema (ECS)](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/ecs) field. Available fields include:
    - `data_stream.type`
    - `data_stream.dataset`
    - `data_stream.namespace`
    - `@timestamp`
    - `event.dataset`
    For example:
    ```yaml
    topic: '%{[data_stream.type]}'
    ```
    You can also set a custom field. This is useful if you need to construct a more complex or structured topic name. For example, this configuration uses the `fields.kafka_topic` custom field to set the topic for each event:
    ```yaml
    topic: '%{[fields.kafka_topic]}'
    ```
    To set a dynamic topic value for outputting Elastic Agent data to Kafka, you can add the [`add_fields` processor](https://www.elastic.co/elastic/docs-builder/docs/3016/reference/fleet/add_fields-processor) to the input configuration settings of your standalone Elastic Agent.
    For example, the following `add_fields` processor creates a dynamic topic value for the `fields.kafka_topic` field by interpolating multiple [data stream fields](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3016/reference/ecs/ecs-data_stream):
    ```yaml
    - add_fields:
        target: ''
        fields: 
          kafka_topic: '%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}' 
    ```
    For more information, refer to [Agent processors](https://www.elastic.co/elastic/docs-builder/docs/3016/reference/fleet/agent-processors).
  </definition>
</definitions>


## Partition settings

The number of partitions created is set automatically by the Kafka broker based on the list of topics. Records are then published to partitions either randomly, in round-robin order, or according to a calculated hash. The default is hash partitioner.
In the following example, after each event is published to a partition, the partitioner selects the next partition in round-robin fashion.
```yaml
    partition:
      round_robin:
        group_events: 1
```

<definitions>
  <definition term="random.group_events">
    (int) Sets the number of events to be published to the same partition, before the partitioner selects a new partition by random. The default value is 1 meaning after each event a new partition is picked randomly.
  </definition>
  <definition term="round_robin.group_events">
    (int) Sets the number of events to be published to the same partition, before the partitioner selects the next partition. The default value is 1 meaning after each event the next partition will be selected.
  </definition>
  <definition term="hash.hash">
    ([]string) List of fields used to compute the partitioning hash value from. If no field is configured, the events key value will be used.
  </definition>
  <definition term="hash.random">
    (bool) Randomly distribute events if no hash or key value can be computed. The default value is `true`.
  </definition>
</definitions>


## Header settings

A header is a key-value pair, and multiple headers can be included with the same key. Only string values are supported. These headers will be included in each produced Kafka message.
<definitions>
  <definition term="key">
    The key to set in the Kafka header.
  </definition>
  <definition term="value">
    The value to set in the Kafka header.
  </definition>
  <definition term="client_id">
    The configurable ClientID used for logging, debugging, and auditing purposes. The default is `Elastic`. The Client ID is part of the protocol to identify where the messages are coming from.
  </definition>
</definitions>


## Other configuration settings

You can specify these various other options in the `kafka-output` section of the agent configuration file.
<definitions>
  <definition term="backoff.init">
    (string) The number of seconds to wait before trying to reconnect to Kafka after a network error. After waiting `backoff.init` seconds, Elastic Agent tries to reconnect. If the attempt fails, the backoff timer is increased exponentially up to `backoff.max`. After a successful connection, the backoff timer is reset.
    **Default:** `1s`
  </definition>
  <definition term="backoff.max">
    (string) The maximum number of seconds to wait before attempting to connect to Kafka after a network error.
    **Default:** `60s`
  </definition>
  <definition term="broker_timeout">
    The maximum length of time a Kafka broker waits for the required number of ACKs before timing out (see the `required_acks` setting further in).
    **Default:** `10` (seconds)
  </definition>
  <definition term="bulk_flush_frequency">
    (int) Duration to wait before sending bulk Kafka request. `0` is no delay.
    **Default:** `0`
  </definition>
  <definition term="bulk_max_size">
    (int) The maximum number of events to bulk in a single Kafka request.
    **Default:** `2048`
  </definition>
  <definition term="channel_buffer_size">
    (int) Per Kafka broker number of messages buffered in output pipeline.
    **Default:** `256`
  </definition>
  <definition term="codec">
    Output codec configuration. You can specify either the `json` or `format` codec. By default the `json` codec is used.
    **`json.pretty`**: If `pretty` is set to true, events will be nicely formatted. The default is false.
    **`json.escape_html`**: If `escape_html` is set to true, html symbols will be escaped in strings. The default is false.
    Example configuration that uses the `json` codec with pretty printing enabled to write events to the console:
    ```yml
    output.console:
      codec.json:
        pretty: true
        escape_html: false
    ```
    **`format.string`**: Configurable format string used to create a custom formatted message.
    Example configurable that uses the `format` codec to print the events timestamp and message field to console:
    ```yml
    output.console:
      codec.format:
        string: '%{[@timestamp]} %{[message]}'
    ```
  </definition>
  <definition term="compression">
    Select a compression codec to use. Supported codecs are `snappy`, `lz4` and `gzip`.
  </definition>
  <definition term="compression_level">
    For the `gzip` codec you can choose a compression level. The level must be in the range of `1` (best speed) to `9` (best compression).
    Increasing the compression level reduces the network usage but increases the CPU usage.
    **Default:** `4`.
  </definition>
  <definition term="keep_alive">
    (string) The keep-alive period for an active network connection. If `0s`, keep-alives are disabled.
    **Default:** `0s`
  </definition>
  <definition term="max_message_bytes">
    (int) The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. This value should be equal to or less than the broker’s `message.max.bytes`.
    **Default:** `1000000` (bytes)
  </definition>
  <definition term="metadata">
    <definitions>
      <definition term="Kafka metadata update settings. The metadata contains information about brokers, topics, partition, and active leaders to use for publishing.">
        Metadata refresh interval. Defaults to 10 minutes.
      </definition>
      <definition term="full">
        Strategy to use when fetching metadata. When this option is `true`, the client will maintain a full set of metadata for all the available topics. When set to `false` it will only refresh the metadata for the configured topics. The default is false.
      </definition>
      <definition term="retry.max">
        Total number of metadata update retries. The default is 3.
      </definition>
      <definition term="retry.backoff">
        Waiting time between retries. The default is 250ms.
      </definition>
    </definitions>
  </definition>
  <definition term="required_acks">
    The ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1.
    Note: If set to 0, no ACKs are returned by Kafka. Messages might be lost silently on error.
    **Default:** `1` (wait for local commit)
  </definition>
  <definition term="timeout">
    The number of seconds to wait for responses from the Kafka brokers before timing out. The default is 30 (seconds).
    **Default:** `1000000` (bytes)
  </definition>
</definitions>