Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented a heap-based circuit breaker #2155

Merged
merged 9 commits into from
Jan 23, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.types;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Class for representing a count or size of bytes.
*
* @since 2.1
*/
public class ByteCount {
private static final Pattern BYTE_PATTERN = Pattern.compile("^(?<value>\\d+\\.?\\d*)(?<unit>[a-z]+)?\\z");
private final long bytes;

private ByteCount(final long bytes) {
this.bytes = bytes;
}

/**
* Gets the value as bytes.
*
* @return A long representation of the bytes.
* @since 2.1
*/
public long getBytes() {
return bytes;
}

private enum Unit {
BYTE("b", 1),
KILOBYTE("kb", 1024),
MEGABYTE("mb", KILOBYTE.multiplier * 1024),
GIGABYTE("gb", MEGABYTE.multiplier * 1024);

private final String unitString;
private final long multiplier;

private static final Map<String, Unit> UNIT_MAP = Arrays.stream(Unit.values())
.collect(Collectors.toMap(unit -> unit.unitString, Function.identity()));

Unit(final String unitString, final long multiplier) {
this.unitString = unitString;
this.multiplier = multiplier;
}

static Optional<Unit> fromString(final String unitString) {
return Optional.ofNullable(UNIT_MAP.get(unitString));
}
}

/**
* Parses a byte string to get the byte count.
*
* @param string A valid string representation of the bytes
* @return The parsed {@link ByteCount}
* @throws ByteCountParseException thrown if unable to parse the input string for the expected format
* @throws ByteCountInvalidInputException thrown if the input is parsable but the units or value is invalid
*/
public static ByteCount parse(final String string) {
final Matcher matcher = BYTE_PATTERN.matcher(string);
if(!matcher.find()) {
throw new ByteCountParseException("Unable to parse bytes provided by '" + string + "'");
}

final String valueString = matcher.group("value");
final String unitString = matcher.group("unit");

if(unitString == null) {
throw new ByteCountInvalidInputException("Byte counts must have a unit.");
}

final Unit unit = Unit.fromString(unitString)
.orElseThrow(() -> new ByteCountInvalidInputException("Invalid byte unit: '" + unitString + "'"));

final BigDecimal valueBigDecimal = new BigDecimal(valueString);

final BigDecimal byteCount = scaleToBytes(valueBigDecimal, unit);

if(unit == Unit.BYTE && isFractional(byteCount)) {
throw new ByteCountInvalidInputException("The byte value '" + string + "' is explicitly declared as a fractional byte which is not allowed.");
}

return new ByteCount(byteCount.longValue());
}

private static BigDecimal scaleToBytes(final BigDecimal value, final Unit unit) {
return value.multiply(BigDecimal.valueOf(unit.multiplier));
}

private static boolean isFractional(final BigDecimal value) {
return value.remainder(BigDecimal.ONE).compareTo(BigDecimal.ZERO) != 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.types;

public class ByteCountInvalidInputException extends RuntimeException {
public ByteCountInvalidInputException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.types;

public class ByteCountParseException extends RuntimeException {
public ByteCountParseException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.types;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ByteCountTest {
@ParameterizedTest
@ValueSource(strings = {
".1b",
".1kb",
".1mb",
".1gb",
".12b",
".0b",
"b",
"kb",
"mb",
"gb",
"1 b",
"1 kb",
"1 mb",
"1 gb",
".b",
".kb",
".mb",
".gb",
"a",
"badinput",
"1b ",
"1b trailing",
"1kb ",
"1kb trailing",
"a1b",
"1b!",
"-0b",
"-1b",
"-1kb",
"-1mb",
"-1gb",
})
void parse_throws_exception_for_invalid_format(final String byteString) {
final ByteCountParseException actualException = assertThrows(ByteCountParseException.class, () -> ByteCount.parse(byteString));

assertThat(actualException.getMessage(), notNullValue());
}

@ParameterizedTest
@ValueSource(strings = {
"0",
"1",
"1024",
"1.5",
})
void parse_throws_exception_for_missing_unit(final String byteString) {
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), containsString("Byte counts must have a unit"));
}

@ParameterizedTest
@ValueSource(strings = {
"0byte",
"0bytes",
"1bytes",
"1nothing",
})
void parse_throws_exception_for_invalid_unit(final String byteString) {
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), containsString("Invalid byte unit"));
}

@ParameterizedTest
@CsvSource({
"0b, 0",
"0kb, 0",
"0mb, 0",
"0gb, 0",
"1b, 1",
"8b, 8",
"1024b, 1024",
"2048b, 2048",
"0.25kb, 256",
"0.5kb, 512",
"1kb, 1024",
"2kb, 2048",
"1.25kb, 1280",
"1.5kb, 1536",
"1024kb, 1048576",
"2048kb, 2097152",
"0.5mb, 524288",
"1mb, 1048576",
"2mb, 2097152",
"5mb, 5242880",
"1024mb, 1073741824",
"0.5gb, 536870912",
"1gb, 1073741824",
"1.5gb, 1610612736",
"2gb, 2147483648",
"200gb, 214748364800"
})
void parse_returns_expected_byte_value(final String byteString, final long expectedBytes) {
final ByteCount byteCount = ByteCount.parse(byteString);
assertThat(byteCount, notNullValue());
assertThat(byteCount.getBytes(), equalTo(expectedBytes));
}

@ParameterizedTest
@ValueSource(strings = {
"0.1b",
"0.5b",
"1.1b",
"1.9b",
"20.1b"
})
void parse_throws_exception_for_explicit_fractional_bytes(final String byteString) {
final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), containsString("fractional"));
}

@ParameterizedTest
@CsvSource({
"1.1kb, 1126",
"1.1mb, 1153433",
"0.49mb, 513802",
})
void parse_returns_rounded_bytes_for_implicit_fractional_bytes(final String byteString, final long expectedBytes) {
final ByteCount byteCount = ByteCount.parse(byteString);
assertThat(byteCount, notNullValue());
assertThat(byteCount.getBytes(), equalTo(expectedBytes));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.breaker;

/**
* Represents a circuit breaker in Data Prepper.
*
* @since 2.1
*/
public interface CircuitBreaker {
/**
* Checks a circuit breaker. If open, then the circuit breaker has
* been tripped.
*
* @return true if open; false if closed.
* @since 2.1
*/
boolean isOpen();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.breaker;

import org.opensearch.dataprepper.parser.model.CircuitBreakerConfig;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
* The application config for circuit breakers. Used for wiring beans
* related to circuit breakers.
*
* @since 2.1
*/
@Configuration
public class CircuitBreakerAppConfig {
@Bean
public CircuitBreakerManager circuitBreakerService(final List<InnerCircuitBreaker> circuitBreakers) {
return new CircuitBreakerManager(circuitBreakers);
}

@Bean
InnerCircuitBreaker heapCircuitBreaker(final DataPrepperConfiguration dataPrepperConfiguration) {
final CircuitBreakerConfig circuitBreakerConfig = dataPrepperConfiguration.getCircuitBreakerConfig();
if(circuitBreakerConfig != null && circuitBreakerConfig.getHeapConfig() != null) {
return new HeapCircuitBreaker(circuitBreakerConfig.getHeapConfig());
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.breaker;

import java.util.List;
import java.util.Optional;

/**
* Class for managing circuit breakers.
*
* @since 2.1
*/
public class CircuitBreakerManager {
private final CircuitBreaker globalCircuitBreaker;

CircuitBreakerManager(final List<InnerCircuitBreaker> circuitBreakers) {
if(circuitBreakers.isEmpty()) {
globalCircuitBreaker = null;
} else {
globalCircuitBreaker = new GlobalCircuitBreaker(circuitBreakers);
}
}

/**
* Returns a circuit breaker representing all circuit breakers. This is open
* if and only if at least one circuit breaker is open.
*
* @return The global circuit breaker.
*/
public Optional<CircuitBreaker> getGlobalCircuitBreaker() {
return Optional.ofNullable(globalCircuitBreaker);
}

private static class GlobalCircuitBreaker implements CircuitBreaker {
private final List<InnerCircuitBreaker> circuitBreakers;

public GlobalCircuitBreaker(final List<InnerCircuitBreaker> circuitBreakers) {
this.circuitBreakers = circuitBreakers;
}

@Override
public boolean isOpen() {
return circuitBreakers.stream().anyMatch(CircuitBreaker::isOpen);
}
}
}
Loading