-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] Automatic routing for bulk #9219
Comments
Similar issue #4984 |
@Bukhtawar yeah, the requirement in that issue is similar to this one, but our solution is quite simple, just generate a random string routing for each bulk in server side to route all the indexing requests in one bulk to one shard. One user of AWS OpenSearch Service reported that currently they generate the random routing in their client, and get indexing performance improved and lower write rejection than before, so they want OpenSearch to implement random routing for bulk functionality in server side so that they can use this functionality much easier. |
@gaobinlong we have been prototyping very different approach for bulk indexing that changes the way the data goes in and out, moving off from batching to streaming, there are still opportunities to improve the coordinator roles in how to dispatch the bulk requests internally (from one or many clients).
I see a few other problems with this approach (beside the ones you mentioned, please correct me if I am wrong):
[1] #3000 |
@reta for the streaming API, can we stream the the payload directly to the data nodes and ensure we pass the routing information back in the response so that it is transparent to the end user? |
@Bukhtawar potentially at some point, see please #9065 (comment) (tldr; having streaming on transport level), the first stage would target clients <-> coordinators interaction |
@gaobinlong This solution makes updates inefficient. Also, the get API will not work. Users have an alternative to implement this at their end by passing in the routing per request. I think Opensearch should solve this problem, not necessarily with the same solution that you described. I have recommended some solutions here. Would you be interested in working on a design proposal considering all options and follow it up with a PR? @reta With streaming API as well, the span out is what makes indexing inefficient. Additionally, having an ability to decouple the routing from the id(especially when our system generates the id e.g. data streams) makes it simple for us to implement adaptive shard selection on indexing in future. |
@reta If there are user defined For indexing same document using index API or bulk API, I think automatic routing will not impact the search results, because without using custom routing, when calling search API, it will execute query on every shard and then collect results, the results are same; and if user specifies custom routing, automatic routing takes no effect, the results are still the same. For streaming index API, I've read the issue carefully, it's really a cool feature, but I think we have a long way to go if we want most ingestion tools such as Logstash, Beats, Fluent-bit and other third-party tools to support that API, so if bulk API will not be deprecated in the future, the automatic routing feature for bulk is still valuable? |
@itiyama, I think automatic routing targets to the use case of append-only write, common-seen in log analytics scenario, Get API or Update API is rarely used in this use case, and users do not need to care about routing in there side, what they want is just to reduce indexing rejection and improve performance, automatic routing is a simple way to do that. I see in your solutions, I'll consider them carefully to make a better design proposal and then make a PR, thanks! |
Thanks @gaobinlong
👍
That's where I see the potential problem: the fetch phase would go and fetch documents, the
Yes, we have a long way to go, and yes, we won't be removing the old API in foreseeable future |
@gaobinlong From the discussion here looks like there will be impact on the |
@reta, @sohami , for search API, this feature will not impact it because the result of the query phase contains the shard id, so in fetch phase, we get the target shard id by the query result directly: OpenSearch/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java Line 183 in b3049fb
, but for get API, I'm thinking of transform the get document request to a search request when this feature is enabled for the target index, even though the performance is less efficient, but because this feature targets for the use cases like log analytics, observability and security analytics which writes more and read less, get/update/delete document APIs are rarely used, so it could be acceptable by the users(we should document this negative effect), what do you think about this? @sohami I think this feature may not conflict with other alternatives, it mainly aims to reduce the impact of the long-tail shard or stuck node, reduce and indexing rejection and the cpu usage of the node, it can improve the indexing performance in a whole, but will not improve the indexing performance in document or shard level, so if other alternatives can improve the indexing performance in document or shard level, I think they can be combined to get great benefits. |
@gaobinlong I think with respect to observability in particular, get API is super useful and used quite often (find trace by |
Requirement
In logs analytics use case of OpenSearch, users may ingest large numbers of documents into the cluster, so the cluster may have big scale, having hundreds of nodes and thousands of shards distributed on different nodes. When using bulk api to ingest documents, the documents will be dispatched to all of the shards in different nodes by the coordinate node, if one of the nodes in the cluster get stuck by JVM full GC or one of the shards has performance issue, then the whole processing time of the bulk operation will be increased and may cause the queue of the write thread pool holds too many pending indexing requests and then cause indexing rejection.
Instead of distributing the indexing requests to different nodes, some users add a random routing value to each of the indexing requests in the bulk operation, to route all of the indexing requests in one bulk operation to only one shard, this can reduce the impact of the long-tail shard or stuck node, reduce the network roundtrip, and reduce the indexing latency and cpu usage, and then improve the indexing performance.
Even though we can add routing to the indexing requests in the client side, but some common-used ingest tools such as Logstash, Filebeat, Fluent-bit do not support setting random routing value for bulk operation, so users cannot be benefited, the only way they can do is that they use their own ingest tools or use OpenSearch client to ingest documents, write some code to implement automatic routing for bulk, which is not friendly.
Proposal
So my proposal is that we implement automatic routing for bulk in OpenSearch server side, generate a random routing value for each bulk, all of the indexing requests in the bulk will have the same routing value and will be written to the same shard, results in improving the performance of the cluster. We can add an index level setting which can enable or disable the automatic bulk routing for every index, and this setting can be updated dynamically, that makes it convenient to use.
Here is a diagram showing the difference between bulk request without routing and bulk request with automatic routing.
Negative effects
However, automatic routing for bulk can introduce some small issues, one issue is that it may cause data skewness between shards, because the number of indexing requests in each bulk maybe different, so the storage between shards maybe not balanced in a short time, but with continuous ingesting documents, the storage will be balanced eventually. On the other hand, in most cases, ingest tools are used to write documents to OpenSearch, so the number of indexing requests in each bulk always are same in most time because the configuration like bulk size in these tools are fixed, so data skewness may not be a problem. Another issue is that users are unable to use GET API to get document by doc id without specifying the routing value after they enable this feature, but in log analytics scenario it’s not common to use GET API, and actually, they can use IDs query instead to get a document by its id, like this:
Overall, automatic routing for bulk is recommended to be used in log analytics and other use cases which write more and read less, the OpenSearch cluster have big scale and high ingesting load, and the indices which are ingesting documents has hundreds of primary shards and even more. Users can benefit from this feature by reducing the indexing latency and indexing rejection, and then improving the indexing performance.
The text was updated successfully, but these errors were encountered: