Skip to content

Commit

Permalink
Disable the circuit breaker for buffers that write data off-heap only…
Browse files Browse the repository at this point in the history
…. This is currently only the Kafka buffer. Resolves opensearch-project#3616

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Nov 9, 2023
1 parent 621de7b commit 7c8908f
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ default Duration getDrainTimeout() {
return Duration.ZERO;
}

/**
* Indicates if writes to this buffer are also in some way written
* onto the JVM heap. If writes do go on heap, this should <b>false</b>
* which is the default.
*
* @return True if this buffer does not write to the JVM heap.
*/
default boolean isWrittenOffHeapOnly() {
return false;
}

/**
* shuts down the buffer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public Duration getDrainTimeout() {
return delegateBuffer.getDrainTimeout();
}

@Override
public boolean isWrittenOffHeapOnly() {
return delegateBuffer.isWrittenOffHeapOnly();
}

@Override
public void shutdown() {
delegateBuffer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ void getDrainTimeout_returns_inner_getDrainTimeout() {
equalTo(drainTimeout));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void isWrittenOffHeapOnly_returns_inner_isWrittenOffHeapOnly(final boolean isWrittenOffHeapOnly) {
when(innerBuffer.isWrittenOffHeapOnly()).thenReturn(isWrittenOffHeapOnly);

assertThat(createObjectUnderTest().isWrittenOffHeapOnly(),
equalTo(isWrittenOffHeapOnly));
}

@Test
void shutdown_calls_inner_shutdown() {
createObjectUnderTest().shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.parser;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.DelegatingBuffer;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
* Buffer decorator created for pipelines that make use of multiple buffers, such as PeerForwarder-enabled pipelines. The decorator
* acts as a pass-through to the primary buffer for most methods except those that rely on a combination of the primary
* and second. For example, isEmpty depends on all buffers being empty.
*
* @since 2.0
*/
class MultiBufferDecorator<T extends Record<?>> extends DelegatingBuffer<T> implements Buffer<T> {
private final List<Buffer> allBuffers;

MultiBufferDecorator(final Buffer primaryBuffer, final List<Buffer> secondaryBuffers) {
super(primaryBuffer);
allBuffers = new ArrayList<>(1 + secondaryBuffers.size());
allBuffers.add(primaryBuffer);
allBuffers.addAll(secondaryBuffers);
}

@Override
public boolean isEmpty() {
return allBuffers.stream().allMatch(Buffer::isEmpty);
}

@Override
public Duration getDrainTimeout() {
return allBuffers.stream()
.map(Buffer::getDrainTimeout)
.reduce(Duration.ZERO, Duration::plus);
}

@Override
public void shutdown() {
allBuffers.forEach(Buffer::shutdown);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.opensearch.dataprepper.pipeline.PipelineConnector;
import org.opensearch.dataprepper.pipeline.router.Router;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.plugins.MultiBufferDecorator;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -147,15 +146,7 @@ private void buildPipelineFromConfiguration(
final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(pipelineDefinedBuffer, secondaryBuffers);


final Buffer buffer;
if(source instanceof PipelineConnector) {
buffer = multiBufferDecorator;
} else {
buffer = circuitBreakerManager.getGlobalCircuitBreaker()
.map(circuitBreaker -> new CircuitBreakingBuffer<>(multiBufferDecorator, circuitBreaker))
.map(b -> (Buffer)b)
.orElseGet(() -> multiBufferDecorator);
}
final Buffer buffer = applyCircuitBreakerToBuffer(source, multiBufferDecorator);

final Router router = routerFactory.createRouter(pipelineConfiguration.getRoutes());

Expand Down Expand Up @@ -313,4 +304,17 @@ List<Buffer> getSecondaryBuffers() {
.map(innerEntry -> innerEntry.getValue())
.collect(Collectors.toList());
}

private Buffer applyCircuitBreakerToBuffer(final Source source, final Buffer buffer) {
if (source instanceof PipelineConnector)
return buffer;

if(buffer.isWrittenOffHeapOnly())
return buffer;

return circuitBreakerManager.getGlobalCircuitBreaker()
.map(circuitBreaker -> new CircuitBreakingBuffer<>(buffer, circuitBreaker))
.map(b -> (Buffer) b)
.orElseGet(() -> buffer);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

package org.opensearch.dataprepper;

import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.SinkModel;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.SinkModel;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;

import java.io.File;
import java.io.IOException;
Expand All @@ -38,6 +38,7 @@ public class TestDataProvider {
public static final String VALID_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_pipeline_configuration.yml";
public static final String VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS = "src/test/resources/valid_pipeline_configuration_with_extensions.yml";
public static final String VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE = "src/test/resources/single_pipeline_valid_empty_source_plugin_settings.yml";
public static final String VALID_OFF_HEAP_FILE = "src/test/resources/single_pipeline_valid_off_heap_buffer.yml";
public static final String CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_root_source.yml";
public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_child_pipeline.yml";
public static final String CYCLE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/cyclic_multiple_pipeline_configuration.yml";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins;
package org.opensearch.dataprepper.parser;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -303,6 +304,27 @@ void parseConfiguration_uses_CircuitBreaking_buffer_when_circuit_breakers_applie
verify(dataPrepperConfiguration).getPipelineExtensions();
}

@Test
void parseConfiguration_uses_unwrapped_buffer_when_circuit_breakers_applied_but_Buffer_is_off_heap() {
final PipelineTransformer objectUnderTest =
createObjectUnderTest(TestDataProvider.VALID_OFF_HEAP_FILE);

final Map<String, Pipeline> pipelineMap = objectUnderTest.transformConfiguration();

assertThat(pipelineMap.size(), equalTo(1));
assertThat(pipelineMap, hasKey("test-pipeline-1"));
final Pipeline pipeline = pipelineMap.get("test-pipeline-1");
assertThat(pipeline, notNullValue());
assertThat(pipeline.getBuffer(), notNullValue());
assertThat(pipeline.getBuffer(), CoreMatchers.not(instanceOf(CircuitBreakingBuffer.class)));

verifyNoInteractions(circuitBreakerManager);
verify(dataPrepperConfiguration).getProcessorShutdownTimeout();
verify(dataPrepperConfiguration).getSinkShutdownTimeout();
verify(dataPrepperConfiguration).getPeerForwarderConfiguration();
verify(dataPrepperConfiguration).getPipelineExtensions();
}

@Test
void parseConfiguration_uses_unwrapped_buffer_when_no_circuit_breakers_are_applied() {
when(circuitBreakerManager.getGlobalCircuitBreaker())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "test_off_heap", pluginType = Buffer.class)
public class TestOffHeapBuffer implements Buffer {
@Override
public void write(Record record, int timeoutInMillis) throws TimeoutException {

}

@Override
public void writeAll(Collection records, int timeoutInMillis) throws Exception {

}

@Override
public Map.Entry<Collection, CheckpointState> read(int timeoutInMillis) {
return null;
}

@Override
public void checkpoint(CheckpointState checkpointState) {

}

@Override
public boolean isWrittenOffHeapOnly() {
return true;
}

@Override
public boolean isEmpty() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
test-pipeline-1:
source:
stdin:
buffer:
test_off_heap:
sink:
- stdout:
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public Duration getDrainTimeout() {
return drainTimeout;
}

@Override
public boolean isWrittenOffHeapOnly() {
return true;
}

@Override
public void shutdown() {
shutdownInProgress.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ void test_kafkaBuffer_getDrainTimeout() {
verify(bufferConfig).getDrainTimeout();
}

@Test
void isWrittenOffHeapOnly_returns_true() {
assertThat(createObjectUnderTest().isWrittenOffHeapOnly(),
equalTo(true));
}

@Test
public void testShutdown_Successful() throws InterruptedException {
kafkaBuffer = createObjectUnderTest();
Expand Down

0 comments on commit 7c8908f

Please sign in to comment.