From 26db9dbb55ecbe86397276cd98e495275d4e8e32 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 24 Oct 2023 22:59:50 +0000 Subject: [PATCH 1/8] Modify EventHandle to be created for every event and support internal and external origination times Signed-off-by: Krishna Kondaka --- .../model/event/DefaultEventHandle.java | 59 ++++++++++++++++++ .../dataprepper/model/event/EventHandle.java | 45 ++++++++++++++ .../dataprepper/model/event/JacksonEvent.java | 7 ++- .../model/event/DefaultEventHandleTests.java | 60 +++++++++++++++++++ .../model/event/JacksonEventTest.java | 27 +++------ .../AcknowledgementSetMonitor.java | 3 +- .../DefaultAcknowledgementSet.java | 5 +- .../dataprepper/event/DefaultEventHandle.java | 29 --------- .../dataprepper/pipeline/ProcessWorker.java | 2 +- .../router/RouterCopyRecordStrategy.java | 6 +- .../AcknowledgementSetMonitorTests.java | 2 +- ...DefaultAcknowledgementSetManagerTests.java | 20 ++----- .../DefaultAcknowledgementSetTests.java | 38 +++++++++--- .../event/DefaultEventHandleTests.java | 32 ---------- .../dataprepper/pipeline/PipelineTests.java | 4 ++ .../router/RouterCopyRecordStrategyTests.java | 9 ++- .../client/CloudWatchLogsService.java | 2 +- .../sink/http/service/HttpSinkService.java | 6 +- .../kafka/producer/KafkaCustomProducer.java | 2 +- .../producer/KafkaCustomProducerTest.java | 2 +- .../plugins/sink/s3/S3SinkService.java | 2 +- .../plugins/sink/sns/SnsSinkService.java | 2 +- 22 files changed, 236 insertions(+), 128 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java delete mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/event/DefaultEventHandle.java delete mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/event/DefaultEventHandleTests.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java new file mode 100644 index 0000000000..0082c75ea8 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import java.lang.ref.WeakReference; + +import java.time.Instant; +import java.io.Serializable; + +public class DefaultEventHandle implements EventHandle, Serializable { + private Instant externalOriginationTime; + private final Instant internalOriginationTime; + private WeakReference acknowledgementSetRef; + + public DefaultEventHandle(final Instant internalOriginationTime) { + this.acknowledgementSetRef = null; + this.externalOriginationTime = null; + this.internalOriginationTime = internalOriginationTime; + } + + @Override + public void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet) { + this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet); + } + + @Override + public void setExternalOriginationTime(final Instant externalOriginationTime) { + this.externalOriginationTime = externalOriginationTime; + } + + public AcknowledgementSet getAcknowledgementSet() { + if (acknowledgementSetRef == null) { + return null; + } + return acknowledgementSetRef.get(); + } + + @Override + public Instant getInternalOriginationTime() { + return this.internalOriginationTime; + } + + @Override + public Instant getExternalOriginationTime() { + return this.externalOriginationTime; + } + + @Override + public void release(boolean result) { + AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); + if (acknowledgementSet != null) { + acknowledgementSet.release(this, result); + } + } +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java index 26f2d29e5f..92416dee67 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java @@ -5,6 +5,9 @@ package org.opensearch.dataprepper.model.event; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import java.time.Instant; + public interface EventHandle { /** * releases event handle @@ -14,4 +17,46 @@ public interface EventHandle { * @since 2.2 */ void release(boolean result); + + /** + * sets acknowledgement set + * + * @param acknowledgementSet acknowledgementSet to be set in the event handle + * @since 2.6 + */ + void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet); + + /** + * gets acknowledgement set + * + * @return returns acknowledgementSet from the event handle + * @since 2.6 + */ + AcknowledgementSet getAcknowledgementSet(); + + /** + * sets external origination time + * + * @param externalOriginationTime externalOriginationTime to be set in the event handle + * @since 2.6 + */ + void setExternalOriginationTime(final Instant externalOriginationTime); + + /** + * gets external origination time + * + * @return returns externalOriginationTime from the event handle. This can be null if it is never set. + * @since 2.6 + */ + Instant getExternalOriginationTime(); + + /** + * gets internal origination time + * + * @return returns internalOriginationTime from the event handle. + * @since 2.6 + */ + Instant getInternalOriginationTime(); + + } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 686ee1d59f..6470904be6 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -91,18 +91,21 @@ protected JacksonEvent(final Builder builder) { } this.jsonNode = getInitialJsonNode(builder.data); + this.eventHandle = new DefaultEventHandle(eventMetadata.getTimeReceived()); } protected JacksonEvent(final JacksonEvent otherEvent) { this.jsonNode = otherEvent.jsonNode.deepCopy(); this.eventMetadata = DefaultEventMetadata.fromEventMetadata(otherEvent.eventMetadata); + this.eventHandle = new DefaultEventHandle(eventMetadata.getTimeReceived()); } public static Event fromMessage(String message) { - return JacksonEvent.builder() + JacksonEvent event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(Collections.singletonMap(MESSAGE_KEY, message)) .build(); + return event; } private JsonNode getInitialJsonNode(final Object data) { @@ -152,7 +155,7 @@ public void put(final String key, final Object value) { } } - public void setEventHandle(EventHandle handle) { + private void setEventHandle(EventHandle handle) { this.eventHandle = handle; } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java new file mode 100644 index 0000000000..6b1130adc3 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.ArgumentMatchers.any; +import org.mockito.Mock; + +import java.time.Instant; + +class DefaultEventHandleTests { + @Mock + private AcknowledgementSet acknowledgementSet; + + @Test + void testBasic() { + Instant now = Instant.now(); + DefaultEventHandle eventHandle = new DefaultEventHandle(now); + assertThat(eventHandle.getAcknowledgementSet(), equalTo(null)); + assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); + assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); + eventHandle.release(true); + } + + @Test + void testWithAcknowledgementSet() { + acknowledgementSet = mock(AcknowledgementSet.class); + when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); + Instant now = Instant.now(); + DefaultEventHandle eventHandle = new DefaultEventHandle(now); + assertThat(eventHandle.getAcknowledgementSet(), equalTo(null)); + assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); + assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); + eventHandle.setAcknowledgementSet(acknowledgementSet); + eventHandle.release(true); + verify(acknowledgementSet).release(eventHandle, true); + } + + @Test + void testWithExternalOriginationTime() { + Instant now = Instant.now(); + DefaultEventHandle eventHandle = new DefaultEventHandle(now); + assertThat(eventHandle.getAcknowledgementSet(), equalTo(null)); + assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); + assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); + eventHandle.setExternalOriginationTime(now.minusSeconds(60)); + assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60))); + eventHandle.release(true); + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index 62ce3dc48d..deb68f83bd 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -42,12 +42,6 @@ public class JacksonEventTest { - class TestEventHandle implements EventHandle { - @Override - public void release(boolean result) { - } - } - private Event event; private String eventType; @@ -398,6 +392,8 @@ public void testBuild_withEventType() { .build(); assertThat(event.getMetadata().getEventType(), is(equalTo(eventType))); + assertThat(event.getEventHandle(), is(notNullValue())); + assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue())); } @Test @@ -411,6 +407,8 @@ public void testBuild_withTimeReceived() { .build(); assertThat(event.getMetadata().getTimeReceived(), is(equalTo(now))); + assertThat(event.getEventHandle(), is(notNullValue())); + assertThat(event.getEventHandle().getInternalOriginationTime(), is(equalTo(now))); } @Test @@ -422,6 +420,8 @@ public void testBuild_withMessageValue() { assertThat(event, is(notNullValue())); assertThat(event.get("message", String.class), is(equalTo(message))); + assertThat(event.getEventHandle(), is(notNullValue())); + assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue())); } @Test @@ -678,6 +678,8 @@ void fromEvent_with_a_JacksonEvent() { assertThat(createdEvent, notNullValue()); assertThat(createdEvent, not(sameInstance(originalEvent))); + assertThat(event.getEventHandle(), is(notNullValue())); + assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue())); assertThat(createdEvent.toMap(), equalTo(dataObject)); assertThat(createdEvent.getJsonNode(), not(sameInstance(originalEvent.getJsonNode()))); @@ -707,19 +709,6 @@ void fromEvent_with_a_non_JacksonEvent() { assertThat(createdEvent.getMetadata(), equalTo(eventMetadata)); } - @Test - void testEventHandleGetAndSet() { - EventHandle testEventHandle = new TestEventHandle(); - final String jsonString = "{\"foo\": \"bar\"}"; - - final JacksonEvent event = JacksonEvent.builder() - .withEventType(eventType) - .withData(jsonString) - .build(); - event.setEventHandle(testEventHandle); - assertThat(event.getEventHandle(), equalTo(testEventHandle)); - } - @Test void testJsonStringBuilder() { final String jsonString = "{\"foo\":\"bar\"}"; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java index 1057418876..bf9f021aa7 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.acknowledgements; import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.event.DefaultEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.util.concurrent.locks.ReentrantLock; @@ -33,7 +32,7 @@ class AcknowledgementSetMonitor implements Runnable { private final AtomicInteger numNullHandles; private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) { - return (DefaultAcknowledgementSet)((DefaultEventHandle)eventHandle).getAcknowledgementSet(); + return (DefaultAcknowledgementSet)eventHandle.getAcknowledgementSet(); } public AcknowledgementSetMonitor() { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index 3c8fe12159..3c145fe99c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -5,7 +5,6 @@ package org.opensearch.dataprepper.acknowledgements; -import org.opensearch.dataprepper.event.DefaultEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; @@ -53,8 +52,8 @@ public void add(Event event) { lock.lock(); try { if (event instanceof JacksonEvent) { - EventHandle eventHandle = new DefaultEventHandle(this); - ((JacksonEvent) event).setEventHandle(eventHandle); + EventHandle eventHandle = event.getEventHandle(); + eventHandle.setAcknowledgementSet(this); pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); } } finally { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/event/DefaultEventHandle.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/event/DefaultEventHandle.java deleted file mode 100644 index 025b130bf6..0000000000 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/event/DefaultEventHandle.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.event; - -import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import java.lang.ref.WeakReference; - -public class DefaultEventHandle implements EventHandle { - private final WeakReference acknowledgementSetRef; - public DefaultEventHandle(AcknowledgementSet acknowledgementSet) { - this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet); - } - - public AcknowledgementSet getAcknowledgementSet() { - return acknowledgementSetRef.get(); - } - - @Override - public void release(boolean result) { - AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); - if (acknowledgementSet != null) { - acknowledgementSet.release(this, result); - } - } -} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 56d81ba68b..9c0ec69a8b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -97,7 +97,7 @@ private void processAcknowledgements(List inputEvents, Collection outputR // For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it inputEvents.forEach(event -> { EventHandle eventHandle = event.getEventHandle(); - if (Objects.nonNull(eventHandle) && !outputEventsSet.contains(event)) { + if (Objects.nonNull(eventHandle) && eventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) { eventHandle.release(true); } else if (acknowledgementsEnabled && Objects.isNull(eventHandle)) { invalidEventHandlesCounter.increment(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java index df7316b981..9c090cbf58 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java @@ -18,7 +18,7 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager; @@ -65,7 +65,7 @@ private void acquireEventReference(final Record record) { } if (referencedRecords.contains(record) || ((routedRecords != null) && routedRecords.contains(record))) { EventHandle eventHandle = ((JacksonEvent)record.getData()).getEventHandle(); - if (eventHandle != null) { + if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { acknowledgementSetManager.acquireEventReference(eventHandle); } } else if (!referencedRecords.contains(record)) { @@ -97,7 +97,7 @@ public Record getRecord(final Record record) { JacksonEvent newRecordEvent; Record newRecord; DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle(); - if (eventHandle != null) { + if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { final EventMetadata eventMetadata = recordEvent.getMetadata(); final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap()); newRecordEvent = (JacksonEvent) eventBuilder.build(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorTests.java index 8c9d065704..6c85b5c4de 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitorTests.java @@ -5,7 +5,7 @@ package org.opensearch.dataprepper.acknowledgements; -import org.opensearch.dataprepper.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java index 486617e9a0..012e823ec5 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java @@ -18,8 +18,6 @@ import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.ArgumentMatchers.any; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -49,23 +47,19 @@ class DefaultAcknowledgementSetManagerTests { void setup() { callbackExecutor = Executors.newFixedThreadPool(2); event1 = mock(JacksonEvent.class); - doAnswer((i) -> { - eventHandle1 = i.getArgument(0); - return null; - }).when(event1).setEventHandle(any()); + eventHandle1 = mock(EventHandle.class); lenient().when(event1.getEventHandle()).thenReturn(eventHandle1); event2 = mock(JacksonEvent.class); - doAnswer((i) -> { - eventHandle2 = i.getArgument(0); - return null; - }).when(event2).setEventHandle(any()); + eventHandle2 = mock(EventHandle.class); lenient().when(event2.getEventHandle()).thenReturn(eventHandle2); acknowledgementSetManager = createObjectUnderTest(); AcknowledgementSet acknowledgementSet1 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); acknowledgementSet1.add(event1); acknowledgementSet1.add(event2); + lenient().when(eventHandle1.getAcknowledgementSet()).thenReturn(acknowledgementSet1); + lenient().when(eventHandle2.getAcknowledgementSet()).thenReturn(acknowledgementSet1); acknowledgementSet1.complete(); } @@ -97,14 +91,12 @@ void testExpirations() throws InterruptedException { @Test void testMultipleAcknowledgementSets() { event3 = mock(JacksonEvent.class); - doAnswer((i) -> { - eventHandle3 = i.getArgument(0); - return null; - }).when(event3).setEventHandle(any()); + eventHandle3 = mock(EventHandle.class); lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); acknowledgementSet2.add(event3); + lenient().when(eventHandle3.getAcknowledgementSet()).thenReturn(acknowledgementSet2); acknowledgementSet2.complete(); acknowledgementSetManager.releaseEventReference(eventHandle2, true); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java index c4403e4b2f..8a4aa1485a 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetTests.java @@ -11,8 +11,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.time.Duration; import java.util.concurrent.ExecutorService; @@ -24,7 +25,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -34,7 +34,12 @@ class DefaultAcknowledgementSetTests { private DefaultAcknowledgementSet defaultAcknowledgementSet; @Mock private JacksonEvent event; + @Mock + private JacksonEvent event2; + @Mock private DefaultEventHandle handle; + @Mock + private DefaultEventHandle handle2; private ExecutorService executor; private Boolean acknowledgementSetResult; @@ -78,12 +83,21 @@ void setupEvent() { callbackInterrupted = new AtomicBoolean(false); event = mock(JacksonEvent.class); - - doAnswer((i) -> { - handle = i.getArgument(0); + handle = mock(DefaultEventHandle.class); + lenient().doAnswer(a -> { + AcknowledgementSet acknowledgementSet = a.getArgument(0); + lenient().when(handle.getAcknowledgementSet()).thenReturn(acknowledgementSet); return null; - }).when(event).setEventHandle(any()); + }).when(handle).setAcknowledgementSet(any(AcknowledgementSet.class)); lenient().when(event.getEventHandle()).thenReturn(handle); + event2 = mock(JacksonEvent.class); + lenient().doAnswer(a -> { + AcknowledgementSet acknowledgementSet = a.getArgument(0); + lenient().when(handle2.getAcknowledgementSet()).thenReturn(acknowledgementSet); + return null; + }).when(handle2).setAcknowledgementSet(any(AcknowledgementSet.class)); + handle2 = mock(DefaultEventHandle.class); + lenient().when(event2.getEventHandle()).thenReturn(handle2); } @Test @@ -115,7 +129,6 @@ void testDefaultAcknowledgementInvalidAcquire() { defaultAcknowledgementSet.add(event); defaultAcknowledgementSet.complete(); DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest(); - DefaultEventHandle handle2 = new DefaultEventHandle(secondAcknowledgementSet); defaultAcknowledgementSet.acquire(handle2); assertThat(invalidAcquiresCounter, equalTo(1)); } @@ -125,7 +138,6 @@ void testDefaultAcknowledgementInvalidRelease() { defaultAcknowledgementSet.add(event); defaultAcknowledgementSet.complete(); DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest(); - DefaultEventHandle handle2 = new DefaultEventHandle(secondAcknowledgementSet); assertThat(defaultAcknowledgementSet.release(handle2, true), equalTo(false)); assertThat(invalidReleasesCounter, equalTo(1)); } @@ -170,6 +182,11 @@ void testDefaultAcknowledgementSetNegativeAcknowledgements() throws Exception { defaultAcknowledgementSet.add(event); defaultAcknowledgementSet.complete(); assertThat(handle, not(equalTo(null))); + lenient().doAnswer(a -> { + AcknowledgementSet acknowledgementSet = a.getArgument(0); + lenient().when(handle.getAcknowledgementSet()).thenReturn(acknowledgementSet); + return null; + }).when(handle).setAcknowledgementSet(any(AcknowledgementSet.class)); assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet)); defaultAcknowledgementSet.acquire(handle); assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(false)); @@ -198,6 +215,11 @@ void testDefaultAcknowledgementSetExpirations() throws Exception { ); defaultAcknowledgementSet.add(event); defaultAcknowledgementSet.complete(); + lenient().doAnswer(a -> { + AcknowledgementSet acknowledgementSet = a.getArgument(0); + lenient().when(handle.getAcknowledgementSet()).thenReturn(acknowledgementSet); + return null; + }).when(handle).setAcknowledgementSet(any(AcknowledgementSet.class)); assertThat(handle, not(equalTo(null))); assertThat(handle.getAcknowledgementSet(), equalTo(defaultAcknowledgementSet)); assertThat(defaultAcknowledgementSet.release(handle, true), equalTo(true)); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/event/DefaultEventHandleTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/event/DefaultEventHandleTests.java deleted file mode 100644 index 2b0895ad6c..0000000000 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/event/DefaultEventHandleTests.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.event; - -import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import org.junit.jupiter.api.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; -import static org.mockito.ArgumentMatchers.any; -import org.mockito.Mock; - -class DefaultEventHandleTests { - @Mock - private AcknowledgementSet acknowledgementSet; - - @Test - void testBasic() { - acknowledgementSet = mock(AcknowledgementSet.class); - when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); - DefaultEventHandle eventHandle = new DefaultEventHandle(acknowledgementSet); - assertThat(eventHandle.getAcknowledgementSet(), equalTo(acknowledgementSet)); - eventHandle.release(true); - verify(acknowledgementSet).release(eventHandle, true); - } -} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java index 2fc86b80eb..0fef88268e 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.EventFactory; @@ -391,6 +392,7 @@ class PublishToSink { private List records; private List> dataFlowComponents; private Source mockSource; + private AcknowledgementSet acknowledgementSet; @BeforeEach void setUp() { @@ -427,7 +429,9 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_AcknowledgementSetM Record rec = records.get(0); event = mock(JacksonEvent.class); eventHandle = mock(EventHandle.class); + acknowledgementSet = mock(AcknowledgementSet.class); when(event.getEventHandle()).thenReturn(eventHandle); + when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); when(rec.getData()).thenReturn(event); routerCopyRecordStrategy.getRecord(rec); routerCopyRecordStrategy.getRecord(rec); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java index a9c030793b..6d7e3bb545 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java @@ -45,7 +45,6 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.event.DefaultEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; @@ -103,8 +102,8 @@ private void attachEventHandlesToRecordsIn(List eventHandles) { Iterator iter = recordsIn.iterator(); while (iter.hasNext()) { Record r = (Record) iter.next(); - DefaultEventHandle handle = new DefaultEventHandle(acknowledgementSet1); - ((JacksonEvent)r.getData()).setEventHandle(handle); + EventHandle handle = ((JacksonEvent)r.getData()).getEventHandle(); + handle.setAcknowledgementSet(acknowledgementSet1); eventHandles.add(handle); } } @@ -243,7 +242,7 @@ void test_one_record_with_acknowledgements_and_multi_components() { try { doAnswer((i) -> { JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - e1.setEventHandle(new DefaultEventHandle(acknowledgementSet1)); + e1.getEventHandle().setAcknowledgementSet(acknowledgementSet1); return null; }).when(acknowledgementSet1).add(any(JacksonEvent.class)); } catch (Exception e){} @@ -281,7 +280,7 @@ void test_multiple_records_with_acknowledgements_and_multi_components() { try { doAnswer((i) -> { JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - e1.setEventHandle(new DefaultEventHandle(acknowledgementSet1)); + e1.getEventHandle().setAcknowledgementSet(acknowledgementSet1); return null; }).when(acknowledgementSet1).add(any(JacksonEvent.class)); } catch (Exception e){} diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java index a27d9ea4cb..3b42ee4973 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java @@ -104,7 +104,7 @@ private void stageLogEvents() { } private void addToBuffer(final Record log, final String logString) { - if (log.getData().getEventHandle() != null) { + if (log.getData().getEventHandle().getAcknowledgementSet() != null) { bufferedEventHandles.add(log.getData().getEventHandle()); } buffer.writeEvent(logString.getBytes(StandardCharsets.UTF_8)); diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java index 480886d52b..c392bf4ae0 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java @@ -179,9 +179,7 @@ public void output(Collection> records) { int count = currentBuffer.getEventCount() +1; currentBuffer.setEventCount(count); - if(event.getEventHandle() != null) { - bufferedEventHandles.add(event.getEventHandle()); - } + bufferedEventHandles.add(event.getEventHandle()); if (ThresholdValidator.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { codec.complete(outputStream); final HttpEndPointResponse failedHttpEndPointResponses = pushToEndPoint(getCurrentBufferData(currentBuffer)); @@ -379,4 +377,4 @@ private void accessTokenIfExpired(final Integer tokenExpired,final String url){ } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index ea0d6f6d59..782313f3fe 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -96,7 +96,7 @@ public void produceRawData(final byte[] bytes, final String key) { } public void produceRecords(final Record record) { - if (record.getData().getEventHandle() != null) { + if (record.getData().getEventHandle().getAcknowledgementSet() != null) { bufferedEventHandles.add(record.getData().getEventHandle()); } Event event = getEvent(record); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java index 5216537f9f..1c4347ed06 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java @@ -21,10 +21,10 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.opensearch.dataprepper.event.DefaultEventHandle; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 65de8225b8..8a5227cd6d 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -127,7 +127,7 @@ void output(Collection> records) { int count = currentBuffer.getEventCount() + 1; currentBuffer.setEventCount(count); - if (event.getEventHandle() != null) { + if (event.getEventHandle().getAcknowledgementSet() != null) { bufferedEventHandles.add(event.getEventHandle()); } } catch (Exception ex) { diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java index ed42d3ab4f..0ab2d68235 100644 --- a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java @@ -121,7 +121,7 @@ void output(Collection> records) { for (Record record : records) { final Event event = record.getData(); processRecordsList.add(event); - if (event.getEventHandle() != null) { + if (event.getEventHandle().getAcknowledgementSet() != null) { bufferedEventHandles.add(event.getEventHandle()); } if (snsSinkConfig.getBatchSize() == processRecordsList.size()) { From 3cbef880ea2ac1f7ddc17e152fba0bf419ce78fa Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 25 Oct 2023 16:35:38 +0000 Subject: [PATCH 2/8] Fixed build failures Signed-off-by: Krishna Kondaka --- .../org/opensearch/dataprepper/model/event/JacksonEvent.java | 4 ---- .../sink/cloudwatch_logs/client/CloudWatchLogsService.java | 4 +--- .../plugins/kafka/producer/KafkaCustomProducer.java | 4 +--- .../plugins/kafka/producer/KafkaCustomProducerTest.java | 2 -- .../dataprepper/plugins/sink/s3/S3SinkService.java | 5 +---- .../dataprepper/plugins/sink/sns/SnsSinkService.java | 4 +--- 6 files changed, 4 insertions(+), 19 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 6470904be6..3c6e4e7969 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -155,10 +155,6 @@ public void put(final String key, final Object value) { } } - private void setEventHandle(EventHandle handle) { - this.eventHandle = handle; - } - @Override public EventHandle getEventHandle() { return eventHandle; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java index 3b42ee4973..fc9963ab46 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java @@ -104,9 +104,7 @@ private void stageLogEvents() { } private void addToBuffer(final Record log, final String logString) { - if (log.getData().getEventHandle().getAcknowledgementSet() != null) { - bufferedEventHandles.add(log.getData().getEventHandle()); - } + bufferedEventHandles.add(log.getData().getEventHandle()); buffer.writeEvent(logString.getBytes(StandardCharsets.UTF_8)); } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index 782313f3fe..ddca58ae2e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -96,9 +96,7 @@ public void produceRawData(final byte[] bytes, final String key) { } public void produceRecords(final Record record) { - if (record.getData().getEventHandle().getAcknowledgementSet() != null) { - bufferedEventHandles.add(record.getData().getEventHandle()); - } + bufferedEventHandles.add(record.getData().getEventHandle()); Event event = getEvent(record); final String key = event.formatString(kafkaProducerConfig.getPartitionKey(), expressionEvaluator); try { diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java index 1c4347ed06..1ab935eb91 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java @@ -67,8 +67,6 @@ public class KafkaCustomProducerTest { @BeforeEach public void setUp() { event = (JacksonEvent) JacksonEvent.fromMessage(UUID.randomUUID().toString()); - DefaultEventHandle defaultEventHandle = mock(DefaultEventHandle.class); - event.setEventHandle(defaultEventHandle); record = new Record<>(event); final TopicConfig topicConfig = new TopicConfig(); topicConfig.setName("test-topic"); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 8a5227cd6d..9028b8863e 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -127,9 +127,7 @@ void output(Collection> records) { int count = currentBuffer.getEventCount() + 1; currentBuffer.setEventCount(count); - if (event.getEventHandle().getAcknowledgementSet() != null) { - bufferedEventHandles.add(event.getEventHandle()); - } + bufferedEventHandles.add(event.getEventHandle()); } catch (Exception ex) { if(sampleException == null) { sampleException = ex; @@ -149,7 +147,6 @@ void output(Collection> records) { failedEvents .stream() .map(Event::getEventHandle) - .filter(Objects::nonNull) .forEach(eventHandle -> eventHandle.release(false)); LOG.error("Unable to add {} events to buffer. Dropping these events. Sample exception provided.", failedEvents.size(), sampleException); } diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java index 0ab2d68235..da29d71e17 100644 --- a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java @@ -121,9 +121,7 @@ void output(Collection> records) { for (Record record : records) { final Event event = record.getData(); processRecordsList.add(event); - if (event.getEventHandle().getAcknowledgementSet() != null) { - bufferedEventHandles.add(event.getEventHandle()); - } + bufferedEventHandles.add(event.getEventHandle()); if (snsSinkConfig.getBatchSize() == processRecordsList.size()) { processRecords(); processRecordsList.clear(); From 5e144bd59d75c30191441a70410d63160774dfbb Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 25 Oct 2023 17:14:49 +0000 Subject: [PATCH 3/8] Fixed build failures Signed-off-by: Krishna Kondaka --- .../dataprepper/model/event/DefaultEventHandleTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java index 6b1130adc3..3f1af4fb26 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java @@ -5,7 +5,6 @@ package org.opensearch.dataprepper.model.event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; From 680bf65753fb0558a208fa79c141c73b3837cb81 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 25 Oct 2023 23:31:35 +0000 Subject: [PATCH 4/8] fixed failing checkstyle error Signed-off-by: Krishna Kondaka --- .../plugins/kafka/producer/KafkaCustomProducerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java index 1ab935eb91..6e91c3b8aa 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java @@ -24,7 +24,6 @@ import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; From eeedfb89e42ad664714ee946ea1db5c878073077 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 27 Oct 2023 00:01:57 +0000 Subject: [PATCH 5/8] Fixed build errors Signed-off-by: Krishna Kondaka --- .../model/event/DefaultEventHandle.java | 1 + .../client/CloudWatchLogsServiceTest.java | 9 -- .../plugins/processor/GeoIPProcessorTest.java | 4 +- .../plugins/sink/http/HttpSinkServiceIT.java | 2 - .../sink/opensearch/OpenSearchSinkIT.java | 14 +-- .../dataprepper/plugins/sink/s3/S3SinkIT.java | 2 - .../plugins/sink/s3/S3SinkServiceIT.java | 5 +- .../plugins/sink/s3/S3SinkService.java | 1 - .../plugins/sink/s3/S3SinkServiceTest.java | 104 ++++++++++++------ .../plugins/sink/sns/SnsSinkServiceIT.java | 4 +- 10 files changed, 74 insertions(+), 72 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java index 0082c75ea8..f6fd46b0c1 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -51,6 +51,7 @@ public Instant getExternalOriginationTime() { @Override public void release(boolean result) { + System.out.println("======release called==="+result); AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); if (acknowledgementSet != null) { acknowledgementSet.release(this, result); diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java index e8e416d53e..8e7fcebc4e 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java @@ -8,7 +8,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer; @@ -62,8 +61,6 @@ Collection> getSampleRecordsCollectionSmall() { final ArrayList> returnCollection = new ArrayList<>(); for (int i = 0; i < 5; i++) { JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage"); - final EventHandle mockEventHandle = mock(EventHandle.class); - mockJacksonEvent.setEventHandle(mockEventHandle); returnCollection.add(new Record<>(mockJacksonEvent)); } @@ -74,8 +71,6 @@ Collection> getSampleRecordsCollection() { final ArrayList> returnCollection = new ArrayList<>(); for (int i = 0; i < thresholdConfig.getBatchSize(); i++) { JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage"); - final EventHandle mockEventHandle = mock(EventHandle.class); - mockJacksonEvent.setEventHandle(mockEventHandle); returnCollection.add(new Record<>(mockJacksonEvent)); } @@ -86,8 +81,6 @@ Collection> getSampleRecordsOfLargerSize() { final ArrayList> returnCollection = new ArrayList<>(); for (int i = 0; i < thresholdConfig.getBatchSize() * 2; i++) { JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("a".repeat((int) (thresholdConfig.getMaxRequestSizeBytes()/24))); - final EventHandle mockEventHandle = mock(EventHandle.class); - mockJacksonEvent.setEventHandle(mockEventHandle); returnCollection.add(new Record<>(mockJacksonEvent)); } @@ -98,8 +91,6 @@ Collection> getSampleRecordsOfLimitSize() { final ArrayList> returnCollection = new ArrayList<>(); for (int i = 0; i < thresholdConfig.getBatchSize(); i++) { JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage".repeat((int) thresholdConfig.getMaxEventSizeBytes())); - final EventHandle mockEventHandle = mock(EventHandle.class); - mockJacksonEvent.setEventHandle(mockEventHandle); returnCollection.add(new Record<>(mockJacksonEvent)); } diff --git a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorTest.java b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorTest.java index 64e7ad23a4..77243d7c12 100644 --- a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorTest.java +++ b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorTest.java @@ -13,7 +13,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; @@ -185,7 +184,6 @@ private Collection> setEventQueue() { private static Record createRecord() { String json = "{\"peer\": {\"ip\": \"136.226.242.205\", \"host\": \"example.org\" }, \"status\": \"success\"}"; final JacksonEvent event = JacksonLog.builder().withData(json).build(); - event.setEventHandle(mock(EventHandle.class)); return new Record<>(event); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkServiceIT.java b/data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkServiceIT.java index 47a9a18f85..2095edcb48 100644 --- a/data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkServiceIT.java +++ b/data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkServiceIT.java @@ -20,7 +20,6 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.plugin.PluginFactory; @@ -139,7 +138,6 @@ private Collection> setEventQueue(final int records) { private static Record createRecord() { final JacksonEvent event = JacksonLog.builder().withData("{\"name\":\""+ UUID.randomUUID() +"\"}").build(); - event.setEventHandle(mock(EventHandle.class)); return new Record<>(event); } diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index dff10c608c..dcdf4200ac 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -39,7 +39,6 @@ import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; @@ -89,7 +88,6 @@ import static org.hamcrest.Matchers.closeTo; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser; @@ -115,7 +113,6 @@ public class OpenSearchSinkIT { private static final String TRACE_INGESTION_TEST_DISABLED_REASON = "Trace ingestion is not supported for ES 6"; private RestClient client; - private EventHandle eventHandle; private SinkContext sinkContext; private String testTagsTargetKey; @@ -152,10 +149,11 @@ public void setup() { expressionEvaluator = mock(ExpressionEvaluator.class); when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false); - eventHandle = mock(EventHandle.class); + /* lenient().doAnswer(a -> { return null; }).when(eventHandle).release(any(Boolean.class)); + */ } @BeforeEach @@ -890,7 +888,6 @@ public void testEventOutputWithTags() throws IOException, InterruptedException { .withData("{\"log\": \"foobar\"}") .withEventType("event") .build(); - ((JacksonEvent)testEvent).setEventHandle(eventHandle); List tagsList = List.of("tag1", "tag2"); testEvent.getMetadata().addTags(tagsList); @@ -920,7 +917,6 @@ public void testEventOutput() throws IOException, InterruptedException { .withData("{\"log\": \"foobar\"}") .withEventType("event") .build(); - ((JacksonEvent) testEvent).setEventHandle(eventHandle); final List> testRecords = Collections.singletonList(new Record<>(testEvent)); @@ -948,7 +944,6 @@ public void testOpenSearchDocumentId(final String testDocumentIdField) throws IO .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) .withEventType("event") .build(); - ((JacksonEvent) testEvent).setEventHandle(eventHandle); testEvent.put(testDocumentIdField, expectedId); final List> testRecords = Collections.singletonList(new Record<>(testEvent)); @@ -974,7 +969,6 @@ public void testOpenSearchRoutingField(final String testRoutingField) throws IOE .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) .withEventType("event") .build(); - ((JacksonEvent) testEvent).setEventHandle(eventHandle); testEvent.put(testRoutingField, expectedRoutingField); final List> testRecords = Collections.singletonList(new Record<>(testEvent)); @@ -1003,7 +997,6 @@ public void testOpenSearchDynamicIndex(final String testIndex) throws IOExceptio .withData(dataMap) .withEventType("event") .build(); - ((JacksonEvent) testEvent).setEventHandle(eventHandle); testEvent.put(testIndex, testIndexName); Map expectedMap = testEvent.toMap(); @@ -1037,7 +1030,6 @@ public void testOpenSearchDynamicIndexWithDate(final String testIndex, final Str .withData(dataMap) .withEventType("event") .build(); - ((JacksonEvent) testEvent).setEventHandle(eventHandle); testEvent.put(testIndex, testIndexName); Map expectedMap = testEvent.toMap(); @@ -1067,7 +1059,6 @@ public void testOpenSearchIndexWithDate(final String testDatePattern) throws IOE .withData(dataMap) .withEventType("event") .build(); - ((JacksonEvent) testEvent).setEventHandle(eventHandle); Map expectedMap = testEvent.toMap(); @@ -1370,7 +1361,6 @@ private Record jsonStringToRecord(final String jsonString) { .withEventType(EventType.TRACE.toString()) .withData(objectMapper.readValue(jsonString, Map.class)).build()); JacksonEvent event = (JacksonEvent) record.getData(); - event.setEventHandle(eventHandle); return record; } catch (final JsonProcessingException e) { throw new RuntimeException(e); diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java index a83a5be9db..7032104bb3 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java @@ -23,7 +23,6 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.DefaultEventMetadata; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -262,7 +261,6 @@ private Event generateTestEvent(final Map eventData) { .withEventType(EventType.LOG.toString()) .build(); final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventMetadata(defaultEventMetadata).build(); - event.setEventHandle(mock(EventHandle.class)); return JacksonEvent.builder() .withData(eventData) .withEventMetadata(defaultEventMetadata) diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index 9d67a4bf5b..951225937d 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -38,7 +38,6 @@ import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.DefaultEventMetadata; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -92,7 +91,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -296,7 +294,6 @@ private static Record createRecord() { withTags(testTags).build(); Map json = generateJson(); final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build(); - event.setEventHandle(mock(EventHandle.class)); return new Record<>(event); } @@ -413,4 +410,4 @@ private List> createParquetRecordsList(final InputStream private MessageType createdParquetSchema(ParquetMetadata parquetMetadata) { return parquetMetadata.getFileMetaData().getSchema(); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 9028b8863e..879854b546 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -28,7 +28,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import java.util.Objects; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index b154d30526..9952fde732 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; @@ -84,6 +85,7 @@ class S3SinkServiceTest { private DistributionSummary s3ObjectSizeSummary; private Random random; private String tagsTargetKey; + private AcknowledgementSet acknowledgementSet; @BeforeEach void setUp() { @@ -91,6 +93,7 @@ void setUp() { random = new Random(); tagsTargetKey = RandomStringUtils.randomAlphabetic(5); s3SinkConfig = mock(S3SinkConfig.class); + acknowledgementSet = mock(AcknowledgementSet.class); codecContext = new OutputCodecContext(tagsTargetKey, Collections.emptyList(), Collections.emptyList()); s3Client = mock(S3Client.class); ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); @@ -377,13 +380,18 @@ void output_will_release_all_handles_since_a_flush() throws IOException { doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); + final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); + for (EventHandle eventHandle : eventHandles) { + eventHandle.setAcknowledgementSet(acknowledgementSet); + } s3SinkService.output(records); - final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); + //final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); for (EventHandle eventHandle : eventHandles) { - verify(eventHandle).release(true); + verify(acknowledgementSet).release(eventHandle, true); } + } @Test @@ -400,21 +408,29 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx doNothing().when(codec).writeEvent(event1, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); - records.stream() - .map(Record::getData) - .map(event -> (JacksonEvent) event) - .forEach(event -> event.setEventHandle(null)); + final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); + for (EventHandle eventHandle : eventHandles) { + eventHandle.setAcknowledgementSet(acknowledgementSet); + } s3SinkService.output(records); + for (EventHandle eventHandle : eventHandles) { + verify(acknowledgementSet).release(eventHandle, true); + } final Collection> records2 = generateRandomStringEventRecord(); - s3SinkService.output(records2); - final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); for (EventHandle eventHandle : eventHandles2) { - verify(eventHandle).release(true); + eventHandle.setAcknowledgementSet(acknowledgementSet); } + + s3SinkService.output(records2); + + for (EventHandle eventHandle : eventHandles2) { + verify(acknowledgementSet).release(eventHandle, true); + } + } @Test @@ -433,12 +449,17 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); - s3SinkService.output(records); - final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); for (EventHandle eventHandle : eventHandles) { - verify(eventHandle).release(false); + System.out.println("==1====EventHandle=="+eventHandle+"==="+acknowledgementSet); + eventHandle.setAcknowledgementSet(acknowledgementSet); + } + s3SinkService.output(records); + + for (EventHandle eventHandle : eventHandles) { + System.out.println("==2====EventHandle=="+eventHandle+"==="+acknowledgementSet); + verify(acknowledgementSet).release(eventHandle, false); } } @@ -456,21 +477,24 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException { doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); + final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); + for (EventHandle eventHandle : eventHandles) { + eventHandle.setAcknowledgementSet(acknowledgementSet); + } s3SinkService.output(records); - final Collection> records2 = generateRandomStringEventRecord(); - s3SinkService.output(records2); - - final List eventHandles1 = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - - for (EventHandle eventHandle : eventHandles1) { - verify(eventHandle).release(true); + for (EventHandle eventHandle : eventHandles) { + verify(acknowledgementSet).release(eventHandle, true); } - + final Collection> records2 = generateRandomStringEventRecord(); final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles2) { - verify(eventHandle).release(true); + eventHandle.setAcknowledgementSet(acknowledgementSet); + } + s3SinkService.output(records2); + for (EventHandle eventHandle : eventHandles2) { + verify(acknowledgementSet).release(eventHandle, true); } + } @Test @@ -489,6 +513,10 @@ void output_will_skip_and_drop_failed_records() throws IOException { List> records = generateEventRecords(2); Event event1 = records.get(0).getData(); Event event2 = records.get(1).getData(); + EventHandle eventHandle1 = event1.getEventHandle(); + EventHandle eventHandle2 = event2.getEventHandle(); + eventHandle1.setAcknowledgementSet(acknowledgementSet); + eventHandle2.setAcknowledgementSet(acknowledgementSet); doThrow(RuntimeException.class).when(codec).writeEvent(event1, outputStream); @@ -499,10 +527,10 @@ void output_will_skip_and_drop_failed_records() throws IOException { inOrder.verify(codec).writeEvent(event1, outputStream); inOrder.verify(codec).writeEvent(event2, outputStream); - verify(event1.getEventHandle()).release(false); - verify(event1.getEventHandle(), never()).release(true); - verify(event2.getEventHandle()).release(true); - verify(event2.getEventHandle(), never()).release(false); + verify(acknowledgementSet).release(eventHandle1, false); + verify(acknowledgementSet, never()).release(eventHandle1, true); + verify(acknowledgementSet).release(eventHandle2, true); + verify(acknowledgementSet, never()).release(eventHandle2, false); } @Test @@ -521,20 +549,26 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); - s3SinkService.output(records); - final List> records2 = generateEventRecords(1); - s3SinkService.output(records2); - final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles) { - verify(eventHandle).release(false); + eventHandle.setAcknowledgementSet(acknowledgementSet); + } + s3SinkService.output(records); + for (EventHandle eventHandle : eventHandles) { + verify(acknowledgementSet).release(eventHandle, false); } + + final List> records2 = generateEventRecords(1); final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); for (EventHandle eventHandle : eventHandles2) { - verify(eventHandle).release(false); + eventHandle.setAcknowledgementSet(acknowledgementSet); + } + s3SinkService.output(records2); + for (EventHandle eventHandle : eventHandles2) { + verify(acknowledgementSet).release(eventHandle, false); } + } private Collection> generateRandomStringEventRecord() { @@ -549,8 +583,6 @@ private List> generateEventRecords(final int numberOfRecords) { List> records = new ArrayList<>(); for (int i = 0; i < numberOfRecords; i++) { final JacksonEvent event = (JacksonEvent) JacksonEvent.fromMessage(UUID.randomUUID().toString()); - final EventHandle eventHandle = mock(EventHandle.class); - event.setEventHandle(eventHandle); records.add(new Record<>(event)); } return records; @@ -563,4 +595,4 @@ private byte[] generateByteArray() { } return bytes; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java b/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java index b3c235dab1..0ce6705ff0 100644 --- a/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java +++ b/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java @@ -17,7 +17,6 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.plugin.PluginFactory; @@ -129,7 +128,6 @@ private Collection> setEventQueue(final int records) { private static Record createRecord() { final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build(); - event.setEventHandle(mock(EventHandle.class)); return new Record<>(event); } @@ -226,4 +224,4 @@ public void sns_sink_service_test_fail_to_push(final int recordCount) throws IOE final Map map = mapper.readValue(new String(Files.readAllBytes(Path.of(dlqFilePath))).replaceAll("(\\r|\\n)", ""), Map.class); assertThat(map.get("topic"),equalTo(topic)); } -} \ No newline at end of file +} From a9c3ebd9c7a3b963cb5240b648b231c10a38cd50 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 31 Oct 2023 21:25:09 +0000 Subject: [PATCH 6/8] Addressed review comments by adding InternalEventHandle Signed-off-by: Krishna Kondaka --- .../model/event/DefaultEventHandle.java | 3 +- .../model/event/DefaultEventMetadata.java | 14 +++++++++ .../dataprepper/model/event/EventHandle.java | 16 ---------- .../model/event/EventMetadata.java | 14 +++++++++ .../model/event/InternalEventHandle.java | 29 ++++++++++++++++++ .../dataprepper/model/event/JacksonEvent.java | 3 +- .../model/event/DefaultEventMetadataTest.java | 11 +++++++ .../AcknowledgementSetMonitor.java | 9 +++++- .../DefaultAcknowledgementSet.java | 9 ++++-- .../dataprepper/pipeline/ProcessWorker.java | 15 +++++++--- .../router/RouterCopyRecordStrategy.java | 2 +- ...DefaultAcknowledgementSetManagerTests.java | 14 ++++----- .../dataprepper/pipeline/PipelineTests.java | 10 +++---- .../router/RouterCopyRecordStrategyTests.java | 30 +++++++++---------- .../sink/opensearch/OpenSearchSinkIT.java | 5 ---- .../plugins/sink/s3/S3SinkServiceTest.java | 4 --- 16 files changed, 124 insertions(+), 64 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java index f6fd46b0c1..86278bea74 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -11,7 +11,7 @@ import java.time.Instant; import java.io.Serializable; -public class DefaultEventHandle implements EventHandle, Serializable { +public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable { private Instant externalOriginationTime; private final Instant internalOriginationTime; private WeakReference acknowledgementSetRef; @@ -51,7 +51,6 @@ public Instant getExternalOriginationTime() { @Override public void release(boolean result) { - System.out.println("======release called==="+result); AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); if (acknowledgementSet != null) { acknowledgementSet.release(this, result); diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java index e2ce55caa2..883297d567 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java @@ -27,6 +27,8 @@ public class DefaultEventMetadata implements EventMetadata { private final Instant timeReceived; + private Instant externalOriginationTime; + private Map attributes; private Set tags; @@ -43,6 +45,7 @@ private DefaultEventMetadata(final Builder builder) { this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes); this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags); + this.externalOriginationTime = null; } private DefaultEventMetadata(final EventMetadata eventMetadata) { @@ -50,6 +53,7 @@ private DefaultEventMetadata(final EventMetadata eventMetadata) { this.timeReceived = eventMetadata.getTimeReceived(); this.attributes = new HashMap<>(eventMetadata.getAttributes()); this.tags = new HashSet<>(eventMetadata.getTags()); + this.externalOriginationTime = null; } @Override @@ -62,6 +66,16 @@ public Instant getTimeReceived() { return timeReceived; } + @Override + public Instant getExternalOriginationTime() { + return externalOriginationTime; + } + + @Override + public void setExternalOriginationTime(Instant externalOriginationTime) { + this.externalOriginationTime = externalOriginationTime; + } + @Override public Map getAttributes() { return attributes; diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java index 92416dee67..9bb0ede315 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java @@ -18,22 +18,6 @@ public interface EventHandle { */ void release(boolean result); - /** - * sets acknowledgement set - * - * @param acknowledgementSet acknowledgementSet to be set in the event handle - * @since 2.6 - */ - void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet); - - /** - * gets acknowledgement set - * - * @return returns acknowledgementSet from the event handle - * @since 2.6 - */ - AcknowledgementSet getAcknowledgementSet(); - /** * sets external origination time * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventMetadata.java index 511a87c1fa..5db56ba85c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventMetadata.java @@ -31,6 +31,20 @@ public interface EventMetadata extends Serializable { */ Instant getTimeReceived(); + /** + * Returns the external origination time of the event + * @return the external origination time + * @since 2.6 + */ + Instant getExternalOriginationTime(); + + /** + * Sets the external origination time of the event + * @param externalOriginationTime the external origination time + * @since 2.6 + */ + void setExternalOriginationTime(Instant externalOriginationTime); + /** * Returns the attributes * @return a map of attributes diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java new file mode 100644 index 0000000000..0975fe9823 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import java.time.Instant; + +public interface InternalEventHandle { + /** + * sets acknowledgement set + * + * @param acknowledgementSet acknowledgementSet to be set in the event handle + * @since 2.6 + */ + void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet); + + /** + * gets acknowledgement set + * + * @return returns acknowledgementSet from the event handle + * @since 2.6 + */ + AcknowledgementSet getAcknowledgementSet(); + +} + diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 3c6e4e7969..846c4c3011 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -101,11 +101,10 @@ protected JacksonEvent(final JacksonEvent otherEvent) { } public static Event fromMessage(String message) { - JacksonEvent event = JacksonEvent.builder() + return JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(Collections.singletonMap(MESSAGE_KEY, message)) .build(); - return event; } private JsonNode getInitialJsonNode(final Object data) { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java index fa624a9e2b..479e7be0c2 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.anEmptyMap; @@ -80,6 +81,16 @@ public void testGetTimeReceived() { assertThat(timeReceived, is(equalTo(testTimeReceived))); } + @Test + public void testExternalOriginationTime() { + Instant externalOriginationTime = eventMetadata.getExternalOriginationTime(); + assertThat(externalOriginationTime, is(nullValue())); + Instant now = Instant.now(); + eventMetadata.setExternalOriginationTime(now); + externalOriginationTime = eventMetadata.getExternalOriginationTime(); + assertThat(externalOriginationTime, is(equalTo(now))); + } + @Test public void testGetAttributes() { final Map attributes = eventMetadata.getAttributes(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java index bf9f021aa7..af9860cc9a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementSetMonitor.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.acknowledgements; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.util.concurrent.locks.ReentrantLock; @@ -32,7 +34,12 @@ class AcknowledgementSetMonitor implements Runnable { private final AtomicInteger numNullHandles; private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) { - return (DefaultAcknowledgementSet)eventHandle.getAcknowledgementSet(); + if (eventHandle instanceof DefaultEventHandle) { + InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; + return (DefaultAcknowledgementSet)internalEventHandle.getAcknowledgementSet(); + } else { + throw new RuntimeException("Unsupported event handle"); + } } public AcknowledgementSetMonitor() { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index 3c145fe99c..741f08939d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -8,6 +8,8 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +55,11 @@ public void add(Event event) { try { if (event instanceof JacksonEvent) { EventHandle eventHandle = event.getEventHandle(); - eventHandle.setAcknowledgementSet(this); - pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); + if (eventHandle instanceof DefaultEventHandle) { + InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; + internalEventHandle.setAcknowledgementSet(this); + pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); + } } } finally { lock.unlock(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 9c0ec69a8b..565c0b9075 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -13,6 +13,8 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.pipeline.common.FutureHelper; import org.opensearch.dataprepper.pipeline.common.FutureHelperResult; import org.slf4j.Logger; @@ -97,10 +99,15 @@ private void processAcknowledgements(List inputEvents, Collection outputR // For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it inputEvents.forEach(event -> { EventHandle eventHandle = event.getEventHandle(); - if (Objects.nonNull(eventHandle) && eventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) { - eventHandle.release(true); - } else if (acknowledgementsEnabled && Objects.isNull(eventHandle)) { - invalidEventHandlesCounter.increment(); + if (eventHandle != null && eventHandle instanceof DefaultEventHandle) { + InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; + if (internalEventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) { + eventHandle.release(true); + } else if (acknowledgementsEnabled) { + invalidEventHandlesCounter.increment(); + } + } else if (eventHandle != null) { + throw new RuntimeException("Unexpected EventHandle"); } }); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java index 9c090cbf58..1bd2944c2e 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategy.java @@ -65,7 +65,7 @@ private void acquireEventReference(final Record record) { } if (referencedRecords.contains(record) || ((routedRecords != null) && routedRecords.contains(record))) { EventHandle eventHandle = ((JacksonEvent)record.getData()).getEventHandle(); - if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { + if (eventHandle != null && eventHandle instanceof DefaultEventHandle) { acknowledgementSetManager.acquireEventReference(eventHandle); } } else if (!referencedRecords.contains(record)) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java index 012e823ec5..c9fd556214 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManagerTests.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.acknowledgements; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.junit.jupiter.api.BeforeEach; @@ -38,20 +38,20 @@ class DefaultAcknowledgementSetManagerTests { @Mock JacksonEvent event3; - EventHandle eventHandle1; - EventHandle eventHandle2; - EventHandle eventHandle3; + DefaultEventHandle eventHandle1; + DefaultEventHandle eventHandle2; + DefaultEventHandle eventHandle3; Boolean result; @BeforeEach void setup() { callbackExecutor = Executors.newFixedThreadPool(2); event1 = mock(JacksonEvent.class); - eventHandle1 = mock(EventHandle.class); + eventHandle1 = mock(DefaultEventHandle.class); lenient().when(event1.getEventHandle()).thenReturn(eventHandle1); event2 = mock(JacksonEvent.class); - eventHandle2 = mock(EventHandle.class); + eventHandle2 = mock(DefaultEventHandle.class); lenient().when(event2.getEventHandle()).thenReturn(eventHandle2); acknowledgementSetManager = createObjectUnderTest(); @@ -91,7 +91,7 @@ void testExpirations() throws InterruptedException { @Test void testMultipleAcknowledgementSets() { event3 = mock(JacksonEvent.class); - eventHandle3 = mock(EventHandle.class); + eventHandle3 = mock(DefaultEventHandle.class); lenient().when(event3.getEventHandle()).thenReturn(eventHandle3); AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java index 0fef88268e..75c1154baa 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java @@ -14,7 +14,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.EventFactory; -import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -84,7 +84,7 @@ class PipelineTests { private Duration peerForwarderDrainTimeout; private EventFactory eventFactory; private JacksonEvent event; - private EventHandle eventHandle; + private DefaultEventHandle eventHandle; private AcknowledgementSetManager acknowledgementSetManager; @BeforeEach @@ -428,7 +428,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_AcknowledgementSetM RouterCopyRecordStrategy routerCopyRecordStrategy = (RouterCopyRecordStrategy)a.getArgument(2); Record rec = records.get(0); event = mock(JacksonEvent.class); - eventHandle = mock(EventHandle.class); + eventHandle = mock(DefaultEventHandle.class); acknowledgementSet = mock(AcknowledgementSet.class); when(event.getEventHandle()).thenReturn(eventHandle); when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); @@ -441,7 +441,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_AcknowledgementSetM Pipeline pipeline = createObjectUnderTest(); when(mockSource.areAcknowledgementsEnabled()).thenReturn(true); pipeline.publishToSinks(records); - verify(acknowledgementSetManager).acquireEventReference(any(EventHandle.class)); + verify(acknowledgementSetManager).acquireEventReference(any(DefaultEventHandle.class)); verify(router) .route(anyCollection(), eq(dataFlowComponents), any(RouterGetRecordStrategy.class), any(BiConsumer.class)); @@ -455,7 +455,7 @@ void publishToSinks_calls_route_with_Events_and_Sinks_verify_InactiveAcknowledge RouterCopyRecordStrategy routerCopyRecordStrategy = (RouterCopyRecordStrategy)a.getArgument(2); Record rec = records.get(0); event = mock(JacksonEvent.class); - eventHandle = mock(EventHandle.class); + eventHandle = mock(DefaultEventHandle.class); when(event.getEventHandle()).thenReturn(null); when(rec.getData()).thenReturn(event); routerCopyRecordStrategy.getRecord(rec); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java index 6d7e3bb545..4c56113323 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/RouterCopyRecordStrategyTests.java @@ -42,7 +42,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.event.EventFactory; -import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -66,7 +66,7 @@ public class RouterCopyRecordStrategyTests { private JacksonEvent event; - private Map handleRefCount; + private Map handleRefCount; private static class TestComponent { } @@ -79,11 +79,11 @@ void setUp() { acknowledgementSet1 = mock(AcknowledgementSet.class); try { lenient().doAnswer((i) -> { - EventHandle handle = (EventHandle) i.getArgument(0); + DefaultEventHandle handle = (DefaultEventHandle) i.getArgument(0); int v = handleRefCount.getOrDefault(handle, 0); handleRefCount.put(handle, v+1); return null; - }).when(acknowledgementSetManager).acquireEventReference(any(EventHandle.class)); + }).when(acknowledgementSetManager).acquireEventReference(any(DefaultEventHandle.class)); } catch (Exception e){} mockRecordsIn = IntStream.range(0, 10) .mapToObj(i -> mock(Record.class)) @@ -98,11 +98,11 @@ void setUp() { .collect(Collectors.toList()); } - private void attachEventHandlesToRecordsIn(List eventHandles) { + private void attachEventHandlesToRecordsIn(List eventHandles) { Iterator iter = recordsIn.iterator(); while (iter.hasNext()) { Record r = (Record) iter.next(); - EventHandle handle = ((JacksonEvent)r.getData()).getEventHandle(); + DefaultEventHandle handle = (DefaultEventHandle)((JacksonEvent)r.getData()).getEventHandle(); handle.setAcknowledgementSet(acknowledgementSet1); eventHandles.add(handle); } @@ -186,10 +186,10 @@ void test_one_record_with_acknowledgements() { = Collections.singletonList(dataFlowComponent); final RouterCopyRecordStrategy getRecordStrategy = createObjectUnderTest(dataFlowComponents); - List eventHandles = new ArrayList<>(); + List eventHandles = new ArrayList<>(); attachEventHandlesToRecordsIn(eventHandles); Record firstRecord = recordsIn.iterator().next(); - EventHandle firstHandle = ((Event)firstRecord.getData()).getEventHandle(); + DefaultEventHandle firstHandle = (DefaultEventHandle)((Event)firstRecord.getData()).getEventHandle(); Record recordOut = getRecordStrategy.getRecord(firstRecord); assertThat(recordOut, sameInstance(firstRecord)); assertTrue(getRecordStrategy.getReferencedRecords().contains(firstRecord)); @@ -208,7 +208,7 @@ void test_multiple_records_with_acknowledgements() { = Collections.singletonList(dataFlowComponent); final RouterCopyRecordStrategy getRecordStrategy = createObjectUnderTest(dataFlowComponents); - List eventHandles = new ArrayList<>(); + List eventHandles = new ArrayList<>(); attachEventHandlesToRecordsIn(eventHandles); Collection recordsOut = getRecordStrategy.getAllRecords(recordsIn); assertThat(recordsOut.size(), equalTo(recordsIn.size())); @@ -237,12 +237,12 @@ void test_one_record_with_acknowledgements_and_multi_components() { } final RouterCopyRecordStrategy getRecordStrategy = createObjectUnderTest(dataFlowComponents); - List eventHandles = new ArrayList<>(); + List eventHandles = new ArrayList<>(); attachEventHandlesToRecordsIn(eventHandles); try { doAnswer((i) -> { JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - e1.getEventHandle().setAcknowledgementSet(acknowledgementSet1); + ((DefaultEventHandle)e1.getEventHandle()).setAcknowledgementSet(acknowledgementSet1); return null; }).when(acknowledgementSet1).add(any(JacksonEvent.class)); } catch (Exception e){} @@ -253,14 +253,14 @@ void test_one_record_with_acknowledgements_and_multi_components() { when(eventBuilder.build()).thenReturn(JacksonEvent.fromEvent(event)); when(eventFactory.eventBuilder(EventBuilder.class)).thenReturn(eventBuilder); Record firstRecord = recordsIn.iterator().next(); - EventHandle firstHandle = ((Event)firstRecord.getData()).getEventHandle(); + DefaultEventHandle firstHandle = (DefaultEventHandle)((Event)firstRecord.getData()).getEventHandle(); Record recordOut = getRecordStrategy.getRecord(firstRecord); assertThat(recordOut, sameInstance(firstRecord)); assertTrue(getRecordStrategy.getReferencedRecords().contains(firstRecord)); recordOut = getRecordStrategy.getRecord(firstRecord); assertThat(recordOut, not(sameInstance(firstRecord))); assertFalse(handleRefCount.containsKey(firstHandle)); - EventHandle newHandle = ((JacksonEvent)recordOut.getData()).getEventHandle(); + DefaultEventHandle newHandle = (DefaultEventHandle)((JacksonEvent)recordOut.getData()).getEventHandle(); assertTrue(getRecordStrategy.getReferencedRecords().contains(recordOut)); assertThat(newHandle, not(equalTo(null))); assertFalse(handleRefCount.containsKey(newHandle)); @@ -275,12 +275,12 @@ void test_multiple_records_with_acknowledgements_and_multi_components() { } final RouterCopyRecordStrategy getRecordStrategy = createObjectUnderTest(dataFlowComponents); - List eventHandles = new ArrayList<>(); + List eventHandles = new ArrayList<>(); attachEventHandlesToRecordsIn(eventHandles); try { doAnswer((i) -> { JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - e1.getEventHandle().setAcknowledgementSet(acknowledgementSet1); + ((DefaultEventHandle)e1.getEventHandle()).setAcknowledgementSet(acknowledgementSet1); return null; }).when(acknowledgementSet1).add(any(JacksonEvent.class)); } catch (Exception e){} diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index dcdf4200ac..755ccb5283 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -149,11 +149,6 @@ public void setup() { expressionEvaluator = mock(ExpressionEvaluator.class); when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false); - /* - lenient().doAnswer(a -> { - return null; - }).when(eventHandle).release(any(Boolean.class)); - */ } @BeforeEach diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 9952fde732..8610bfbf85 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -386,8 +386,6 @@ void output_will_release_all_handles_since_a_flush() throws IOException { } s3SinkService.output(records); - //final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles) { verify(acknowledgementSet).release(eventHandle, true); } @@ -452,13 +450,11 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); for (EventHandle eventHandle : eventHandles) { - System.out.println("==1====EventHandle=="+eventHandle+"==="+acknowledgementSet); eventHandle.setAcknowledgementSet(acknowledgementSet); } s3SinkService.output(records); for (EventHandle eventHandle : eventHandles) { - System.out.println("==2====EventHandle=="+eventHandle+"==="+acknowledgementSet); verify(acknowledgementSet).release(eventHandle, false); } } From 53b17870fb024012b8a28192e73d1404deee8b8c Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 1 Nov 2023 18:12:18 +0000 Subject: [PATCH 7/8] Fixed checkstyle errors Signed-off-by: Krishna Kondaka --- .../java/org/opensearch/dataprepper/model/event/EventHandle.java | 1 - .../opensearch/dataprepper/model/event/InternalEventHandle.java | 1 - .../java/org/opensearch/dataprepper/pipeline/ProcessWorker.java | 1 - 3 files changed, 3 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java index 9bb0ede315..64ef3be574 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java @@ -5,7 +5,6 @@ package org.opensearch.dataprepper.model.event; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.time.Instant; public interface EventHandle { diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java index 0975fe9823..3817365f17 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.model.event; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import java.time.Instant; public interface InternalEventHandle { /** diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 565c0b9075..fb4effb413 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.ArrayList; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Future; import java.util.stream.Collectors; From df4470420f87f34b252e1b56d247ff527fa7d3b8 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 1 Nov 2023 18:55:30 +0000 Subject: [PATCH 8/8] Fixed build errors Signed-off-by: Krishna Kondaka --- .../plugins/sink/s3/S3SinkServiceTest.java | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 8610bfbf85..7160660137 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; @@ -137,6 +138,10 @@ void setUp() { lenient().when(pluginMetrics.summary(S3SinkService.S3_OBJECTS_SIZE)).thenReturn(s3ObjectSizeSummary); } + private DefaultEventHandle castToDefaultHandle(EventHandle eventHandle) { + return (DefaultEventHandle)eventHandle; + } + private S3SinkService createObjectUnderTest() { return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics); } @@ -380,8 +385,8 @@ void output_will_release_all_handles_since_a_flush() throws IOException { doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); - final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles) { + final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); + for (DefaultEventHandle eventHandle : eventHandles) { eventHandle.setAcknowledgementSet(acknowledgementSet); } s3SinkService.output(records); @@ -406,8 +411,8 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx doNothing().when(codec).writeEvent(event1, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); - final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles) { + final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); + for (DefaultEventHandle eventHandle : eventHandles) { eventHandle.setAcknowledgementSet(acknowledgementSet); } @@ -417,9 +422,9 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx } final Collection> records2 = generateRandomStringEventRecord(); - final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); + final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles2) { + for (DefaultEventHandle eventHandle : eventHandles2) { eventHandle.setAcknowledgementSet(acknowledgementSet); } @@ -447,9 +452,9 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); - final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); + final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles) { + for (DefaultEventHandle eventHandle : eventHandles) { eventHandle.setAcknowledgementSet(acknowledgementSet); } s3SinkService.output(records); @@ -473,8 +478,8 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException { doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); - final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles) { + final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); + for (DefaultEventHandle eventHandle : eventHandles) { eventHandle.setAcknowledgementSet(acknowledgementSet); } s3SinkService.output(records); @@ -482,8 +487,8 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException { verify(acknowledgementSet).release(eventHandle, true); } final Collection> records2 = generateRandomStringEventRecord(); - final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles2) { + final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); + for (DefaultEventHandle eventHandle : eventHandles2) { eventHandle.setAcknowledgementSet(acknowledgementSet); } s3SinkService.output(records2); @@ -509,8 +514,8 @@ void output_will_skip_and_drop_failed_records() throws IOException { List> records = generateEventRecords(2); Event event1 = records.get(0).getData(); Event event2 = records.get(1).getData(); - EventHandle eventHandle1 = event1.getEventHandle(); - EventHandle eventHandle2 = event2.getEventHandle(); + DefaultEventHandle eventHandle1 = (DefaultEventHandle)event1.getEventHandle(); + DefaultEventHandle eventHandle2 = (DefaultEventHandle)event2.getEventHandle(); eventHandle1.setAcknowledgementSet(acknowledgementSet); eventHandle2.setAcknowledgementSet(acknowledgementSet); @@ -545,8 +550,8 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); - final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles) { + final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); + for (DefaultEventHandle eventHandle : eventHandles) { eventHandle.setAcknowledgementSet(acknowledgementSet); } s3SinkService.output(records); @@ -555,9 +560,9 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I } final List> records2 = generateEventRecords(1); - final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); + final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (EventHandle eventHandle : eventHandles2) { + for (DefaultEventHandle eventHandle : eventHandles2) { eventHandle.setAcknowledgementSet(acknowledgementSet); } s3SinkService.output(records2);