Skip to content

Commit

Permalink
Test: basic grok e2e test (#536)
Browse files Browse the repository at this point in the history
* REF: refactoring existing trace e2e tests

Signed-off-by: qchea <[email protected]>

* MAINT: github workflow

Signed-off-by: qchea <[email protected]>

* ADD: README

Signed-off-by: qchea <[email protected]>

* FIX: spotless

Signed-off-by: qchea <[email protected]>

* STY: spotless for markdown

Signed-off-by: qchea <[email protected]>

* MAINT: address PR comments

Signed-off-by: qchea <[email protected]>

* MAINT: directory name

Signed-off-by: qchea <[email protected]>

* MAINT: build directory reference

Signed-off-by: qchea <[email protected]>

* TST: e2e grok test logic

Signed-off-by: qchea <[email protected]>

* MAINT: add e2e basic grok github workflow

Signed-off-by: qchea <[email protected]>

* MAINT: expose ports at create container

Signed-off-by: qchea <[email protected]>

* doc: README

Signed-off-by: qchea <[email protected]>

* MAINT: use hasKey

Signed-off-by: qchea <[email protected]>

* MAINT: remove wildcard imports

Signed-off-by: qchea <[email protected]>

* REF: ApacheLogFaker and its test

Signed-off-by: qchea <[email protected]>

* RNM: basic grok -> basic log

Signed-off-by: qchea <[email protected]>
  • Loading branch information
chenqi0805 authored Nov 10, 2021
1 parent cce0fdd commit 5f7fbdd
Show file tree
Hide file tree
Showing 10 changed files with 445 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# This workflow will build a Java project with Gradle
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-gradle

name: Data Prepper Log Analytics Basic Grok End-to-end test with Gradle

on:
push:
branches: [ main ]
pull_request:
workflow_dispatch:

jobs:
build:
strategy:
matrix:
java: [14]

runs-on: ubuntu-latest

steps:
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
uses: actions/checkout@v2
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Run basic grok end-to-end tests with Gradle
run: ./gradlew :e2e-test:log:basicLogEndToEndTest
14 changes: 14 additions & 0 deletions e2e-test/log/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Log Data Ingestion End-to-end Tests

This module includes e2e tests for log data ingestion supported by data-prepper.

## Basic Grok Ingestion Pipeline End-to-end test

Run from current directory
```
./gradlew :basicLogEndToEndTest
```
or from project root directory
```
./gradlew :e2e-test:log:basicLogEndToEndTest
```
157 changes: 157 additions & 0 deletions e2e-test/log/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
apply plugin: DockerRemoteApiPlugin


import com.bmuschko.gradle.docker.DockerRemoteApiPlugin
import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer
import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer
import com.bmuschko.gradle.docker.tasks.container.DockerStopContainer
import com.bmuschko.gradle.docker.tasks.container.DockerRemoveContainer
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import com.bmuschko.gradle.docker.tasks.image.DockerPullImage
import com.bmuschko.gradle.docker.tasks.image.Dockerfile
import com.bmuschko.gradle.docker.tasks.network.DockerCreateNetwork
import com.bmuschko.gradle.docker.tasks.network.DockerRemoveNetwork

/**
* End-to-end test docker network
*/
task createDataPrepperNetwork(type: DockerCreateNetwork) {
networkName = "data_prepper_network"
}

task removeDataPrepperNetwork(type: DockerRemoveNetwork) {
dependsOn createDataPrepperNetwork
networkId = createDataPrepperNetwork.getNetworkId()
}

def BASIC_GROK_PIPELINE_YAML = "basic-grok-e2e-pipeline.yml"

/**
* DataPrepper Docker tasks
*/
task createDataPrepperDockerFile(type: Dockerfile) {
dependsOn copyDataPrepperJar
destFile = project.file('build/docker/Dockerfile')
from("adoptopenjdk/openjdk14:jre-14.0.1_7-alpine")
workingDir("/app")
copyFile("${dataPrepperJarFilepath}", "/app/data-prepper.jar")
copyFile("src/integrationTest/resources/${BASIC_GROK_PIPELINE_YAML}", "/app/${BASIC_GROK_PIPELINE_YAML}")
copyFile("src/integrationTest/resources/data_prepper.yml", "/app/data_prepper.yml")
defaultCommand("java", "-jar", "data-prepper.jar", "/app/${BASIC_GROK_PIPELINE_YAML}", "/app/data_prepper.yml")
}

task buildDataPrepperDockerImage(type: DockerBuildImage) {
dependsOn createDataPrepperDockerFile
inputDir = file(".")
dockerFile = file("build/docker/Dockerfile")
images.add("e2e-test-log-pipeline-image")
}

def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int sourcePort,
final int serverPort, final String pipelineConfigYAML) {
return tasks.create("create${taskBaseName}", DockerCreateContainer) {
dependsOn buildDataPrepperDockerImage
dependsOn createDataPrepperNetwork
containerName = dataPrepperName
exposePorts("tcp", [2021, 4900])
hostConfig.portBindings = [String.format('%d:2021', sourcePort), String.format('%d:4900', serverPort)]
hostConfig.network = createDataPrepperNetwork.getNetworkName()
cmd = ["java", "-jar", "data-prepper.jar", pipelineConfigYAML, "/app/data_prepper.yml"]
targetImageId buildDataPrepperDockerImage.getImageId()
}
}

