Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify EventHandle to be created for every event and support internal and external origination times #3546

Merged
merged 8 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.lang.ref.WeakReference;

import java.time.Instant;
import java.io.Serializable;

public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable {
private Instant externalOriginationTime;
private final Instant internalOriginationTime;
private WeakReference<AcknowledgementSet> acknowledgementSetRef;

public DefaultEventHandle(final Instant internalOriginationTime) {
this.acknowledgementSetRef = null;
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
}

@Override
public void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet);
}

@Override
public void setExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

public AcknowledgementSet getAcknowledgementSet() {
if (acknowledgementSetRef == null) {
return null;
}
return acknowledgementSetRef.get();
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
}

@Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the next PR let's scope down and first only implement internal end to end latency metric. For ddb use case we could potentially use this value for external (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_StreamRecord.html#DDB-Type-streams_StreamRecord-ApproximateCreationDateTime), so we may not need to make any hasty decisions on users configuring this time. @dlvenable What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your proposal, the Source will set this value on the Event. We don't need to let the user configure this in the pipeline. But, we could in the future. Right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Will only implement internal latency for now in the OpenSearchSink and S3 Sink

public Instant getExternalOriginationTime() {
return this.externalOriginationTime;
}

@Override
public void release(boolean result) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class DefaultEventMetadata implements EventMetadata {

private final Instant timeReceived;

private Instant externalOriginationTime;

private Map<String, Object> attributes;

private Set<String> tags;
Expand All @@ -43,13 +45,15 @@ private DefaultEventMetadata(final Builder builder) {
this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes);

this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags);
this.externalOriginationTime = null;
}

private DefaultEventMetadata(final EventMetadata eventMetadata) {
this.eventType = eventMetadata.getEventType();
this.timeReceived = eventMetadata.getTimeReceived();
this.attributes = new HashMap<>(eventMetadata.getAttributes());
this.tags = new HashSet<>(eventMetadata.getTags());
this.externalOriginationTime = null;
}

@Override
Expand All @@ -62,6 +66,16 @@ public Instant getTimeReceived() {
return timeReceived;
}

@Override
public Instant getExternalOriginationTime() {
return externalOriginationTime;
}

