Skip to content

Commit

Permalink
Send acknowledgements to source when events are forwarded to remote p…
Browse files Browse the repository at this point in the history
…eer (#4305)

Send acknowledgements to source when events are forwarded to remote peer

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Mar 21, 2024
1 parent 5596c57 commit 2a02080
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ default boolean isByteBuffer() {
return false;
}

/**
* Checks if the buffer enables acknowledgements for the pipeline
*
* @return true if the buffer supports raw bytes, false otherwise
*/
default boolean areAcknowledgementsEnabled() {
return false;
}

/**
* Returns buffer's drain timeout as duration
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public boolean isByteBuffer() {
return delegateBuffer.isByteBuffer();
}

@Override
public boolean areAcknowledgementsEnabled() {
return delegateBuffer.areAcknowledgementsEnabled();
}

@Override
public Duration getDrainTimeout() {
return delegateBuffer.getDrainTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void testIsByteBuffer() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();

assertEquals(false, buffer.isByteBuffer());
assertEquals(false, buffer.areAcknowledgementsEnabled());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ void isByteBuffer_returns_inner_isByteBuffer(final boolean isByteBuffer) {
equalTo(isByteBuffer));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void areAcksEnabled_returns_inner_areAcksEnabled(final boolean ackEnabled) {
when(innerBuffer.areAcknowledgementsEnabled()).thenReturn(ackEnabled);

assertThat(createObjectUnderTest().areAcknowledgementsEnabled(),
equalTo(ackEnabled));
}

@Test
void getDrainTimeout_returns_inner_getDrainTimeout() {
final Duration drainTimeout = Duration.ofSeconds(random.nextInt(10_000) + 100);
Expand All @@ -208,4 +217,4 @@ void shutdown_calls_inner_shutdown() {

verify(innerBuffer).shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private Optional<Source> getSourceIfPipelineType(
Pipeline sourcePipeline = pipelineMap.get(connectedPipeline);
final PipelineConnector pipelineConnector = sourceConnectorMap.get(sourcePipelineName);
pipelineConnector.setSourcePipelineName(pipelineNameOptional.get());
if (sourcePipeline.getSource().areAcknowledgementsEnabled()) {
if (sourcePipeline.getSource().areAcknowledgementsEnabled() || sourcePipeline.getBuffer().areAcknowledgementsEnabled()) {
pipelineConnector.enableAcknowledgements();
}
return Optional.of(pipelineConnector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ private Map<CompletableFuture<AggregatedHttpResponse>, List<Record<Event>>> forw
final CompletableFuture<AggregatedHttpResponse> responseFuture =
peerForwarderClient.serializeRecordsAndSendHttpRequest(recordsToForward, destinationIp, pluginId, pipelineName);
forwardingRequestsMap.put(responseFuture, recordsToForward);
for (Record<Event> record: recordsToForward) {
Event event = record.getData();
event.getEventHandle().release(true);
}
} catch (final Exception e) {
LOG.warn("Unable to submit request for forwarding, processing locally.", e);
processFailedRequestsLocally(null, recordsToForward);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ List<Future<Void>> publishToSinks(final Collection<Record> records) {

final RouterGetRecordStrategy getRecordStrategy =
new RouterCopyRecordStrategy(eventFactory,
(source.areAcknowledgementsEnabled() || buffer.isByteBuffer()) ?
(source.areAcknowledgementsEnabled() || buffer.areAcknowledgementsEnabled()) ?
acknowledgementSetManager :
InactiveAcknowledgementSetManager.getInstance(),
sinks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ProcessWorker(
this.pipeline = pipeline;
this.pluginMetrics = PluginMetrics.fromNames("ProcessWorker", pipeline.getName());
this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES);
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled();
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled() || readBuffer.areAcknowledgementsEnabled();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class TestDataProvider {
public static final String VALID_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_pipeline_configuration.yml";
public static final String VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE = "src/test/resources/single_pipeline_valid_empty_source_plugin_settings.yml";
public static final String VALID_OFF_HEAP_FILE = "src/test/resources/single_pipeline_valid_off_heap_buffer.yml";
public static final String VALID_OFF_HEAP_FILE_WITH_ACKS = "src/test/resources/multiple_pipeline_valid_off_heap_buffer_with_acks.yml";
public static final String DISCONNECTED_VALID_OFF_HEAP_FILE_WITH_ACKS = "src/test/resources/multiple_disconnected_pipeline_valid_off_heap_buffer_with_acks.yml";
public static final String CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_root_source.yml";
public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_child_pipeline.yml";
public static final String CYCLE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/cyclic_multiple_pipeline_configuration.yml";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ void isByteBuffer_returns_value_of_inner_buffer(boolean innerIsByteBuffer) {
assertThat(createObjectUnderTest().isByteBuffer(), equalTo(innerIsByteBuffer));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void areAcknowledgementsEnabled_returns_value_of_inner_buffer(boolean ackEnabled) {
when(buffer.areAcknowledgementsEnabled()).thenReturn(ackEnabled);

assertThat(createObjectUnderTest().areAcknowledgementsEnabled(), equalTo(ackEnabled));
}

@Nested
class NoCircuitBreakerChecks {
@AfterEach
Expand Down Expand Up @@ -202,4 +210,4 @@ void writeBytes_should_check_CircuitBreaker_and_throw_if_open() {
verify(circuitBreaker).isOpen();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,57 @@ void parseConfiguration_with_multiple_valid_pipelines_creates_the_correct_pipeli
verify(dataPrepperConfiguration).getPipelineExtensions();
}

@Test
void parseConfiguration_with_multiple_valid_pipelines_creates_the_correct_pipelineMap_with_acks() {
mockDataPrepperConfigurationAccesses();
final PipelineTransformer pipelineTransformer =
createObjectUnderTest(TestDataProvider.VALID_OFF_HEAP_FILE_WITH_ACKS);
final Map<String, Pipeline> actualPipelineMap = pipelineTransformer.transformConfiguration();
assertThat(actualPipelineMap.keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES));
verifyDataPrepperConfigurationAccesses(actualPipelineMap.keySet().size());
verify(dataPrepperConfiguration).getPipelineExtensions();

assertThat(actualPipelineMap, hasKey("test-pipeline-1"));
assertThat(actualPipelineMap, hasKey("test-pipeline-2"));
assertThat(actualPipelineMap, hasKey("test-pipeline-3"));
Pipeline pipeline = actualPipelineMap.get("test-pipeline-1");
assertThat(pipeline, notNullValue());
assertThat(pipeline.getBuffer(), CoreMatchers.not(instanceOf(CircuitBreakingBuffer.class)));
assertThat(pipeline.getBuffer().areAcknowledgementsEnabled(),equalTo(true));
pipeline = actualPipelineMap.get("test-pipeline-2");
assertThat(pipeline, notNullValue());
assertThat(pipeline.getSource().areAcknowledgementsEnabled(),equalTo(true));
pipeline = actualPipelineMap.get("test-pipeline-3");
assertThat(pipeline, notNullValue());
assertThat(pipeline.getSource().areAcknowledgementsEnabled(),equalTo(true));
}

@Test
void parseConfiguration_with_multiple_disconnected_valid_pipelines_creates_the_correct_pipelineMap_with_acks() {
mockDataPrepperConfigurationAccesses();
final PipelineTransformer pipelineTransformer =
createObjectUnderTest(TestDataProvider.DISCONNECTED_VALID_OFF_HEAP_FILE_WITH_ACKS);
final Map<String, Pipeline> actualPipelineMap = pipelineTransformer.transformConfiguration();
assertThat(actualPipelineMap.keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES));
verifyDataPrepperConfigurationAccesses(actualPipelineMap.keySet().size());
verify(dataPrepperConfiguration).getPipelineExtensions();

assertThat(actualPipelineMap, hasKey("test-pipeline-1"));
assertThat(actualPipelineMap, hasKey("test-pipeline-2"));
assertThat(actualPipelineMap, hasKey("test-pipeline-3"));
Pipeline pipeline = actualPipelineMap.get("test-pipeline-1");
assertThat(pipeline, notNullValue());
assertThat(pipeline.getBuffer(), CoreMatchers.not(instanceOf(CircuitBreakingBuffer.class)));
assertThat(pipeline.getBuffer().areAcknowledgementsEnabled(),equalTo(true));
pipeline = actualPipelineMap.get("test-pipeline-2");
assertThat(pipeline, notNullValue());
assertThat(pipeline.getSource().areAcknowledgementsEnabled(),equalTo(true));
pipeline = actualPipelineMap.get("test-pipeline-3");
assertThat(pipeline, notNullValue());
assertThat(pipeline.getSource().areAcknowledgementsEnabled(),equalTo(false));
}


@Test
void parseConfiguration_with_invalid_root_pipeline_creates_empty_pipelinesMap() {
final PipelineTransformer pipelineTransformer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public boolean isWrittenOffHeapOnly() {
return true;
}

@Override
public boolean areAcknowledgementsEnabled() {
return true;
}

@Override
public boolean isEmpty() {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# this configuration file is solely for testing formatting
test-pipeline-1:
source:
file:
path: "/tmp/file-source.tmp"
buffer:
test_off_heap:
sink:
- pipeline:
name: "test-pipeline-2"
test-pipeline-2:
source:
pipeline:
name: "test-pipeline-1"
sink:
- file:
path: "/tmp/todelete2.txt"
test-pipeline-3:
source:
file:
path: "/tmp/file-source2.tmp"
sink:
- file:
path: "/tmp/todelete.txt"

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# this configuration file is solely for testing formatting
test-pipeline-1:
source:
file:
path: "/tmp/file-source.tmp"
buffer:
test_off_heap:
sink:
- pipeline:
name: "test-pipeline-2"
test-pipeline-2:
source:
pipeline:
name: "test-pipeline-1"
sink:
- pipeline:
name: "test-pipeline-3"
test-pipeline-3:
source:
pipeline:
name: "test-pipeline-2"
sink:
- file:
path: "/tmp/todelete.txt"
4 changes: 2 additions & 2 deletions data-prepper-plugins/aggregate-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
### <a name="when"></a>
* `when` (Optional): A `String` that represents a condition that must be evaluated to true for the aggregation to be applied on the event. Events that do not evaluate to true on the condition are skipped. Default is no condition which means all events are included in the aggregation.

### <a name="local_only"></a>
* `local_only` (Optional): A `Boolean` indicating if the aggregation should be done local to node instead of forwarding to remote peers.
### <a name="local_mode"></a>
* `local_mode` (Optional): A `Boolean` indicating if the aggregation should be done local to node instead of forwarding to remote peers.

## Available Aggregate Actions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public boolean isByteBuffer() {
return true;
}

@Override
public boolean areAcknowledgementsEnabled() {
return true;
}

@Override
public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) throws Exception {
for (Record<Event> record : records) {
Expand Down

0 comments on commit 2a02080

Please sign in to comment.