Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit-break based on real memory usage #31767

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.indices.breaker;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.concurrent.TimeUnit;

@Fork(3)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@SuppressWarnings("unused") //invoked by benchmarking framework
public class MemoryStatsBenchmark {
private static final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();

@Param({"0", "16", "256", "4096"})
private int tokens;

@Benchmark
public void baseline() {
Blackhole.consumeCPU(tokens);
}

@Benchmark
@Threads(1)
public long getMemoryStats_01() {
Blackhole.consumeCPU(tokens);
return memoryMXBean.getHeapMemoryUsage().getUsed();
}

@Benchmark
@Threads(2)
public long getMemoryStats_02() {
Blackhole.consumeCPU(tokens);
return memoryMXBean.getHeapMemoryUsage().getUsed();
}

@Benchmark
@Threads(4)
public long getMemoryStats_04() {
Blackhole.consumeCPU(tokens);
return memoryMXBean.getHeapMemoryUsage().getUsed();
}

@Benchmark
@Threads(8)
public long getMemoryStats_08() {
Blackhole.consumeCPU(tokens);
return memoryMXBean.getHeapMemoryUsage().getUsed();
}

@Benchmark
@Threads(16)
public long getMemoryStats_16() {
Blackhole.consumeCPU(tokens);
return memoryMXBean.getHeapMemoryUsage().getUsed();
}

@Benchmark
@Threads(32)
public long getMemoryStats_32() {
Blackhole.consumeCPU(tokens);
return memoryMXBean.getHeapMemoryUsage().getUsed();
}

@Benchmark
@Threads(64)
public long getMemoryStats_64() {
Blackhole.consumeCPU(tokens);
return memoryMXBean.getHeapMemoryUsage().getUsed();
}
}

10 changes: 9 additions & 1 deletion docs/reference/migration/migrate_7_0/indices.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,12 @@ The following previously deprecated url parameter have been removed:

Previously the in flight requests circuit breaker considered only the raw byte representation.
By bumping the value of `network.breaker.inflight_requests.overhead` from 1 to 2, this circuit
breaker considers now also the memory overhead of representing the request as a structured object.
breaker considers now also the memory overhead of representing the request as a structured object.

==== Parent circuit breaker changes

The parent circuit breaker defines a new setting `indices.breaker.total.use_real_memory` which is
`true` by default if the JVM heap size is smaller than 1GB. This means that the parent circuit
breaker will trip based on currently used heap memory instead of only considering the reserved
memory by child circuit breakers. When this setting is `true`, the default parent breaker limit
also changes from 70% to 95% of the JVM heap size.
12 changes: 10 additions & 2 deletions docs/reference/modules/indices/circuit_breaker.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,19 @@ These settings can be dynamically updated on a live cluster with the
[float]
==== Parent circuit breaker

The parent-level breaker can be configured with the following setting:
The parent-level breaker can be configured with the following settings:

`indices.breaker.total.use_real_memory`::

Whether the parent breaker should take real memory usage into account (`true`) or only
consider the amount that is reserved by child circuit breakers (`false`). Defaults to `true`
if the JVM heap size is smaller than 1GB, otherwise `false`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we talked about today, I think we may want to think about why this defaults to off and put it in the documentation (or at least mention it in this PR). Right now it comes off as more of "it's off by default for > 1gb heaps for no reason in particular" which seems strange given that we're trying to be as safe as possible by default

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to change the default to the real memory circuit breaker for all cases but initially I wanted to be a bit more conservative. I felt that for larger deployments it is ok to stick with the current implementation. I also thought that it might be one thing less to consider in a major version upgrade if we keep the default for now for those deployments.

My idea behind changing it only for lower heap sizes below 1GB deviation between real memory usage and actively tracked memory usage starts to matter more and we want to ensure we push back accordingly. I did not choose this heap size by coincidence but based on our benchmarks. With 1GB heap, Elasticsearch can handle our macrobenchmark suite but it starts to struggle below that point (e.g. for 768MB).

I could ask how the rest of the team feels turning it on in all cases. Wdyt?


`indices.breaker.total.limit`::