@Override
public void setExternalOriginationTime(Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

@Override
public Map<String, Object> getAttributes() {
return attributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.model.event;

import java.time.Instant;

public interface EventHandle {
/**
* releases event handle
Expand All @@ -14,4 +16,30 @@ public interface EventHandle {
* @since 2.2
*/
void release(boolean result);

/**
* sets external origination time
*
* @param externalOriginationTime externalOriginationTime to be set in the event handle
* @since 2.6
*/
void setExternalOriginationTime(final Instant externalOriginationTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should set the origination time on the EventMetadata. The EventHandle can then get this data from the EventMetadata.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal origination time is set in EventMetadata and EventHandle gets it from it. Do you want the external origination time also set in the EventMetadata?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, add this to EventMetadata.

The EventHandle can also hold a reference to the time in there.


/**
* gets external origination time
*
* @return returns externalOriginationTime from the event handle. This can be null if it is never set.
* @since 2.6
*/
Instant getExternalOriginationTime();

/**
* gets internal origination time
*
* @return returns internalOriginationTime from the event handle.
* @since 2.6
*/
Instant getInternalOriginationTime();


}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ public interface EventMetadata extends Serializable {
*/
Instant getTimeReceived();

/**
* Returns the external origination time of the event
* @return the external origination time
* @since 2.6
*/
Instant getExternalOriginationTime();

/**
* Sets the external origination time of the event
* @param externalOriginationTime the external origination time
* @since 2.6
*/
void setExternalOriginationTime(Instant externalOriginationTime);

/**
* Returns the attributes
* @return a map of attributes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

public interface InternalEventHandle {
/**
* sets acknowledgement set
*
* @param acknowledgementSet acknowledgementSet to be set in the event handle
* @since 2.6
*/
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet);

/**
* gets acknowledgement set
*
* @return returns acknowledgementSet from the event handle
* @since 2.6
*/
AcknowledgementSet getAcknowledgementSet();

}

Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,13 @@ protected JacksonEvent(final Builder builder) {
}

this.jsonNode = getInitialJsonNode(builder.data);
this.eventHandle = new DefaultEventHandle(eventMetadata.getTimeReceived());
}

protected JacksonEvent(final JacksonEvent otherEvent) {
this.jsonNode = otherEvent.jsonNode.deepCopy();
this.eventMetadata = DefaultEventMetadata.fromEventMetadata(otherEvent.eventMetadata);
this.eventHandle = new DefaultEventHandle(eventMetadata.getTimeReceived());
}

public static Event fromMessage(String message) {
Expand Down Expand Up @@ -152,10 +154,6 @@ public void put(final String key, final Object value) {
}
}

public void setEventHandle(EventHandle handle) {
this.eventHandle = handle;
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import org.junit.jupiter.api.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.any;
import org.mockito.Mock;

import java.time.Instant;

class DefaultEventHandleTests {
@Mock
private AcknowledgementSet acknowledgementSet;

@Test
void testBasic() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.release(true);
}

@Test
void testWithAcknowledgementSet() {
acknowledgementSet = mock(AcknowledgementSet.class);
when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true);
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.setAcknowledgementSet(acknowledgementSet);
eventHandle.release(true);
verify(acknowledgementSet).release(eventHandle, true);
}

@Test
void testWithExternalOriginationTime() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.setExternalOriginationTime(now.minusSeconds(60));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60)));
eventHandle.release(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
Expand Down Expand Up @@ -80,6 +81,16 @@ public void testGetTimeReceived() {
assertThat(timeReceived, is(equalTo(testTimeReceived)));
}

@Test
public void testExternalOriginationTime() {
Instant externalOriginationTime = eventMetadata.getExternalOriginationTime();
assertThat(externalOriginationTime, is(nullValue()));
Instant now = Instant.now();
eventMetadata.setExternalOriginationTime(now);
externalOriginationTime = eventMetadata.getExternalOriginationTime();
assertThat(externalOriginationTime, is(equalTo(now)));
}

@Test
public void testGetAttributes() {
final Map<String, Object> attributes = eventMetadata.getAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@

public class JacksonEventTest {

class TestEventHandle implements EventHandle {
@Override
public void release(boolean result) {
}
}

private Event event;

private String eventType;
Expand Down Expand Up @@ -398,6 +392,8 @@ public void testBuild_withEventType() {
.build();

assertThat(event.getMetadata().getEventType(), is(equalTo(eventType)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));
}

@Test
Expand All @@ -411,6 +407,8 @@ public void testBuild_withTimeReceived() {
.build();

assertThat(event.getMetadata().getTimeReceived(), is(equalTo(now)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(equalTo(now)));
}

@Test
Expand All @@ -422,6 +420,8 @@ public void testBuild_withMessageValue() {

assertThat(event, is(notNullValue()));
assertThat(event.get("message", String.class), is(equalTo(message)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));
}

@Test
Expand Down Expand Up @@ -678,6 +678,8 @@ void fromEvent_with_a_JacksonEvent() {

assertThat(createdEvent, notNullValue());
assertThat(createdEvent, not(sameInstance(originalEvent)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));

assertThat(createdEvent.toMap(), equalTo(dataObject));
assertThat(createdEvent.getJsonNode(), not(sameInstance(originalEvent.getJsonNode())));
Expand Down Expand Up @@ -707,19 +709,6 @@ void fromEvent_with_a_non_JacksonEvent() {
assertThat(createdEvent.getMetadata(), equalTo(eventMetadata));
}

@Test
void testEventHandleGetAndSet() {
EventHandle testEventHandle = new TestEventHandle();
final String jsonString = "{\"foo\": \"bar\"}";

final JacksonEvent event = JacksonEvent.builder()
.withEventType(eventType)
.withData(jsonString)
.build();
event.setEventHandle(testEventHandle);
assertThat(event.getEventHandle(), equalTo(testEventHandle));
}

@Test
void testJsonStringBuilder() {
final String jsonString = "{\"foo\":\"bar\"}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -33,7 +34,12 @@ class AcknowledgementSetMonitor implements Runnable {
private final AtomicInteger numNullHandles;

private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) {
return (DefaultAcknowledgementSet)((DefaultEventHandle)eventHandle).getAcknowledgementSet();
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
return (DefaultAcknowledgementSet)internalEventHandle.getAcknowledgementSet();
} else {
throw new RuntimeException("Unsupported event handle");
}
}

public AcknowledgementSetMonitor() {
Expand Down
Loading
Loading