def startDataPrepperDockerContainer(final DockerCreateContainer createDataPrepperDockerContainerTask) {
return tasks.create("start${createDataPrepperDockerContainerTask.getName()}", DockerStartContainer) {
dependsOn createDataPrepperDockerContainerTask
targetContainerId createDataPrepperDockerContainerTask.getContainerId()
}
}

def stopDataPrepperDockerContainer(final DockerStartContainer startDataPrepperDockerContainerTask) {
return tasks.create("stop${startDataPrepperDockerContainerTask.getName()}", DockerStopContainer) {
targetContainerId startDataPrepperDockerContainerTask.getContainerId()
}
}

def removeDataPrepperDockerContainer(final DockerStopContainer stopDataPrepperDockerContainerTask) {
return tasks.create("remove${stopDataPrepperDockerContainerTask.getName()}", DockerRemoveContainer) {
targetContainerId stopDataPrepperDockerContainerTask.getContainerId()
}
}

/**
* OpenSearch Docker tasks
*/
task pullOpenSearchDockerImage(type: DockerPullImage) {
image = "opensearchproject/opensearch:${versionMap.opensearchVersion}"
}

task createOpenSearchDockerContainer(type: DockerCreateContainer) {
dependsOn createDataPrepperNetwork
dependsOn pullOpenSearchDockerImage
targetImageId pullOpenSearchDockerImage.image
containerName = "node-0.example.com"
hostConfig.portBindings = ['9200:9200', '9600:9600']
hostConfig.autoRemove = true
hostConfig.network = createDataPrepperNetwork.getNetworkName()
envVars = ['discovery.type':'single-node']
}

task startOpenSearchDockerContainer(type: DockerStartContainer) {
dependsOn createOpenSearchDockerContainer
targetContainerId createOpenSearchDockerContainer.getContainerId()

doLast {
sleep(90*1000)
}
}

task stopOpenSearchDockerContainer(type: DockerStopContainer) {
targetContainerId createOpenSearchDockerContainer.getContainerId()

doLast {
sleep(5*1000)
}
}

/**
* End to end test. Spins up OpenSearch and DataPrepper docker containers, then runs the integ test
* Stops the docker containers when finished
*/
task basicLogEndToEndTest(type: Test) {
dependsOn build
dependsOn startOpenSearchDockerContainer
def createDataPrepperTask = createDataPrepperDockerContainer(
"basicLogDataPrepper", "dataprepper", 2021, 4900, "/app/${BASIC_GROK_PIPELINE_YAML}")
def startDataPrepperTask = startDataPrepperDockerContainer(createDataPrepperTask as DockerCreateContainer)
dependsOn startDataPrepperTask
startDataPrepperTask.mustRunAfter 'startOpenSearchDockerContainer'
// wait for data-preppers to be ready
doFirst {
sleep(10*1000)
}

description = 'Runs the basic grok end-to-end test.'
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath

filter {
includeTestsMatching "com.amazon.dataprepper.integration.log.EndToEndBasicLogTest.testPipelineEndToEnd*"
}

finalizedBy stopOpenSearchDockerContainer
def stopDataPrepperTask = stopDataPrepperDockerContainer(startDataPrepperTask as DockerStartContainer)
finalizedBy stopDataPrepperTask
finalizedBy removeDataPrepperDockerContainer(stopDataPrepperTask as DockerStopContainer)
finalizedBy removeDataPrepperNetwork
}