Starting limit for overall parent breaker, defaults to 70% of JVM heap.
Starting limit for overall parent breaker, defaults to 70% of JVM heap if
`indices.breaker.total.use_real_memory` is `false`. If `indices.breaker.total.use_real_memory`
is `true`, defaults to 95% of the JVM heap.

[[fielddata-circuit-breaker]]
[float]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws Cir

// Additionally, we need to check that we haven't exceeded the parent's limit
try {
parent.checkParentLimit(label);
parent.checkParentLimit((long) (bytes * overheadConstant), label);
} catch (CircuitBreakingException e) {
// If the parent breaker is tripped, this breaker has to be
// adjusted back down because the allocation is "blocked" but the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS,
HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING,
HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -44,10 +47,24 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {

private static final String CHILD_LOGGER_PREFIX = "org.elasticsearch.indices.breaker.";

private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();

private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();

public static final Setting<Boolean> USE_REAL_MEMORY_USAGE_SETTING =
Setting.boolSetting("indices.breaker.total.use_real_memory", settings -> {
ByteSizeValue maxHeapSize = new ByteSizeValue(MEMORY_MX_BEAN.getHeapMemoryUsage().getMax());
return Boolean.toString(maxHeapSize.compareTo(new ByteSizeValue(1, ByteSizeUnit.GB)) < 0);
}, Property.NodeScope);

public static final Setting<ByteSizeValue> TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.memorySizeSetting("indices.breaker.total.limit", "70%", Property.Dynamic, Property.NodeScope);
Setting.memorySizeSetting("indices.breaker.total.limit", settings -> {
if (USE_REAL_MEMORY_USAGE_SETTING.get(settings)) {
return "95%";
} else {
return "70%";
}
}, Property.Dynamic, Property.NodeScope);

public static final Setting<ByteSizeValue> FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING =
Setting.memorySizeSetting("indices.breaker.fielddata.limit", "60%", Property.Dynamic, Property.NodeScope);
Expand Down Expand Up @@ -77,6 +94,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
public static final Setting<CircuitBreaker.Type> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING =
new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);

private final boolean trackRealMemoryUsage;
private volatile BreakerSettings parentSettings;
private volatile BreakerSettings fielddataSettings;
private volatile BreakerSettings inFlightRequestsSettings;
Expand Down Expand Up @@ -120,6 +138,8 @@ public HierarchyCircuitBreakerService(Settings settings, ClusterSettings cluster
logger.trace("parent circuit breaker with settings {}", this.parentSettings);
}

this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings);

registerBreaker(this.requestSettings);
registerBreaker(this.fielddataSettings);
registerBreaker(this.inFlightRequestsSettings);
Expand Down Expand Up @@ -191,17 +211,15 @@ public CircuitBreaker getBreaker(String name) {

@Override
public AllCircuitBreakerStats stats() {
long parentEstimated = 0;
List<CircuitBreakerStats> allStats = new ArrayList<>(this.breakers.size());
// Gather the "estimated" count for the parent breaker by adding the
// estimations for each individual breaker
for (CircuitBreaker breaker : this.breakers.values()) {
allStats.add(stats(breaker.getName()));
parentEstimated += breaker.getUsed();
}
// Manually add the parent breaker settings since they aren't part of the breaker map
allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(),
parentEstimated, 1.0, parentTripCount.get()));
parentUsed(0L), 1.0, parentTripCount.get()));
return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()]));
}

Expand All @@ -211,15 +229,28 @@ public CircuitBreakerStats stats(String name) {
return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount());
}

