From 970af0236b359a5410b22c9f637976af36869b3f Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 12 Oct 2022 20:27:07 -0500 Subject: [PATCH] Fix PipelineConnector to duplicate the events (#1897) (#1918) * Fix string mutate processors to duplicate the events Signed-off-by: Krishna Kondaka * Fix string mutate processors to duplicate the events - made changes as per David's suggestions Signed-off-by: Krishna Kondaka * Removed unnecessary changes leftover from 1st commit Signed-off-by: Krishna Kondaka * Modified PipelineConnector to duplicate JacksonSpan type events too. Added testcases in PipelineConnectorTest Signed-off-by: Krishna Kondaka * Addressed review comments Signed-off-by: Krishna Kondaka * Addressed review comment and added a new testcase for JacksonSpan withData() Signed-off-by: Krishna Kondaka * Addressed review comment and added parallel pipeline test to github/workflows Signed-off-by: Krishna Kondaka * fixed workflow failure Signed-off-by: Krishna Kondaka Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka (cherry picked from commit 5bd7a31b02f6e316c865b49b24cd719555ab51f0) Co-authored-by: kkondaka <41027584+kkondaka@users.noreply.github.com> Signed-off-by: kkondaka <41027584+kkondaka@users.noreply.github.com> --- ...per-log-analytics-basic-grok-e2e-tests.yml | 3 +- .../dataprepper/model/trace/JacksonSpan.java | 13 +- .../model/trace/JacksonSpanTest.java | 46 ++++++ .../pipeline/PipelineConnector.java | 30 ++++ .../pipeline/PipelineConnectorTest.java | 154 +++++++++++++++++- e2e-test/log/build.gradle | 40 ++++- .../ParallelGrokStringSubstituteLogTest.java | 147 +++++++++++++++++ .../parallel-grok-substitute-e2e-pipeline.yml | 40 +++++ 8 files changed, 464 insertions(+), 9 deletions(-) create mode 100644 e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/ParallelGrokStringSubstituteLogTest.java create mode 100644 e2e-test/log/src/integrationTest/resources/parallel-grok-substitute-e2e-pipeline.yml diff --git a/.github/workflows/data-prepper-log-analytics-basic-grok-e2e-tests.yml b/.github/workflows/data-prepper-log-analytics-basic-grok-e2e-tests.yml index 11791763ab..ac4ec06a24 100644 --- a/.github/workflows/data-prepper-log-analytics-basic-grok-e2e-tests.yml +++ b/.github/workflows/data-prepper-log-analytics-basic-grok-e2e-tests.yml @@ -14,6 +14,7 @@ jobs: strategy: matrix: java: [11, 17] + test: ['basicLogEndToEndTest', 'parallelGrokStringSubstituteTest'] fail-fast: false runs-on: ubuntu-latest @@ -26,4 +27,4 @@ jobs: - name: Checkout Data-Prepper uses: actions/checkout@v2 - name: Run basic grok end-to-end tests with Gradle - run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:log:basicLogEndToEndTest \ No newline at end of file + run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:log:${{ matrix.test }} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java index 875251c977..342d9284d5 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java @@ -219,6 +219,17 @@ public Builder withJsonData(final String data) { return this; } + /** + * Sets the data of the event. + * @param data the data + * @since 2.0 + */ + @Override + public Builder withData(final Object data) { + this.data.putAll(mapper.convertValue(data, Map.class)); + return this; + } + /** * Sets the metadata. * @param eventMetadata the metadata @@ -419,7 +430,7 @@ public Builder withServiceName(final String serviceName) { public JacksonSpan build() { validateParameters(); checkAndSetDefaultValues(); - this.withData(data); + super.withData(data); this.withEventType(EventType.TRACE.toString()); return new JacksonSpan(this); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java index a6157e8e09..1783ef9dee 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java @@ -640,5 +640,51 @@ void testBuilder_withEventMetadata_with_event_invalid_event_metadata_should_thro assertThrows(IllegalArgumentException.class, builder::build); } + + @Test + void testBuilder_withData_with_event_valid_data() { + final Map data = new HashMap(); + final String traceId = "414243"; + final String kind = "SPAN_KIND_INTERNAL"; + final String traceGroup = "FRUITSGroup"; + final String traceGroupFields = "{\"endTime\":\"1970-01-01T00:00:00Z\",\"durationInNanos\": 0,\"statusCode\": 0}"; + final String spanId = "313030"; + final String name = "FRUITS"; + final String startTime = "1970-01-01T00:00:00Z"; + final String endTime = "1970-01-02T00:00:00Z"; + final String durationInNanos = "100"; + data.put("traceId", traceId); + data.put("kind", kind); + data.put("traceGroup", traceGroup); + data.put("traceGroupFields", traceGroupFields); + data.put("spanId", spanId); + data.put("name", name); + data.put("startTime", startTime); + data.put("endTime", endTime); + data.put("durationInNanos", durationInNanos); + + EventMetadata eventMetadata = mock(EventMetadata.class); + final Instant now = Instant.now(); + when(eventMetadata.getEventType()).thenReturn(String.valueOf(EventType.TRACE)); + when(eventMetadata.getTimeReceived()).thenReturn(now); + + + final JacksonSpan jacksonSpan = JacksonSpan.builder() + .withData(data) + .withEventMetadata(eventMetadata) + .build(); + + assertThat(jacksonSpan, is(notNullValue())); + assertThat(jacksonSpan.getMetadata(), is(notNullValue())); + assertThat(jacksonSpan.getMetadata().getTimeReceived(), equalTo(now)); + assertThat(jacksonSpan.toMap().get("traceId"), equalTo(traceId)); + assertThat(jacksonSpan.toMap().get("kind"), equalTo(kind)); + assertThat(jacksonSpan.toMap().get("traceGroup"), equalTo(traceGroup)); + assertThat(jacksonSpan.toMap().get("spanId"), equalTo(spanId)); + assertThat(jacksonSpan.toMap().get("name"), equalTo(name)); + assertThat(jacksonSpan.toMap().get("startTime"), equalTo(startTime)); + assertThat(jacksonSpan.toMap().get("endTime"), equalTo(endTime)); + assertThat(jacksonSpan.toMap().get("durationInNanos"), equalTo(durationInNanos)); + } } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineConnector.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineConnector.java index 6fb839dd8b..55fe4c4b7e 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineConnector.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineConnector.java @@ -9,6 +9,10 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.trace.Span; +import org.opensearch.dataprepper.model.trace.JacksonSpan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +58,32 @@ public void stop() { public void output(final Collection records) { if (buffer != null && !isStopRequested.get()) { for (T record : records) { + if(record.getData() instanceof JacksonSpan) { + try { + final Span spanEvent = (Span)record.getData(); + Span newSpanEvent = JacksonSpan.builder() + .withData(spanEvent.toMap()) + .withEventMetadata(spanEvent.getMetadata()) + .build(); + record = (T) (new Record<>(newSpanEvent)); + } catch (Exception ex) { + LOG.error("PipelineConnector [{}-{}]: exception while duplicating the event [{}]", + sinkPipelineName, sourcePipelineName, ex); + } + } else if(record.getData() instanceof Event) { + try { + final Event recordEvent = (Event)record.getData(); + Event newRecordEvent = JacksonEvent.builder() + .withData(recordEvent.toMap()) + .withEventMetadata(recordEvent.getMetadata()) + .build(); + record = (T) (new Record<>(newRecordEvent)); + } catch (Exception ex) { + LOG.error("PipelineConnector [{}-{}]: exception while duplicating the event [{}]", + sinkPipelineName, sourcePipelineName, ex); + } + } + while (true) { try { buffer.write(record, DEFAULT_WRITE_TIMEOUT); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineConnectorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineConnectorTest.java index fc6f9d4c2e..ca47f9f6be 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineConnectorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineConnectorTest.java @@ -5,19 +5,46 @@ package org.opensearch.dataprepper.pipeline; +import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.plugins.buffer.TestBuffer; +import org.opensearch.dataprepper.model.trace.JacksonSpan; +import org.opensearch.dataprepper.model.trace.DefaultTraceGroupFields; +import org.opensearch.dataprepper.model.trace.DefaultLink; +import org.opensearch.dataprepper.model.trace.DefaultSpanEvent; + + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThat; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.UUID; +import java.util.Queue; +import java.util.Map; +import java.util.Date; +import java.util.LinkedList; import java.util.concurrent.TimeoutException; +import com.google.common.collect.ImmutableMap; + import static org.junit.Assert.assertTrue; +import org.junit.jupiter.api.Assertions; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -25,24 +52,108 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; + @RunWith(MockitoJUnitRunner.class) public class PipelineConnectorTest { private static final String RECORD_DATA = "RECORD_DATA"; private static final Record RECORD = new Record<>(RECORD_DATA); + private static Record EVENT_RECORD; + private static Record SPAN_RECORD; + private static final String SINK_PIPELINE_NAME = "SINK_PIPELINE_NAME"; + private final String testKey = UUID.randomUUID().toString(); + private final String testValue = UUID.randomUUID().toString(); + private static final String TEST_TRACE_ID = UUID.randomUUID().toString(); + private static final String TEST_SPAN_ID = UUID.randomUUID().toString(); + private static final String TEST_TRACE_STATE = UUID.randomUUID().toString(); + private static final String TEST_PARENT_SPAN_ID = UUID.randomUUID().toString(); + private static final String TEST_NAME = UUID.randomUUID().toString(); + private static final String TEST_KIND = UUID.randomUUID().toString(); + private static final String TEST_START_TIME = UUID.randomUUID().toString(); + private static final String TEST_END_TIME = UUID.randomUUID().toString(); + private static final Map TEST_ATTRIBUTES = ImmutableMap.of("key1", new Date().getTime(), "key2", UUID.randomUUID().toString()); + private static final Integer TEST_DROPPED_ATTRIBUTES_COUNT = 8; + private static final Integer TEST_DROPPED_EVENTS_COUNT = 45; + private static final Integer TEST_DROPPED_LINKS_COUNT = 21; + private static final String TEST_TRACE_GROUP = UUID.randomUUID().toString(); + private static final Long TEST_DURATION_IN_NANOS = 537L; + private static final String TEST_SERVICE_NAME = UUID.randomUUID().toString(); @Mock private Buffer> buffer; - private List> recordList; - private PipelineConnector> sut; + private TestBuffer eventBuffer; + private List> eventRecordList; + private PipelineConnector> eut; + + private BlockingBuffer> spanBuffer; + private List> spanRecordList; + private DefaultTraceGroupFields defaultTraceGroupFields; + private PipelineConnector> sput; + private DefaultLink defaultLink; + private DefaultSpanEvent defaultSpanEvent; + + + @Before public void setup() { recordList = Collections.singletonList(RECORD); - sut = new PipelineConnector<>(); + + final Event event = JacksonEvent.builder() + .withEventType("event") + .withData(Collections.singletonMap(testKey, testValue)) + .build(); + EVENT_RECORD = new Record<>(event); + eventRecordList = Collections.singletonList(EVENT_RECORD); + final Queue> bufferQueue = new LinkedList<>(); + eventBuffer = new TestBuffer(bufferQueue, 1); + eut = new PipelineConnector<>(); + + defaultSpanEvent = DefaultSpanEvent.builder() + .withName(UUID.randomUUID().toString()) + .withTime(UUID.randomUUID().toString()) + .build(); + + defaultLink = DefaultLink.builder() + .withTraceId(UUID.randomUUID().toString()) + .withSpanId(UUID.randomUUID().toString()) + .withTraceState(UUID.randomUUID().toString()) + .build(); + + defaultTraceGroupFields = DefaultTraceGroupFields.builder() + .withDurationInNanos(123L) + .withStatusCode(201) + .withEndTime("the End") + .build(); + final JacksonSpan span = JacksonSpan.builder() + .withSpanId(TEST_SPAN_ID) + .withTraceId(TEST_TRACE_ID) + .withTraceState(TEST_TRACE_STATE) + .withParentSpanId(TEST_PARENT_SPAN_ID) + .withName(TEST_NAME) + .withServiceName(TEST_SERVICE_NAME) + .withKind(TEST_KIND) + .withStartTime(TEST_START_TIME) + .withEndTime(TEST_END_TIME) + .withAttributes(TEST_ATTRIBUTES) + .withDroppedAttributesCount(TEST_DROPPED_ATTRIBUTES_COUNT) + .withEvents(Arrays.asList(defaultSpanEvent)) + .withDroppedEventsCount(TEST_DROPPED_EVENTS_COUNT) + .withLinks(Arrays.asList(defaultLink)) + .withDroppedLinksCount(TEST_DROPPED_LINKS_COUNT) + .withTraceGroup(TEST_TRACE_GROUP) + .withDurationInNanos(TEST_DURATION_IN_NANOS) + .withTraceGroupFields(defaultTraceGroupFields) + .build(); + + SPAN_RECORD = new Record<>(span); + spanRecordList = Collections.singletonList(SPAN_RECORD); + spanBuffer = new BlockingBuffer<>("Pipeline1"); + sput = new PipelineConnector<>(); + } @Test(expected = RuntimeException.class) @@ -78,6 +189,43 @@ public void testOutputSuccess() throws Exception { verify(buffer).write(eq(RECORD), anyInt()); } + @Test + public void testEventBufferOutputSuccess() throws Exception { + eut.start(eventBuffer); + + eut.output(eventRecordList); + + Map.Entry>, CheckpointState> ent = eventBuffer.read(1); + ArrayList> records = new ArrayList<>(ent.getKey()); + // Make sure the records are different + assertThat(eventRecordList.get(0), not(sameInstance(records.get(0)))); + // Make sure the events are different + Event event1 = eventRecordList.get(0).getData(); + Event event2 = records.get(0).getData(); + assertThat(event1, not(sameInstance(event2))); + event1.toMap().forEach((k, v)-> Assertions.assertEquals(event2.get(k, String.class), v)); + event1.toMap().forEach((k, v)-> Assertions.assertEquals(k, testKey)); + event1.toMap().forEach((k, v)-> Assertions.assertEquals(v, testValue)); + + } + + @Test + public void testSpanBufferOutputSuccess() throws Exception { + sput.start(spanBuffer); + + sput.output(spanRecordList); + + Map.Entry>, CheckpointState> ent = spanBuffer.doRead(10000); + ArrayList> records = new ArrayList<>(ent.getKey()); + // Make sure the records are different + assertThat(spanRecordList.get(0), not(sameInstance(records.get(0)))); + // Make sure the spans are different + JacksonSpan span1 = spanRecordList.get(0).getData(); + JacksonSpan span2 = records.get(0).getData(); + assertThat(span1, not(sameInstance(span2))); + span1.toMap().forEach((k, v) -> Assertions.assertEquals(span2.toMap().get(k), v)); + } + @Test public void testSetSinkPipelineName() { sut.setSinkPipelineName(SINK_PIPELINE_NAME); diff --git a/e2e-test/log/build.gradle b/e2e-test/log/build.gradle index 3ef8ecccbe..1ce8246ecf 100644 --- a/e2e-test/log/build.gradle +++ b/e2e-test/log/build.gradle @@ -31,6 +31,8 @@ task removeDataPrepperNetwork(type: DockerRemoveNetwork) { } def BASIC_GROK_PIPELINE_YAML = "basic-grok-e2e-pipeline.yml" +def PARALLEL_GROK_SUBSTITUTE_PIPELINE_YAML = "parallel-grok-substitute-e2e-pipeline.yml" +def DATA_PREPPER_CONFIG_YAML = "data_prepper.yml" /** * DataPrepper Docker tasks @@ -41,8 +43,6 @@ task createDataPrepperDockerFile(type: Dockerfile) { from(dataPrepperBaseImage) workingDir("/app/data-prepper") copyFile("${dataPrepperJarFilepath}", "/app/data-prepper/lib") - copyFile("src/integrationTest/resources/${BASIC_GROK_PIPELINE_YAML}", "/app/data-prepper/pipelines/pipelines.yaml") - copyFile("src/integrationTest/resources/data_prepper.yml", "/app/data-prepper/config/data-prepper-config.yaml") defaultCommand('java', '-Ddata-prepper.dir=/app/data-prepper', '-cp', '/app/data-prepper/lib/*', 'org.opensearch.dataprepper.DataPrepperExecute') } @@ -54,13 +54,16 @@ task buildDataPrepperDockerImage(type: DockerBuildImage) { } def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int sourcePort, - final int serverPort) { + final int serverPort, final String pipelineConfigYAML, final String dataPrepperConfigYAML) { return tasks.create("create${taskBaseName}", DockerCreateContainer) { dependsOn buildDataPrepperDockerImage dependsOn createDataPrepperNetwork containerName = dataPrepperName exposePorts("tcp", [2021, 4900]) hostConfig.portBindings = [String.format('%d:2021', sourcePort), String.format('%d:4900', serverPort)] + hostConfig.binds = [(project.file("src/integrationTest/resources/${pipelineConfigYAML}").toString()):"/app/data-prepper/pipelines/pipelines.yaml", + (project.file("/tmp/AAA.out").toString()):"/tmp/AAA.out", + (project.file("src/integrationTest/resources/${dataPrepperConfigYAML}").toString()):"/app/data-prepper/config/data-prepper-config.yaml"] hostConfig.network = createDataPrepperNetwork.getNetworkName() cmd = ['java', '-Ddata-prepper.dir=/app/data-prepper', '-cp', '/app/data-prepper/lib/*', 'org.opensearch.dataprepper.DataPrepperExecute'] targetImageId buildDataPrepperDockerImage.getImageId() @@ -129,7 +132,7 @@ task basicLogEndToEndTest(type: Test) { dependsOn build dependsOn startOpenSearchDockerContainer def createDataPrepperTask = createDataPrepperDockerContainer( - "basicLogDataPrepper", "dataprepper", 2021, 4900) + "basicLogDataPrepper", "dataprepper", 2021, 4900, "${BASIC_GROK_PIPELINE_YAML}", "${DATA_PREPPER_CONFIG_YAML}") def startDataPrepperTask = startDataPrepperDockerContainer(createDataPrepperTask as DockerCreateContainer) dependsOn startDataPrepperTask startDataPrepperTask.mustRunAfter 'startOpenSearchDockerContainer' @@ -154,6 +157,35 @@ task basicLogEndToEndTest(type: Test) { finalizedBy removeDataPrepperNetwork } +task parallelGrokStringSubstituteTest(type: Test) { + dependsOn build + dependsOn startOpenSearchDockerContainer + def createDataPrepperTask = createDataPrepperDockerContainer( + "ParallelGrokSubstLogDataPrepper", "dataprepper-pgsts-test", 2021, 4900, "${PARALLEL_GROK_SUBSTITUTE_PIPELINE_YAML}", "${DATA_PREPPER_CONFIG_YAML}") + def startDataPrepperTask = startDataPrepperDockerContainer(createDataPrepperTask as DockerCreateContainer) + dependsOn startDataPrepperTask + startDataPrepperTask.mustRunAfter 'startOpenSearchDockerContainer' + // wait for data-preppers to be ready + doFirst { + sleep(15*1000) + } + + description = 'Runs the parallel grok and string substitute end-to-end test.' + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + + filter { + includeTestsMatching "org.opensearch.dataprepper.integration.log.ParallelGrokStringSubstituteLogTest.testPipelineEndToEnd*" + } + + finalizedBy stopOpenSearchDockerContainer + def stopDataPrepperTask = stopDataPrepperDockerContainer(startDataPrepperTask as DockerStartContainer) + finalizedBy stopDataPrepperTask + finalizedBy removeDataPrepperDockerContainer(stopDataPrepperTask as DockerStopContainer) + finalizedBy removeDataPrepperNetwork +} + dependencies { integrationTestImplementation project(':data-prepper-api') integrationTestImplementation project(':data-prepper-plugins:common') diff --git a/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/ParallelGrokStringSubstituteLogTest.java b/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/ParallelGrokStringSubstituteLogTest.java new file mode 100644 index 0000000000..dfc7b80fc5 --- /dev/null +++ b/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/ParallelGrokStringSubstituteLogTest.java @@ -0,0 +1,147 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.integration.log; + +import org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration; +import org.opensearch.dataprepper.plugins.source.loggenerator.ApacheLogFaker; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.SessionProtocol; +import io.netty.util.AsciiString; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.action.admin.indices.refresh.RefreshRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.not; + +public class ParallelGrokStringSubstituteLogTest { + private static final int HTTP_SOURCE_PORT = 2021; + private static final String GROK_INDEX_NAME = "test-grok-index"; + private static final String SUBSTITUTE_INDEX_NAME = "test-substitute-index"; + private static final String testString = "firstword secondword thirdword"; + + private final ApacheLogFaker apacheLogFaker = new ApacheLogFaker(); + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + public void testPipelineEndToEnd() throws JsonProcessingException { + // Send data to http source + sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData()); + // Verify data in OpenSearch backend + final RestHighLevelClient restHighLevelClient = prepareOpenSearchRestHighLevelClient(); + final List> retrievedDocs = new ArrayList<>(); + // Wait for data to flow through pipeline and be indexed by ES + await().atMost(20, TimeUnit.SECONDS).untilAsserted( + () -> { + refreshIndices(restHighLevelClient); + final SearchRequest grokRequest = new SearchRequest(GROK_INDEX_NAME); + final SearchRequest substRequest = new SearchRequest(SUBSTITUTE_INDEX_NAME); + grokRequest.source( + SearchSourceBuilder.searchSource().size(100) + ); + substRequest.source( + SearchSourceBuilder.searchSource().size(100) + ); + final SearchResponse grokResponse = restHighLevelClient.search(grokRequest, RequestOptions.DEFAULT); + final List> grokSources = getSourcesFromSearchHits(grokResponse.getHits()); + Assert.assertEquals(1, grokSources.size()); + Map grokSource = grokSources.get(0); + Assert.assertEquals(4, grokSource.size()); + Assert.assertEquals(grokSource.get("message"), testString); + String[] words = testString.split(" "); + Assert.assertEquals(grokSource.get("word1"), words[0]); + Assert.assertEquals(grokSource.get("word2"), words[1]); + Assert.assertEquals(grokSource.get("word3"), words[2]); + + final SearchResponse substResponse = restHighLevelClient.search(substRequest, RequestOptions.DEFAULT); + final List> substSources = getSourcesFromSearchHits(substResponse.getHits()); + Assert.assertEquals(1, substSources.size()); + Map substSource = substSources.get(0); + Assert.assertEquals(1, substSource.size()); + String expectedString = testString.replace("word", "WORD"); + Assert.assertEquals(substSource.get("message"), expectedString); + } + ); + } + + private RestHighLevelClient prepareOpenSearchRestHighLevelClient() { + final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder( + Collections.singletonList("https://127.0.0.1:9200")); + builder.withUsername("admin"); + builder.withPassword("admin"); + return builder.build().createClient(); + } + + private void sendHttpRequestToSource(final int port, final HttpData httpData) { + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(String.format("127.0.0.1:%d", port)) + .method(HttpMethod.POST) + .path("/log/ingest") + .contentType(MediaType.JSON_UTF_8) + .build(), + httpData) + .aggregate() + .whenComplete((i, ex) -> { + assertThat(i.status(), is(HttpStatus.OK)); + final List headerKeys = i.headers() + .stream() + .map(Map.Entry::getKey) + .map(AsciiString::toString) + .collect(Collectors.toList()); + assertThat("Response Header Keys", headerKeys, not(contains("server"))); + }).join(); + } + + private List> getSourcesFromSearchHits(final SearchHits searchHits) { + final List> sources = new ArrayList<>(); + searchHits.forEach(hit -> { + Map source = hit.getSourceAsMap(); + sources.add(source); + }); + return sources; + } + + private void refreshIndices(final RestHighLevelClient restHighLevelClient) throws IOException { + final RefreshRequest requestAll = new RefreshRequest(); + restHighLevelClient.indices().refresh(requestAll, RequestOptions.DEFAULT); + } + + private HttpData generateRandomApacheLogHttpData() throws JsonProcessingException { + final List> jsonArray = new ArrayList<>(); + final Map logObj = new HashMap() {{ + put("message", testString); + }}; + jsonArray.add(logObj); + final String jsonData = objectMapper.writeValueAsString(jsonArray); + return HttpData.ofUtf8(jsonData); + } +} diff --git a/e2e-test/log/src/integrationTest/resources/parallel-grok-substitute-e2e-pipeline.yml b/e2e-test/log/src/integrationTest/resources/parallel-grok-substitute-e2e-pipeline.yml new file mode 100644 index 0000000000..fe1e053aeb --- /dev/null +++ b/e2e-test/log/src/integrationTest/resources/parallel-grok-substitute-e2e-pipeline.yml @@ -0,0 +1,40 @@ +pipeline1: + source: + http: + sink: + - pipeline: + name: "pipeline2" + - pipeline: + name: "pipeline3" + +pipeline2: + source: + pipeline: + name: "pipeline1" + processor: + - substitute_string: + entries: + - source: "message" + from: "word" + to: "WORD" + sink: + - opensearch: + hosts: [ "https://node-0.example.com:9200" ] + username: "admin" + password: "admin" + index: "test-substitute-index" + +pipeline3: + source: + pipeline: + name: "pipeline1" + processor: + - grok: + match: + message: ['%{WORD:word1} %{WORD:word2} %{WORD:word3}'] + sink: + - opensearch: + hosts: [ "https://node-0.example.com:9200" ] + username: "admin" + password: "admin" + index: "test-grok-index"