Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bucket aggregation circuit breaker optimization. #46751

Merged
merged 8 commits into from
Jan 31, 2020

Conversation

howardhuanghua
Copy link
Contributor

Bucket aggregation will consume a lot of memory on coordinate node if it has a huge number of resulting buckets. search.max_buckets setting could limit maximum number of buckets allowed in a single response. Sometimes user may increase this setting to get more buckets, but it also increases the risk of OOM. It's hard to evaluate a suitable value for max_buckets.

With this PR, we introduced search.check_buckets_step_size setting. Whenever allocating search.check_buckets_step_size new buckets, we do a parent circuit breaker checking. Based on this setting, user could control aggregation memory with a certain step size. We are also considering whether we could deprecate search.max_buckets in the future.

@howardhuanghua howardhuanghua force-pushed the optimize_breaker branch 2 times, most recently from bad4d7c to d173ffe Compare September 17, 2019 01:44
@howardhuanghua
Copy link
Contributor Author

Ping @danielmitterdorfer @dakrone , could you please help to check this PR? Thanks.

@javanna javanna added the :Search/Search Search-related issues that do not fall into other categories label Sep 19, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-search

@jimczi jimczi added :Analytics/Aggregations Aggregations and removed :Search/Search Search-related issues that do not fall into other categories labels Sep 19, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo

@polyfractal
Copy link
Contributor

Hi @howardhuanghua, thanks for the PR :)

Is this specifically to address #37182, e.g. dealing with memory usage on the coordinating node (before the final reduce is invoked)?

I'm asking because we do track memory usage with the request circuit breaker during the shard collections (as part of AggregatorBase), so this breaker seems redundant in that respect. But if it is to address #37182, then I understand the purpose.

I'm not sure a new breaker is the right approach though. It would probably be better to re-use the existing request breaker since that is well defined and semantically fits the job.

As an aside, I don't think deprecating max_buckets is likely. MaxBuckets exists because tracking memory overhead of buckets/metrics is very difficult, and so sometimes a hard limit is more robust than attempting to estimate memory usage.

@howardhuanghua
Copy link
Contributor Author

howardhuanghua commented Sep 20, 2019

Hi @polyfractal, thanks for the comment. This PR checks real memory usage with a certain step size of buckets allocation in MultiBucketConsumer. The purpose is to break a aggregation request that may get memory explosion as early as possible. It could be used both in coordinate node and data node:

For the aggregation buckets memory tracking issue, now we have follow solutions:

  1. Add aggregation buckets memory usage into request breaker.
    Since aggregation is also a search request, it also should be controlled by request circuit breaker as you have suggested in the comment. However, it's difficult to track buckets memory, so it's hard to use request circuit breaker to limit buckets memory.

  2. Use max_buckets to hard limit result buckets number.
    This could accurately control buckets number usage, rather than memory usage. It's also not a memory circuit breaker. If user wants to increase it to get more bucket results, it's hard to evaluate a value that would not explode memory and also meet user's result buckets number requirement. Many of our users have some data analysis cases for time series data, the result buckets number may get up to tens or even hundreds of thousands, if we set max_buckets directly to a high value for all the aggregation request, it would easily get node OOM. So it's a little bit risky and not that flexible to tune this setting.

  3. Check parent circuit breaker in a certain step/interval.
    That's the above PR we have introduced, check_buckets_step_size controls memory tracking step/interval. Instead of control total result buckets, it uses parent circuit breaker to control memory usage with a certain step size. This would be accurate and flexible, user could set a lower step size to control aggregation memory usage without limiting max result buckets number.
    If the purpose of max_buckets is only to control memory usage, then I think it seems check_buckets_step_size is good enough.

@polyfractal
Copy link
Contributor

Thanks for the explanation, that helps me understand the purpose better. A few thoughts, mainly writing this so I have an overview of all the pieces:

