Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify EventHandle to be created for every event and support internal and external origination times #3546

Merged
merged 8 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.lang.ref.WeakReference;

import java.time.Instant;
import java.io.Serializable;

public class DefaultEventHandle implements EventHandle, Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this package protected? I'd really like to avoid this getting used as it will be hard to clean-up in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "package protected" you mean make this as
protected class DefaultEventHandle implements EventHandle, Serializable?

private Instant externalOriginationTime;
private final Instant internalOriginationTime;
private WeakReference<AcknowledgementSet> acknowledgementSetRef;

public DefaultEventHandle(final Instant internalOriginationTime) {
this.acknowledgementSetRef = null;
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
}

@Override
public void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet);
}

@Override
public void setExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

public AcknowledgementSet getAcknowledgementSet() {
if (acknowledgementSetRef == null) {
return null;
}
return acknowledgementSetRef.get();
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
}

@Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the next PR let's scope down and first only implement internal end to end latency metric. For ddb use case we could potentially use this value for external (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_StreamRecord.html#DDB-Type-streams_StreamRecord-ApproximateCreationDateTime), so we may not need to make any hasty decisions on users configuring this time. @dlvenable What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your proposal, the Source will set this value on the Event. We don't need to let the user configure this in the pipeline. But, we could in the future. Right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Will only implement internal latency for now in the OpenSearchSink and S3 Sink

public Instant getExternalOriginationTime() {
return this.externalOriginationTime;
}