private long parentUsed(long newBytesReserved) {
if (this.trackRealMemoryUsage) {
return currentMemoryUsage() + newBytesReserved;
} else {
long parentEstimated = 0;
for (CircuitBreaker breaker : this.breakers.values()) {
parentEstimated += breaker.getUsed() * breaker.getOverhead();
}
return parentEstimated;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be parentEstimated + newBytesReserved? We add it to the real memory usage if tracking memory

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because this is for the current strategy which sums up the total memory reserved by all child circuit breakers. As the corresponding child circuit breaker accounts for that amount of memory already, we do not need to do that again in the parent breaker.

For the new strategy which is based on real memory usage, we do not rely on the child memory circuit breakers but rather only on current memory usage. Hence, we need to consider the amount of memory that is about to be reserved here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}

//package private to allow overriding it in tests
long currentMemoryUsage() {
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
}

/**
* Checks whether the parent breaker has been tripped
*/
public void checkParentLimit(String label) throws CircuitBreakingException {
long totalUsed = 0;
for (CircuitBreaker breaker : this.breakers.values()) {
totalUsed += (breaker.getUsed() * breaker.getOverhead());
}

public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
long totalUsed = parentUsed(newBytesReserved);
long parentLimit = this.parentSettings.getLimit();
if (totalUsed > parentLimit) {
this.parentTripCount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.common.settings;

import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.IndexingMemoryController;
Expand Down Expand Up @@ -57,8 +58,15 @@ public void testIndicesRequestCacheSetting() {
}

public void testCircuitBreakerSettings() {
// default is chosen based on actual heap size
double defaultTotalPercentage;
if (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() < new ByteSizeValue(1, ByteSizeUnit.GB).getBytes()) {
defaultTotalPercentage = 0.95d;
} else {
defaultTotalPercentage = 0.7d;
}
assertMemorySizeSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.total.limit",
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.7)));
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * defaultTotalPercentage)));
assertMemorySizeSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.fielddata.limit",
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.6)));
assertMemorySizeSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.request.limit",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.containsString;
Expand All @@ -56,7 +57,7 @@ public CircuitBreaker getBreaker(String name) {
}

@Override
public void checkParentLimit(String label) throws CircuitBreakingException {
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
// never trip
}
};
Expand Down Expand Up @@ -114,7 +115,7 @@ public CircuitBreaker getBreaker(String name) {
}

@Override
public void checkParentLimit(String label) throws CircuitBreakingException {
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
// Parent will trip right before regular breaker would trip
if (getBreaker(CircuitBreaker.REQUEST).getUsed() > parentLimit) {
parentTripped.incrementAndGet();
Expand Down Expand Up @@ -170,6 +171,7 @@ public void checkParentLimit(String label) throws CircuitBreakingException {
*/
public void testBorrowingSiblingBreakerMemory() throws Exception {
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200mb")
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb")
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb")
Expand Down Expand Up @@ -199,4 +201,50 @@ public void testBorrowingSiblingBreakerMemory() throws Exception {
assertThat(exception.getMessage(), containsString("which is larger than the limit of [209715200/200mb]"));
}
}

public void testParentBreaksOnRealMemoryUsage() throws Exception {
Settings clusterSettings = Settings.builder()
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE)
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200b")
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "300b")
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 2)
.build();

AtomicLong memoryUsage = new AtomicLong();
final CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
@Override
long currentMemoryUsage() {
return memoryUsage.get();
}
};
final CircuitBreaker requestBreaker = service.getBreaker(CircuitBreaker.REQUEST);

// anything below 100 bytes should work (overhead) - current memory usage is zero
requestBreaker.addEstimateBytesAndMaybeBreak(randomLongBetween(0, 99), "request");
assertEquals(0, requestBreaker.getTrippedCount());
// assume memory usage has increased to 150 bytes
memoryUsage.set(150);

// a reservation that bumps memory usage to less than 200 (150 bytes used + reservation < 200)
requestBreaker.addEstimateBytesAndMaybeBreak(randomLongBetween(0, 24), "request");
assertEquals(0, requestBreaker.getTrippedCount());
memoryUsage.set(181);

long reservationInBytes = randomLongBetween(10, 50);
// anything >= 20 bytes (10 bytes * 2 overhead) reservation breaks the parent but it must be low enough to avoid
// breaking the child breaker.
CircuitBreakingException exception = expectThrows(CircuitBreakingException.class, () -> requestBreaker
.addEstimateBytesAndMaybeBreak(reservationInBytes, "request"));
// it was the parent that rejected the reservation
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [request] would be"));
assertThat(exception.getMessage(), containsString("which is larger than the limit of [200/200b]"));
assertEquals(0, requestBreaker.getTrippedCount());
assertEquals(1, service.stats().getStats(CircuitBreaker.PARENT).getTrippedCount());

// lower memory usage again - the same reservation should succeed
memoryUsage.set(100);
requestBreaker.addEstimateBytesAndMaybeBreak(reservationInBytes, "request");
assertEquals(0, requestBreaker.getTrippedCount());
}
}
Loading