﻿---
title: Kafka output settings
description: Specify these settings to send data over a secure connection to Kafka. In the Fleet Output settings, make sure that the Kafka output type is selected...
url: https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/kafka-output-settings
products:
  - Elastic Agent
  - Fleet
applies_to:
  - Elastic Cloud Serverless: Generally available
  - Elastic Stack: Generally available
---

# Kafka output settings
<note>
  If you plan to use Logstash to modify Elastic Agent output data before it’s sent to Kafka, refer to our [guidance](#kafka-output-settings-ls-warning) for doing so, further in on this page.
</note>

Specify these settings to send data over a secure connection to Kafka. In the Fleet [Output settings](/elastic/docs-builder/docs/3028/reference/fleet/fleet-settings#output-settings), make sure that the Kafka output type is selected.
<admonition title="Kafka timestamps and Elastic Agent">
  - Kafka 3.6+ introduces stricter timestamp validation with the introduction of two new broker/topic-level properties: [log.message.timestamp.before.max.ms](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-before-max-ms) and
    [log.message.timestamp.after.max.ms](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-after-max-ms).
    These properties limit the time difference between the message timestamp (from Elastic Agent) and the Kafka broker receive time.
    Messages can be rejected if the values are exceeded and `log.message.timestamp.type=CreateTime` is set.
    These checks are ignored if `log.message.timestamp.type=LogAppendTime` is set.
  - For Kafka version 0.10.0.0+ the message creation timestamp is set by Elastic Agent and equals the initial timestamp of the event. This behavior affects the retention policy in Kafka. For example, if an Elastic Agent event was created 2 weeks ago, the retention policy is set to 7 days and the message from Elastic Agent arrives to Kafka today, it is immediately discarded because the timestamp value is before the last 7 days.
    You can change this behavior by setting timestamps on message arrival instead.
    The message is not discarded but kept for 7 more days. Set `log.message.timestamp.type` to `LogAppendTime` (default `CreateTime`) in the Kafka configuration.
</admonition>


### General settings

<definitions>
  <definition term="Kafka version">
    The Kafka protocol version that Elastic Agent will request when connecting. Defaults to `1.0.0`. Currently Kafka versions from `0.8.2.0` to `2.6.0` are supported, however the latest Kafka version (`3.x.x`) is expected to be compatible when version `2.6.0` is selected. When using Kafka 4.0 and newer, the version must be set to at least `2.1.0`.
  </definition>
  <definition term="Hosts">
    The addresses your Elastic Agents will use to connect to one or more Kafka brokers. Use the format `host:port` (without any protocol `http://`). Click **Add row** to specify additional addresses.
    **Examples:**
    - `localhost:9092`
    - `mykafkahost:9092`
    Refer to the [Fleet Server](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/fleet-server) documentation for default ports and other configuration details.
  </definition>
</definitions>


### Authentication settings

Select the mechanism that Elastic Agent uses to authenticate with Kafka.
<definitions>
  <definition term="None">
    No authentication is used between Elastic Agent and Kafka. This is the default option. In production, it’s recommended to have an authentication method selected.
    <definitions>
      <definition term="Plaintext">
        Set this option for traffic between Elastic Agent and Kafka to be sent as plaintext, without any transport layer security.
      </definition>
    </definitions>
    This is the default option when no authentication is set.
    <definitions>
      <definition term="Encryption">
        Set this option for traffic between Elastic Agent and Kafka to use transport layer security.
      </definition>
    </definitions>
    When **Encryption** is selected, the **Server SSL certificate authorities** and **Verification mode** mode options become available.
  </definition>
  <definition term="Username / Password">
    Connect to Kafka with a username and password.
    Provide your username and password, and select a SASL (Simple Authentication and Security Layer) mechanism for your login credentials.
    When SCRAM is enabled, Elastic Agent uses the [SCRAM](https://en.wikipedia.org/wiki/Salted_Challenge_Response_Authentication_Mechanism) mechanism to authenticate the user credential. SCRAM is based on the IETF RFC5802 standard which describes a challenge-response mechanism for authenticating users.
    - Plain - SCRAM is not used to authenticate
    - SCRAM-SHA-256 - uses the SHA-256 hashing function
    - SCRAM-SHA-512 - uses the SHA-512 hashing function
    To prevent unauthorized access your Kafka password is stored as a secret value. While secret storage is recommended, you can choose to override this setting and store the password as plain text in the agent policy definition. Secret storage requires Fleet Server version 8.12 or higher.
    Note that this setting can also be stored as a secret value or as plain text for preconfigured outputs. See [Preconfiguration settings](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3028/reference/kibana/configuration-reference/fleet-settings#_preconfiguration_settings_for_advanced_use_cases) in the Kibana Guide to learn more.
  </definition>
  <definition term="SSL">
    Authenticate using the Secure Sockets Layer (SSL) protocol. Provide the following details for your SSL certificate:
    <definitions>
      <definition term="Client SSL certificate">
        The certificate generated for the client. Copy and paste in the full contents of the certificate. This is the certificate that all the agents will use to connect to Kafka.
      </definition>
    </definitions>
    In cases where each client has a unique certificate, the local path to that certificate can be placed here. The agents will pick the certificate in that location when establishing a connection to Kafka.
    <definitions>
      <definition term="Client SSL certificate key">
        The private key generated for the client. This must be in PKCS 8 key. Copy and paste in the full contents of the certificate key. This is the certificate key that all the agents will use to connect to Kafka.
      </definition>
    </definitions>
    In cases where each client has a unique certificate key, the local path to that certificate key can be placed here. The agents will pick the certificate key in that location when establishing a connection to Kafka.
    To prevent unauthorized access the certificate key is stored as a secret value. While secret storage is recommended, you can choose to override this setting and store the key as plain text in the agent policy definition. Secret storage requires Fleet Server version 8.12 or higher.
    Note that this setting can also be stored as a secret value or as plain text for preconfigured outputs. See [Preconfiguration settings](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3028/reference/kibana/configuration-reference/fleet-settings#_preconfiguration_settings_for_advanced_use_cases) in the Kibana Guide to learn more.
  </definition>
  <definition term="Server SSL certificate authorities">
    The CA certificate to use to connect to Kafka. This is the CA used to generate the certificate and key for Kafka. Copy and paste in the full contents for the CA certificate.
    This setting is optional. This setting is not available when the authentication `None` and `Plaintext` options are selected.
    Click **Add row** to specify additional certificate authories.
  </definition>
  <definition term="Verification mode">
    Controls the verification of server certificates. Valid values are:
    <definitions>
      <definition term="Full">
        Verifies that the provided certificate is signed by a trusted authority (CA) and also verifies that the server’s hostname (or IP address) matches the names identified within the certificate.
      </definition>
      <definition term="None">
        Performs *no verification* of the server’s certificate. This mode disables many of the security benefits of SSL/TLS and should only be used after cautious consideration. It is primarily intended as a temporary diagnostic mechanism when attempting to resolve TLS errors; its use in production environments is strongly discouraged.
      </definition>
      <definition term="Strict">
        Verifies that the provided certificate is signed by a trusted authority (CA) and also verifies that the server’s hostname (or IP address) matches the names identified within the certificate. If the Subject Alternative Name is empty, it returns an error.
      </definition>
      <definition term="Certificate">
        Verifies that the provided certificate is signed by a trusted authority (CA), but does not perform any hostname verification.
      </definition>
    </definitions>
    The default value is `Full`. This setting is not available when the authentication `None` and `Plaintext` options are selected.
  </definition>
</definitions>


### Partitioning settings

The number of partitions created is set automatically by the Kafka broker based on the list of topics. Records are then published to partitions either randomly, in round-robin order, or according to a calculated hash.
<definitions>
  <definition term="Random">
    Publish records to Kafka output broker event partitions randomly. Specify the number of events to be published to the same partition before the partitioner selects a new partition.
  </definition>
  <definition term="Round robin">
    Publish records to Kafka output broker event partitions in a round-robin fashion. Specify the number of events to be published to the same partition before the partitioner selects a new partition.
  </definition>
  <definition term="Hash">
    Publish records to Kafka output broker event partitions based on a hash computed from the specified list of fields. If a field is not specified, the Kafka event key value is used.
  </definition>
</definitions>


### Topics settings

Use this option to set the Kafka topic for each Elastic Agent event.
<definitions>
  <definition term="Default topic">
    Set the default Kafka topic used for events sent by Elastic Agent.
    You can set a static topic, for example `elastic-agent`, or you can choose to set a topic dynamically based on an [Elastic Common Schema (ECS)](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3028/reference/ecs) field. Available fields include:
    - `data_stream.type`
    - `data_stream.dataset`
    - `data_stream.namespace`
    - `@timestamp`
    - `event.dataset`
    You can also set a custom field. This is useful if you need to construct a more complex or structured topic name. For example, you can use the `fields.kafka_topic` custom field to set a dynamic topic for each event.
    To set a dynamic topic value for outputting Elastic Agent data to Kafka, you can add the [`add_fields` processor](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/add_fields-processor) to any integration policies on your Fleet-managed Elastic Agents.
    For example, the following `add_fields` processor creates a dynamic topic value for the `fields.kafka_topic` field by interpolating multiple [data stream fields](https://docs-v3-preview.elastic.dev/elastic/docs-builder/docs/3028/reference/ecs/ecs-data_stream):
    ```yaml
    - add_fields:
        target: ''
        fields: 
          kafka_topic: '%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}' 
    ```
    For more information, refer to [Agent processors](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/agent-processors).
  </definition>
</definitions>


### Header settings

A header is a key-value pair, and multiple headers can be included with the same key. Only string values are supported. These headers will be included in each produced Kafka message.
<definitions>
  <definition term="Key">
    The key to set in the Kafka header.
  </definition>
  <definition term="Value">
    The value to set in the Kafka header.
    Click **Add header** to configure additional headers to be included in each Kafka message.
  </definition>
  <definition term="Client ID">
    The configurable ClientID used for logging, debugging, and auditing purposes. The default is `Elastic`. The Client ID is part of the protocol to identify where the messages are coming from.
  </definition>
</definitions>


### Compression settings

You can enable compression to reduce the volume of Kafka output.
<definitions>
  <definition term="Codec">
    Select a compression codec to use. Supported codecs are `snappy`, `lz4` and `gzip`.
  </definition>
  <definition term="Level">
    For the `gzip` codec you can choose a compression level. The level must be in the range of `1` (best speed) to `9` (best compression).
    Increasing the compression level reduces the network usage but increases the CPU usage. The default value is 4.
  </definition>
</definitions>


### Broker settings

Configure timeout and buffer size values for the Kafka brokers.
<definitions>
  <definition term="Broker timeout">
    The maximum length of time a Kafka broker waits for the required number of ACKs before timing out (see the `ACK reliability` setting further in). The default is 10 seconds.
  </definition>
  <definition term="Broker reachability timeout">
    The maximum length of time that an Elastic Agent waits for a response from a Kafka broker before timing out. The default is 30 seconds.
  </definition>
  <definition term="ACK reliability">
    The ACK reliability level required from broker. Options are:
    - Wait for local commit
    - Wait for all replicas to commit
    - Do not wait
    The default is `Wait for local commit`.
    Note that if ACK reliability is set to `Do not wait` no ACKs are returned by Kafka. Messages might be lost silently in the event of an error.
  </definition>
</definitions>


### Other settings

<definitions>
  <definition term="Key">
    An optional formatted string specifying the Kafka event key. If configured, the event key can be extracted from the event using a format string.
    See the [Kafka documentation](https://kafka.apache.org/intro#intro_topics) for the implications of a particular choice of key; by default, the key is chosen by the Kafka cluster.
  </definition>
  <definition term="Proxy">
    Select a proxy URL for Elastic Agent to connect to Kafka. To learn about proxy configuration, refer to [Using a proxy server with Elastic Agent and Fleet](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/fleet-agent-proxy-support).
  </definition>
  <definition term="Advanced YAML configuration">
    YAML settings that will be added to the Kafka output section of each policy that uses this output. Make sure you specify valid YAML. The UI does not currently provide validation.
    See [Advanced YAML configuration](#kafka-output-settings-yaml-config) for descriptions of the available settings.
  </definition>
  <definition term="Make this output the default for agent integrations">
    When this setting is on, Elastic Agents use this output to send data if no other output is set in the [agent policy](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/agent-policy).
  </definition>
  <definition term="Make this output the default for agent monitoring">
    When this setting is on, Elastic Agents use this output to send [agent monitoring data](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/monitor-elastic-agent) if no other output is set in the [agent policy](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/agent-policy).
  </definition>
</definitions>


## Advanced YAML configuration

<definitions>
  <definition term="backoff.init">
    (string) The number of seconds to wait before trying to reconnect to Kafka after a network error. After waiting `backoff.init` seconds, Elastic Agent tries to reconnect. If the attempt fails, the backoff timer is increased exponentially up to `backoff.max`. After a successful connection, the backoff timer is reset.
    **Default:** `1s`
  </definition>
  <definition term="backoff.max">
    (string) The maximum number of seconds to wait before attempting to connect to Kafka after a network error.
    **Default:** `60s`
  </definition>
  <definition term="bulk_max_size">
    (int) The maximum number of events to bulk in a single Kafka request.
    **Default:** `2048`
  </definition>
  <definition term="bulk_flush_frequency">
    (int) Duration to wait before sending bulk Kafka request. `0` is no delay.
    **Default:** `0`
  </definition>
  <definition term="channel_buffer_size">
    (int) Per Kafka broker number of messages buffered in output pipeline.
    **Default:** `256`
  </definition>
  <definition term="client_id">
    (string) The configurable ClientID used for logging, debugging, and auditing purposes.
    **Default:** `Elastic Agent`
  </definition>
  <definition term="codec">
    Output codec configuration. You can specify either the `json` or `format` codec. By default the `json` codec is used.
    **`json.pretty`**: If `pretty` is set to true, events will be nicely formatted. The default is false.
    **`json.escape_html`**: If `escape_html` is set to true, html symbols will be escaped in strings. The default is false.
    Example configuration that uses the `json` codec with pretty printing enabled to write events to the console:
    ```yaml
    output.console:
      codec.json:
     pretty: true
      escape_html: false
    ```
    **`format.string`**: Configurable format string used to create a custom formatted message.
    Example configurable that uses the `format` codec to print the events timestamp and message field to console:
    ```yaml
    output.console:
     codec.format:
     string: '%{[@timestamp]} %{[message]}'
    ```
    **Default:** `json`
  </definition>
  <definition term="keep_alive">
    (string) The keep-alive period for an active network connection. If `0s`, keep-alives are disabled.
    **Default:** `0s`
  </definition>
  <definition term="max_message_bytes">
    (int) The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. This value should be equal to or less than the broker’s `message.max.bytes`.
    **Default:** `1000000` (bytes)
  </definition>
  <definition term="metadata">
    Kafka metadata update settings. The metadata contains information about brokers, topics, partition, and active leaders to use for publishing.
    <definitions>
      <definition term="refresh_frequency">
        Metadata refresh interval. Defaults to 10 minutes.
      </definition>
      <definition term="full">
        Strategy to use when fetching metadata. When this option is `true`, the client will maintain a full set of metadata for all the available topics. When set to `false` it will only refresh the metadata for the configured topics. The default is false.
      </definition>
      <definition term="retry.max">
        Total number of metadata update retries. The default is 3.
      </definition>
      <definition term="retry.backoff">
        Waiting time between retries. The default is 250ms.
      </definition>
    </definitions>
  </definition>
  <definition term="queue.mem.events">
    The number of events the queue can store. This value should be evenly divisible by the smaller of `queue.mem.flush.min_events` or `bulk_max_size` to avoid sending partial batches to the output.
    **Default:** `3200 events`
  </definition>
  <definition term="queue.mem.flush.min_events">
    `flush.min_events` is a legacy parameter, and new configurations should prefer to control batch size with `bulk_max_size`. As of 8.13, there is never a performance advantage to limiting batch size with `flush.min_events` instead of `bulk_max_size`
    **Default:** `1600 events`
  </definition>
  <definition term="queue.mem.flush.timeout">
    (int) The maximum wait time for `queue.mem.flush.min_events` to be fulfilled. If set to 0s, events are available to the output immediately.
    **Default:** `10s`
  </definition>
</definitions>


## Kafka output and using Logstash to index data to Elasticsearch

If you are considering using Logstash to ship the data from `kafka` to Elasticsearch, be aware the structure of the documents sent from Elastic Agent to `kafka` must not be modified by Logstash. We suggest disabling `ecs_compatibility` on both the `kafka` input and the `json` codec in order to make sure the input doesn’t edit the fields and their contents.
The data streams setup by the integrations expect to receive events having the same structure and field names as they were sent directly from an Elastic Agent.
The structure of the documents sent from Elastic Agent to `kafka` must not be modified by Logstash. We suggest disabling `ecs_compatibility` on both the `kafka` input and the `json` codec.
Refer to the [Logstash output for Elastic Agent](https://www.elastic.co/elastic/docs-builder/docs/3028/reference/fleet/ls-output-settings) documentation for more details.
```yaml
inputs {
  kafka {
    ...
    ecs_compatibility => "disabled"
    codec => json { ecs_compatibility => "disabled" }
    ...
  }
}
...
```