Skip to content

Commit

Permalink
Fix PipelineConnector to duplicate the events (#1897) (#1918)
Browse files Browse the repository at this point in the history
* Fix string mutate processors to duplicate the events

Signed-off-by: Krishna Kondaka <[email protected]>

* Fix string mutate processors to duplicate the events - made changes as per David's suggestions

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed unnecessary changes leftover from 1st commit

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified PipelineConnector to duplicate JacksonSpan type events too. Added testcases in PipelineConnectorTest

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comment and added a new testcase for JacksonSpan withData()

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comment and added parallel pipeline test to github/workflows

Signed-off-by: Krishna Kondaka <[email protected]>

* fixed workflow failure

Signed-off-by: Krishna Kondaka <[email protected]>

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
(cherry picked from commit 5bd7a31)

Co-authored-by: kkondaka <[email protected]>
Signed-off-by: kkondaka <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and kkondaka authored Oct 13, 2022
1 parent a22456e commit 970af02
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
strategy:
matrix:
java: [11, 17]
test: ['basicLogEndToEndTest', 'parallelGrokStringSubstituteTest']
fail-fast: false

runs-on: ubuntu-latest
Expand All @@ -26,4 +27,4 @@ jobs:
- name: Checkout Data-Prepper
uses: actions/checkout@v2
- name: Run basic grok end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:log:basicLogEndToEndTest
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:log:${{ matrix.test }}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ public Builder withJsonData(final String data) {
return this;
}

/**
* Sets the data of the event.
* @param data the data
* @since 2.0
*/
@Override
public Builder withData(final Object data) {
this.data.putAll(mapper.convertValue(data, Map.class));
return this;
}

/**
* Sets the metadata.
* @param eventMetadata the metadata
Expand Down Expand Up @@ -419,7 +430,7 @@ public Builder withServiceName(final String serviceName) {
public JacksonSpan build() {
validateParameters();
checkAndSetDefaultValues();
this.withData(data);
super.withData(data);
this.withEventType(EventType.TRACE.toString());
return new JacksonSpan(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,5 +640,51 @@ void testBuilder_withEventMetadata_with_event_invalid_event_metadata_should_thro

assertThrows(IllegalArgumentException.class, builder::build);
}

@Test
void testBuilder_withData_with_event_valid_data() {
final Map<String, Object> data = new HashMap<String, Object>();
final String traceId = "414243";
final String kind = "SPAN_KIND_INTERNAL";
final String traceGroup = "FRUITSGroup";
final String traceGroupFields = "{\"endTime\":\"1970-01-01T00:00:00Z\",\"durationInNanos\": 0,\"statusCode\": 0}";
final String spanId = "313030";
final String name = "FRUITS";
final String startTime = "1970-01-01T00:00:00Z";
final String endTime = "1970-01-02T00:00:00Z";
final String durationInNanos = "100";
data.put("traceId", traceId);
data.put("kind", kind);
data.put("traceGroup", traceGroup);
data.put("traceGroupFields", traceGroupFields);
data.put("spanId", spanId);
data.put("name", name);
data.put("startTime", startTime);
data.put("endTime", endTime);
data.put("durationInNanos", durationInNanos);

EventMetadata eventMetadata = mock(EventMetadata.class);
final Instant now = Instant.now();
when(eventMetadata.getEventType()).thenReturn(String.valueOf(EventType.TRACE));
when(eventMetadata.getTimeReceived()).thenReturn(now);


final JacksonSpan jacksonSpan = JacksonSpan.builder()
.withData(data)
.withEventMetadata(eventMetadata)
.build();

assertThat(jacksonSpan, is(notNullValue()));
assertThat(jacksonSpan.getMetadata(), is(notNullValue()));
assertThat(jacksonSpan.getMetadata().getTimeReceived(), equalTo(now));
assertThat(jacksonSpan.toMap().get("traceId"), equalTo(traceId));
assertThat(jacksonSpan.toMap().get("kind"), equalTo(kind));
assertThat(jacksonSpan.toMap().get("traceGroup"), equalTo(traceGroup));
assertThat(jacksonSpan.toMap().get("spanId"), equalTo(spanId));
assertThat(jacksonSpan.toMap().get("name"), equalTo(name));
assertThat(jacksonSpan.toMap().get("startTime"), equalTo(startTime));
assertThat(jacksonSpan.toMap().get("endTime"), equalTo(endTime));
assertThat(jacksonSpan.toMap().get("durationInNanos"), equalTo(durationInNanos));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.trace.Span;
import org.opensearch.dataprepper.model.trace.JacksonSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -54,6 +58,32 @@ public void stop() {
public void output(final Collection<T> records) {
if (buffer != null && !isStopRequested.get()) {
for (T record : records) {
if(record.getData() instanceof JacksonSpan) {
try {
final Span spanEvent = (Span)record.getData();
Span newSpanEvent = JacksonSpan.builder()
.withData(spanEvent.toMap())
.withEventMetadata(spanEvent.getMetadata())
.build();
record = (T) (new Record<>(newSpanEvent));
} catch (Exception ex) {
LOG.error("PipelineConnector [{}-{}]: exception while duplicating the event [{}]",
sinkPipelineName, sourcePipelineName, ex);
}
} else if(record.getData() instanceof Event) {
try {
final Event recordEvent = (Event)record.getData();
Event newRecordEvent = JacksonEvent.builder()
.withData(recordEvent.toMap())
.withEventMetadata(recordEvent.getMetadata())
.build();
record = (T) (new Record<>(newRecordEvent));
} catch (Exception ex) {
LOG.error("PipelineConnector [{}-{}]: exception while duplicating the event [{}]",
sinkPipelineName, sourcePipelineName, ex);
}
}

while (true) {
try {
buffer.write(record, DEFAULT_WRITE_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,155 @@

package org.opensearch.dataprepper.pipeline;

import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.buffer.TestBuffer;
import org.opensearch.dataprepper.model.trace.JacksonSpan;
import org.opensearch.dataprepper.model.trace.DefaultTraceGroupFields;
import org.opensearch.dataprepper.model.trace.DefaultLink;
import org.opensearch.dataprepper.model.trace.DefaultSpanEvent;


import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.Queue;
import java.util.Map;
import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.ImmutableMap;


import static org.junit.Assert.assertTrue;
import org.junit.jupiter.api.Assertions;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;


@RunWith(MockitoJUnitRunner.class)
public class PipelineConnectorTest {
private static final String RECORD_DATA = "RECORD_DATA";
private static final Record<String> RECORD = new Record<>(RECORD_DATA);
private static Record<Event> EVENT_RECORD;
private static Record<JacksonSpan> SPAN_RECORD;

private static final String SINK_PIPELINE_NAME = "SINK_PIPELINE_NAME";
private final String testKey = UUID.randomUUID().toString();
private final String testValue = UUID.randomUUID().toString();
private static final String TEST_TRACE_ID = UUID.randomUUID().toString();
private static final String TEST_SPAN_ID = UUID.randomUUID().toString();
private static final String TEST_TRACE_STATE = UUID.randomUUID().toString();
private static final String TEST_PARENT_SPAN_ID = UUID.randomUUID().toString();
private static final String TEST_NAME = UUID.randomUUID().toString();
private static final String TEST_KIND = UUID.randomUUID().toString();
private static final String TEST_START_TIME = UUID.randomUUID().toString();
private static final String TEST_END_TIME = UUID.randomUUID().toString();
private static final Map<String, Object> TEST_ATTRIBUTES = ImmutableMap.of("key1", new Date().getTime(), "key2", UUID.randomUUID().toString());
private static final Integer TEST_DROPPED_ATTRIBUTES_COUNT = 8;
private static final Integer TEST_DROPPED_EVENTS_COUNT = 45;
private static final Integer TEST_DROPPED_LINKS_COUNT = 21;
private static final String TEST_TRACE_GROUP = UUID.randomUUID().toString();
private static final Long TEST_DURATION_IN_NANOS = 537L;
private static final String TEST_SERVICE_NAME = UUID.randomUUID().toString();

@Mock
private Buffer<Record<String>> buffer;

private List<Record<String>> recordList;

private PipelineConnector<Record<String>> sut;

private TestBuffer eventBuffer;
private List<Record<Event>> eventRecordList;
private PipelineConnector<Record<Event>> eut;

private BlockingBuffer<Record<JacksonSpan>> spanBuffer;
private List<Record<JacksonSpan>> spanRecordList;
private DefaultTraceGroupFields defaultTraceGroupFields;
private PipelineConnector<Record<JacksonSpan>> sput;
private DefaultLink defaultLink;
private DefaultSpanEvent defaultSpanEvent;



@Before
public void setup() {
recordList = Collections.singletonList(RECORD);

sut = new PipelineConnector<>();

final Event event = JacksonEvent.builder()
.withEventType("event")
.withData(Collections.singletonMap(testKey, testValue))
.build();
EVENT_RECORD = new Record<>(event);
eventRecordList = Collections.singletonList(EVENT_RECORD);
final Queue<Record<Event>> bufferQueue = new LinkedList<>();
eventBuffer = new TestBuffer(bufferQueue, 1);
eut = new PipelineConnector<>();

defaultSpanEvent = DefaultSpanEvent.builder()
.withName(UUID.randomUUID().toString())
.withTime(UUID.randomUUID().toString())
.build();

defaultLink = DefaultLink.builder()
.withTraceId(UUID.randomUUID().toString())
.withSpanId(UUID.randomUUID().toString())
.withTraceState(UUID.randomUUID().toString())
.build();

defaultTraceGroupFields = DefaultTraceGroupFields.builder()
.withDurationInNanos(123L)
.withStatusCode(201)
.withEndTime("the End")
.build();
final JacksonSpan span = JacksonSpan.builder()
.withSpanId(TEST_SPAN_ID)
.withTraceId(TEST_TRACE_ID)
.withTraceState(TEST_TRACE_STATE)
.withParentSpanId(TEST_PARENT_SPAN_ID)
.withName(TEST_NAME)
.withServiceName(TEST_SERVICE_NAME)
.withKind(TEST_KIND)
.withStartTime(TEST_START_TIME)
.withEndTime(TEST_END_TIME)
.withAttributes(TEST_ATTRIBUTES)
.withDroppedAttributesCount(TEST_DROPPED_ATTRIBUTES_COUNT)
.withEvents(Arrays.asList(defaultSpanEvent))
.withDroppedEventsCount(TEST_DROPPED_EVENTS_COUNT)
.withLinks(Arrays.asList(defaultLink))
.withDroppedLinksCount(TEST_DROPPED_LINKS_COUNT)
.withTraceGroup(TEST_TRACE_GROUP)
.withDurationInNanos(TEST_DURATION_IN_NANOS)
.withTraceGroupFields(defaultTraceGroupFields)
.build();

SPAN_RECORD = new Record<>(span);
spanRecordList = Collections.singletonList(SPAN_RECORD);
spanBuffer = new BlockingBuffer<>("Pipeline1");
sput = new PipelineConnector<>();

}

@Test(expected = RuntimeException.class)
Expand Down Expand Up @@ -78,6 +189,43 @@ public void testOutputSuccess() throws Exception {
verify(buffer).write(eq(RECORD), anyInt());
}

@Test
public void testEventBufferOutputSuccess() throws Exception {
eut.start(eventBuffer);

eut.output(eventRecordList);

Map.Entry<Collection<Record<Event>>, CheckpointState> ent = eventBuffer.read(1);
ArrayList<Record<Event>> records = new ArrayList<>(ent.getKey());
// Make sure the records are different
assertThat(eventRecordList.get(0), not(sameInstance(records.get(0))));
// Make sure the events are different
Event event1 = eventRecordList.get(0).getData();
Event event2 = records.get(0).getData();
assertThat(event1, not(sameInstance(event2)));
event1.toMap().forEach((k, v)-> Assertions.assertEquals(event2.get(k, String.class), v));
event1.toMap().forEach((k, v)-> Assertions.assertEquals(k, testKey));
event1.toMap().forEach((k, v)-> Assertions.assertEquals(v, testValue));

}

@Test
public void testSpanBufferOutputSuccess() throws Exception {
sput.start(spanBuffer);

sput.output(spanRecordList);

Map.Entry<Collection<Record<JacksonSpan>>, CheckpointState> ent = spanBuffer.doRead(10000);
ArrayList<Record<JacksonSpan>> records = new ArrayList<>(ent.getKey());
// Make sure the records are different
assertThat(spanRecordList.get(0), not(sameInstance(records.get(0))));
// Make sure the spans are different
JacksonSpan span1 = spanRecordList.get(0).getData();
JacksonSpan span2 = records.get(0).getData();
assertThat(span1, not(sameInstance(span2)));
span1.toMap().forEach((k, v) -> Assertions.assertEquals(span2.toMap().get(k), v));
}

@Test
public void testSetSinkPipelineName() {
sut.setSinkPipelineName(SINK_PIPELINE_NAME);
Expand Down
Loading

0 comments on commit 970af02

Please sign in to comment.