Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threaded input plugins can drop records in high load situations #7846

Closed
danlenar opened this issue Aug 21, 2023 · 2 comments
Closed

Threaded input plugins can drop records in high load situations #7846

danlenar opened this issue Aug 21, 2023 · 2 comments

Comments

@danlenar
Copy link
Contributor

Bug Report

Threaded input plugins can drop records if there is a high rate of ingested logs.
Running same config with threaded turned off doesn't lead to any dropped records.
With threaded turn off, I can average about 10-30% drop rate from a file that is 100Mb and contains half million log lines.
So, while non threaded might be slower to parse logs, it doesn't result any dropped records.

This my config

env:
  KAFKA_BROKERS: kafka-kafka-brokers:9093

service:
  flush: 1
  log_level: info
  scheduler.base: 2
  scheduler.cap: 300
  
pipeline:
  inputs:
    - name: tail
      db: /var/log/fluent-bit/app.db
      mem_buf_limit: 0
      offset_key: log.offset
      path: /var/log/app.log
      read_from_head: true
      rotate_wait: 600
      tag: application
      threaded: true
  outputs:
    - name: kafka
      match: "*"
      brokers: ${KAFKA_BROKERS}
      format: msgpack
      rdkafka.log.connection.close: false
      rdkafka.security.protocol: ssl
      rdkafka.ssl.ca.location: /etc/fluent-bit/kafka/ca.crt
      rdkafka.ssl.certificate.location: /etc/fluent-bit/kafka/user.crt
      rdkafka.ssl.key.location: /etc/fluent-bit/kafka/user.key
      retry_limit: false
      timestamp_key: _time
      topics: logging
      processors:
        logs:
          - name: modify
            add: host.name ${HOSTNAME}
          - name: lua
            call: kafka
            code: |
              function kafka(tag, timestamp, record)
                local new_record = record
                new_record["_time"] = string.format("%d.%09d", timestamp["sec"], timestamp["nsec"])
                new_record["_tag"] = tag
                return 2, timestamp, new_record
              end
            time_as_table: true

There are two scenarios where threaded input plugin can drop records.

The first is you can fill up the ring buffer, which has a max size of 1024 chunks.
If an average chunk for input tail plugin is around 32K bytes, then you can roughly have a total size of 32MB worth of data in the ring buffer. In my testing, I have a 100Mb file I am testing with, so I am able to replicate this issue somewhat easily.

The first scenario does require having no mem_buf_limit configured.

[2023/08/21 20:34:57] [ info] [sp] stream processor started
[tail.0] failed buffer write, retries=0
[tail.0] failed buffer write, retries=1
[tail.0] failed buffer write, retries=2
[tail.0] failed buffer write, retries=3
[tail.0] failed buffer write, retries=4
[tail.0] failed buffer write, retries=5
[tail.0] failed buffer write, retries=6
[tail.0] failed buffer write, retries=7
[tail.0] failed buffer write, retries=8
[tail.0] failed buffer write, retries=9
[2023/08/21 20:35:03] [error] [input:tail:tail.0] could not enqueue records into the ring buffer

To get around the ring buffering issue, I am trying to set mem_buf_limit to prevent the ring buffer from getting full.
With mem_buf_limit is set to 5MB, I get the following errors below.
It doesn't appear that back pressure works correctly with threaded input plugins.

[2023/08/21 20:35:35] [ info] [task]   task_id=36 still running on route(s): kafka/kafka.0
[2023/08/21 19:55:57] [debug] [input chunk] tail.0 is paused, cannot append records
[2023/08/21 19:55:57] [debug] [input chunk] tail.0 is paused, cannot append records
[2023/08/21 19:55:57] [debug] [input chunk] tail.0 is paused, cannot append records
[2023/08/21 19:55:57] [debug] [input chunk] tail.0 is paused, cannot append records
[2023/08/21 19:55:57] [debug] [input chunk] tail.0 is paused, cannot append records
[2023/08/21 19:55:57] [debug] [input chunk] tail.0 is paused, cannot append records
[2023/08/21 19:55:57] [debug] [input chunk] tail.0 is paused, cannot append records
[2023/08/21 19:55:57] [debug] [input chunk] tail.0 is paused, cannot append records

The second scenario I am preparing a PR to fix, but the first scenario probably requires increasing the ring buffer size or implementing some type of back pressure to prevent any records from being dropped.

Your Environment
v2.1.8 (custom build that includes #7812)

Additional context

Copy link
Contributor

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

@github-actions github-actions bot added the Stale label Dec 12, 2023
Copy link
Contributor

This issue was closed because it has been stalled for 5 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Dec 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant