From 35e37c671ccc4b736dbebea5fcfbfd95f6ef590d Mon Sep 17 00:00:00 2001 From: David Venable Date: Mon, 23 Jan 2023 17:11:16 -0600 Subject: [PATCH] Implemented a heap-based circuit breaker (#2155) Implemented a heap-based circuit breaker. This circuit breaker will prevent entry buffers from accepting events after the heap usage reaches a specified value. This checks for heap usage in a background thread and updates the state, which the buffer will then use to determine if the circuit breaker is open or closed. This also signals to the JVM to start a GC when the threshold is reached. Resolves #2150. Signed-off-by: David Venable --- .../dataprepper/model/types/ByteCount.java | 104 ++++++++ .../types/ByteCountInvalidInputException.java | 12 + .../model/types/ByteCountParseException.java | 12 + .../model/types/ByteCountTest.java | 148 +++++++++++ .../dataprepper/breaker/CircuitBreaker.java | 22 ++ .../breaker/CircuitBreakerAppConfig.java | 37 +++ .../breaker/CircuitBreakerManager.java | 49 ++++ .../breaker/HeapCircuitBreaker.java | 110 ++++++++ .../breaker/InnerCircuitBreaker.java | 13 + .../parser/ByteCountDeserializer.java | 40 +++ .../parser/CircuitBreakingBuffer.java | 74 ++++++ .../dataprepper/parser/PipelineParser.java | 23 +- .../config/DataPrepperAppConfiguration.java | 6 +- .../config/PipelineParserConfiguration.java | 7 +- .../parser/model/CircuitBreakerConfig.java | 28 ++ .../model/DataPrepperConfiguration.java | 17 +- .../model/HeapCircuitBreakerConfig.java | 61 +++++ .../breaker/CircuitBreakerAppConfigTest.java | 65 +++++ .../dataprepper/breaker/CircuitBreakerIT.java | 89 +++++++ .../breaker/CircuitBreakerManagerTest.java | 127 +++++++++ .../breaker/HeapCircuitBreakerTest.java | 240 ++++++++++++++++++ .../parser/ByteCountDeserializerTest.java | 53 ++++ .../parser/CircuitBreakingBufferTest.java | 145 +++++++++++ .../parser/PipelineParserTests.java | 104 +++++++- .../PipelineParserConfigurationTest.java | 6 +- .../model/DataPrepperConfigurationTests.java | 16 +- .../model/HeapCircuitBreakerConfigTest.java | 63 +++++ .../parser/model/heap_with_reset.yaml | 2 + .../parser/model/heap_without_reset.yaml | 1 + ...epper_config_with_heap_circuit_breaker.yml | 6 + docs/configuration.md | 22 ++ 31 files changed, 1672 insertions(+), 30 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountInvalidInputException.java create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountParseException.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreaker.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfig.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerManager.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/HeapCircuitBreaker.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/InnerCircuitBreaker.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/ByteCountDeserializer.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/CircuitBreakerConfig.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/HeapCircuitBreakerConfig.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfigTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerIT.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerManagerTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/HeapCircuitBreakerTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/ByteCountDeserializerTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/HeapCircuitBreakerConfigTest.java create mode 100644 data-prepper-core/src/test/resources/org/opensearch/dataprepper/parser/model/heap_with_reset.yaml create mode 100644 data-prepper-core/src/test/resources/org/opensearch/dataprepper/parser/model/heap_without_reset.yaml create mode 100644 data-prepper-core/src/test/resources/valid_data_prepper_config_with_heap_circuit_breaker.yml diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java new file mode 100644 index 0000000000..ae09e953ff --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.types; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Class for representing a count or size of bytes. + * + * @since 2.1 + */ +public class ByteCount { + private static final Pattern BYTE_PATTERN = Pattern.compile("^(?\\d+\\.?\\d*)(?[a-z]+)?\\z"); + private final long bytes; + + private ByteCount(final long bytes) { + this.bytes = bytes; + } + + /** + * Gets the value as bytes. + * + * @return A long representation of the bytes. + * @since 2.1 + */ + public long getBytes() { + return bytes; + } + + private enum Unit { + BYTE("b", 1), + KILOBYTE("kb", 1024), + MEGABYTE("mb", KILOBYTE.multiplier * 1024), + GIGABYTE("gb", MEGABYTE.multiplier * 1024); + + private final String unitString; + private final long multiplier; + + private static final Map UNIT_MAP = Arrays.stream(Unit.values()) + .collect(Collectors.toMap(unit -> unit.unitString, Function.identity())); + + Unit(final String unitString, final long multiplier) { + this.unitString = unitString; + this.multiplier = multiplier; + } + + static Optional fromString(final String unitString) { + return Optional.ofNullable(UNIT_MAP.get(unitString)); + } + } + + /** + * Parses a byte string to get the byte count. + * + * @param string A valid string representation of the bytes + * @return The parsed {@link ByteCount} + * @throws ByteCountParseException thrown if unable to parse the input string for the expected format + * @throws ByteCountInvalidInputException thrown if the input is parsable but the units or value is invalid + */ + public static ByteCount parse(final String string) { + final Matcher matcher = BYTE_PATTERN.matcher(string); + if(!matcher.find()) { + throw new ByteCountParseException("Unable to parse bytes provided by '" + string + "'"); + } + + final String valueString = matcher.group("value"); + final String unitString = matcher.group("unit"); + + if(unitString == null) { + throw new ByteCountInvalidInputException("Byte counts must have a unit."); + } + + final Unit unit = Unit.fromString(unitString) + .orElseThrow(() -> new ByteCountInvalidInputException("Invalid byte unit: '" + unitString + "'")); + + final BigDecimal valueBigDecimal = new BigDecimal(valueString); + + final BigDecimal byteCount = scaleToBytes(valueBigDecimal, unit); + + if(unit == Unit.BYTE && isFractional(byteCount)) { + throw new ByteCountInvalidInputException("The byte value '" + string + "' is explicitly declared as a fractional byte which is not allowed."); + } + + return new ByteCount(byteCount.longValue()); + } + + private static BigDecimal scaleToBytes(final BigDecimal value, final Unit unit) { + return value.multiply(BigDecimal.valueOf(unit.multiplier)); + } + + private static boolean isFractional(final BigDecimal value) { + return value.remainder(BigDecimal.ONE).compareTo(BigDecimal.ZERO) != 0; + } +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountInvalidInputException.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountInvalidInputException.java new file mode 100644 index 0000000000..f87c3d2a38 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountInvalidInputException.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.types; + +public class ByteCountInvalidInputException extends RuntimeException { + public ByteCountInvalidInputException(final String message) { + super(message); + } +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountParseException.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountParseException.java new file mode 100644 index 0000000000..aac4d89ca5 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCountParseException.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.types; + +public class ByteCountParseException extends RuntimeException { + public ByteCountParseException(final String message) { + super(message); + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java new file mode 100644 index 0000000000..eda34eae69 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.types; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ByteCountTest { + @ParameterizedTest + @ValueSource(strings = { + ".1b", + ".1kb", + ".1mb", + ".1gb", + ".12b", + ".0b", + "b", + "kb", + "mb", + "gb", + "1 b", + "1 kb", + "1 mb", + "1 gb", + ".b", + ".kb", + ".mb", + ".gb", + "a", + "badinput", + "1b ", + "1b trailing", + "1kb ", + "1kb trailing", + "a1b", + "1b!", + "-0b", + "-1b", + "-1kb", + "-1mb", + "-1gb", + }) + void parse_throws_exception_for_invalid_format(final String byteString) { + final ByteCountParseException actualException = assertThrows(ByteCountParseException.class, () -> ByteCount.parse(byteString)); + + assertThat(actualException.getMessage(), notNullValue()); + } + + @ParameterizedTest + @ValueSource(strings = { + "0", + "1", + "1024", + "1.5", + }) + void parse_throws_exception_for_missing_unit(final String byteString) { + final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString)); + + assertThat(actualException.getMessage(), notNullValue()); + assertThat(actualException.getMessage(), containsString("Byte counts must have a unit")); + } + + @ParameterizedTest + @ValueSource(strings = { + "0byte", + "0bytes", + "1bytes", + "1nothing", + }) + void parse_throws_exception_for_invalid_unit(final String byteString) { + final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString)); + + assertThat(actualException.getMessage(), notNullValue()); + assertThat(actualException.getMessage(), containsString("Invalid byte unit")); + } + + @ParameterizedTest + @CsvSource({ + "0b, 0", + "0kb, 0", + "0mb, 0", + "0gb, 0", + "1b, 1", + "8b, 8", + "1024b, 1024", + "2048b, 2048", + "0.25kb, 256", + "0.5kb, 512", + "1kb, 1024", + "2kb, 2048", + "1.25kb, 1280", + "1.5kb, 1536", + "1024kb, 1048576", + "2048kb, 2097152", + "0.5mb, 524288", + "1mb, 1048576", + "2mb, 2097152", + "5mb, 5242880", + "1024mb, 1073741824", + "0.5gb, 536870912", + "1gb, 1073741824", + "1.5gb, 1610612736", + "2gb, 2147483648", + "200gb, 214748364800" + }) + void parse_returns_expected_byte_value(final String byteString, final long expectedBytes) { + final ByteCount byteCount = ByteCount.parse(byteString); + assertThat(byteCount, notNullValue()); + assertThat(byteCount.getBytes(), equalTo(expectedBytes)); + } + + @ParameterizedTest + @ValueSource(strings = { + "0.1b", + "0.5b", + "1.1b", + "1.9b", + "20.1b" + }) + void parse_throws_exception_for_explicit_fractional_bytes(final String byteString) { + final ByteCountInvalidInputException actualException = assertThrows(ByteCountInvalidInputException.class, () -> ByteCount.parse(byteString)); + + assertThat(actualException.getMessage(), notNullValue()); + assertThat(actualException.getMessage(), containsString("fractional")); + } + + @ParameterizedTest + @CsvSource({ + "1.1kb, 1126", + "1.1mb, 1153433", + "0.49mb, 513802", + }) + void parse_returns_rounded_bytes_for_implicit_fractional_bytes(final String byteString, final long expectedBytes) { + final ByteCount byteCount = ByteCount.parse(byteString); + assertThat(byteCount, notNullValue()); + assertThat(byteCount.getBytes(), equalTo(expectedBytes)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreaker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreaker.java new file mode 100644 index 0000000000..9774648b59 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreaker.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +/** + * Represents a circuit breaker in Data Prepper. + * + * @since 2.1 + */ +public interface CircuitBreaker { + /** + * Checks a circuit breaker. If open, then the circuit breaker has + * been tripped. + * + * @return true if open; false if closed. + * @since 2.1 + */ + boolean isOpen(); +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfig.java new file mode 100644 index 0000000000..19fb683dcd --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfig.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +import org.opensearch.dataprepper.parser.model.CircuitBreakerConfig; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.List; + +/** + * The application config for circuit breakers. Used for wiring beans + * related to circuit breakers. + * + * @since 2.1 + */ +@Configuration +public class CircuitBreakerAppConfig { + @Bean + public CircuitBreakerManager circuitBreakerService(final List circuitBreakers) { + return new CircuitBreakerManager(circuitBreakers); + } + + @Bean + InnerCircuitBreaker heapCircuitBreaker(final DataPrepperConfiguration dataPrepperConfiguration) { + final CircuitBreakerConfig circuitBreakerConfig = dataPrepperConfiguration.getCircuitBreakerConfig(); + if(circuitBreakerConfig != null && circuitBreakerConfig.getHeapConfig() != null) { + return new HeapCircuitBreaker(circuitBreakerConfig.getHeapConfig()); + } else { + return null; + } + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerManager.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerManager.java new file mode 100644 index 0000000000..23360ab04c --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/CircuitBreakerManager.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +import java.util.List; +import java.util.Optional; + +/** + * Class for managing circuit breakers. + * + * @since 2.1 + */ +public class CircuitBreakerManager { + private final CircuitBreaker globalCircuitBreaker; + + CircuitBreakerManager(final List circuitBreakers) { + if(circuitBreakers.isEmpty()) { + globalCircuitBreaker = null; + } else { + globalCircuitBreaker = new GlobalCircuitBreaker(circuitBreakers); + } + } + + /** + * Returns a circuit breaker representing all circuit breakers. This is open + * if and only if at least one circuit breaker is open. + * + * @return The global circuit breaker. + */ + public Optional getGlobalCircuitBreaker() { + return Optional.ofNullable(globalCircuitBreaker); + } + + private static class GlobalCircuitBreaker implements CircuitBreaker { + private final List circuitBreakers; + + public GlobalCircuitBreaker(final List circuitBreakers) { + this.circuitBreakers = circuitBreakers; + } + + @Override + public boolean isOpen() { + return circuitBreakers.stream().anyMatch(CircuitBreaker::isOpen); + } + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/HeapCircuitBreaker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/HeapCircuitBreaker.java new file mode 100644 index 0000000000..bd16179ebf --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/HeapCircuitBreaker.java @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +import io.micrometer.core.instrument.Metrics; +import org.opensearch.dataprepper.parser.model.HeapCircuitBreakerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An implementation of {@link CircuitBreaker} which checks against heap usage. + * + * @since 2.1 + */ +class HeapCircuitBreaker implements InnerCircuitBreaker, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(HeapCircuitBreaker.class); + public static final int OPEN_METRIC_VALUE = 1; + public static final int CLOSED_METRIC_VALUE = 0; + private final MemoryMXBean memoryMXBean; + private final long usageBytes; + private final Duration resetPeriod; + private final Lock lock; + private final AtomicInteger openGauge; + private final ScheduledExecutorService scheduledExecutorService; + private volatile boolean open; + private Instant resetTime; + + HeapCircuitBreaker(final HeapCircuitBreakerConfig circuitBreakerConfig) { + this(circuitBreakerConfig, ManagementFactory.getMemoryMXBean()); + } + + HeapCircuitBreaker(final HeapCircuitBreakerConfig circuitBreakerConfig, final MemoryMXBean memoryMXBean) { + Objects.requireNonNull(circuitBreakerConfig); + Objects.requireNonNull(circuitBreakerConfig.getUsage()); + + usageBytes = circuitBreakerConfig.getUsage().getBytes(); + if(usageBytes <= 0) + throw new IllegalArgumentException("Bytes usage must be positive."); + + resetPeriod = Objects.requireNonNull(circuitBreakerConfig.getReset()); + this.memoryMXBean = memoryMXBean; + open = false; + lock = new ReentrantLock(); + resetTime = Instant.MIN; + + Metrics.gauge("core.circuitBreakers.heap.memoryUsage", this, cb -> getUsedMemoryBytes()); + openGauge = Metrics.gauge("core.circuitBreakers.heap.open", new AtomicInteger(0)); + + final Duration checkInterval = Objects.requireNonNull(circuitBreakerConfig.getCheckInterval()); + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + scheduledExecutorService + .scheduleAtFixedRate(this::checkMemory, 0L, checkInterval.toMillis(), TimeUnit.MILLISECONDS); + + LOG.info("Heap circuit breaker with usage of {} bytes.", usageBytes); + } + + @Override + public boolean isOpen() { + return open; + } + + private void checkMemory() { + final boolean previousOpen = open; + + if(previousOpen && Instant.now().compareTo(resetTime) < 0) { + return; + } + + final long usedMemoryBytes = getUsedMemoryBytes(); + if(usedMemoryBytes > usageBytes) { + open = true; + if(!previousOpen) { + System.gc(); + resetTime = Instant.now().plus(resetPeriod); + openGauge.set(OPEN_METRIC_VALUE); + LOG.info("Circuit breaker tripped and open. {} used > {} configured", usedMemoryBytes, usageBytes); + } + } else { + open = false; + if(previousOpen) { + openGauge.set(CLOSED_METRIC_VALUE); + LOG.info("Circuit breaker closed. {} used <= {} configured", usedMemoryBytes, usageBytes); + } + } + } + + private long getUsedMemoryBytes() { + return memoryMXBean.getHeapMemoryUsage().getUsed(); + } + + @Override + public void close() throws Exception { + scheduledExecutorService.shutdown(); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/InnerCircuitBreaker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/InnerCircuitBreaker.java new file mode 100644 index 0000000000..50fb2450cb --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/InnerCircuitBreaker.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +/** + * Interface to signal that this {@link CircuitBreaker} to prevent + * access beyond the {@link CircuitBreakerManager}. + */ +interface InnerCircuitBreaker extends CircuitBreaker { +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/ByteCountDeserializer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/ByteCountDeserializer.java new file mode 100644 index 0000000000..ab6e2b32a1 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/ByteCountDeserializer.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.parser; + +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.opensearch.dataprepper.model.types.ByteCount; + +import java.io.IOException; + +/** + * Deserializes {@link ByteCount} values using Jackson. + * + * @since 2.1 + */ +public class ByteCountDeserializer extends StdDeserializer { + public ByteCountDeserializer() { + this(ByteCount.class); + } + + protected ByteCountDeserializer(final Class valueClass) { + super(valueClass); + } + + @Override + public ByteCount deserialize(final JsonParser parser, final DeserializationContext context) throws IOException, JacksonException { + final String byteString = parser.getValueAsString(); + + try { + return ByteCount.parse(byteString); + } catch (final Exception ex) { + throw new IllegalArgumentException(ex); + } + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java new file mode 100644 index 0000000000..c546bce5f8 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.parser; + +import org.opensearch.dataprepper.breaker.CircuitBreaker; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static java.util.Objects.requireNonNull; + +/** + * Decorator for {@link Buffer} which checks a {@link CircuitBreaker} + * before writing records. + * + * @param The type of record. + * @since 2.1 + */ +class CircuitBreakingBuffer> implements Buffer { + private final Buffer buffer; + private final CircuitBreaker circuitBreaker; + + /** + * Constructor + * + * @param buffer The inner buffer which is being decorated + * @param circuitBreaker The circuit breaker to check + */ + public CircuitBreakingBuffer(final Buffer buffer, final CircuitBreaker circuitBreaker) { + this.buffer = requireNonNull(buffer); + this.circuitBreaker = requireNonNull(circuitBreaker); + } + + @Override + public void write(final T record, final int timeoutInMillis) throws TimeoutException { + checkBreaker(); + + buffer.write(record, timeoutInMillis); + } + + @Override + public void writeAll(final Collection records, final int timeoutInMillis) throws Exception { + checkBreaker(); + + buffer.writeAll(records, timeoutInMillis); + } + + private void checkBreaker() throws TimeoutException { + if(circuitBreaker.isOpen()) + throw new TimeoutException("Circuit breaker is open. Unable to write to buffer."); + } + + @Override + public Map.Entry, CheckpointState> read(final int timeoutInMillis) { + return buffer.read(timeoutInMillis); + } + + @Override + public void checkpoint(final CheckpointState checkpointState) { + buffer.checkpoint(checkpointState); + } + + @Override + public boolean isEmpty() { + return buffer.isEmpty(); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java index aa792f8a77..dd941263f4 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.parser; +import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; @@ -61,6 +62,7 @@ public class PipelineParser { private final String pipelineConfigurationFileLocation; private final RouterFactory routerFactory; private final DataPrepperConfiguration dataPrepperConfiguration; + private final CircuitBreakerManager circuitBreakerManager; private final Map sourceConnectorMap = new HashMap<>(); //TODO Remove this and rely only on pipelineMap private final PluginFactory pluginFactory; private final PeerForwarderProvider peerForwarderProvider; @@ -69,12 +71,14 @@ public PipelineParser(final String pipelineConfigurationFileLocation, final PluginFactory pluginFactory, final PeerForwarderProvider peerForwarderProvider, final RouterFactory routerFactory, - final DataPrepperConfiguration dataPrepperConfiguration) { + final DataPrepperConfiguration dataPrepperConfiguration, + final CircuitBreakerManager circuitBreakerManager) { this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation; this.pluginFactory = Objects.requireNonNull(pluginFactory); this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider); this.routerFactory = routerFactory; this.dataPrepperConfiguration = Objects.requireNonNull(dataPrepperConfiguration); + this.circuitBreakerManager = circuitBreakerManager; } /** @@ -158,7 +162,7 @@ private void buildPipelineFromConfiguration( pluginFactory.loadPlugin(Source.class, sourceSetting)); LOG.info("Building buffer for the pipeline [{}]", pipelineName); - final Buffer buffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting()); + final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting()); LOG.info("Building processors for the pipeline [{}]", pipelineName); final int processorThreads = pipelineConfiguration.getWorkers(); @@ -187,11 +191,22 @@ private void buildPipelineFromConfiguration( final List secondaryBuffers = getSecondaryBuffers(); LOG.info("Constructing MultiBufferDecorator with [{}] secondary buffers for pipeline [{}]", secondaryBuffers.size(), pipelineName); - final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(buffer, secondaryBuffers); + final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(pipelineDefinedBuffer, secondaryBuffers); + + + final Buffer buffer; + if(source instanceof PipelineConnector) { + buffer = multiBufferDecorator; + } else { + buffer = circuitBreakerManager.getGlobalCircuitBreaker() + .map(circuitBreaker -> new CircuitBreakingBuffer<>(multiBufferDecorator, circuitBreaker)) + .map(b -> (Buffer)b) + .orElseGet(() -> multiBufferDecorator); + } final Router router = routerFactory.createRouter(pipelineConfiguration.getRoutes()); - final Pipeline pipeline = new Pipeline(pipelineName, source, multiBufferDecorator, decoratedProcessorSets, sinks, router, processorThreads, readBatchDelay, + final Pipeline pipeline = new Pipeline(pipelineName, source, buffer, decoratedProcessorSets, sinks, router, processorThreads, readBatchDelay, dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(), getPeerForwarderDrainTimeout(dataPrepperConfiguration)); pipelineMap.put(pipelineName, pipeline); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/DataPrepperAppConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/DataPrepperAppConfiguration.java index a832f50e59..19a5229e6e 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/DataPrepperAppConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/DataPrepperAppConfiguration.java @@ -8,6 +8,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.parser.ByteCountDeserializer; import org.opensearch.dataprepper.parser.DataPrepperDurationDeserializer; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.springframework.context.annotation.Bean; @@ -29,7 +31,9 @@ public DataPrepperConfiguration dataPrepperConfiguration( if (dataPrepperConfigFileLocation != null) { final File configurationFile = new File(dataPrepperConfigFileLocation); try { - final SimpleModule simpleModule = new SimpleModule().addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); + final SimpleModule simpleModule = new SimpleModule() + .addDeserializer(Duration.class, new DataPrepperDurationDeserializer()) + .addDeserializer(ByteCount.class, new ByteCountDeserializer()); objectMapper.registerModule(simpleModule); return objectMapper.readValue(configurationFile, DataPrepperConfiguration.class); } catch (final IOException e) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java index 239494590b..b7f1e189e5 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.parser.config; +import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.parser.PipelineParser; @@ -22,12 +23,14 @@ public PipelineParser pipelineParser( final PluginFactory pluginFactory, final PeerForwarderProvider peerForwarderProvider, final RouterFactory routerFactory, - final DataPrepperConfiguration dataPrepperConfiguration + final DataPrepperConfiguration dataPrepperConfiguration, + final CircuitBreakerManager circuitBreakerManager ) { return new PipelineParser(fileStructurePathProvider.getPipelineConfigFileLocation(), pluginFactory, peerForwarderProvider, routerFactory, - dataPrepperConfiguration); + dataPrepperConfiguration, + circuitBreakerManager); } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/CircuitBreakerConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/CircuitBreakerConfig.java new file mode 100644 index 0000000000..99c9c45406 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/CircuitBreakerConfig.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.parser.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * The Data Prepper configuration for circuit breakers. + * + * @since 2.1 + */ +public class CircuitBreakerConfig { + @JsonProperty("heap") + private HeapCircuitBreakerConfig heapConfig; + + /** + * Gets the configuration for the heap. + * + * @return The heap circuit breaker configuration + * @since 2.1 + */ + public HeapCircuitBreakerConfig getHeapConfig() { + return heapConfig; + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java index c65764c490..dcb73fd9fa 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java @@ -5,12 +5,10 @@ package org.opensearch.dataprepper.parser.model; -import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; import java.time.Duration; import java.util.Collections; @@ -33,13 +31,12 @@ public class DataPrepperConfiguration { private String privateKeyPassword = ""; private List metricRegistries = DEFAULT_METRIC_REGISTRY_TYPE; private PluginModel authentication; + private CircuitBreakerConfig circuitBreakerConfig; private Map metricTags = new HashMap<>(); private PeerForwarderConfiguration peerForwarderConfiguration; private Duration processorShutdownTimeout; private Duration sinkShutdownTimeout; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()); - public static final DataPrepperConfiguration DEFAULT_CONFIG = new DataPrepperConfiguration(); public DataPrepperConfiguration() {} @@ -57,9 +54,11 @@ public DataPrepperConfiguration( @JsonProperty("metricTags") final Map metricTags, @JsonProperty("peer_forwarder") final PeerForwarderConfiguration peerForwarderConfiguration, @JsonProperty("processorShutdownTimeout") final Duration processorShutdownTimeout, - @JsonProperty("sinkShutdownTimeout") final Duration sinkShutdownTimeout + @JsonProperty("sinkShutdownTimeout") final Duration sinkShutdownTimeout, + @JsonProperty("circuit_breakers") final CircuitBreakerConfig circuitBreakerConfig ) { this.authentication = authentication; + this.circuitBreakerConfig = circuitBreakerConfig; setSsl(ssl); this.keyStoreFilePath = keyStoreFilePath != null ? keyStoreFilePath : ""; this.keyStorePassword = keyStorePassword != null ? keyStorePassword : ""; @@ -152,4 +151,8 @@ public Duration getProcessorShutdownTimeout() { public Duration getSinkShutdownTimeout() { return sinkShutdownTimeout; } + + public CircuitBreakerConfig getCircuitBreakerConfig() { + return circuitBreakerConfig; + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/HeapCircuitBreakerConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/HeapCircuitBreakerConfig.java new file mode 100644 index 0000000000..1248177a91 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/HeapCircuitBreakerConfig.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.parser.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.types.ByteCount; + +import java.time.Duration; + +/** + * Configuration for the heap circuit breaker. + */ +public class HeapCircuitBreakerConfig { + public static final Duration DEFAULT_RESET = Duration.ofSeconds(1); + private static final Duration DEFAULT_CHECK_INTERVAL = Duration.ofMillis(500); + @NotNull + @JsonProperty("usage") + private ByteCount usage; + + @JsonProperty("reset") + private Duration reset = DEFAULT_RESET; + + @JsonProperty("check_interval") + private Duration checkInterval = DEFAULT_CHECK_INTERVAL; + + /** + * Gets the usage as a {@link ByteCount}. If the current Java heap usage + * exceeds this value then the circuit breaker will be open. + * + * @return Usage threshold + * @since 2.1 + */ + public ByteCount getUsage() { + return usage; + } + + /** + * Gets the reset timeout. After tripping the circuit breaker, no new + * checks until after this time has passed. + * + * @return The duration + * @since 2.1 + */ + public Duration getReset() { + return reset; + } + + /** + * Gets the check interval. This is the time between checks of the heap size. + * + * @return The check interval as a duration + * @since 2.1 + */ + public Duration getCheckInterval() { + return checkInterval; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfigTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfigTest.java new file mode 100644 index 0000000000..a4f952d189 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerAppConfigTest.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.parser.model.CircuitBreakerConfig; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; +import org.opensearch.dataprepper.parser.model.HeapCircuitBreakerConfig; + +import java.time.Duration; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CircuitBreakerAppConfigTest { + @Mock + private DataPrepperConfiguration dataPrepperConfiguration; + + private CircuitBreakerAppConfig createObjectUnderTest() { + return new CircuitBreakerAppConfig(); + } + + @Test + void heapCircuitBreaker_returns_null_if_CircuitBreakerConfig_is_null() { + assertThat(createObjectUnderTest().heapCircuitBreaker(dataPrepperConfiguration), + nullValue()); + } + + @Test + void heapCircuitBreaker_returns_null_if_HeapCircuitBreakerConfig_is_null() { + final CircuitBreakerConfig circuitBreakerConfig = mock(CircuitBreakerConfig.class); + when(dataPrepperConfiguration.getCircuitBreakerConfig()) + .thenReturn(circuitBreakerConfig); + + assertThat(createObjectUnderTest().heapCircuitBreaker(dataPrepperConfiguration), + nullValue()); + } + + @Test + void heapCircuitBreaker_returns_HeapCircuitBreaker_if_HeapCircuitBreakerConfig_is_present() { + final ByteCount byteCount = mock(ByteCount.class); + when(byteCount.getBytes()).thenReturn(1L); + final HeapCircuitBreakerConfig heapCircuitBreakerConfig = mock(HeapCircuitBreakerConfig.class); + when(heapCircuitBreakerConfig.getUsage()).thenReturn(byteCount); + when(heapCircuitBreakerConfig.getCheckInterval()).thenReturn(Duration.ofSeconds(1)); + final CircuitBreakerConfig circuitBreakerConfig = mock(CircuitBreakerConfig.class); + when(circuitBreakerConfig.getHeapConfig()).thenReturn(heapCircuitBreakerConfig); + when(dataPrepperConfiguration.getCircuitBreakerConfig()) + .thenReturn(circuitBreakerConfig); + + assertThat(createObjectUnderTest().heapCircuitBreaker(dataPrepperConfiguration), + instanceOf(HeapCircuitBreaker.class)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerIT.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerIT.java new file mode 100644 index 0000000000..ff14013db3 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerIT.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.parser.model.CircuitBreakerConfig; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; +import org.opensearch.dataprepper.parser.model.HeapCircuitBreakerConfig; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +import java.time.Duration; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class CircuitBreakerIT { + private static final Duration SMALL_SLEEP_INTERVAL = Duration.ofMillis(50); + @Mock + private DataPrepperConfiguration dataPrepperConfiguration; + @Mock + private CircuitBreakerConfig circuitBreakerConfig; + @BeforeEach + void setUp() { + when(dataPrepperConfiguration.getCircuitBreakerConfig()).thenReturn(circuitBreakerConfig); + } + + private CircuitBreakerManager createObjectUnderTest() { + final AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(); + applicationContext.registerBean("dataPrepperConfig", DataPrepperConfiguration.class, () -> dataPrepperConfiguration); + applicationContext.scan(CircuitBreakerAppConfig.class.getPackage().getName()); + applicationContext.register(CircuitBreakerAppConfig.class); + applicationContext.refresh(); + + return applicationContext.getBean(CircuitBreakerManager.class); + } + + @Test + void globalCircuitBreaker_returns_empty_when_no_circuit_breakers_active() { + final Optional globalCircuitBreaker = createObjectUnderTest().getGlobalCircuitBreaker(); + assertThat(globalCircuitBreaker, notNullValue()); + assertThat(globalCircuitBreaker.isPresent(), equalTo(false)); + } + + @Nested + class HeapCircuitBreaker { + @Mock + private HeapCircuitBreakerConfig heapCircuitBreakerConfig; + + @BeforeEach + void setUp() { + when(circuitBreakerConfig.getHeapConfig()).thenReturn(heapCircuitBreakerConfig); + when(heapCircuitBreakerConfig.getCheckInterval()).thenReturn(SMALL_SLEEP_INTERVAL); + } + + @ParameterizedTest + @CsvSource({ + "1000000gb, false", + "8b, true" + }) + void globalCircuitBreaker_returns_expected_value_based_on_heap(final String byteCount, final boolean expectedIsOpen) throws InterruptedException { + when(heapCircuitBreakerConfig.getUsage()).thenReturn(ByteCount.parse(byteCount)); + final Optional globalCircuitBreaker = createObjectUnderTest().getGlobalCircuitBreaker(); + + assertThat(globalCircuitBreaker, notNullValue()); + assertThat(globalCircuitBreaker.isPresent(), equalTo(true)); + final CircuitBreaker circuitBreaker = globalCircuitBreaker.get(); + assertThat(circuitBreaker, notNullValue()); + + Thread.sleep(SMALL_SLEEP_INTERVAL.toMillis()); + + assertThat(circuitBreaker.isOpen(), equalTo(expectedIsOpen)); + } + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerManagerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerManagerTest.java new file mode 100644 index 0000000000..a57a83caa1 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/CircuitBreakerManagerTest.java @@ -0,0 +1,127 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CircuitBreakerManagerTest { + private List innerCircuitBreakers; + + @BeforeEach + void setUp() { + innerCircuitBreakers = Collections.emptyList(); + } + + private CircuitBreakerManager createObjectUnderTest() { + return new CircuitBreakerManager(innerCircuitBreakers); + } + + @Test + void constructor_throws_if_null_InnerCircuitBreakers() { + innerCircuitBreakers = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void getGlobalCircuitBreaker_returns_empty_if_list_is_empty() { + final Optional optionalCircuitBreaker = createObjectUnderTest().getGlobalCircuitBreaker(); + + assertThat(optionalCircuitBreaker, notNullValue()); + assertThat(optionalCircuitBreaker.isPresent(), equalTo(false)); + } + + @Nested + class SingleCircuitBreaker { + @Mock + private InnerCircuitBreaker circuitBreaker; + + @BeforeEach + void setUp() { + innerCircuitBreakers = Collections.singletonList(circuitBreaker); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void getGlobalCircuitBreaker_returns_CircuitBreaker_where_isOpen_is_equal_to_single_isOpen(final boolean innerIsOpen) { + final Optional optionalCircuitBreaker = createObjectUnderTest().getGlobalCircuitBreaker(); + + assertThat(optionalCircuitBreaker, notNullValue()); + assertThat(optionalCircuitBreaker.isPresent(), equalTo(true)); + assertThat(optionalCircuitBreaker.get(), notNullValue()); + + when(circuitBreaker.isOpen()).thenReturn(innerIsOpen); + final CircuitBreaker actualBreaker = optionalCircuitBreaker.get(); + assertThat(actualBreaker.isOpen(), equalTo(innerIsOpen)); + } + } + + @Nested + class MultipleCircuitBreakers { + @BeforeEach + void setUp() { + innerCircuitBreakers = IntStream.range(0, 3) + .mapToObj(i -> mock(InnerCircuitBreaker.class)) + .collect(Collectors.toList()); + } + + @Test + void getGlobalCircuitBreaker_returns_CircuitBreaker_where_isOpen_is_equal_to_false_if_all_are_false() { + for (InnerCircuitBreaker innerCircuitBreaker : innerCircuitBreakers) { + when(innerCircuitBreaker.isOpen()).thenReturn(false); + } + + final Optional optionalCircuitBreaker = createObjectUnderTest().getGlobalCircuitBreaker(); + + assertThat(optionalCircuitBreaker, notNullValue()); + assertThat(optionalCircuitBreaker.isPresent(), equalTo(true)); + assertThat(optionalCircuitBreaker.get(), notNullValue()); + + final CircuitBreaker actualBreaker = optionalCircuitBreaker.get(); + assertThat(actualBreaker.isOpen(), equalTo(false)); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2}) + void getGlobalCircuitBreaker_returns_CircuitBreaker_where_isOpen_is_equal_to_true_if_any_are_true(final int openIndex) { + for (int i = 0; i < 3; i++) { + final InnerCircuitBreaker innerCircuitBreaker = innerCircuitBreakers.get(i); + if(i == openIndex) { + when(innerCircuitBreaker.isOpen()).thenReturn(true); + } + } + + final Optional optionalCircuitBreaker = createObjectUnderTest().getGlobalCircuitBreaker(); + + assertThat(optionalCircuitBreaker, notNullValue()); + assertThat(optionalCircuitBreaker.isPresent(), equalTo(true)); + assertThat(optionalCircuitBreaker.get(), notNullValue()); + + final CircuitBreaker actualBreaker = optionalCircuitBreaker.get(); + assertThat(actualBreaker.isOpen(), equalTo(true)); + } + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/HeapCircuitBreakerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/HeapCircuitBreakerTest.java new file mode 100644 index 0000000000..64dbbd69ff --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/breaker/HeapCircuitBreakerTest.java @@ -0,0 +1,240 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.breaker; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.parser.model.HeapCircuitBreakerConfig; + +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.time.Duration; +import java.util.Random; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class HeapCircuitBreakerTest { + private static final Duration VERY_LARGE_RESET_PERIOD = Duration.ofDays(1); + private static final Duration SMALL_RESET_PERIOD = Duration.ofMillis(50); + private static final Duration SMALL_CHECK_INTERVAL = SMALL_RESET_PERIOD; + private static final long SLEEP_MILLIS = SMALL_CHECK_INTERVAL.plusMillis(50).toMillis(); + @Mock + private HeapCircuitBreakerConfig config; + + @Mock + private MemoryMXBean memoryMXBean; + + private Random random; + private long byteUsage; + private MemoryUsage memoryUsage; + + private HeapCircuitBreaker objectUnderTest; + + @BeforeEach + void setUp() { + random = new Random(); + } + + @AfterEach + void tearDown() throws Exception { + if(objectUnderTest != null) { + objectUnderTest.close(); + objectUnderTest = null; + } + } + + private HeapCircuitBreaker createObjectUnderTest() { + return new HeapCircuitBreaker(config, memoryMXBean); + } + + @Test + void constructor_throws_if_config_is_null() { + config = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_usage_is_null() { + lenient().when(config.getUsage()).thenReturn(null); + lenient().when(config.getReset()).thenReturn(Duration.ofSeconds(1)); + lenient().when(config.getCheckInterval()).thenReturn(Duration.ofSeconds(1)); + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_reset_is_null() { + lenient().when(config.getCheckInterval()).thenReturn(Duration.ofSeconds(1)); + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_checkInterval_is_null() { + lenient().when(config.getReset()).thenReturn(Duration.ofSeconds(1)); + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @ParameterizedTest + @ValueSource(longs = {0, -1}) + void constructor_throws_if_usage_is_non_positive(final long bytes) { + final ByteCount byteCount = mock(ByteCount.class); + when(byteCount.getBytes()).thenReturn(bytes); + when(config.getUsage()).thenReturn(byteCount); + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); + } + + @Nested + class ValidConfig { + @BeforeEach + void setUp() { + byteUsage = random.nextInt(1024) + 1024 * 1024; + final ByteCount usageByteCount = mock(ByteCount.class); + when(usageByteCount.getBytes()).thenReturn(byteUsage); + when(config.getUsage()).thenReturn(usageByteCount); + when(config.getCheckInterval()).thenReturn(SMALL_CHECK_INTERVAL); + + memoryUsage = mock(MemoryUsage.class); + when(memoryMXBean.getHeapMemoryUsage()).thenReturn(memoryUsage); + } + + @Test + void object_checks_memory_even_when_not_calling_isOpen() throws InterruptedException { + objectUnderTest = createObjectUnderTest(); + + Thread.sleep(SLEEP_MILLIS); + + verify(memoryMXBean, atLeastOnce()).getHeapMemoryUsage(); + } + + @ParameterizedTest + @ValueSource(longs = {1, 2, 1024}) + void isOpen_returns_false_if_used_bytes_less_than_configured_bytes(final long bytesDifference) throws InterruptedException { + when(memoryUsage.getUsed()).thenReturn(byteUsage - bytesDifference); + + objectUnderTest = createObjectUnderTest(); + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(false)); + } + + @Test + void isOpen_returns_false_if_used_bytes_equal_to_configured_bytes() throws InterruptedException { + when(memoryUsage.getUsed()).thenReturn(byteUsage); + + objectUnderTest = createObjectUnderTest(); + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(false)); + } + + @ParameterizedTest + @ValueSource(longs = {1, 2, 1024, 1024 * 1024}) + void isOpen_returns_true_if_used_bytes_greater_than_configured_bytes(final long bytesGreater) throws InterruptedException { + when(memoryUsage.getUsed()).thenReturn(byteUsage + bytesGreater); + + objectUnderTest = createObjectUnderTest(); + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(true)); + } + + @Test + void will_not_check_within_reset_period() throws InterruptedException { + when(config.getReset()).thenReturn(VERY_LARGE_RESET_PERIOD); + + when(memoryUsage.getUsed()).thenReturn(byteUsage + 1); + + objectUnderTest = createObjectUnderTest(); + + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(true)); + + reset(memoryUsage); + lenient().when(memoryUsage.getUsed()).thenReturn(byteUsage - 1); + for(int i = 0; i < 3; i++) { + Thread.sleep(SLEEP_MILLIS); + } + + assertThat(objectUnderTest.isOpen(), equalTo(true)); + } + + @Test + void will_check_after_reset_period() throws InterruptedException { + when(config.getReset()).thenReturn(SMALL_RESET_PERIOD); + + when(memoryUsage.getUsed()).thenReturn(byteUsage + 1); + + objectUnderTest = createObjectUnderTest(); + + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(true)); + + reset(memoryUsage); + when(memoryUsage.getUsed()).thenReturn(byteUsage - 1); + for(int i = 0; i < 3; i++) { + Thread.sleep(SLEEP_MILLIS); + } + + assertThat(objectUnderTest.isOpen(), equalTo(false)); + } + + @Test + void isOpen_transition_from_false_to_true() throws InterruptedException { + when(config.getReset()).thenReturn(SMALL_RESET_PERIOD); + when(memoryUsage.getUsed()).thenReturn(byteUsage - 1); + + objectUnderTest = createObjectUnderTest(); + + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(false)); + + reset(memoryUsage); + when(memoryUsage.getUsed()).thenReturn(byteUsage + 1); + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(true)); + } + + @Test + void isOpen_transition_from_true_to_false() throws InterruptedException { + when(config.getReset()).thenReturn(SMALL_RESET_PERIOD); + + when(memoryUsage.getUsed()).thenReturn(byteUsage + 1); + objectUnderTest = createObjectUnderTest(); + + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(true)); + + reset(memoryUsage); + when(memoryUsage.getUsed()).thenReturn(byteUsage - 1); + Thread.sleep(SLEEP_MILLIS); + assertThat(objectUnderTest.isOpen(), equalTo(false)); + } + + @Test + void isOpen_returns_false_if_MemoryMXBean_throws_on_first_call() throws InterruptedException { + reset(memoryMXBean); + when(memoryMXBean.getHeapMemoryUsage()).thenThrow(RuntimeException.class); + + objectUnderTest = createObjectUnderTest(); + Thread.sleep(SLEEP_MILLIS); + + assertThat(objectUnderTest.isOpen(), equalTo(false)); + } + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/ByteCountDeserializerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/ByteCountDeserializerTest.java new file mode 100644 index 0000000000..9845b42ab1 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/ByteCountDeserializerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.parser; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.types.ByteCount; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ByteCountDeserializerTest { + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + + final SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(ByteCount.class, new ByteCountDeserializer()); + objectMapper.registerModule(simpleModule); + } + + @ParameterizedTest + @ValueSource(strings = {"1", "1b 2b", "1vb", "bad"}) + void convert_with_invalid_values_throws(final String invalidByteString) { + assertThrows(IllegalArgumentException.class, () -> objectMapper.convertValue(invalidByteString, ByteCount.class)); + } + + @ParameterizedTest + @CsvSource({ + "0b, 0", + "1b, 1", + "2b, 2", + "2kb, 2048", + "1mb, 1048576" + }) + void convert_with_valid_values_returns_expected_bytes(final String byteString, final long expectedValue) { + final ByteCount byteCount = objectMapper.convertValue(byteString, ByteCount.class); + + assertThat(byteCount, notNullValue()); + assertThat(byteCount.getBytes(), equalTo(expectedValue)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java new file mode 100644 index 0000000000..0b9df89428 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.parser; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.breaker.CircuitBreaker; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CircuitBreakingBufferTest { + @Mock + private Buffer> buffer; + + @Mock + private CircuitBreaker circuitBreaker; + + private int timeoutMillis; + + @BeforeEach + void setUp() { + timeoutMillis = 1000; + } + + private CircuitBreakingBuffer> createObjectUnderTest() { + return new CircuitBreakingBuffer<>(buffer, circuitBreaker); + } + + @Test + void constructor_should_throw_with_null_buffer() { + buffer = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_should_throw_with_null_circuitBreaker() { + circuitBreaker = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Nested + class NoCircuitBreakerChecks { + @AfterEach + void verifyNoCircuitBreakerChecks() { + verifyNoInteractions(circuitBreaker); + } + + @Test + void read_should_return_inner_read() { + final Map.Entry>, CheckpointState> readResponse = mock(Map.Entry.class); + when(buffer.read(timeoutMillis)).thenReturn(readResponse); + createObjectUnderTest().read(timeoutMillis); + } + + @Test + void checkpoint_should_call_inner_checkpoint() { + final CheckpointState checkpointState = mock(CheckpointState.class); + createObjectUnderTest().checkpoint(checkpointState); + + verify(buffer).checkpoint(checkpointState); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void isEmpty_should_return_inner_isEmpty(final boolean empty) { + when(buffer.isEmpty()).thenReturn(empty); + + assertThat(createObjectUnderTest().isEmpty(), equalTo(empty)); + } + } + + @Nested + class WithRecords { + @Mock + private Record record; + + @Test + void write_should_check_CircuitBreaker_and_call_inner_write_if_not_open() throws TimeoutException { + when(circuitBreaker.isOpen()).thenReturn(false); + + createObjectUnderTest().write(record, timeoutMillis); + + verify(buffer).write(record, timeoutMillis); + verify(circuitBreaker).isOpen(); + } + + @Test + void write_should_check_CircuitBreaker_and_throw_if_open() { + when(circuitBreaker.isOpen()).thenReturn(true); + + assertThrows(TimeoutException.class, () -> createObjectUnderTest().write(record, timeoutMillis)); + + verifyNoInteractions(buffer); + verify(circuitBreaker).isOpen(); + } + + @Test + void writeAll_should_check_CircuitBreaker_and_call_inner_write_if_not_open() throws Exception { + when(circuitBreaker.isOpen()).thenReturn(false); + + final List> records = Collections.singletonList(record); + createObjectUnderTest().writeAll(records, timeoutMillis); + + verify(buffer).writeAll(records, timeoutMillis); + verify(circuitBreaker).isOpen(); + } + + @Test + void writeAll_should_check_CircuitBreaker_and_throw_if_open() { + when(circuitBreaker.isOpen()).thenReturn(true); + + final List> records = Collections.singletonList(record); + assertThrows(TimeoutException.class, () -> createObjectUnderTest().writeAll(records, timeoutMillis)); + + verifyNoInteractions(buffer); + verify(circuitBreaker).isOpen(); + } + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineParserTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineParserTests.java index 1ad8f85907..a8fb73c7be 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineParserTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineParserTests.java @@ -5,47 +5,53 @@ package org.opensearch.dataprepper.parser; +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.TestDataProvider; +import org.opensearch.dataprepper.breaker.CircuitBreaker; +import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; -import org.opensearch.dataprepper.TestDataProvider; import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.peerforwarder.PeerForwarderReceiveBuffer; import org.opensearch.dataprepper.pipeline.Pipeline; import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.plugin.DefaultPluginFactory; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.Mock; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.time.Duration; -import java.util.List; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; -import java.util.stream.Stream; import java.util.UUID; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; @ExtendWith(MockitoExtension.class) class PipelineParserTests { @@ -60,6 +66,8 @@ class PipelineParserTests { private PeerForwarderConfiguration peerForwarderConfiguration; @Mock private PeerForwarderReceiveBuffer buffer; + @Mock + private CircuitBreakerManager circuitBreakerManager; private PluginFactory pluginFactory; @@ -82,7 +90,7 @@ void tearDown() { } private PipelineParser createObjectUnderTest(final String pipelineConfigurationFileLocation) { - return new PipelineParser(pipelineConfigurationFileLocation, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration); + return new PipelineParser(pipelineConfigurationFileLocation, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager); } @Test @@ -256,6 +264,74 @@ void getPeerForwarderDrainDuration_IsSet() { verify(peerForwarderConfiguration).getDrainTimeout(); } + @Test + void parseConfiguration_uses_CircuitBreaking_buffer_when_circuit_breakers_applied() { + final CircuitBreaker circuitBreaker = mock(CircuitBreaker.class); + when(circuitBreakerManager.getGlobalCircuitBreaker()) + .thenReturn(Optional.of(circuitBreaker)); + final PipelineParser objectUnderTest = + createObjectUnderTest(TestDataProvider.VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE); + + final Map pipelineMap = objectUnderTest.parseConfiguration(); + + assertThat(pipelineMap.size(), equalTo(1)); + assertThat(pipelineMap, hasKey("test-pipeline-1")); + final Pipeline pipeline = pipelineMap.get("test-pipeline-1"); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getBuffer(), instanceOf(CircuitBreakingBuffer.class)); + + verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); + verify(dataPrepperConfiguration).getSinkShutdownTimeout(); + verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + } + + @Test + void parseConfiguration_uses_unwrapped_buffer_when_no_circuit_breakers_are_applied() { + when(circuitBreakerManager.getGlobalCircuitBreaker()) + .thenReturn(Optional.empty()); + final PipelineParser objectUnderTest = + createObjectUnderTest(TestDataProvider.VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE); + + final Map pipelineMap = objectUnderTest.parseConfiguration(); + + assertThat(pipelineMap.size(), equalTo(1)); + assertThat(pipelineMap, hasKey("test-pipeline-1")); + final Pipeline pipeline = pipelineMap.get("test-pipeline-1"); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getBuffer(), notNullValue()); + assertThat(pipeline.getBuffer(), CoreMatchers.not(instanceOf(CircuitBreakingBuffer.class))); + + verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); + verify(dataPrepperConfiguration).getSinkShutdownTimeout(); + verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + } + + @Test + void parseConfiguration_uses_unwrapped_buffer_for_pipeline_connectors() { + final CircuitBreaker circuitBreaker = mock(CircuitBreaker.class); + when(circuitBreakerManager.getGlobalCircuitBreaker()) + .thenReturn(Optional.of(circuitBreaker)); + final PipelineParser objectUnderTest = + createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); + + final Map pipelineMap = objectUnderTest.parseConfiguration(); + + assertThat(pipelineMap, hasKey("test-pipeline-1")); + final Pipeline entryPipeline = pipelineMap.get("test-pipeline-1"); + assertThat(entryPipeline, notNullValue()); + assertThat(entryPipeline.getBuffer(), instanceOf(CircuitBreakingBuffer.class)); + + assertThat(pipelineMap, hasKey("test-pipeline-2")); + final Pipeline connectedPipeline = pipelineMap.get("test-pipeline-2"); + assertThat(connectedPipeline, notNullValue()); + assertThat(connectedPipeline.getBuffer(), notNullValue()); + assertThat(connectedPipeline.getBuffer(), CoreMatchers.not(instanceOf(CircuitBreakingBuffer.class))); + + verify(dataPrepperConfiguration, times(3)).getProcessorShutdownTimeout(); + verify(dataPrepperConfiguration, times(3)).getSinkShutdownTimeout(); + verify(dataPrepperConfiguration, times(3)).getPeerForwarderConfiguration(); + } + private void mockDataPrepperConfigurationAccesses() { when(dataPrepperConfiguration.getProcessorShutdownTimeout()).thenReturn(Duration.ofSeconds(Math.abs(new Random().nextInt()))); when(dataPrepperConfiguration.getSinkShutdownTimeout()).thenReturn(Duration.ofSeconds(Math.abs(new Random().nextInt()))); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java index b34050abf0..be8152976c 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.parser.config; +import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.junit.jupiter.api.Test; @@ -40,6 +41,9 @@ class PipelineParserConfigurationTest { @Mock private DataPrepperConfiguration dataPrepperConfiguration; + @Mock + private CircuitBreakerManager circuitBreakerManager; + @Test void pipelineParser() { final String pipelineConfigFileLocation = "hot soup"; @@ -47,7 +51,7 @@ void pipelineParser() { .thenReturn(pipelineConfigFileLocation); final PipelineParser pipelineParser = pipelineParserConfiguration.pipelineParser( - fileStructurePathProvider, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration); + fileStructurePathProvider, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager); assertThat(pipelineParser, is(notNullValue())); verify(fileStructurePathProvider).getPipelineConfigFileLocation(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/DataPrepperConfigurationTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/DataPrepperConfigurationTests.java index 06cc81cfb7..c74cc144e9 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/DataPrepperConfigurationTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/DataPrepperConfigurationTests.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.parser.model; import org.opensearch.dataprepper.TestDataProvider; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.parser.ByteCountDeserializer; import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; import org.opensearch.dataprepper.parser.DataPrepperDurationDeserializer; import com.fasterxml.jackson.databind.module.SimpleModule; @@ -33,7 +35,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class DataPrepperConfigurationTests { - private static SimpleModule simpleModule = new SimpleModule().addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); + private static SimpleModule simpleModule = new SimpleModule() + .addDeserializer(Duration.class, new DataPrepperDurationDeserializer()) + .addDeserializer(ByteCount.class, new ByteCountDeserializer()); private static ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()).registerModule(simpleModule); private static DataPrepperConfiguration makeConfig(String filePath) throws IOException { @@ -182,4 +186,14 @@ void testConfigWithInValidShutdownTimeout(final String configFile) { void testConfigWithNegativeShutdownTimeout(final String configFile) { assertThrows(ValueInstantiationException.class, () -> makeConfig(configFile)); } + + @Test + void testConfigWithHeapCircuitBreaker() throws IOException { + final DataPrepperConfiguration config = makeConfig("src/test/resources/valid_data_prepper_config_with_heap_circuit_breaker.yml"); + assertThat(config, notNullValue()); + assertThat(config.getCircuitBreakerConfig(), notNullValue()); + assertThat(config.getCircuitBreakerConfig().getHeapConfig(), notNullValue()); + assertThat(config.getCircuitBreakerConfig().getHeapConfig().getUsage(), notNullValue()); + assertThat(config.getCircuitBreakerConfig().getHeapConfig().getUsage().getBytes(), Matchers.equalTo(2_684_354_560L)); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/HeapCircuitBreakerConfigTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/HeapCircuitBreakerConfigTest.java new file mode 100644 index 0000000000..5d0057a753 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/HeapCircuitBreakerConfigTest.java @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.parser.model; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.parser.ByteCountDeserializer; +import org.opensearch.dataprepper.parser.DataPrepperDurationDeserializer; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class HeapCircuitBreakerConfigTest { + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(new YAMLFactory()); + + final SimpleModule simpleModule = new SimpleModule() + .addDeserializer(ByteCount.class, new ByteCountDeserializer()) + .addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); + objectMapper.registerModule(simpleModule); + } + + @Test + void deserialize_heap_without_reset() throws IOException { + final InputStream resourceStream = this.getClass().getResourceAsStream("heap_with_reset.yaml"); + + final HeapCircuitBreakerConfig config = objectMapper.readValue(resourceStream, HeapCircuitBreakerConfig.class); + + assertThat(config, notNullValue()); + assertThat(config.getUsage(), notNullValue()); + assertThat(config.getUsage().getBytes(), equalTo(24L)); + assertThat(config.getReset(), notNullValue()); + assertThat(config.getReset(), equalTo(Duration.ofSeconds(3))); + } + + @Test + void deserialize_heap_without_reset_configured() throws IOException { + final InputStream resourceStream = this.getClass().getResourceAsStream("heap_without_reset.yaml"); + + final HeapCircuitBreakerConfig config = objectMapper.readValue(resourceStream, HeapCircuitBreakerConfig.class); + + assertThat(config, notNullValue()); + assertThat(config.getUsage(), notNullValue()); + assertThat(config.getUsage().getBytes(), equalTo(24L)); + assertThat(config.getReset(), notNullValue()); + assertThat(config.getReset(), equalTo(HeapCircuitBreakerConfig.DEFAULT_RESET)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/org/opensearch/dataprepper/parser/model/heap_with_reset.yaml b/data-prepper-core/src/test/resources/org/opensearch/dataprepper/parser/model/heap_with_reset.yaml new file mode 100644 index 0000000000..d020ea2c57 --- /dev/null +++ b/data-prepper-core/src/test/resources/org/opensearch/dataprepper/parser/model/heap_with_reset.yaml @@ -0,0 +1,2 @@ +usage: 24b +reset: 3s \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/org/opensearch/dataprepper/parser/model/heap_without_reset.yaml b/data-prepper-core/src/test/resources/org/opensearch/dataprepper/parser/model/heap_without_reset.yaml new file mode 100644 index 0000000000..e3fd6d2159 --- /dev/null +++ b/data-prepper-core/src/test/resources/org/opensearch/dataprepper/parser/model/heap_without_reset.yaml @@ -0,0 +1 @@ +usage: 24b \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/valid_data_prepper_config_with_heap_circuit_breaker.yml b/data-prepper-core/src/test/resources/valid_data_prepper_config_with_heap_circuit_breaker.yml new file mode 100644 index 0000000000..d32173d4d0 --- /dev/null +++ b/data-prepper-core/src/test/resources/valid_data_prepper_config_with_heap_circuit_breaker.yml @@ -0,0 +1,6 @@ +serverPort: 1234 +ssl: true + +circuit_breakers: + heap: + usage: 2.5gb diff --git a/docs/configuration.md b/docs/configuration.md index bf4ff08860..539001bbe9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -146,6 +146,28 @@ To do so, add the argument below to the `docker run` command. -v /full/path/to/keystore.p12:/usr/share/data-prepper/keystore.p12 ``` +## Circuit Breakers + +Data Prepper supports circuit breakers which will interrupt adding objects +to the buffer when certain conditions are met. + +### Heap + +Heap circuit breaker: When the JVM heap usage reaches a configurable size stop accepting requests to buffers. + +Configuration + +```yaml +circuit_breakers: + heap: + usage: 6.5gb + reset: 2s +``` + +* `usage` - float - The absolute value of JVM memory which will trip the circuit breaker. This can be defined with bytes (`b`), kilobytes (`kb`), megabytes (`mb`), or gigabytes (`gb`). +* `reset` - Duration - The time between when the circuit is tripped and the next attempt to validate will occur. Defaults to 1s. +* `check_interval` - Duration - The time between checks of the heap usage. Defaults to 500ms. + ## Deprecated Pipeline Configuration Support Starting in Data Prepper 1.3.0, Prepper plugins were renamed to Processors. The use of the prepper or processor name in pipeline configuration files is still supported. However, the use of both processor and prepper in the same configuration file is **not** supported.