Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TST: basic grok e2e test #536

Merged
merged 18 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# This workflow will build a Java project with Gradle
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-gradle

name: Data Prepper Log Analytics Basic Grok End-to-end test with Gradle

on:
push:
branches: [ main ]
pull_request:
workflow_dispatch:

jobs:
build:
strategy:
matrix:
java: [14]

runs-on: ubuntu-latest

steps:
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
uses: actions/checkout@v2
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Run basic grok end-to-end tests with Gradle
run: ./gradlew :e2e-test:log:basicLogEndToEndTest
14 changes: 14 additions & 0 deletions e2e-test/log/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Log Data Ingestion End-to-end Tests

This module includes e2e tests for log data ingestion supported by data-prepper.

## Basic Grok Ingestion Pipeline End-to-end test

Run from current directory
```
./gradlew :basicLogEndToEndTest
```
or from project root directory
```
./gradlew :e2e-test:log:basicLogEndToEndTest
```
157 changes: 157 additions & 0 deletions e2e-test/log/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
apply plugin: DockerRemoteApiPlugin


import com.bmuschko.gradle.docker.DockerRemoteApiPlugin
import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer
import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer
import com.bmuschko.gradle.docker.tasks.container.DockerStopContainer
import com.bmuschko.gradle.docker.tasks.container.DockerRemoveContainer
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import com.bmuschko.gradle.docker.tasks.image.DockerPullImage
import com.bmuschko.gradle.docker.tasks.image.Dockerfile
import com.bmuschko.gradle.docker.tasks.network.DockerCreateNetwork
import com.bmuschko.gradle.docker.tasks.network.DockerRemoveNetwork

/**
* End-to-end test docker network
*/
task createDataPrepperNetwork(type: DockerCreateNetwork) {
networkName = "data_prepper_network"
}

task removeDataPrepperNetwork(type: DockerRemoveNetwork) {
dependsOn createDataPrepperNetwork
networkId = createDataPrepperNetwork.getNetworkId()
}

def BASIC_GROK_PIPELINE_YAML = "basic-grok-e2e-pipeline.yml"

/**
* DataPrepper Docker tasks
*/
task createDataPrepperDockerFile(type: Dockerfile) {
dependsOn copyDataPrepperJar
destFile = project.file('build/docker/Dockerfile')
from("adoptopenjdk/openjdk14:jre-14.0.1_7-alpine")
workingDir("/app")
copyFile("${dataPrepperJarFilepath}", "/app/data-prepper.jar")
copyFile("src/integrationTest/resources/${BASIC_GROK_PIPELINE_YAML}", "/app/${BASIC_GROK_PIPELINE_YAML}")
copyFile("src/integrationTest/resources/data_prepper.yml", "/app/data_prepper.yml")
defaultCommand("java", "-jar", "data-prepper.jar", "/app/${BASIC_GROK_PIPELINE_YAML}", "/app/data_prepper.yml")
}

task buildDataPrepperDockerImage(type: DockerBuildImage) {
dependsOn createDataPrepperDockerFile
inputDir = file(".")
dockerFile = file("build/docker/Dockerfile")
images.add("e2e-test-log-pipeline-image")
}

def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int sourcePort,
final int serverPort, final String pipelineConfigYAML) {
return tasks.create("create${taskBaseName}", DockerCreateContainer) {
dependsOn buildDataPrepperDockerImage
dependsOn createDataPrepperNetwork
containerName = dataPrepperName
exposePorts("tcp", [2021, 4900])
hostConfig.portBindings = [String.format('%d:2021', sourcePort), String.format('%d:4900', serverPort)]
hostConfig.network = createDataPrepperNetwork.getNetworkName()
cmd = ["java", "-jar", "data-prepper.jar", pipelineConfigYAML, "/app/data_prepper.yml"]
targetImageId buildDataPrepperDockerImage.getImageId()
}
}

def startDataPrepperDockerContainer(final DockerCreateContainer createDataPrepperDockerContainerTask) {
return tasks.create("start${createDataPrepperDockerContainerTask.getName()}", DockerStartContainer) {
dependsOn createDataPrepperDockerContainerTask
targetContainerId createDataPrepperDockerContainerTask.getContainerId()
}
}

