Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fluent-Bit does not handle 201-Created with 409-Conflicts errors from ElasticSearch correctly #6341

Closed
simbou2000 opened this issue Nov 3, 2022 · 6 comments · Fixed by #9236

Comments

@simbou2000
Copy link

Bug Report

Describe the bug
ElasticSearch server returns responses that Fluent-Bit will parse and check if there were any issues with the data it just transmitted.
Right now, Fluent-Bit does handle a case of "error" where ElasticSearch informs of "conflicts" and Fluent-Bit will silently ignore that as a successful transmission. (https://github.com/fluent/fluent-bit/blob/master/plugins/out_es/es.c#L773-L776)
This check however will fail if the last bulk had both conflicts and created entries since the status code for a created entry is 201, which Fluent-Bit will consider it as an error.

I have included an example of chunk being transmitted 5 times and their ElasticSearch responses in the screenshot section.
1st attempt: errors=true, an overload reject, but created 2 entries from the bulk.
2nd attempt: errors=true, an overload reject again, with 2 conflicts from the 1st attempt.
3rd attempt: Same as 2nd attempt.
4th attempt: errors=true, created the last entry, with the same 2 conflicts as before.
5th attempt: errors=true, 3 conflicts, but Fluent-Bit handle this as a success.

The 4th attempt in this case should have been a success since even if ElasticSearch indicates there was an error (errors=true), Fluent-Bit handles conflicts as a non-issue, but a 201 (created) status code was also mixed within those items and treated it as an actual error.

This cause Fluent-Bit to resend the same bulk multiple times more than necessary and put a higher stress on the ElasticSearch server.
Seen other cases where the following attempt were 429 Rejected although it already sent that bulk correctly.

To Reproduce

  • Steps to reproduce the problem:
    Use ElasticSearch output with any kind of data where the server is under heavy load or a simulated overloaded server.

Expected behavior
Fluent-Bit should also exclude code 201 in this check: https://github.com/fluent/fluent-bit/blob/master/plugins/out_es/es.c#L773-L776
For example:

if (item_val.via.i64 != 409 && item_val.via.i64 != 201) {
    check = FLB_TRUE;
    goto done;
}

Possibly other codes or any 2xx status codes should be excluded too.

Screenshots

[2022/10/26 15:46:36] [error] [output:es:es.1] error: Output
{
 "took": 15022,
 "errors": true,
 "items": [
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "f52b988e-ee72-79b4-7854-863f517883d4",
    "status": 429,
    "error": {
     "type": "es_rejected_execution_exception",
     "reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@52f3939d}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@1a3c5f59} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10134, completed tasks = 10934096]]"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
    "_version": 1,
    "result": "created",
    "_shards": {
     "total": 2,
     "successful": 2,
     "failed": 0
    },
    "_seq_no": 801595,
    "_primary_term": 1,
    "status": 201
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
    "_version": 1,
    "result": "created",
    "_shards": {
     "total": 2,
     "successful": 2,
     "failed": 0
    },
    "_seq_no": 806215,
    "_primary_term": 1,
    "status": 201
   }
  }
 ]
}
[2022/10/26 15:46:36] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 11 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)

...


[2022/10/26 15:47:11] [error] [output:es:es.1] error: Output
{
 "took": 24131,
 "errors": true,
 "items": [
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "f52b988e-ee72-79b4-7854-863f517883d4",
    "status": 429,
    "error": {
     "type": "es_rejected_execution_exception",
     "reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@7a1057e7}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@757b6fa9} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10489, completed tasks = 10947558]]"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "4",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "1",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  }
 ]
}
[2022/10/26 15:47:11] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 16 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)

...


[2022/10/26 15:47:40] [error] [output:es:es.1] error: Output
{
 "took": 13507,
 "errors": true,
 "items": [
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "f52b988e-ee72-79b4-7854-863f517883d4",
    "status": 429,
    "error": {
     "type": "es_rejected_execution_exception",
     "reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@7dc88a53}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@4b6357aa} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10512, completed tasks = 10964755]]"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "4",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "1",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  }
 ]
}
[2022/10/26 15:47:40] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 16 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)

...

[2022/10/26 15:48:10] [error] [output:es:es.1] error: Output
{
 "took": 13759,
 "errors": true,
 "items": [
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "f52b988e-ee72-79b4-7854-863f517883d4",
    "_version": 1,
    "result": "created",
    "_shards": {
     "total": 2,
     "successful": 2,
     "failed": 0
    },
    "_seq_no": 892046,
    "_primary_term": 1,
    "status": 201
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "4",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "1",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  }
 ]
}
[2022/10/26 15:48:10] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 74 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)

...


[2022/10/26 15:49:43] [ info] [engine] flush chunk '444271-1666813576.646583913.flb' succeeded at retry 4: task_id=9, input=tail.0 > output=es.1 (out_id=1)

Your Environment

  • Version used: td-agent-bit 1.9.3
  • Configuration:
  • Environment name and version (e.g. Kubernetes? What version?): RHEL 8 VM
  • Server type and version:
  • Operating System and version:
  • Filters and plugins:

Additional context
Wrongly detecting a problem when a mix of conflicts and created entries in ElasticSearch bulk only cause more stress on ElasticSearch server by resending the whole bulk again.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 2, 2023

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

@github-actions github-actions bot added the Stale label Feb 2, 2023
@acastong
Copy link

acastong commented Feb 2, 2023

This is a one-liner fix that reduces unnecessary requests to Elasticsearch server when it is overloaded. The issue needs to remain open.

@github-actions github-actions bot removed the Stale label Feb 3, 2023
@github-actions
Copy link
Contributor

github-actions bot commented May 5, 2023

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

@github-actions github-actions bot added the Stale label May 5, 2023
@github-actions
Copy link
Contributor

This issue was closed because it has been stalled for 5 days with no activity.

@dsteininger86
Copy link

@simbou2000 We currently have exactly this problem. But I wonder why you didn't create it as a pull request in the first place?

@cosmo0920
Copy link
Contributor

Hi, I tried to write another candidates to fix this issue: #9236
The formerly proposed solution does not covered for (failure, success, success, failure) case.
We needn't to eliminate success information during the error checking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants