Skip to content

Commit

Permalink
Implemented a heap-based circuit breaker (opensearch-project#2155)
Browse files Browse the repository at this point in the history
Implemented a heap-based circuit breaker. This circuit breaker will prevent entry buffers from accepting events after the heap usage reaches a specified value. This checks for heap usage in a background thread and updates the state, which the buffer will then use to determine if the circuit breaker is open or closed. This also signals to the JVM to start a GC when the threshold is reached. Resolves opensearch-project#2150.

Signed-off-by: David Venable <[email protected]>
Signed-off-by: mahesh724 <[email protected]>
  • Loading branch information
dlvenable authored and mahesh724 committed Mar 3, 2023
1 parent cc53731 commit 5d9401e
Show file tree
Hide file tree
Showing 31 changed files with 1,672 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.types;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Class for representing a count or size of bytes.
*
* @since 2.1
*/
public class ByteCount {
private static final Pattern BYTE_PATTERN = Pattern.compile("^(?<value>\\d+\\.?\\d*)(?<unit>[a-z]+)?\\z");
private final long bytes;

private ByteCount(final long bytes) {
this.bytes = bytes;
}

/**
* Gets the value as bytes.
*
* @return A long representation of the bytes.
* @since 2.1
*/
public long getBytes() {
return bytes;
}

private enum Unit {
BYTE("b", 1),
KILOBYTE("kb", 1024),
MEGABYTE("mb", KILOBYTE.multiplier * 1024),
GIGABYTE("gb", MEGABYTE.multiplier * 1024);

private final String unitString;
private final long multiplier;

private static final Map<String, Unit> UNIT_MAP = Arrays.stream(Unit.values())
.collect(Collectors.toMap(unit -> unit.unitString, Function.identity()));

Unit(final String unitString, final long multiplier) {
this.unitString = unitString;
this.multiplier = multiplier;
}

static Optional<Unit> fromString(final String unitString) {
return Optional.ofNullable(UNIT_MAP.get(unitString));
}
}

/**
* Parses a byte string to get the byte count.
*
* @param string A valid string representation of the bytes
* @return The parsed {@link ByteCount}
* @throws ByteCountParseException thrown if unable to parse the input string for the expected format
* @throws ByteCountInvalidInputException thrown if the input is parsable but the units or value is invalid
*/
public static ByteCount parse(final String string) {
final Matcher matcher = BYTE_PATTERN.matcher(string);
if(!matcher.find()) {
throw new ByteCountParseException("Unable to parse bytes provided by '" + string + "'");
}

final String valueString = matcher.group("value");
final String unitString = matcher.group("unit");

if(unitString == null) {
throw new ByteCountInvalidInputException("Byte counts must have a unit.");
}

final Unit unit = Unit.fromString(unitString)
.orElseThrow(() -> new ByteCountInvalidInputException("Invalid byte unit: '" + unitString + "'"));

final BigDecimal valueBigDecimal = new BigDecimal(valueString);

final BigDecimal byteCount = scaleToBytes(valueBigDecimal, unit);

if(unit == Unit.BYTE && isFractional(byteCount)) {
throw new ByteCountInvalidInputException("The byte value '" + string + "' is explicitly declared as a fractional byte which is not allowed.");
}

return new ByteCount(byteCount.longValue());
}

private static BigDecimal scaleToBytes(final BigDecimal value, final Unit unit) {
return value.multiply(BigDecimal.valueOf(unit.multiplier));
}

private static boolean isFractional(final BigDecimal value) {
return value.remainder(BigDecimal.ONE).compareTo(BigDecimal.ZERO) != 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.types;

public class ByteCountInvalidInputException extends RuntimeException {
public ByteCountInvalidInputException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.types;

public class ByteCountParseException extends RuntimeException {
public ByteCountParseException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.types;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ByteCountTest {
@ParameterizedTest
@ValueSource(strings = {
".1b",
".1kb",
".1mb",
".1gb",
".12b",
".0b",
"b",
"kb",
"mb",
"gb",
"1 b",
"1 kb",
"1 mb",
"1 gb",
".b",
".kb",
".mb",
".gb",
"a",
"badinput",
"1b ",
"1b trailing",
"1kb ",
"1kb trailing",
"a1b",
"1b!",
"-0b",
"-1b",
"-1kb",
"-1mb",
"-1gb",
})
void parse_throws_exception_for_invalid_format(final String byteString) {
final ByteCountParseException actualException = assertThrows(ByteCountParseException.class, () -> ByteCount.parse(byteString));

assertThat(actualException.getMessage(), notNullValue());
}

@ParameterizedTest
@ValueSource(strings = {
"0",
"1",
"1024",
"1.5",
})
void parse_throws_exception_for_missing_unit(final String byteString) {
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), containsString("Byte counts must have a unit"));
}

@ParameterizedTest
@ValueSource(strings = {
"0byte",
"0bytes",
"1bytes",
"1nothing",
})
void parse_throws_exception_for_invalid_unit(final String byteString) {
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), containsString("Invalid byte unit"));
}

@ParameterizedTest
@CsvSource({
"0b, 0",
"0kb, 0",
"0mb, 0",
"0gb, 0",
"1b, 1",
"8b, 8",
"1024b, 1024",
"2048b, 2048",
"0.25kb, 256",
"0.5kb, 512",
"1kb, 1024",
"2kb, 2048",
"1.25kb, 1280",
"1.5kb, 1536",
"1024kb, 1048576",
"2048kb, 2097152",
"0.5mb, 524288",
"1mb, 1048576",
"2mb, 2097152",
"5mb, 5242880",
"1024mb, 1073741824",
"0.5gb, 536870912",
"1gb, 1073741824",
"1.5gb, 1610612736",
"2gb, 2147483648",
"200gb, 214748364800"
})
void parse_returns_expected_byte_value(final String byteString, final long expectedBytes) {
final ByteCount byteCount = ByteCount.parse(byteString);
assertThat(byteCount, notNullValue());
assertThat(byteCount.getBytes(), equalTo(expectedBytes));
}

@ParameterizedTest
@ValueSource(strings = {
"0.1b",
"0.5b",
"1.1b",
"1.9b",
"20.1b"
})
void parse_throws_exception_for_explicit_fractional_bytes(final String byteString) {
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), containsString("fractional"));
}

@ParameterizedTest
@CsvSource({
"1.1kb, 1126",
"1.1mb, 1153433",
"0.49mb, 513802",
})
void parse_returns_rounded_bytes_for_implicit_fractional_bytes(final String byteString, final long expectedBytes) {
final ByteCount byteCount = ByteCount.parse(byteString);
assertThat(byteCount, notNullValue());
assertThat(byteCount.getBytes(), equalTo(expectedBytes));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.breaker;

/**
* Represents a circuit breaker in Data Prepper.
*
* @since 2.1
*/
public interface CircuitBreaker {
/**
* Checks a circuit breaker. If open, then the circuit breaker has
* been tripped.
*
* @return true if open; false if closed.
* @since 2.1
*/
boolean isOpen();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.breaker;

import org.opensearch.dataprepper.parser.model.CircuitBreakerConfig;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
* The application config for circuit breakers. Used for wiring beans
* related to circuit breakers.
*
* @since 2.1
*/
@Configuration
public class CircuitBreakerAppConfig {
@Bean
public CircuitBreakerManager circuitBreakerService(final List<InnerCircuitBreaker> circuitBreakers) {
return new CircuitBreakerManager(circuitBreakers);
}

@Bean
InnerCircuitBreaker heapCircuitBreaker(final DataPrepperConfiguration dataPrepperConfiguration) {
final CircuitBreakerConfig circuitBreakerConfig = dataPrepperConfiguration.getCircuitBreakerConfig();
if(circuitBreakerConfig != null && circuitBreakerConfig.getHeapConfig() != null) {
return new HeapCircuitBreaker(circuitBreakerConfig.getHeapConfig());
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.breaker;

import java.util.List;
import java.util.Optional;

/**
* Class for managing circuit breakers.
*
* @since 2.1
*/
public class CircuitBreakerManager {
private final CircuitBreaker globalCircuitBreaker;

CircuitBreakerManager(final List<InnerCircuitBreaker> circuitBreakers) {
if(circuitBreakers.isEmpty()) {
globalCircuitBreaker = null;
} else {
globalCircuitBreaker = new GlobalCircuitBreaker(circuitBreakers);
}
}

/**
* Returns a circuit breaker representing all circuit breakers. This is open
* if and only if at least one circuit breaker is open.
*
* @return The global circuit breaker.
*/
public Optional<CircuitBreaker> getGlobalCircuitBreaker() {
return Optional.ofNullable(globalCircuitBreaker);
}

private static class GlobalCircuitBreaker implements CircuitBreaker {
private final List<InnerCircuitBreaker> circuitBreakers;

public GlobalCircuitBreaker(final List<InnerCircuitBreaker> circuitBreakers) {
this.circuitBreakers = circuitBreakers;
}

@Override
public boolean isOpen() {
return circuitBreakers.stream().anyMatch(CircuitBreaker::isOpen);
}
}
}
Loading

0 comments on commit 5d9401e

Please sign in to comment.