Today, the parent breaker is checked any time we increment a breaker via ChildMemoryCircuitBreaker#addEstimateBytesAndMaybeBreak(). This is used when we create a new Aggregator (AggregatorBase#addRequestCircuitBreakerBytes()) or any time we resize a BigArrays (BigArrays#adjustBreaker()). BigArrays are the data structure that back most of the aggregations.

So this means the real memory breaker is being checked on most (but not all) shard-level aggregation operation today.

It's not being checked on:

  • shard-level reduction
  • while accumulating results on the coordinator / intermediate reductions
  • the final reduce on coordinator
  • response serialization

So I agree that we should add some checks to the parent breaker at those steps. I'm not sure we need check_buckets_step_size though. Checking real-memory breaker is pretty cheap (few hundred nanos) so I think we could just check every time we account for a new bucket. That would simplify things since we don't need an extra setting.

@danielmitterdorfer do you know if it is ok to call HiearchyCircuitBreakingService#checkParentLimit() directly? It's public, but also only used from within the service, and seems like maybe something that should only be called indirectly?

@danielmitterdorfer
Copy link
Member

@danielmitterdorfer do you know if it is ok to call HiearchyCircuitBreakingService#checkParentLimit() directly? It's public, but also only used from within the service, and seems like maybe something that should only be called indirectly?

IMHO the CircuitBreakerService should only be used to retrieve the corresponding circuit breaker (via #getBreaker(String)). Users can call public API methods on the circuit breaker which in turn may call #checkParentLimit() (that depends on the implementation) but it is not intended that any other code calls #checkParentLimit() directly. I'd argue that #checkParentLimit() is only declared public because it needs to be called by ChildMemoryCircuitBreaker which is in a different package.

@howardhuanghua
Copy link
Contributor Author

Thanks @danielmitterdorfer, @polyfractal. To avoid calling #checkParentLimit() directly, I use REQUEST child ciruit breaker instead in MultiBucketConsumer, pass 0 bytes to breaker.addEstimateBytesAndMaybeBreak to trigger parent limit checking. Please help to check the updated commit.

If we check real-memory breaker every time we account for a new bucket, each checking is about few hundred nanos, if an aggregation result contains hundreds of thousands of buckets, then would cost tens of milliseconds on parent limit checking. With check_buckets_step_size setting, we could reduce the checking frequency to avoid performance impaction even it's just a little. I set it to 1000 by default, in above case, even hundreds of thousands of buckets, the checking cost would totally less than 1ms.

Meanwhile, I would like to confirm these unchecked levels you have mentioned above:

  • shard-level reduction
    Do you mean AggregatorBase#buildAggregation? It seems in this shard leve reduction, it already has circuit break checking, take StringTermsAggregator as an example:

  • while accumulating results on the coordinator / intermediate reductions
    We already have a propose on this. For coordinator node accumulating aggregation results, we could evaluate buckets memory usage based on incoming network stream bytes length before deserialization, and add it into request circuit breaker. I will open another PR to describe this propose later.

  • the final reduce on coordinator
    Current PR could solve the coordinator node buckets reduction circuit breaker checking issue.

  • response serialization
    Do you mean the search response building method as follow?

    public void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
    listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId));
    }

@howardhuanghua
Copy link
Contributor Author

Hi @polyfractal, would you pelase help to check the updated commit again? Thanks a lot.

@polyfractal
Copy link
Contributor

Hi @howardhuanghua, apologies for the delay. I became unexpectedly very busy this last week. I'll try to take a look at your new changes tomorrow!

@howardhuanghua
Copy link
Contributor Author

Hi Adrien @jpountz , would you please help to review this PR? Thanks a lot.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea a lot. I left some comments that might help simplify a bit.

@@ -109,6 +129,11 @@ public void accept(int value) {
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
}

