Skip to content

Commit

Permalink
Modify EventHandle to be created for every event and support internal…
Browse files Browse the repository at this point in the history
… and external origination times (#3546)

* Modify EventHandle to be created for every event and support internal and external origination times

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed build failures

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed build failures

Signed-off-by: Krishna Kondaka <[email protected]>

* fixed failing checkstyle error

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed build errors

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments by adding InternalEventHandle

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle errors

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed build errors

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Nov 1, 2023
1 parent d868229 commit 7869eb7
Show file tree
Hide file tree
Showing 34 changed files with 399 additions and 244 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.lang.ref.WeakReference;

import java.time.Instant;
import java.io.Serializable;

public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable {
private Instant externalOriginationTime;
private final Instant internalOriginationTime;
private WeakReference<AcknowledgementSet> acknowledgementSetRef;

public DefaultEventHandle(final Instant internalOriginationTime) {
this.acknowledgementSetRef = null;
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
}

@Override
public void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet);
}

@Override
public void setExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

public AcknowledgementSet getAcknowledgementSet() {
if (acknowledgementSetRef == null) {
return null;
}
return acknowledgementSetRef.get();
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
}

@Override
public Instant getExternalOriginationTime() {
return this.externalOriginationTime;
}

@Override
public void release(boolean result) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class DefaultEventMetadata implements EventMetadata {

private final Instant timeReceived;

private Instant externalOriginationTime;

private Map<String, Object> attributes;

private Set<String> tags;
Expand All @@ -43,13 +45,15 @@ private DefaultEventMetadata(final Builder builder) {
this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes);

this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags);
this.externalOriginationTime = null;
}

private DefaultEventMetadata(final EventMetadata eventMetadata) {
this.eventType = eventMetadata.getEventType();
this.timeReceived = eventMetadata.getTimeReceived();
this.attributes = new HashMap<>(eventMetadata.getAttributes());
this.tags = new HashSet<>(eventMetadata.getTags());
this.externalOriginationTime = null;
}

@Override
Expand All @@ -62,6 +66,16 @@ public Instant getTimeReceived() {
return timeReceived;
}

@Override
public Instant getExternalOriginationTime() {
return externalOriginationTime;
}

@Override
public void setExternalOriginationTime(Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

@Override
public Map<String, Object> getAttributes() {
return attributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.model.event;

import java.time.Instant;

public interface EventHandle {
/**
* releases event handle
Expand All @@ -14,4 +16,30 @@ public interface EventHandle {
* @since 2.2
*/
void release(boolean result);

/**
* sets external origination time
*
* @param externalOriginationTime externalOriginationTime to be set in the event handle
* @since 2.6
*/
void setExternalOriginationTime(final Instant externalOriginationTime);

/**
* gets external origination time
*
* @return returns externalOriginationTime from the event handle. This can be null if it is never set.
* @since 2.6
*/
Instant getExternalOriginationTime();

/**
* gets internal origination time
*
* @return returns internalOriginationTime from the event handle.
* @since 2.6
*/
Instant getInternalOriginationTime();


}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ public interface EventMetadata extends Serializable {
*/
Instant getTimeReceived();

/**
* Returns the external origination time of the event
* @return the external origination time
* @since 2.6
*/
Instant getExternalOriginationTime();

/**
* Sets the external origination time of the event
* @param externalOriginationTime the external origination time
* @since 2.6
*/
void setExternalOriginationTime(Instant externalOriginationTime);

/**
* Returns the attributes
* @return a map of attributes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

public interface InternalEventHandle {
/**
* sets acknowledgement set
*
* @param acknowledgementSet acknowledgementSet to be set in the event handle
* @since 2.6
*/
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet);

/**
* gets acknowledgement set
*
* @return returns acknowledgementSet from the event handle
* @since 2.6
*/
AcknowledgementSet getAcknowledgementSet();

}

Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,13 @@ protected JacksonEvent(final Builder builder) {
}

this.jsonNode = getInitialJsonNode(builder.data);
this.eventHandle = new DefaultEventHandle(eventMetadata.getTimeReceived());
}

protected JacksonEvent(final JacksonEvent otherEvent) {
this.jsonNode = otherEvent.jsonNode.deepCopy();
this.eventMetadata = DefaultEventMetadata.fromEventMetadata(otherEvent.eventMetadata);
this.eventHandle = new DefaultEventHandle(eventMetadata.getTimeReceived());
}