def stopDataPrepperDockerContainer(final DockerStartContainer startDataPrepperDockerContainerTask) {
return tasks.create("stop${startDataPrepperDockerContainerTask.getName()}", DockerStopContainer) {
targetContainerId startDataPrepperDockerContainerTask.getContainerId()
}
}

def removeDataPrepperDockerContainer(final DockerStopContainer stopDataPrepperDockerContainerTask) {
return tasks.create("remove${stopDataPrepperDockerContainerTask.getName()}", DockerRemoveContainer) {
targetContainerId stopDataPrepperDockerContainerTask.getContainerId()
}
}

/**
* OpenSearch Docker tasks
*/
task pullOpenSearchDockerImage(type: DockerPullImage) {
image = "opensearchproject/opensearch:${versionMap.opensearchVersion}"
}

task createOpenSearchDockerContainer(type: DockerCreateContainer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just curious. Is it possible to set up the end to end tests through creating a docker-compose.yaml. It seems like it would be a lot cleaner to initialize the tests rather than having to write all this code to set up the containers. Maybe there are some downsides to doing this that I'm unaware of?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling out. There is definitely an option: https://bmuschko.com/blog/gradle-docker-compose/

Looks like from the above doc all we need to do is hardcode a docker-compose.yml. It might greatly simplify the gradle setup as you mentioned. But I think it is worth a bit more exploration to tell if it fits well into both trace-analytics and log-analytics e2e. I will open an issue for that and reserve it for a future enhancement PR on our infra.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! I do think making end to end tests through docker-compose is something that would be nice down the line. It would definitely be the easiest way for external contributors to make their own end to end tests.

dependsOn createDataPrepperNetwork
dependsOn pullOpenSearchDockerImage
targetImageId pullOpenSearchDockerImage.image
containerName = "node-0.example.com"
hostConfig.portBindings = ['9200:9200', '9600:9600']
hostConfig.autoRemove = true
hostConfig.network = createDataPrepperNetwork.getNetworkName()
envVars = ['discovery.type':'single-node']
}

task startOpenSearchDockerContainer(type: DockerStartContainer) {
dependsOn createOpenSearchDockerContainer
targetContainerId createOpenSearchDockerContainer.getContainerId()

doLast {
sleep(90*1000)
}
}

task stopOpenSearchDockerContainer(type: DockerStopContainer) {
targetContainerId createOpenSearchDockerContainer.getContainerId()

doLast {
sleep(5*1000)
}
}

/**
* End to end test. Spins up OpenSearch and DataPrepper docker containers, then runs the integ test
* Stops the docker containers when finished
*/
task basicLogEndToEndTest(type: Test) {
dependsOn build
dependsOn startOpenSearchDockerContainer
def createDataPrepperTask = createDataPrepperDockerContainer(
"basicLogDataPrepper", "dataprepper", 2021, 4900, "/app/${BASIC_GROK_PIPELINE_YAML}")
def startDataPrepperTask = startDataPrepperDockerContainer(createDataPrepperTask as DockerCreateContainer)
dependsOn startDataPrepperTask
startDataPrepperTask.mustRunAfter 'startOpenSearchDockerContainer'
// wait for data-preppers to be ready
doFirst {
sleep(10*1000)
}

description = 'Runs the basic grok end-to-end test.'
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath

filter {
includeTestsMatching "com.amazon.dataprepper.integration.log.EndToEndBasicLogTest.testPipelineEndToEnd*"
}

finalizedBy stopOpenSearchDockerContainer
def stopDataPrepperTask = stopDataPrepperDockerContainer(startDataPrepperTask as DockerStartContainer)
finalizedBy stopDataPrepperTask
finalizedBy removeDataPrepperDockerContainer(stopDataPrepperTask as DockerStopContainer)
finalizedBy removeDataPrepperNetwork
}

