-
Notifications
You must be signed in to change notification settings - Fork 207
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implemented a heap-based circuit breaker (#2155)
Implemented a heap-based circuit breaker. This circuit breaker will prevent entry buffers from accepting events after the heap usage reaches a specified value. This checks for heap usage in a background thread and updates the state, which the buffer will then use to determine if the circuit breaker is open or closed. This also signals to the JVM to start a GC when the threshold is reached. Resolves #2150. Signed-off-by: David Venable <[email protected]>
- Loading branch information
Showing
31 changed files
with
1,672 additions
and
30 deletions.
There are no files selected for viewing
104 changes: 104 additions & 0 deletions
104
data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.model.types; | ||
|
||
import java.math.BigDecimal; | ||
import java.util.Arrays; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.function.Function; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Class for representing a count or size of bytes. | ||
* | ||
* @since 2.1 | ||
*/ | ||
public class ByteCount { | ||
private static final Pattern BYTE_PATTERN = Pattern.compile("^(?<value>\\d+\\.?\\d*)(?<unit>[a-z]+)?\\z"); | ||
private final long bytes; | ||
|
||
private ByteCount(final long bytes) { | ||
this.bytes = bytes; | ||
} | ||
|
||
/** | ||
* Gets the value as bytes. | ||
* | ||
* @return A long representation of the bytes. | ||
* @since 2.1 | ||
*/ | ||
public long getBytes() { | ||
return bytes; | ||
} | ||
|
||
private enum Unit { | ||
BYTE("b", 1), | ||
KILOBYTE("kb", 1024), | ||
MEGABYTE("mb", KILOBYTE.multiplier * 1024), | ||
GIGABYTE("gb", MEGABYTE.multiplier * 1024); | ||
|
||
private final String unitString; | ||
private final long multiplier; | ||
|
||
private static final Map<String, Unit> UNIT_MAP = Arrays.stream(Unit.values()) | ||
.collect(Collectors.toMap(unit -> unit.unitString, Function.identity())); | ||
|
||
Unit(final String unitString, final long multiplier) { | ||
this.unitString = unitString; | ||
this.multiplier = multiplier; | ||
} | ||
|
||
static Optional<Unit> fromString(final String unitString) { | ||
return Optional.ofNullable(UNIT_MAP.get(unitString)); | ||
} | ||
} | ||
|
||
/** | ||
* Parses a byte string to get the byte count. | ||
* | ||
* @param string A valid string representation of the bytes | ||
* @return The parsed {@link ByteCount} | ||
* @throws ByteCountParseException thrown if unable to parse the input string for the expected format | ||
* @throws ByteCountInvalidInputException thrown if the input is parsable but the units or value is invalid | ||
*/ | ||
public static ByteCount parse(final String string) { | ||
final Matcher matcher = BYTE_PATTERN.matcher(string); | ||
if(!matcher.find()) { | ||
throw new ByteCountParseException("Unable to parse bytes provided by '" + string + "'"); | ||
} | ||
|
||
final String valueString = matcher.group("value"); | ||
final String unitString = matcher.group("unit"); | ||
|
||
if(unitString == null) { | ||
throw new ByteCountInvalidInputException("Byte counts must have a unit."); | ||
} | ||
|
||
final Unit unit = Unit.fromString(unitString) | ||
.orElseThrow(() -> new ByteCountInvalidInputException("Invalid byte unit: '" + unitString + "'")); | ||
|
||
final BigDecimal valueBigDecimal = new BigDecimal(valueString); | ||
|
||
final BigDecimal byteCount = scaleToBytes(valueBigDecimal, unit); | ||
|
||
if(unit == Unit.BYTE && isFractional(byteCount)) { | ||
throw new ByteCountInvalidInputException("The byte value '" + string + "' is explicitly declared as a fractional byte which is not allowed."); | ||
} | ||
|
||
return new ByteCount(byteCount.longValue()); | ||
} | ||
|
||
private static BigDecimal scaleToBytes(final BigDecimal value, final Unit unit) { | ||
return value.multiply(BigDecimal.valueOf(unit.multiplier)); | ||
} | ||
|
||
private static boolean isFractional(final BigDecimal value) { | ||
return value.remainder(BigDecimal.ONE).compareTo(BigDecimal.ZERO) != 0; | ||
} | ||
} |
12 changes: 12 additions & 0 deletions
12
.../src/main/java/org/opensearch/dataprepper/model/types/ByteCountInvalidInputException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.model.types; | ||
|
||
public class ByteCountInvalidInputException extends RuntimeException { | ||
public ByteCountInvalidInputException(final String message) { | ||
super(message); | ||
} | ||
} |
12 changes: 12 additions & 0 deletions
12
...per-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountParseException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.model.types; | ||
|
||
public class ByteCountParseException extends RuntimeException { | ||
public ByteCountParseException(final String message) { | ||
super(message); | ||
} | ||
} |
148 changes: 148 additions & 0 deletions
148
data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.model.types; | ||
|
||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.CsvSource; | ||
import org.junit.jupiter.params.provider.ValueSource; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.notNullValue; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
|
||
class ByteCountTest { | ||
@ParameterizedTest | ||
@ValueSource(strings = { | ||
".1b", | ||
".1kb", | ||
".1mb", | ||
".1gb", | ||
".12b", | ||
".0b", | ||
"b", | ||
"kb", | ||
"mb", | ||
"gb", | ||
"1 b", | ||
"1 kb", | ||
"1 mb", | ||
"1 gb", | ||
".b", | ||
".kb", | ||
".mb", | ||
".gb", | ||
"a", | ||
"badinput", | ||
"1b ", | ||
"1b trailing", | ||
"1kb ", | ||
"1kb trailing", | ||
"a1b", | ||
"1b!", | ||
"-0b", | ||
"-1b", | ||
"-1kb", | ||
"-1mb", | ||
"-1gb", | ||
}) | ||
void parse_throws_exception_for_invalid_format(final String byteString) { | ||
final ByteCountParseException actualException = assertThrows(ByteCountParseException.class, () -> ByteCount.parse(byteString)); | ||
|
||
assertThat(actualException.getMessage(), notNullValue()); | ||
} | ||
|
||
@ParameterizedTest | ||
@ValueSource(strings = { | ||
"0", | ||
"1", | ||
"1024", | ||
"1.5", | ||
}) | ||
void parse_throws_exception_for_missing_unit(final String byteString) { | ||
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString)); | ||
|
||
assertThat(actualException.getMessage(), notNullValue()); | ||
assertThat(actualException.getMessage(), containsString("Byte counts must have a unit")); | ||
} | ||
|
||
@ParameterizedTest | ||
@ValueSource(strings = { | ||
"0byte", | ||
"0bytes", | ||
"1bytes", | ||
"1nothing", | ||
}) | ||
void parse_throws_exception_for_invalid_unit(final String byteString) { | ||
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString)); | ||
|
||
assertThat(actualException.getMessage(), notNullValue()); | ||
assertThat(actualException.getMessage(), containsString("Invalid byte unit")); | ||
} | ||
|
||
@ParameterizedTest | ||
@CsvSource({ | ||
"0b, 0", | ||
"0kb, 0", | ||
"0mb, 0", | ||
"0gb, 0", | ||
"1b, 1", | ||
"8b, 8", | ||
"1024b, 1024", | ||
"2048b, 2048", | ||
"0.25kb, 256", | ||
"0.5kb, 512", | ||
"1kb, 1024", | ||
"2kb, 2048", | ||
"1.25kb, 1280", | ||
"1.5kb, 1536", | ||
"1024kb, 1048576", | ||
"2048kb, 2097152", | ||
"0.5mb, 524288", | ||
"1mb, 1048576", | ||
"2mb, 2097152", | ||
"5mb, 5242880", | ||
"1024mb, 1073741824", | ||
"0.5gb, 536870912", | ||
"1gb, 1073741824", | ||
"1.5gb, 1610612736", | ||
"2gb, 2147483648", | ||
"200gb, 214748364800" | ||
}) | ||
void parse_returns_expected_byte_value(final String byteString, final long expectedBytes) { | ||
final ByteCount byteCount = ByteCount.parse(byteString); | ||
assertThat(byteCount, notNullValue()); | ||
assertThat(byteCount.getBytes(), equalTo(expectedBytes)); | ||
} | ||
|
||
@ParameterizedTest | ||
@ValueSource(strings = { | ||
"0.1b", | ||
"0.5b", | ||
"1.1b", | ||
"1.9b", | ||
"20.1b" | ||
}) | ||
void parse_throws_exception_for_explicit_fractional_bytes(final String byteString) { | ||
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString)); | ||
|
||
assertThat(actualException.getMessage(), notNullValue()); | ||
assertThat(actualException.getMessage(), containsString("fractional")); | ||
} | ||
|
||
@ParameterizedTest | ||
@CsvSource({ | ||
"1.1kb, 1126", | ||
"1.1mb, 1153433", | ||
"0.49mb, 513802", | ||
}) | ||
void parse_returns_rounded_bytes_for_implicit_fractional_bytes(final String byteString, final long expectedBytes) { | ||
final ByteCount byteCount = ByteCount.parse(byteString); | ||
assertThat(byteCount, notNullValue()); | ||
assertThat(byteCount.getBytes(), equalTo(expectedBytes)); | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreaker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.breaker; | ||
|
||
/** | ||
* Represents a circuit breaker in Data Prepper. | ||
* | ||
* @since 2.1 | ||
*/ | ||
public interface CircuitBreaker { | ||
/** | ||
* Checks a circuit breaker. If open, then the circuit breaker has | ||
* been tripped. | ||
* | ||
* @return true if open; false if closed. | ||
* @since 2.1 | ||
*/ | ||
boolean isOpen(); | ||
} |
37 changes: 37 additions & 0 deletions
37
...repper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.breaker; | ||
|
||
import org.opensearch.dataprepper.parser.model.CircuitBreakerConfig; | ||
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* The application config for circuit breakers. Used for wiring beans | ||
* related to circuit breakers. | ||
* | ||
* @since 2.1 | ||
*/ | ||
@Configuration | ||
public class CircuitBreakerAppConfig { | ||
@Bean | ||
public CircuitBreakerManager circuitBreakerService(final List<InnerCircuitBreaker> circuitBreakers) { | ||
return new CircuitBreakerManager(circuitBreakers); | ||
} | ||
|
||
@Bean | ||
InnerCircuitBreaker heapCircuitBreaker(final DataPrepperConfiguration dataPrepperConfiguration) { | ||
final CircuitBreakerConfig circuitBreakerConfig = dataPrepperConfiguration.getCircuitBreakerConfig(); | ||
if(circuitBreakerConfig != null && circuitBreakerConfig.getHeapConfig() != null) { | ||
return new HeapCircuitBreaker(circuitBreakerConfig.getHeapConfig()); | ||
} else { | ||
return null; | ||
} | ||
} | ||
} |
49 changes: 49 additions & 0 deletions
49
...-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.breaker; | ||
|
||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
/** | ||
* Class for managing circuit breakers. | ||
* | ||
* @since 2.1 | ||
*/ | ||
public class CircuitBreakerManager { | ||
private final CircuitBreaker globalCircuitBreaker; | ||
|
||
CircuitBreakerManager(final List<InnerCircuitBreaker> circuitBreakers) { | ||
if(circuitBreakers.isEmpty()) { | ||
globalCircuitBreaker = null; | ||
} else { | ||
globalCircuitBreaker = new GlobalCircuitBreaker(circuitBreakers); | ||
} | ||
} | ||
|
||
/** | ||
* Returns a circuit breaker representing all circuit breakers. This is open | ||
* if and only if at least one circuit breaker is open. | ||
* | ||
* @return The global circuit breaker. | ||
*/ | ||
public Optional<CircuitBreaker> getGlobalCircuitBreaker() { | ||
return Optional.ofNullable(globalCircuitBreaker); | ||
} | ||
|
||
private static class GlobalCircuitBreaker implements CircuitBreaker { | ||
private final List<InnerCircuitBreaker> circuitBreakers; | ||
|
||
public GlobalCircuitBreaker(final List<InnerCircuitBreaker> circuitBreakers) { | ||
this.circuitBreakers = circuitBreakers; | ||
} | ||
|
||
@Override | ||
public boolean isOpen() { | ||
return circuitBreakers.stream().anyMatch(CircuitBreaker::isOpen); | ||
} | ||
} | ||
} |
Oops, something went wrong.