dependencies {
implementation "com.github.javafaker:javafaker:1.0.2"
integrationTestCompile project(':data-prepper-plugins:opensearch')
integrationTestImplementation "com.linecorp.armeria:armeria:1.0.0"
integrationTestImplementation "org.awaitility:awaitility:4.0.3"
integrationTestImplementation "org.opensearch.client:opensearch-rest-high-level-client:${versionMap.opensearchVersion}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.dataprepper.integration.log;

import com.amazon.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import org.junit.Assert;
import org.junit.Test;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasKey;

public class EndToEndBasicLogTest {
private static final int HTTP_SOURCE_PORT = 2021;
private static final String TEST_INDEX_NAME = "test-grok-index";

private final ApacheLogFaker apacheLogFaker = new ApacheLogFaker();
private final ObjectMapper objectMapper = new ObjectMapper();

@Test
public void testPipelineEndToEnd() throws JsonProcessingException {
// Send data to http source
sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData(2));
sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData(3));

// Verify data in OpenSearch backend
final RestHighLevelClient restHighLevelClient = prepareOpenSearchRestHighLevelClient();
final List<Map<String, Object>> retrievedDocs = new ArrayList<>();
// Wait for data to flow through pipeline and be indexed by ES
await().atMost(10, TimeUnit.SECONDS).untilAsserted(
() -> {
refreshIndices(restHighLevelClient);
final SearchRequest searchRequest = new SearchRequest(TEST_INDEX_NAME);
searchRequest.source(
SearchSourceBuilder.searchSource().size(100)
);
final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
final List<Map<String, Object>> foundSources = getSourcesFromSearchHits(searchResponse.getHits());
Assert.assertEquals(5, foundSources.size());
retrievedDocs.addAll(foundSources);
}
);
// Verify original and grokked keys from retrieved docs
final List<String> expectedDocKeys = Arrays.asList(
"date", "log", "clientip", "ident", "auth", "timestamp", "verb", "request", "httpversion", "response", "bytes");
retrievedDocs.forEach(expectedDoc -> {
for (String key: expectedDocKeys) {
assertThat(expectedDoc, hasKey(key));
}
});
}

private RestHighLevelClient prepareOpenSearchRestHighLevelClient() {
final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(
Collections.singletonList("https://127.0.0.1:9200"));
builder.withUsername("admin");
builder.withPassword("admin");
return builder.build().createClient();
}

private void sendHttpRequestToSource(final int port, final HttpData httpData) {
WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority(String.format("127.0.0.1:%d", port))
.method(HttpMethod.POST)
.path("/log/ingest")
.contentType(MediaType.JSON_UTF_8)
.build(),
httpData)
.aggregate()
.whenComplete((i, ex) -> assertThat(i.status(), is(HttpStatus.OK))).join();
}

private List<Map<String, Object>> getSourcesFromSearchHits(final SearchHits searchHits) {
final List<Map<String, Object>> sources = new ArrayList<>();
searchHits.forEach(hit -> {
Map<String, Object> source = hit.getSourceAsMap();
sources.add(source);
});
return sources;
}

private void refreshIndices(final RestHighLevelClient restHighLevelClient) throws IOException {
final RefreshRequest requestAll = new RefreshRequest();
restHighLevelClient.indices().refresh(requestAll, RequestOptions.DEFAULT);
}

private HttpData generateRandomApacheLogHttpData(final int numLogs) throws JsonProcessingException {
final List<Map<String, Object>> jsonArray = new ArrayList<>();
for (int i = 0; i < numLogs; i++) {
final Map<String, Object> logObj = new HashMap<String, Object>() {{
put("date", System.currentTimeMillis());
put("log", apacheLogFaker.generateRandomApacheLog());
}};
jsonArray.add(logObj);
}
final String jsonData = objectMapper.writeValueAsString(jsonArray);
return HttpData.ofUtf8(jsonData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
grok-pipeline:
source:
http:
prepper:
- grok:
match:
log: [ "%{COMMONAPACHELOG}" ]
sink:
- opensearch:
hosts: [ "https://node-0.example.com:9200" ]
username: "admin"
password: "admin"
index: "test-grok-index"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ssl: false
Loading

0 comments on commit 5f7fbdd

Please sign in to comment.