Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing add_kubernetes_metadata processor each time a filestream reader stops makes filebeat panic #32200

Closed
MichaelKatsoulis opened this issue Jul 5, 2022 · 4 comments
Labels
bug Team:Elastic-Agent Label for the Agent team

Comments

@MichaelKatsoulis
Copy link
Contributor

Currently there are two ways to enable add_kubernetes_metadata processor with filestream input to collect container logs and get their metadata.
The first one is the processor as part of the input:

filebeat.yml: |-
    filebeat.inputs:
    - type: filestream
      id: my-filestream-id
      parsers:
        - container: ~
      prospector.scanner.symlinks: true
      paths:
        - /var/log/containers/*.log
      processors:
        - add_kubernetes_metadata:
            host: ${NODE_NAME}
            matchers:
            - logs_path:
                logs_path: "/var/log/containers/"

and the second is the processors outside of the input in a more central place:

filebeat.yml: |-
    filebeat.inputs:
    - type: filestream
      id: my-filestream-id
      parsers:
        - container: ~
      prospector.scanner.symlinks: true
      paths:
        - /var/log/containers/*.log

    processors:
      - add_cloud_metadata:
      - add_host_metadata:
      - add_kubernetes_metadata:
          host: ${NODE_NAME}
          matchers:
          - logs_path:
              logs_path: "/var/log/containers/"

In the first case,
each time a filestream reader closes ( In case of kubernetes jobs that stop after their work is finished, the file remains idle and I guess at some point the reader closes) then it stops the connected processors which means:

  1. it stops the watcher for new pods
  2. it sends a done signal to the cache(used by the processor) channel, which prevents its cleanup

Next time the same happens (filestream reader stops) the same process takes place. But the watcher is already stoped (nothing happens) and the cache done channel is closed.
The second closing of an already closed channel makes filebeat panic

{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"input.filestream","log.origin":{"file.name":"filestream/input.go","file.line":143},"message":"Closing reader of filestream","service.name":"filebeat","id":"8DDE18B49C8C362C","source":"filestream::.global::native::10637969-66305","path":"/var/log/containers/filebeat-prod-kxqkf_kube-system_filebeat-9f7ebdc5dbef23f6785804c9b07d0cdf301382e3cf5e7bd1c8e1c82a94250012.log","state-id":"native::10637969-66305","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":158},"message":"client: closing acker","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":163},"message":"client: done closing acker","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":165},"message":"client: unlink from queue","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":187},"message":"client: cancelled 0 events","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":167},"message":"client: done unlink","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-06-28T10:53:11.919Z","log.logger":"publisher","log.origin":{"file.name":"pipeline/client.go","file.line":170},"message":"client: closing processors","service.name":"filebeat","ecs.version":"1.6.0"}
panic: close of closed channel

goroutine 214 [running]:
github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata.(*cache).stop(...)
  /go/src/github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata/cache.go:97
github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata.(*kubernetesAnnotator).Close(0xc000a72700)
  /go/src/github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata/kubernetes.go:311 +0x4f
github.com/elastic/beats/v7/libbeat/processors.Close(...)
  /go/src/github.com/elastic/beats/libbeat/processors/processor.go:57
github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x1)
  /go/src/github.com/elastic/beats/libbeat/publisher/processing/processors.go:94 +0x14b
github.com/elastic/beats/v7/libbeat/processors.Close(...)
  /go/src/github.com/elastic/beats/libbeat/processors/processor.go:57
github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x0)
  /go/src/github.com/elastic/beats/libbeat/publisher/processing/processors.go:94 +0x14b
github.com/elastic/beats/v7/libbeat/processors.Close(...)
  /go/src/github.com/elastic/beats/libbeat/processors/processor.go:57
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close.func1()
  /go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:171 +0x2d9
sync.(*Once).doSlow(0x0, 0xc00142dd78)
  /usr/local/go/src/sync/once.go:68 +0xd2
sync.(*Once).Do(...)
  /usr/local/go/src/sync/once.go:59
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close(0xc00186c000)
  /go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:152 +0x59
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).runSignalPropagation(0xc001392fb8)
  /go/src/github.com/elastic/beats/libbeat/publisher/pipeline/pipeline.go:394 +0x22b
created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).registerSignalPropagation.func1
  /go/src/github.com/elastic/beats/libbeat/publisher/pipeline/pipeline.go:347 +0x9b

If we try to handle this error in the processor side we have two options:

  1. If the first time that the cache is closed we set its value to nil then this will lead to errors because there are still other clients (from different filestream readers) that try to access the cache.

  2. If we check first wether the cache channel is closed or not before closing then we avoid that error. But it is still doesn't make sense as the watcher(for new pods) is stopped and the cache doesn't not get updated. So all the next clients of the cache will get invalid data.

This problem occurs because multiple clients are using the same processor. And each one tries to close it whenever they finish.

This problem does not take place in the second scenario where the add_kubernetes_metadata is outside of the filestream input. Each reader does not try to close the processor. Also it does not occur when using container input instead of filestream.

Is stopping the connected processors each time a filestream reader finishes the right approach?

As far as I understand the different clients that use the same processors are part of a pipeline where clients publish event into. It looks like one client is created per file(not 100% sure).
All these clients share the same processor add_kubernetes_metadata id. When a filestream reader gets closed, Close is called.

We could check if the length of clients list contains clients before closing the processors. Problem is that the client's Close method is called by other places as well like https://github.com/elastic/beats/blob/main/filebeat/beater/channels.go#L145.

Another easier approach would be to not allow add_kubernetes_metadata processor to be part of the input level and only be configured outside it. Or we should just document it.

@elastic/elastic-agent-data-plane what do you think about this? Why is filestream approach of closing processors different to container?
Also @elastic/obs-cloud-monitoring have you face the same problem (maybe try the same scenario out) with add_cloud_metadata processor?

@MichaelKatsoulis MichaelKatsoulis added bug Team:Elastic-Agent-Control-Plane Label for the Agent Control Plane team labels Jul 5, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-control-plane (Team:Elastic-Agent-Control-Plane)

@MichaelKatsoulis MichaelKatsoulis changed the title Closing add_kubernetes_metadata processor each time a filestream reader stops makes filebeat panic Closing add_kubernetes_metadata processor each time a filestream reader stops makes filebeat panic Jul 5, 2022
@cmacknz
Copy link
Member

cmacknz commented Jul 5, 2022

https://github.com/orgs/elastic/teams/elastic-agent-data-plane what do you think about this? Why is filestream approach of closing processors different to container?

The container input is just a wrapper around the log input. I'm not sure where the difference is coming from, but probably it is not intentional.

@gitbeyond
Copy link

gitbeyond commented Dec 13, 2022

I also meet the problem.

  • version: filebeat-8.5.3
  • k8s version: v1.22.7

my conf file:

    - type: filestream
      id: tomcat-backgroud-log-1
      paths:
        - /var/lib/kubelet/pods/*/volumes/kubernetes.io~empty-dir/log-dir/*.log
      fields_under_root: true
      ignore_inactive: since_last_start
      close.on_state_change.inactive: 5m
      prospector.scanner.check_interval: 1s
      message_max_bytes: 2097152
      processors:
        - add_kubernetes_metadata:
            host: ${NODE_NAME}
            add_resource_metadata:
              namespace:
                enabled: false
              node:
                enabled: false
              deployment: false
              cronjob: false
            default_indexers.enabled: false
            default_matchers.enabled: false
            indexers:
              - pod_uid:
            matchers:
              - logs_path:
                  logs_path: "/var/lib/kubelet/pods/"
                  resource_type: 'pod'

the error log:

{
  "log.level": "error",
  "@timestamp": "2022-12-13T18:00:17.129+0800",
  "log.logger": "input.filestream",
  "log.origin": {
    "file.name": "input-logfile/harvester.go",
    "file.line": 168
  },
  "message": "Harvester crashed with: harvester panic with: close of closed channel\ngoroutine 224 [running]:\nruntime/debug.Stack()\n\truntime/debug/stack.go:24 +0x65\ngithub.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.startHarvester.func1.1()\n\tgithub.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile/harvester.go:167 +0x78\npanic({0x558ebc1660e0, 0x558ebc6ecbb0})\n\truntime/panic.go:844 +0x258\ngithub.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata.(*cache).stop(...)\n\tgithub.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata/cache.go:97\ngithub.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata.(*kubernetesAnnotator).Close(0xc0009e7600?)\n\tgithub.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata/kubernetes.go:311 +0x4f\ngithub.com/elastic/beats/v7/libbeat/processors.Close(...)\n\tgithub.com/elastic/beats/v7/libbeat/processors/processor.go:58\ngithub.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x5?)\n\tgithub.com/elastic/beats/v7/libbeat/publisher/processing/processors.go:95 +0x159\ngithub.com/elastic/beats/v7/libbeat/processors.Close(...)\n\tgithub.com/elastic/beats/v7/libbeat/processors/processor.go:58\ngithub.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x0?)\n\tgithub.com/elastic/beats/v7/libbeat/publisher/processing/processors.go:95 +0x159\ngithub.com/elastic/beats/v7/libbeat/processors.Close(...)\n\tgithub.com/elastic/beats/v7/libbeat/processors/processor.go:58\ngithub.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close.func1()\n\tgithub.com/elastic/beats/v7/libbeat/publisher/pipeline/client.go:167 +0x2df\nsync.(*Once).doSlow(0x0?, 0x0?)\n\tsync/once.go:68 +0xc2\nsync.(*Once).Do(...)\n\tsync/once.go:59\ngithub.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close(0x558eba364866?)\n\tgithub.com/elastic/beats/v7/libbeat/publisher/pipeline/client.go:148 +0x59\ngithub.com/elastic/beats/v7/filebeat/beater.(*countingClient).Close(0x558eba3647df?)\n\tgithub.com/elastic/beats/v7/filebeat/beater/channels.go:145 +0x22\ngithub.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.startHarvester.func1({0x558ebc7305d0?, 0xc0006ce6c0})\n\tgithub.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile/harvester.go:219 +0x949\ngithub.com/elastic/go-concert/unison.(*TaskGroup).Go.func1()\n\tgithub.com/elastic/[email protected]/unison/taskgroup.go:163 +0xc3\ncreated by github.com/elastic/go-concert/unison.(*TaskGroup).Go\n\tgithub.com/elastic/[email protected]/unison/taskgroup.go:159 +0xca\n",
  "service.name": "filebeat",
  "id": "tomcat-backgroud-log-1",
  "source_file": "filestream::tomcat-backgroud-log-1::native::671566857-64768",
  "ecs.version": "1.6.0"
}

the last panic info:

panic: close of closed channel

goroutine 203 [running]:
github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata.(*cache).stop(...)
	github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata/cache.go:97
github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata.(*kubernetesAnnotator).Close(0xc0009e7600?)
	github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata/kubernetes.go:311 +0x4f
github.com/elastic/beats/v7/libbeat/processors.Close(...)
	github.com/elastic/beats/v7/libbeat/processors/processor.go:58
github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0xc000cf7c28?)
	github.com/elastic/beats/v7/libbeat/publisher/processing/processors.go:95 +0x159
github.com/elastic/beats/v7/libbeat/processors.Close(...)
	github.com/elastic/beats/v7/libbeat/processors/processor.go:58
github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x0?)
	github.com/elastic/beats/v7/libbeat/publisher/processing/processors.go:95 +0x159
github.com/elastic/beats/v7/libbeat/processors.Close(...)
	github.com/elastic/beats/v7/libbeat/processors/processor.go:58
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close.func1()
	github.com/elastic/beats/v7/libbeat/publisher/pipeline/client.go:167 +0x2df
sync.(*Once).doSlow(0x558ebe4f52c8?, 0x0?)
	sync/once.go:68 +0xc2
sync.(*Once).Do(...)
	sync/once.go:59
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close(0xc00184e400?)
	github.com/elastic/beats/v7/libbeat/publisher/pipeline/client.go:148 +0x59
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).runSignalPropagation(0xc000dc37b8?)
	github.com/elastic/beats/v7/libbeat/publisher/pipeline/pipeline.go:363 +0x226
created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).registerSignalPropagation.func1
	github.com/elastic/beats/v7/libbeat/publisher/pipeline/pipeline.go:315 +0x96

@cmacknz cmacknz added Team:Elastic-Agent Label for the Agent team and removed Team:Elastic-Agent-Control-Plane Label for the Agent Control Plane team labels Dec 13, 2022
@belimawr
Copy link
Contributor

belimawr commented Mar 1, 2023

Closing this issue as it's duplicated by #34219 and has been closed by #34647 (and its backports).

@belimawr belimawr closed this as completed Mar 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

No branches or pull requests

5 participants