@Override
public void release(boolean result) {
System.out.println("======release called==="+result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remove this print statement

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. Yes

AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.time.Instant;

public interface EventHandle {
/**
* releases event handle
Expand All @@ -14,4 +17,46 @@ public interface EventHandle {
* @since 2.2
*/
void release(boolean result);

/**
* sets acknowledgement set
*
* @param acknowledgementSet acknowledgementSet to be set in the event handle
* @since 2.6
*/
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose all of this? It might make sense to have a different interface for internal use. Say InternalEventHandle. Then DefaultEventHandle can implement that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this suggestion. Are you suggesting we have InternalEventHandle and EventHandle separately?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'd have two interfaces:

public interface InterfaceEventHandle {
...
// methods that data-prepper-core calls are here.
}

And we implement it as:

class DefaultEventHandle implements EventHandle, InternalEventHandle
{
...
}

In data-prepper, core we'd need a cast: InternalEventHandle internalEventHandle = (InternalEventHandle) eventHandle.

This would be a good indication that somebody is doing something they shouldn't if we see it outside of data-prepper-core.


/**
* gets acknowledgement set
*
* @return returns acknowledgementSet from the event handle
* @since 2.6
*/
AcknowledgementSet getAcknowledgementSet();

/**
* sets external origination time
*
* @param externalOriginationTime externalOriginationTime to be set in the event handle
* @since 2.6
*/
void setExternalOriginationTime(final Instant externalOriginationTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should set the origination time on the EventMetadata. The EventHandle can then get this data from the EventMetadata.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal origination time is set in EventMetadata and EventHandle gets it from it. Do you want the external origination time also set in the EventMetadata?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, add this to EventMetadata.

The EventHandle can also hold a reference to the time in there.


/**
* gets external origination time
*
* @return returns externalOriginationTime from the event handle. This can be null if it is never set.
* @since 2.6
*/
Instant getExternalOriginationTime();

/**
* gets internal origination time
*
* @return returns internalOriginationTime from the event handle.
* @since 2.6
*/
Instant getInternalOriginationTime();


}
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,21 @@ protected JacksonEvent(final Builder builder) {
}

this.jsonNode = getInitialJsonNode(builder.data);
this.eventHandle = new DefaultEventHandle(eventMetadata.getTimeReceived());
}

protected JacksonEvent(final JacksonEvent otherEvent) {
this.jsonNode = otherEvent.jsonNode.deepCopy();
this.eventMetadata = DefaultEventMetadata.fromEventMetadata(otherEvent.eventMetadata);
this.eventHandle = new DefaultEventHandle(eventMetadata.getTimeReceived());
}

public static Event fromMessage(String message) {
return JacksonEvent.builder()
JacksonEvent event = JacksonEvent.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this change accomplish?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. I was trying something. undid the change. Thanks!

.withEventType(EVENT_TYPE)
.withData(Collections.singletonMap(MESSAGE_KEY, message))
.build();
return event;
}

private JsonNode getInitialJsonNode(final Object data) {
Expand Down Expand Up @@ -152,10 +155,6 @@ public void put(final String key, final Object value) {
}
}

public void setEventHandle(EventHandle handle) {
this.eventHandle = handle;
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import org.junit.jupiter.api.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.any;
import org.mockito.Mock;

import java.time.Instant;

class DefaultEventHandleTests {
@Mock
private AcknowledgementSet acknowledgementSet;

@Test
void testBasic() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.release(true);
}

@Test
void testWithAcknowledgementSet() {
acknowledgementSet = mock(AcknowledgementSet.class);
when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true);
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.setAcknowledgementSet(acknowledgementSet);
eventHandle.release(true);
verify(acknowledgementSet).release(eventHandle, true);
}

@Test
void testWithExternalOriginationTime() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.setExternalOriginationTime(now.minusSeconds(60));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60)));
eventHandle.release(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@

public class JacksonEventTest {

class TestEventHandle implements EventHandle {
@Override
public void release(boolean result) {
}
}

private Event event;

private String eventType;
Expand Down Expand Up @@ -398,6 +392,8 @@ public void testBuild_withEventType() {
.build();

assertThat(event.getMetadata().getEventType(), is(equalTo(eventType)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));
}

@Test
Expand All @@ -411,6 +407,8 @@ public void testBuild_withTimeReceived() {
.build();

assertThat(event.getMetadata().getTimeReceived(), is(equalTo(now)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(equalTo(now)));
}

@Test
Expand All @@ -422,6 +420,8 @@ public void testBuild_withMessageValue() {

assertThat(event, is(notNullValue()));
assertThat(event.get("message", String.class), is(equalTo(message)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));
}

@Test
Expand Down Expand Up @@ -678,6 +678,8 @@ void fromEvent_with_a_JacksonEvent() {

assertThat(createdEvent, notNullValue());
assertThat(createdEvent, not(sameInstance(originalEvent)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));

assertThat(createdEvent.toMap(), equalTo(dataObject));
assertThat(createdEvent.getJsonNode(), not(sameInstance(originalEvent.getJsonNode())));
Expand Down Expand Up @@ -707,19 +709,6 @@ void fromEvent_with_a_non_JacksonEvent() {
assertThat(createdEvent.getMetadata(), equalTo(eventMetadata));
}

@Test
void testEventHandleGetAndSet() {
EventHandle testEventHandle = new TestEventHandle();
final String jsonString = "{\"foo\": \"bar\"}";

final JacksonEvent event = JacksonEvent.builder()
.withEventType(eventType)
.withData(jsonString)
.build();
event.setEventHandle(testEventHandle);
assertThat(event.getEventHandle(), equalTo(testEventHandle));
}

@Test
void testJsonStringBuilder() {
final String jsonString = "{\"foo\":\"bar\"}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -33,7 +32,7 @@ class AcknowledgementSetMonitor implements Runnable {
private final AtomicInteger numNullHandles;

private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) {
return (DefaultAcknowledgementSet)((DefaultEventHandle)eventHandle).getAcknowledgementSet();
return (DefaultAcknowledgementSet)eventHandle.getAcknowledgementSet();
}

public AcknowledgementSetMonitor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
Expand Down Expand Up @@ -53,8 +52,8 @@ public void add(Event event) {
lock.lock();
try {
if (event instanceof JacksonEvent) {
EventHandle eventHandle = new DefaultEventHandle(this);
((JacksonEvent) event).setEventHandle(eventHandle);
EventHandle eventHandle = event.getEventHandle();
eventHandle.setAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
}
} finally {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void processAcknowledgements(List<Event> inputEvents, Collection outputR
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
if (Objects.nonNull(eventHandle) && !outputEventsSet.contains(event)) {
if (Objects.nonNull(eventHandle) && eventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) {
eventHandle.release(true);
} else if (acknowledgementsEnabled && Objects.isNull(eventHandle)) {
invalidEventHandlesCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager;

Expand Down Expand Up @@ -65,7 +65,7 @@ private void acquireEventReference(final Record record) {
}
if (referencedRecords.contains(record) || ((routedRecords != null) && routedRecords.contains(record))) {
EventHandle eventHandle = ((JacksonEvent)record.getData()).getEventHandle();
if (eventHandle != null) {
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
acknowledgementSetManager.acquireEventReference(eventHandle);
}
} else if (!referencedRecords.contains(record)) {
Expand Down Expand Up @@ -97,7 +97,7 @@ public Record getRecord(final Record record) {
JacksonEvent newRecordEvent;
Record newRecord;
DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle();
if (eventHandle != null) {
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
final EventMetadata eventMetadata = recordEvent.getMetadata();
final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap());
newRecordEvent = (JacksonEvent) eventBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
Expand Down
Loading
Loading