Custom Kafka Integration
| Version | 2.0.0 (View all) |
| Subscription level What's this? |
Basic |
| Developed by What's this? |
Elastic |
| Minimum Kibana version(s) | 9.0.0 8.13.0 |
The Custom Kafka integration is an input package for Elastic Agent. It runs the Filebeat Kafka input so agents can consume records from Apache Kafka topics and ship them to Elasticsearch. Use it when applications or pipelines already publish logs or events to Kafka and you want Elastic to read from those topics without an intermediate forwarder.
This integration is intended for Kafka clusters where brokers speak the standard Kafka protocol. It is tested and supported for Kafka broker versions roughly between 0.11 and 2.8.0. Earlier or later brokers can work but are not guaranteed.
Elastic Agent connects to your cluster using the bootstrap hosts you configure, joins a consumer group (group_id), and subscribes to one or more topics. Messages are read by the agent, enriched with Kafka metadata (for example topic, partition, offset), and written to Elasticsearch using the dataset name you choose (default kafka_log.generic). Optional SASL, Kerberos, and TLS settings secure the connection to the brokers. Optional parsers and processors adjust the payload on the agent before ingest.
The integration collects events derived from Kafka messages:
- Message payload: Typically stored in the
messagefield (format depends on your producers—plain text, JSON, syslog, and so on). - Kafka metadata: Fields such as
kafka.topic,kafka.partition,kafka.offset,kafka.key, andkafka.headerswhere applicable. - Routing fields:
data_stream.dataset,data_stream.type, anddata_stream.namespacefollow your Fleet policy and dataset name.
The default dataset is kafka_log.generic. Changing the Dataset name in the policy sends data to a different backing data stream. Dataset names must follow Elasticsearch naming rules (no - in the dataset segment).
- Ingest logs or telemetry already landed on Kafka by microservices or stream processors.
- Centralize topic data for search and observability in Kibana without maintaining a separate log shipper per producer.
- Apply a custom Ingest Pipeline in Elasticsearch when you need parsing or ECS normalization beyond agent-side parsers.
- Network reachability from the host running Elastic Agent to each configured bootstrap broker (hostnames and ports).
- Topic access: ACLs or permissions that allow your consumer group to read the configured topics.
- Authentication details if the cluster uses SASL (PLAIN, SCRAM), Kerberos, or TLS client certificates—match these to your broker configuration.
- A stack version that satisfies the integration’s Kibana requirement (refer to the integration manifest in Kibana or this package’s
manifest.yml).
Elastic Agent runs the Kafka input and forwards events to Elasticsearch. Install the agent on a host that can reach your Kafka brokers (same VPC or routed network, firewall rules allowing outbound connections to broker listeners).
- Identify bootstrap broker addresses (for example
kafka1:9092) and the topic names to consume. - Choose a unique consumer group id (
group_id) for this policy integration—duplicate group membership affects partition assignment when multiple agents share the same group. - If the cluster uses TLS or SASL, gather certificates, credentials, or Kerberos configuration paths before editing the integration.
- Go to Management → Integrations.
- Search for Custom Kafka Logs and open it.
- Click Add Custom Kafka Logs (or add the integration to an existing policy).
- Configure the main options:
- Hosts: Bootstrap servers for the Kafka cluster.
- Topics: Topics to subscribe to.
- Group ID: Consumer group for this input.
- Dataset name: Target dataset (default
kafka_log.generic). - Client ID, Kafka protocol version, initial offset, fetch/rebalance tuning: expand Advanced options when needed.
- Configure SSL, SASL, or Kerberos under advanced sections if your brokers require them.
- Optionally set Parsers (for example NDJSON) or Processors, and Tags.
- Optionally set Ingest Pipeline to an Elasticsearch pipeline ID for server-side processing.
- Save the policy and confirm the agent receives the updated configuration.
- Produce a test message to one of the configured topics (use your usual producer tooling or
kafka-console-producer). - In Kibana, open Analytics → Discover and select a logs-related data view (for example
logs-*). - Filter with KQL, for example:
data_stream.dataset : "kafka_log.generic"— adjust to match your configured Dataset name (defaultkafka_log.generic). - Confirm fields such as
message,kafka.topic,input.type(kafka), and timestamps look correct.
For help with Elastic ingest tools, refer to Common problems.
- Connection or timeout errors: Verify broker addresses, ports, TLS (
ssl.enabled), and that firewalls allow outbound traffic from the agent host to every bootstrap broker. - Authentication failures: Confirm SASL mechanism, username/password, or Kerberos settings align with the broker, check broker logs for
Authentication failedor similar. - No documents in Discover: Confirm the agent is healthy, the policy applied, and the Dataset name matches your Discover filter. Dataset names must not contain hyphens.
- Duplicate or competing consumers: Using the same
group_idon many agents splits partitions across them by design, use distinct groups if you need full duplicate reads. - Offset / replay behavior:
initial_offset(for exampleoldestvsnewest) affects where consumption starts for new groups. Changinggroup_idstarts a new consumer group offset state. - Parsing issues: If JSON or multiline payloads look wrong, review Parsers and consider an Elasticsearch Ingest Pipeline for complex structures.
For architectures used to scale ingest, refer to Ingest Architectures.
- Throughput: Kafka throughput scales with partitions and consumer parallelism, multiple agents with the same
group_idshare partitions (one consumer per partition per group). - Fetch settings: Tune fetch sizes and max_wait_time in advanced options if you need higher batching or lower latency—balance broker load and agent memory.
- Multiple integrations: Separate policies or dataset names help isolate indices and retention for different topic groups.
- Elasticsearch: Size your cluster for the volume of documents and consider ingest pipelines and index lifecycle policies for hot/warm tiers.
Refer to the ECS field reference for ECS fields.
Additional documentation:
Changelog
| Version | Details | Minimum Kibana version |
|---|---|---|
| 2.0.0 | Enhancement (View pull request) Convert kafka_log to an input package. |
9.0.0 8.13.0 |
| 1.9.1 | Bug fix (View pull request) Updated the version field description to clarify default protocol versions. |
9.0.0 8.13.0 |
| 1.9.0 | Enhancement (View pull request) Add SASL mechanism configuration. |
9.0.0 8.13.0 |
| 1.8.1 | Bug fix (View pull request) Updated SSL description to be uniform and to include links to documentation. |
9.0.0 8.13.0 |
| 1.8.0 | Enhancement (View pull request) Add support for Kibana 9.0.0. |
9.0.0 8.13.0 |
| 1.7.0 | Enhancement (View pull request) ECS version updated to 8.11.0. Update the kibana constraint to ^8.13.0. Modified the field definitions to remove ECS fields made redundant by the ecs@mappings component template. |
8.13.0 |
| 1.6.0 | Enhancement (View pull request) Enable secrets for sensitive fields. For more details, refer https://www.elastic.co/guide/en/fleet/current/agent-policy.html#agent-policy-secret-values |
8.12.0 |
| 1.5.1 | Bug fix (View pull request) Disable secrets for older stack versions due to errors. |
8.0.0 7.16.0 |
| 1.5.0 | Enhancement (View pull request) Enable 'secret' for the sensitive fields, supported from 8.12. |
8.0.0 7.16.0 |
| 1.4.0 | Enhancement (View pull request) Update the package format_version to 3.0.0. |
8.0.0 7.16.0 |
| 1.3.0 | Enhancement (View pull request) Add permissions to reroute events to logs-- for generic datastream |
8.0.0 7.16.0 |
| 1.2.0 | Enhancement (View pull request) Rename ownership from obs-service-integrations to obs-infraobs-integrations |
8.0.0 7.16.0 |
| 1.1.1 | Enhancement (View pull request) Added categories and/or subcategories. |
8.0.0 7.16.0 |
| 1.1.0 | Enhancement (View pull request) Update ECS version to 8.5.1 |
8.0.0 7.16.0 |
| 1.0.0 | Enhancement (View pull request) Initial Release |
8.0.0 7.16.0 |