From 358bb9bd7585502b269a3cbd4efb60ad7d3d238d Mon Sep 17 00:00:00 2001 From: Sebastian Utz Date: Mon, 12 Jan 2015 10:32:14 +0100 Subject: [PATCH] fixup! fixup! add support for registering custom circuit breaker --- .../common/breaker/CircuitBreaker.java | 6 +-- .../HierarchyCircuitBreakerService.java | 30 +++++++++----- .../breaker/CircuitBreakerServiceTests.java | 39 +++++++++++++++++-- 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java index 2fde06ecdce54..6c099cfe01492 100644 --- a/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java +++ b/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java @@ -29,9 +29,9 @@ */ public interface CircuitBreaker { - public static final String PARENT = "PARENT"; - public static final String FIELDDATA = "FIELDDATA"; - public static final String REQUEST = "REQUEST"; + public static final String PARENT = "parent"; + public static final String FIELDDATA = "fielddata"; + public static final String REQUEST = "request"; public static enum Type { // A regular or child MemoryCircuitBreaker diff --git a/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index df875c7809be6..db9717356ac0e 100644 --- a/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -30,8 +30,8 @@ import org.elasticsearch.node.settings.NodeSettingsService; import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import static com.google.common.collect.Lists.newArrayList; @@ -42,7 +42,7 @@ */ public class HierarchyCircuitBreakerService extends CircuitBreakerService { - private final Map breakers = new ConcurrentHashMap(); + private final ConcurrentMap breakers = new ConcurrentHashMap(); // Old pre-1.4.0 backwards compatible settings public static final String OLD_CIRCUIT_BREAKER_MAX_BYTES_SETTING = "indices.fielddata.breaker.limit"; @@ -220,7 +220,7 @@ public void checkParentLimit(String label) throws CircuitBreakingException { long parentLimit = this.parentSettings.getLimit(); if (totalUsed > parentLimit) { this.parentTripCount.incrementAndGet(); - throw new CircuitBreakingException("[PARENT] Data too large, data for [" + + throw new CircuitBreakingException("[parent] Data too large, data for [" + label + "] would be larger than limit of [" + parentLimit + "/" + new ByteSizeValue(parentLimit) + "]", totalUsed, parentLimit); @@ -238,15 +238,27 @@ public void registerBreaker(BreakerSettings breakerSettings) { // Validate the settings validateSettings(new BreakerSettings[] {breakerSettings}); - CircuitBreaker breaker; if (breakerSettings.getType() == CircuitBreaker.Type.NOOP) { - breaker = new NoopCircuitBreaker(breakerSettings.getName()); + CircuitBreaker breaker = new NoopCircuitBreaker(breakerSettings.getName()); + breakers.put(breakerSettings.getName(), breaker); } else { - CircuitBreaker oldBreaker = breakers.get(breakerSettings.getName()); - breaker = new ChildMemoryCircuitBreaker(breakerSettings, - (ChildMemoryCircuitBreaker)oldBreaker, logger, this, breakerSettings.getName()); + CircuitBreaker oldBreaker; + CircuitBreaker breaker = new ChildMemoryCircuitBreaker(breakerSettings, + logger, this, breakerSettings.getName()); + + for (;;) { + oldBreaker = breakers.putIfAbsent(breakerSettings.getName(), breaker); + if (oldBreaker == null) { + return; + } + breaker = new ChildMemoryCircuitBreaker(breakerSettings, + (ChildMemoryCircuitBreaker)oldBreaker, logger, this, breakerSettings.getName()); + + if (breakers.replace(breakerSettings.getName(), oldBreaker, breaker)) { + return; + } + } } - breakers.put(breakerSettings.getName(), breaker); } } diff --git a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java index 7fa39a4b7c714..6e573ce3ec411 100644 --- a/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java +++ b/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceTests.java @@ -27,8 +27,11 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.breaker.BreakerSettings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerStats; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.rest.RestStatus; @@ -236,7 +239,7 @@ public void testParentChecking() throws Exception { client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); fail("should have thrown an exception"); } catch (Exception e) { - String errMsg = "[FIELDDATA] Data too large, data for [test] would be larger than limit of [10/10b]"; + String errMsg = "[fielddata] Data too large, data for [test] would be larger than limit of [10/10b]"; assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); } @@ -258,7 +261,7 @@ public void testParentChecking() throws Exception { client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); fail("should have thrown an exception"); } catch (Exception e) { - String errMsg = "[PARENT] Data too large, data for [test] would be larger than limit of [15/15b]"; + String errMsg = "[parent] Data too large, data for [test] would be larger than limit of [15/15b]"; assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); } @@ -292,7 +295,7 @@ public void testRequestBreaker() throws Exception { client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get(); fail("aggregation should have tripped the breaker"); } catch (Exception e) { - String errMsg = "CircuitBreakingException[[REQUEST] Data too large, data for [] would be larger than limit of [10/10b]]"; + String errMsg = "CircuitBreakingException[[request] Data too large, data for [] would be larger than limit of [10/10b]]"; assertThat("Exception: " + ExceptionsHelper.unwrapCause(e) + " should contain a CircuitBreakingException", ExceptionsHelper.unwrapCause(e).getMessage().contains(errMsg), equalTo(true)); } @@ -314,4 +317,34 @@ public void run() { } }, 30, TimeUnit.SECONDS); } + + @Test + public void testCustomCircuitBreakerRegistration() throws Exception { + Iterable serviceIter = internalCluster().getInstances(CircuitBreakerService.class); + + final String breakerName = "customBreaker"; + BreakerSettings breakerSettings = new BreakerSettings(breakerName, 8, 1.03); + CircuitBreaker breaker = null; + + for (CircuitBreakerService s : serviceIter) { + s.registerBreaker(breakerSettings); + breaker = s.getBreaker(breakerSettings.getName()); + } + + if (breaker != null) { + try { + breaker.addEstimateBytesAndMaybeBreak(16, "test"); + } catch (CircuitBreakingException e) { + // ignore, we forced a circuit break + } + } + + NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().clear().setBreaker(true).get(); + int breaks = 0; + for (NodeStats stat : stats.getNodes()) { + CircuitBreakerStats breakerStats = stat.getBreaker().getStats(breakerName); + breaks += breakerStats.getTrippedCount(); + } + assertThat(breaks, greaterThanOrEqualTo(1)); + } }