if (value > 0 && checkBucketsStepSizeLimit > 0 && count % checkBucketsStepSizeLimit == 0) {
CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of only getting the request circuit breaker here, can you take a CircuitBreaker instead of a CircuitBreakerService in the constructor?

private volatile int maxBucket;
public static final Setting<Integer> CHECK_BUCKETS_STEP_SIZE_SETTING =
Setting.intSetting("search.check_buckets_step_size", DEFAULT_CHECK_BUCKETS_STEP_SIZE,
-1, Setting.Property.NodeScope, Setting.Property.Dynamic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of not making it configurable at all and check every 1000 buckets all the time? (Or maybe 1024 so that the % 1000 can be replaced with a lighter & 0x3FF mask)

final boolean forbidPrivateIndexSettings) {
final Environment environment,
final Collection<Class<? extends Plugin>> classpathPlugins,
final boolean forbidPrivateIndexSettings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you undo the above unrelated indentation changes?

new MultiBucketConsumerService.MultiBucketConsumer(10000, 10000, service);

long currentMemory = ((HierarchyCircuitBreakerService) service).currentMemoryUsage();
if (currentMemory > parentLimitBytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe make the test a bit more predictable by calling addWithoutBreaking with a number of bytes that is greater than the limit?

BigArrays bigArrays, FetchPhase fetchPhase) {
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null);
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
BigArrays bigArrays, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

.numberOfReplicas(0)
.creationDate(System.currentTimeMillis())
.build(),
Settings.EMPTY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@howardhuanghua
Copy link
Contributor Author

Hi @jpountz , thanks for your review. I have updated the code, would you please help to check again?
Add few points as follow:

  1. I removed search.check_buckets_step_size that I've added before. I agree that we don't need user to configure this. Meanwhile, I am considering that, shall we still need search.max_buckets ? It seems if we check parent circuit breaker every 1024 buckets, then we don't need to limit max buckets anymore?

  2. In testAllocationBucketsBreaker UT, I use addWithoutBreaking to make sure used bytes is greater than the total circuit breaker limit, and I also need to set indices.breaker.total.use_real_memory to false. As using real memory case would not add all the child breakers together.

@howardhuanghua
Copy link
Contributor Author

Hi @jpountz , would you please help to review the changes again? Thank you.

@jpountz
Copy link
Contributor

jpountz commented Jan 30, 2020

@elasticmachine update branch

@jpountz
Copy link
Contributor

jpountz commented Jan 30, 2020

@elasticmachine ok to test

@jpountz
Copy link
Contributor

jpountz commented Jan 30, 2020

I opened #51694 for the failure of elasticsearch-ci/1

@elasticmachine run elasticsearch-ci/1

@jpountz
Copy link
Contributor

jpountz commented Jan 31, 2020

@howardhuanghua Thanks again for your contribution! I opened #51731 to discuss the deprecation of search.max_buckets that you raised.

jpountz added a commit that referenced this pull request Jan 31, 2020
@howardhuanghua
Copy link
Contributor Author

@jpountz Thanks a lot for your help!

@polyfractal
Copy link
Contributor

Thanks @howardhuanghua! ❤️

@liwanjie1020
Copy link

@howardhuanghua I always thought that with this new parameter, requests for too many buckets should be rejected。But I tested it today, making two requests and creating hundreds of thousands of buckets per request。In the end, the machine is still directly OOM。
My ES version is 7.10.1。I tested not only the case where indices.breaker.total.use_real_memory is false, but also the case where indices.breaker.total.use_real_memory is true。And the max_buckets is the default value of 65535.The result is the same。
image
image
But when I set the max_buckets to extremely large (instead of the default value of 65535). The circuit breaker will come into play and the cluster will not crash and will not OOM.
I feel very confused. I hope you can help me. Thank you very much

@C-300SilverMountain
Copy link

If the JVM memory has been used for 60%, a large request comes in and consumes 50% of the memory. Since the default value of the circuit breaker is 60%, the circuit breaker will not be triggered, and the consequence is oom

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants