From 3ac286d884f636c0ae7b18fff14c18bfa308aec7 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Mon, 28 Oct 2024 14:37:23 +0100 Subject: [PATCH] feat: override body instead of attributes --- kubernetes/elastic-helm/daemonset.yaml | 46 ++++++++++++-------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/kubernetes/elastic-helm/daemonset.yaml b/kubernetes/elastic-helm/daemonset.yaml index 07672f44a6..bee20a50a9 100644 --- a/kubernetes/elastic-helm/daemonset.yaml +++ b/kubernetes/elastic-helm/daemonset.yaml @@ -208,7 +208,7 @@ config: # Match on namespace for the demo - resource.attributes["k8s.namespace.name"] == "ingress-nginx" statements: - - merge_maps(attributes, ExtractGrokPatterns(body, "%{LOG_LEVEL:log.level}%{MONTHNUM}%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}\\.%{MICROS}%{SPACE}%{NUMBER:nginx_ingress_controller.error.thread_id} %{SOURCE_FILE:nginx_ingress_controller.error.source.file}:%{NUMBER:nginx_ingress_controller.error.source.line_number}\\] %{GREEDYMULTILINE:message}", true, ["LOG_LEVEL=[A-Z]", "MONTHNUM=(0[1-9]|1[0-2])", "MONTHDAY=(0[1-9]|[12][0-9]|3[01])", "HOUR=([01][0-9]|2[0-3])", "MINUTE=[0-5][0-9]", "SECOND=[0-5][0-9]", "MICROS=[0-9]{6}", "SOURCE_FILE=[^:]+", "GREEDYMULTILINE=(.|\\n)*"]), "upsert") + - set(body, ExtractGrokPatterns(body, "%{LOG_LEVEL:log.level}%{MONTHNUM}%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}\\.%{MICROS}%{SPACE}%{NUMBER:error.thread_id} %{SOURCE_FILE:error.source.file}:%{NUMBER:error.source.line_number}\\] %{GREEDYMULTILINE:message}", true, ["LOG_LEVEL=[A-Z]", "MONTHNUM=(0[1-9]|1[0-2])", "MONTHDAY=(0[1-9]|[12][0-9]|3[01])", "HOUR=([01][0-9]|2[0-3])", "MINUTE=[0-5][0-9]", "SECOND=[0-5][0-9]", "MICROS=[0-9]{6}", "SOURCE_FILE=[^:]+", "GREEDYMULTILINE=(.|\\n)*"])) - set(attributes["data_stream.dataset"], "nginx_ingress_controller.error") @@ -225,63 +225,61 @@ config: statements: # Log format: https://github.com/kubernetes/ingress-nginx/blob/nginx-0.30.0/docs/user-guide/nginx-configuration/log-format.md # Based on https://github.com/elastic/integrations/blob/main/packages/nginx_ingress_controller/data_stream/access/elasticsearch/ingest_pipeline/default.yml - - merge_maps(attributes, ExtractGrokPatterns(body, "(%{NGINX_HOST} )?\"?(?:%{NGINX_ADDRESS_LIST:nginx_ingress_controller.access.remote_ip_list}|%{NOTSPACE:source.address}) - (-|%{DATA:user.name}) \\[%{HTTPDATE:nginx_ingress_controller.access.time}\\] \"%{DATA:nginx_ingress_controller.access.info}\" %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.size:long} \"(-|%{DATA:http.request.referrer})\" \"(-|%{DATA:user_agent.original})\" %{NUMBER:http.request.size:long} %{NUMBER:http.request.time:double} \\[%{DATA:upstream.name}\\] \\[%{DATA:upstream.alternative_name}\\] (%{UPSTREAM_ADDRESS_LIST:upstream.address}|-) (%{UPSTREAM_RESPONSE_SIZE_LIST:upstream.response.size_list}|-) (%{UPSTREAM_RESPONSE_TIME_LIST:upstream.response.time_list}|-) (%{UPSTREAM_RESPONSE_STATUS_CODE_LIST:upstream.response.status_code_list}|-) %{GREEDYDATA:http.request.id}", true, ["NGINX_HOST=(?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})?", "NGINX_NOTSEPARATOR=[^\t ,:]+", "NGINX_ADDRESS_LIST=(?:%{IP}|%{WORD}) (\"?,?\\s*(?:%{IP}|%{WORD}))*", "UPSTREAM_ADDRESS_LIST=(?:%{IP}(:%{NUMBER})?)(\"?,?\\s*(?:%{IP}(:%{NUMBER})?))*", "UPSTREAM_RESPONSE_SIZE_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "UPSTREAM_RESPONSE_TIME_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "UPSTREAM_RESPONSE_STATUS_CODE_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "IP=(?:\\[?%{IPV6}\\]?|%{IPV4})"]), "upsert") - - - - merge_maps(attributes, ExtractGrokPatterns(attributes["nginx_ingress_controller.access.info"], "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}", true), "upsert") - - delete_key(attributes, "nginx_ingress_controller.access.info") + - set(body, ExtractGrokPatterns(body, "(%{NGINX_HOST} )?\"?(?:%{NGINX_ADDRESS_LIST:nginx_ingress_controller.access.remote_ip_list}|%{NOTSPACE:source.address}) - (-|%{DATA:user.name}) \\[%{HTTPDATE:nginx_ingress_controller.access.time}\\] \"%{DATA:nginx_ingress_controller.access.info}\" %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.size:long} \"(-|%{DATA:http.request.referrer})\" \"(-|%{DATA:user_agent.original})\" %{NUMBER:http.request.size:long} %{NUMBER:http.request.time:double} \\[%{DATA:upstream.name}\\] \\[%{DATA:upstream.alternative_name}\\] (%{UPSTREAM_ADDRESS_LIST:upstream.address}|-) (%{UPSTREAM_RESPONSE_SIZE_LIST:upstream.response.size_list}|-) (%{UPSTREAM_RESPONSE_TIME_LIST:upstream.response.time_list}|-) (%{UPSTREAM_RESPONSE_STATUS_CODE_LIST:upstream.response.status_code_list}|-) %{GREEDYDATA:http.request.id}", true, ["NGINX_HOST=(?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})?", "NGINX_NOTSEPARATOR=[^\t ,:]+", "NGINX_ADDRESS_LIST=(?:%{IP}|%{WORD}) (\"?,?\\s*(?:%{IP}|%{WORD}))*", "UPSTREAM_ADDRESS_LIST=(?:%{IP}(:%{NUMBER})?)(\"?,?\\s*(?:%{IP}(:%{NUMBER})?))*", "UPSTREAM_RESPONSE_SIZE_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "UPSTREAM_RESPONSE_TIME_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "UPSTREAM_RESPONSE_STATUS_CODE_LIST=(?:%{NUMBER})(\"?,?\\s*(?:%{NUMBER}))*", "IP=(?:\\[?%{IPV6}\\]?|%{IPV4})"])) + - merge_maps(body, ExtractGrokPatterns(body["nginx_ingress_controller.access.info"], "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}", true), "upsert") + - delete_key(body, "nginx_ingress_controller.access.info") # Extra URL parsing - - merge_maps(attributes, URL(attributes["url.original"]), "upsert") - - set(attributes["url.domain"], attributes["destination.domain"]) + - merge_maps(body, URL(body["url.original"]), "upsert") + - set(body["url.domain"], body["destination.domain"]) - # set protocol name - - set(attributes["network.protocol.name"], "http") + # set source.address as attribute for GeoIP processor + - set(attributes["source.address"], body["source.address"]) - # tmp - set(attributes["data_stream.dataset"], "nginx_ingress_controller.access") # LogRecord event: https://github.com/open-telemetry/semantic-conventions/pull/982 - set(attributes["event.name"], "nginx.ingress.controller.access") - - set(attributes["event.timestamp"], attributes["nginx_ingress_controller.access.time"]) - - delete_key(attributes, "nginx_ingress_controller.access.time") + - set(attributes["event.timestamp"], String(Time(body["nginx_ingress_controller.access.time"], "%d/%b/%Y:%H:%M:%S %z"))) + + - delete_key(body, "nginx_ingress_controller.access.time") - context: log conditions: # Extract user agent when not empty - - attributes["user_agent.original"] != nil + - body["user_agent.original"] != nil statements: # Extract UserAgent # TODO: UserAgent OTTL function does not provide os specific metadata yet: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35458 - - merge_maps(attributes, UserAgent(attributes["user_agent.original"]), "upsert") + - merge_maps(body, UserAgent(body["user_agent.original"]), "upsert") - context: log conditions: - - attributes["upstream.response.time_list"] != nil + - body["upstream.response.time_list"] != nil statements: # Extract comma separated list # TODO: We would like to get the sum over all upstream.response.time_list values instead of providing a slice with all the values - - set(attributes["upstream.response.time"], Split(attributes["upstream.response.time_list"], ",")) - - delete_key(attributes, "upstream.response.time_list") + - set(body["upstream.response.time"], Split(body["upstream.response.time_list"], ",")) + - delete_key(body, "upstream.response.time_list") - context: log conditions: - - attributes["upstream.response.size_list"] != nil + - body["upstream.response.size_list"] != nil statements: # Extract comma separated list # TODO: We would like to get the Last upstream.response.size_list value instead of providing a slice with all the values # See: https://github.com/elastic/integrations/blob/main/packages/nginx_ingress_controller/data_stream/access/elasticsearch/ingest_pipeline/default.yml#L94b - - set(attributes["upstream.response.size"], Split(attributes["upstream.response.size_list"], ",")) - - delete_key(attributes, "upstream.response.size_list") + - set(body["upstream.response.size"], Split(body["upstream.response.size_list"], ",")) + - delete_key(body, "upstream.response.size_list") - context: log conditions: - - attributes["upstream.response.status_code_list"] != nil + - body["upstream.response.status_code_list"] != nil statements: # Extract comma separated list # TODO: We would like to get the Last upstream.response.status_code_list value instead of providing a slice with all the values - - set(attributes["upstream.response.status_code"], Split(attributes["upstream.response.status_code_list"], ",")) - - delete_key(attributes, "upstream.response.status_code_list") + - set(body["upstream.response.status_code"], Split(body["upstream.response.status_code_list"], ",")) + - delete_key(body, "upstream.response.status_code_list") receivers: otlp: null jaeger: null