Skip to content

Commit

Permalink
fixup! fixup! add support for registering custom circuit breaker
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Jan 12, 2015
1 parent 3f51352 commit 358bb9b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
*/
public interface CircuitBreaker {

public static final String PARENT = "PARENT";
public static final String FIELDDATA = "FIELDDATA";
public static final String REQUEST = "REQUEST";
public static final String PARENT = "parent";
public static final String FIELDDATA = "fielddata";
public static final String REQUEST = "request";

public static enum Type {
// A regular or child MemoryCircuitBreaker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.elasticsearch.node.settings.NodeSettingsService;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.collect.Lists.newArrayList;
Expand All @@ -42,7 +42,7 @@
*/
public class HierarchyCircuitBreakerService extends CircuitBreakerService {

private final Map<String, CircuitBreaker> breakers = new ConcurrentHashMap();
private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap();

// Old pre-1.4.0 backwards compatible settings
public static final String OLD_CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit";
Expand Down Expand Up @@ -220,7 +220,7 @@ public void checkParentLimit(String label) throws CircuitBreakingException {
long parentLimit = this.parentSettings.getLimit();
if (totalUsed > parentLimit) {
this.parentTripCount.incrementAndGet();
throw new CircuitBreakingException("[PARENT] Data too large, data for [" +
throw new CircuitBreakingException("[parent] Data too large, data for [" +
label + "] would be larger than limit of [" +
parentLimit + "/" + new ByteSizeValue(parentLimit) + "]",
totalUsed, parentLimit);
Expand All @@ -238,15 +238,27 @@ public void registerBreaker(BreakerSettings breakerSettings) {
// Validate the settings
validateSettings(new BreakerSettings[] {breakerSettings});

CircuitBreaker breaker;
if (breakerSettings.getType() == CircuitBreaker.Type.NOOP) {
breaker = new NoopCircuitBreaker(breakerSettings.getName());
CircuitBreaker breaker = new NoopCircuitBreaker(breakerSettings.getName());
breakers.put(breakerSettings.getName(), breaker);
} else {
CircuitBreaker oldBreaker = breakers.get(breakerSettings.getName());
breaker = new ChildMemoryCircuitBreaker(breakerSettings,
(ChildMemoryCircuitBreaker)oldBreaker, logger, this, breakerSettings.getName());
CircuitBreaker oldBreaker;
CircuitBreaker breaker = new ChildMemoryCircuitBreaker(breakerSettings,
logger, this, breakerSettings.getName());

for (;;) {
oldBreaker = breakers.putIfAbsent(breakerSettings.getName(), breaker);
if (oldBreaker == null) {
return;
}
breaker = new ChildMemoryCircuitBreaker(breakerSettings,
(ChildMemoryCircuitBreaker)oldBreaker, logger, this, breakerSettings.getName());

if (breakers.replace(breakerSettings.getName(), oldBreaker, breaker)) {
return;
}
}
}

breakers.put(breakerSettings.getName(), breaker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -236,7 +239,7 @@ public void testParentChecking() throws Exception {
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
fail("should have thrown an exception");
} catch (Exception e) {
String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]";
String errMsg = "[fielddata] Data too large, data for [test] would be larger than limit of [10/10b]";
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
}
Expand All @@ -258,7 +261,7 @@ public void testParentChecking() throws Exception {
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
fail("should have thrown an exception");
} catch (Exception e) {
String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]";
String errMsg = "[parent] Data too large, data for [test] would be larger than limit of [15/15b]";
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
}
Expand Down Expand Up @@ -292,7 +295,7 @@ public void testRequestBreaker() throws Exception {
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get();
fail("aggregation should have tripped the breaker");
} catch (Exception e) {
String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [<reused_arrays>] would be larger than limit of [10/10b]]";
String errMsg = "CircuitBreakingException[[request] Data too large, data for [<reused_arrays>] would be larger than limit of [10/10b]]";
assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException",
ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true));
}
Expand All @@ -314,4 +317,34 @@ public void run() {
}
}, 30, TimeUnit.SECONDS);
}

@Test
public void testCustomCircuitBreakerRegistration() throws Exception {
Iterable<CircuitBreakerService> serviceIter = internalCluster().getInstances(CircuitBreakerService.class);

final String breakerName = "customBreaker";
BreakerSettings breakerSettings = new BreakerSettings(breakerName, 8, 1.03);
CircuitBreaker breaker = null;

for (CircuitBreakerService s : serviceIter) {
s.registerBreaker(breakerSettings);
breaker = s.getBreaker(breakerSettings.getName());
}

if (breaker != null) {
try {
breaker.addEstimateBytesAndMaybeBreak(16, "test");
} catch (CircuitBreakingException e) {
// ignore, we forced a circuit break
}
}

NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().clear().setBreaker(true).get();
int breaks = 0;
for (NodeStats stat : stats.getNodes()) {
CircuitBreakerStats breakerStats = stat.getBreaker().getStats(breakerName);
breaks += breakerStats.getTrippedCount();
}
assertThat(breaks, greaterThanOrEqualTo(1));
}
}

0 comments on commit 358bb9b

Please sign in to comment.