Loading

Nginx Ingress Controller Integration

<div class="condensed-table">
| | |
| --- | --- |
| Version | 0.0.3 [beta] (View all) |
| Compatible Kibana version(s) | 8.16.0 or higher |
| Supported Serverless project types
What’s this? | Security
Observability |
| Subscription level
What’s this? | Basic |
| Level of support
What’s this? | Elastic |

</div>
This integration collects and parses logs from Nginx Ingress Controller instances. It can parse access and error logs created by the ingress.

The integration was tested with the Nginx Ingress Controller v0.30.0 and v0.40.2. The log format is described here.

EDOT collector supported versions: 8.16.0

OpenTelemetry collector components:

  • Filelog receiver v0.112.0+
  • Transform processor v0.112.0+
  • Resource detector processor v0.112.0+
  • (Optional) GeoIP processor v0.112.0+
  • Elasticsearch exporter v0.112.0+
  • Filestorage extension v0.112.0+
extensions:
  file_storage:

receivers:
  filelog:
    include_file_path: true
    include: [/var/log/pods/*nginx-ingress-nginx-controller*/controller/*.log]
    operators:
      - id: container-parser
        type: container

processors:
  transform/parse_nginx_ingress_error/log:
    error_mode: ignore
    log_statements:
      - context: log
        conditions:
            # ^[EWF]: Matches logs starting with E (Error), W (Warning), or F (Fatal).
            # \d{4}: Matches the four digits after the log level (representing the date, like 1215 for December 15).
            # .+: Matches the rest of the log line (the message part, without needing specific timestamp or file format).
          - IsMatch(body, "^[EWF]\\d{4} .+")
        statements:
          - set(body, ExtractGrokPatterns(body, "%{LOG_LEVEL:log.level}%{MONTHNUM}%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}\\.%{MICROS}%{SPACE}%{NUMBER:thread_id} %{SOURCE_FILE:source.file.name}:%{NUMBER:source.line_number}\\\] %{GREEDYMULTILINE:message}", true, ["LOG_LEVEL=[A-Z]", "MONTHNUM=(0[1-9]|1[0-2])", "MONTHDAY=(0[1-9]|[12][0-9]|3[01])", "HOUR=([01][0-9]|2[0-3])", "MINUTE=[0-5][0-9]", "SECOND=[0-5][0-9]", "MICROS=[0-9]{6}", "SOURCE_FILE=[^:]+", "GREEDYMULTILINE=(.|\\n)*"]))

          - set(attributes["data_stream.dataset"], "nginx_ingress_controller.error")

          # LogRecord event: https://github.com/open-telemetry/semantic-conventions/pull/982
          - set(attributes["event.name"], "nginx_ingress_controller.error")

  transform/parse_nginx_ingress_access/log:
    error_mode: ignore
    log_statements:
      - context: log
        conditions:
            #    1
            #    2
            #    3
          - IsMatch(body, "^([0-9a-fA-F:.]+) - [^ ]+ .*[0-9a-fA-F]+$")
        statements:
          # Log format: https://github.com/kubernetes/ingress-nginx/blob/nginx-0.30.0/docs/user-guide/nginx-configuration/log-format.md
          # Based on https://github.com/elastic/integrations/blob/main/packages/nginx_ingress_controller/data_stream/access/elasticsearch/ingest_pipeline/default.yml
          - set(body, ExtractGrokPatterns(body, "(%{NGINX_HOST} )?\"?(?:%{NGINX_ADDRESS_LIST:nginx_ingress_controller.access.remote_ip_list}|%{NOTSPACE:source.address}) - (-|%{DATA:user.name}) \\\[%{HTTPDATE:nginx_ingress_controller.access.time}\\\] \"%{DATA:nginx_ingress_controller.access.info}\" %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.size:long} \"(-|%{DATA:http.request.referrer})\" \"(-|%{DATA:user_agent.original})\" %{NUMBER:http.request.size:long} %{NUMBER:http.request.time:double} \\\[%{DATA:upstream.name}\\\] \\\[%{DATA:upstream.alternative_name}\\\] (%{UPSTREAM_ADDRESS_LIST:upstream.address}|-) (%{UPSTREAM_RESPONSE_SIZE_LIST:upstream.response.size_list}|-) (%{UPSTREAM_RESPONSE_TIME_LIST:upstream.response.time_list}|-) (%{UPSTREAM_RESPONSE_STATUS_CODE_LIST:upstream.response.status_code_list}|-) %{GREEDYDATA:http.request.id}", true, ["NGINX_HOST=(?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})?", "NGINX_NOTSEPARATOR=[^\t ,:]+", "NGINX_ADDRESS_LIST=(?:%{IP}|%{WORD}) (\"?,?\\s*(?:%{IP}|%{WORD}))*", "UPSTREAM_ADDRESS_LIST=(?:%{IP}(:%{NUMBER})?)(\"?,?\\s*(?:%{IP}(:%{NUMBER})?))*", "UPSTREAM_RESPONSE_SIZE_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "UPSTREAM_RESPONSE_TIME_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "UPSTREAM_RESPONSE_STATUS_CODE_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "IP=(?:\\\[?%{IPV6}\\\]?|%{IPV4})"]))
          - merge_maps(body, ExtractGrokPatterns(body["nginx_ingress_controller.access.info"], "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}", true), "upsert")
          - delete_key(body, "nginx_ingress_controller.access.info")

          # Extra URL parsing
          - merge_maps(body, URL(body["url.original"]), "upsert")
          - set(body["url.domain"], body["destination.domain"])

          # set source.address as attribute for GeoIP processor
          - set(attributes["source.address"], body["source.address"])

          - set(attributes["data_stream.dataset"], "nginx_ingress_controller.access")

          # LogRecord event: https://github.com/open-telemetry/semantic-conventions/pull/982
          - set(attributes["event.name"], "nginx_ingress_controller.access")
          - set(attributes["event.timestamp"], String(Time(body["nginx_ingress_controller.access.time"], "%d/%b/%Y:%H:%M:%S %z")))

          - delete_key(body, "nginx_ingress_controller.access.time")

      - context: log
        conditions:
            # Extract user agent when not empty
          - body["user_agent.original"] != nil
        statements:
          # Extract UserAgent
          # TODO: UserAgent OTTL function does not provide os specific metadata yet: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35458
          - merge_maps(body, UserAgent(body["user_agent.original"]), "upsert")

      - context: log
        conditions:
          - body["upstream.response.time_list"] != nil
        statements:
          # Extract comma separated list
          # TODO: We would like to get the sum over all upstream.response.time_list values instead of providing a slice with all the values
          - set(body["upstream.response.time"], Split(body["upstream.response.time_list"], ","))
          - delete_key(body, "upstream.response.time_list")

      - context: log
        conditions:
          - body["upstream.response.size_list"] != nil
        statements:
          # Extract comma separated list
          # TODO: We would like to get the Last upstream.response.size_list value instead of providing a slice with all the values
          # See: https://github.com/elastic/integrations/blob/main/packages/nginx_ingress_controller/data_stream/access/elasticsearch/ingest_pipeline/default.yml#L94b
          - set(body["upstream.response.size"], Split(body["upstream.response.size_list"], ","))
          - delete_key(body, "upstream.response.size_list")

      - context: log
        conditions:
          - body["upstream.response.status_code_list"] != nil
        statements:
          # Extract comma separated list
          # TODO: We would like to get the Last upstream.response.status_code_list value instead of providing a slice with all the values
          - set(body["upstream.response.status_code"], Split(body["upstream.response.status_code_list"], ","))
          - delete_key(body, "upstream.response.status_code_list")

  # TODO: add other detectors
  resourcedetection/system:
    detectors: ["system"]
    system:
      hostname_sources: [ "os" ]
      resource_attributes:
        host.name:
          enabled: true
        host.id:
          enabled: false
        host.arch:
          enabled: true

  # geoip:
  #   context: record
  #   providers:
  #     maxmind:
  #       database_path: /tmp/GeoLite2-City.mmdb

exporters:
  elasticsearch:
    endpoints:
    - YOUR_ELASTICSEARCH_ENDPOINT
    api_key: YOUR_ELASTICSEARCH_API_KEY
    logs_dynamic_index:
      enabled: true
    mapping:
      mode: otel
  debug:
    verbosity: detailed

service:
  extensions: [file_storage]
  pipelines:
    logs:
      receivers: [filelog]
      processors: [transform/parse_nginx_ingress_access/log, transform/parse_nginx_ingress_error/log, resourcedetection/system]
      # Uncomment the following line if geoip is configured
      # processors: [transform/parse_nginx_ingress_access/log, transform/parse_nginx_ingress_error/log, geoip, resourcedetection/system]
      exporters: [debug, elasticsearch]
  1. ^([0-9a-fA-F:.]+): Matches the remote address (IPv4 or IPv6 format).
  2. [^ ]+: Matches the remote user (including the hyphen for missing user).
  3. .*[0-9a-fA-F]+$: Ensures the log line ends with a hexadecimal string (request ID).

Don’t forget to replace:

  • YOUR_ELASTICSEARCH_ENDPOINT: your Elasticsearch endpoint (with https:// prefix example: https://1234567.us-west2.gcp.elastic-cloud.com:443).
  • YOUR_ELASTICSEARCH_API_KEY: your Elasticsearch API Key

The Geographical IP metadata for incoming Nginx Ingress controller requests is disabled by default. To enable it, you need to provide a local GeoIP database path in the processors' configuration:

  1. Uncomment the GeoIP processors configuration:
geoip:
 context: record
 providers:
   maxmind:
     database_path: /tmp/GeoLite2-City.mmdb
  1. Include the processors in the logs pipeline:
processors: [transform/parse_nginx_ingress_access/log, transform/parse_nginx_ingress_error/log, geoip, resourcedetection/system]

The access data stream collects the Nginx Ingress Controller access logs.

The error data stream collects the Nginx Ingress Controller error logs.