public static Event fromMessage(String message) {
Expand Down Expand Up @@ -152,10 +154,6 @@ public void put(final String key, final Object value) {
}
}

public void setEventHandle(EventHandle handle) {
this.eventHandle = handle;
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import org.junit.jupiter.api.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.any;
import org.mockito.Mock;

import java.time.Instant;

class DefaultEventHandleTests {
@Mock
private AcknowledgementSet acknowledgementSet;

@Test
void testBasic() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.release(true);
}

@Test
void testWithAcknowledgementSet() {
acknowledgementSet = mock(AcknowledgementSet.class);
when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true);
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.setAcknowledgementSet(acknowledgementSet);
eventHandle.release(true);
verify(acknowledgementSet).release(eventHandle, true);
}

@Test
void testWithExternalOriginationTime() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.setExternalOriginationTime(now.minusSeconds(60));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60)));
eventHandle.release(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
Expand Down Expand Up @@ -80,6 +81,16 @@ public void testGetTimeReceived() {
assertThat(timeReceived, is(equalTo(testTimeReceived)));
}

@Test
public void testExternalOriginationTime() {
Instant externalOriginationTime = eventMetadata.getExternalOriginationTime();
assertThat(externalOriginationTime, is(nullValue()));
Instant now = Instant.now();
eventMetadata.setExternalOriginationTime(now);
externalOriginationTime = eventMetadata.getExternalOriginationTime();
assertThat(externalOriginationTime, is(equalTo(now)));
}

@Test
public void testGetAttributes() {
final Map<String, Object> attributes = eventMetadata.getAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@

public class JacksonEventTest {

class TestEventHandle implements EventHandle {
@Override
public void release(boolean result) {
}
}

private Event event;

private String eventType;
Expand Down Expand Up @@ -398,6 +392,8 @@ public void testBuild_withEventType() {
.build();

assertThat(event.getMetadata().getEventType(), is(equalTo(eventType)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));
}

@Test
Expand All @@ -411,6 +407,8 @@ public void testBuild_withTimeReceived() {
.build();

assertThat(event.getMetadata().getTimeReceived(), is(equalTo(now)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(equalTo(now)));
}

@Test
Expand All @@ -422,6 +420,8 @@ public void testBuild_withMessageValue() {

assertThat(event, is(notNullValue()));
assertThat(event.get("message", String.class), is(equalTo(message)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));
}

@Test
Expand Down Expand Up @@ -678,6 +678,8 @@ void fromEvent_with_a_JacksonEvent() {

assertThat(createdEvent, notNullValue());
assertThat(createdEvent, not(sameInstance(originalEvent)));
assertThat(event.getEventHandle(), is(notNullValue()));
assertThat(event.getEventHandle().getInternalOriginationTime(), is(notNullValue()));

assertThat(createdEvent.toMap(), equalTo(dataObject));
assertThat(createdEvent.getJsonNode(), not(sameInstance(originalEvent.getJsonNode())));
Expand Down Expand Up @@ -707,19 +709,6 @@ void fromEvent_with_a_non_JacksonEvent() {
assertThat(createdEvent.getMetadata(), equalTo(eventMetadata));
}

@Test
void testEventHandleGetAndSet() {
EventHandle testEventHandle = new TestEventHandle();
final String jsonString = "{\"foo\": \"bar\"}";

final JacksonEvent event = JacksonEvent.builder()
.withEventType(eventType)
.withData(jsonString)
.build();
event.setEventHandle(testEventHandle);
assertThat(event.getEventHandle(), equalTo(testEventHandle));
}

@Test
void testJsonStringBuilder() {
final String jsonString = "{\"foo\":\"bar\"}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -33,7 +34,12 @@ class AcknowledgementSetMonitor implements Runnable {
private final AtomicInteger numNullHandles;

private DefaultAcknowledgementSet getAcknowledgementSet(final EventHandle eventHandle) {
return (DefaultAcknowledgementSet)((DefaultEventHandle)eventHandle).getAcknowledgementSet();
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
return (DefaultAcknowledgementSet)internalEventHandle.getAcknowledgementSet();
} else {
throw new RuntimeException("Unsupported event handle");
}
}

public AcknowledgementSetMonitor() {
Expand Down
Loading

0 comments on commit 7869eb7

Please sign in to comment.