dependencies {
implementation "com.github.javafaker:javafaker:1.0.2"
integrationTestCompile project(':data-prepper-plugins:opensearch')
integrationTestImplementation "com.linecorp.armeria:armeria:1.0.0"
integrationTestImplementation "org.awaitility:awaitility:4.0.3"
integrationTestImplementation "org.opensearch.client:opensearch-rest-high-level-client:${versionMap.opensearchVersion}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.dataprepper.integration.log;

import com.amazon.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import org.junit.Assert;
import org.junit.Test;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasKey;

public class EndToEndBasicLogTest {
private static final int HTTP_SOURCE_PORT = 2021;
private static final String TEST_INDEX_NAME = "test-grok-index";

private final ApacheLogFaker apacheLogFaker = new ApacheLogFaker();
private final ObjectMapper objectMapper = new ObjectMapper();

@Test
public void testPipelineEndToEnd() throws JsonProcessingException {
// Send data to http source
sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData(2));
sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData(3));

// Verify data in OpenSearch backend
final RestHighLevelClient restHighLevelClient = prepareOpenSearchRestHighLevelClient();
final List<Map<String, Object>> retrievedDocs = new ArrayList<>();
// Wait for data to flow through pipeline and be indexed by ES
await().atMost(10, TimeUnit.SECONDS).untilAsserted(
() -> {
refreshIndices(restHighLevelClient);
final SearchRequest searchRequest = new SearchRequest(TEST_INDEX_NAME);
searchRequest.source(
SearchSourceBuilder.searchSource().size(100)
);
final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
final List<Map<String, Object>> foundSources = getSourcesFromSearchHits(searchResponse.getHits());
Assert.assertEquals(5, foundSources.size());
retrievedDocs.addAll(foundSources);
}
);
// Verify original and grokked keys from retrieved docs
final List<String> expectedDocKeys = Arrays.asList(
"date", "log", "clientip", "ident", "auth", "timestamp", "verb", "request", "httpversion", "response", "bytes");
retrievedDocs.forEach(expectedDoc -> {
for (String key: expectedDocKeys) {
assertThat(expectedDoc, hasKey(key));
}
});
}

private RestHighLevelClient prepareOpenSearchRestHighLevelClient() {
final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(
Collections.singletonList("https://127.0.0.1:9200"));
builder.withUsername("admin");
builder.withPassword("admin");
return builder.build().createClient();
}

private void sendHttpRequestToSource(final int port, final HttpData httpData) {
WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority(String.format("127.0.0.1:%d", port))
.method(HttpMethod.POST)
.path("/log/ingest")
.contentType(MediaType.JSON_UTF_8)
.build(),
httpData)
.aggregate()
.whenComplete((i, ex) -> assertThat(i.status(), is(HttpStatus.OK))).join();
}

private List<Map<String, Object>> getSourcesFromSearchHits(final SearchHits searchHits) {
final List<Map<String, Object>> sources = new ArrayList<>();
searchHits.forEach(hit -> {
Map<String, Object> source = hit.getSourceAsMap();
sources.add(source);
});
return sources;
}

private void refreshIndices(final RestHighLevelClient restHighLevelClient) throws IOException {
final RefreshRequest requestAll = new RefreshRequest();
restHighLevelClient.indices().refresh(requestAll, RequestOptions.DEFAULT);
}

private HttpData generateRandomApacheLogHttpData(final int numLogs) throws JsonProcessingException {
final List<Map<String, Object>> jsonArray = new ArrayList<>();
for (int i = 0; i < numLogs; i++) {
final Map<String, Object> logObj = new HashMap<String, Object>() {{
put("date", System.currentTimeMillis());
put("log", apacheLogFaker.generateRandomApacheLog());
}};
jsonArray.add(logObj);
}
final String jsonData = objectMapper.writeValueAsString(jsonArray);
return HttpData.ofUtf8(jsonData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
grok-pipeline:
source:
http:
prepper:
- grok:
match:
log: [ "%{COMMONAPACHELOG}" ]
sink:
- opensearch:
hosts: [ "https://node-0.example.com:9200" ]
username: "admin"
password: "admin"
index: "test-grok-index"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ssl: false
Loading