diff --git a/.github/workflows/data-prepper-log-analytics-basic-grok-e2e-tests.yml b/.github/workflows/data-prepper-log-analytics-basic-grok-e2e-tests.yml new file mode 100644 index 0000000000..748eba4d57 --- /dev/null +++ b/.github/workflows/data-prepper-log-analytics-basic-grok-e2e-tests.yml @@ -0,0 +1,30 @@ +# This workflow will build a Java project with Gradle +# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-gradle + +name: Data Prepper Log Analytics Basic Grok End-to-end test with Gradle + +on: + push: + branches: [ main ] + pull_request: + workflow_dispatch: + +jobs: + build: + strategy: + matrix: + java: [14] + + runs-on: ubuntu-latest + + steps: + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v1 + with: + java-version: ${{ matrix.java }} + - name: Checkout Data-Prepper + uses: actions/checkout@v2 + - name: Grant execute permission for gradlew + run: chmod +x gradlew + - name: Run basic grok end-to-end tests with Gradle + run: ./gradlew :e2e-test:log:basicLogEndToEndTest \ No newline at end of file diff --git a/e2e-test/log/README.md b/e2e-test/log/README.md new file mode 100644 index 0000000000..127b9a1af0 --- /dev/null +++ b/e2e-test/log/README.md @@ -0,0 +1,14 @@ +# Log Data Ingestion End-to-end Tests + +This module includes e2e tests for log data ingestion supported by data-prepper. + +## Basic Grok Ingestion Pipeline End-to-end test + +Run from current directory +``` +./gradlew :basicLogEndToEndTest +``` +or from project root directory +``` +./gradlew :e2e-test:log:basicLogEndToEndTest +``` diff --git a/e2e-test/log/build.gradle b/e2e-test/log/build.gradle new file mode 100644 index 0000000000..16f2f06ce9 --- /dev/null +++ b/e2e-test/log/build.gradle @@ -0,0 +1,157 @@ +apply plugin: DockerRemoteApiPlugin + + +import com.bmuschko.gradle.docker.DockerRemoteApiPlugin +import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer +import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer +import com.bmuschko.gradle.docker.tasks.container.DockerStopContainer +import com.bmuschko.gradle.docker.tasks.container.DockerRemoveContainer +import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage +import com.bmuschko.gradle.docker.tasks.image.DockerPullImage +import com.bmuschko.gradle.docker.tasks.image.Dockerfile +import com.bmuschko.gradle.docker.tasks.network.DockerCreateNetwork +import com.bmuschko.gradle.docker.tasks.network.DockerRemoveNetwork + +/** + * End-to-end test docker network + */ +task createDataPrepperNetwork(type: DockerCreateNetwork) { + networkName = "data_prepper_network" +} + +task removeDataPrepperNetwork(type: DockerRemoveNetwork) { + dependsOn createDataPrepperNetwork + networkId = createDataPrepperNetwork.getNetworkId() +} + +def BASIC_GROK_PIPELINE_YAML = "basic-grok-e2e-pipeline.yml" + +/** + * DataPrepper Docker tasks + */ +task createDataPrepperDockerFile(type: Dockerfile) { + dependsOn copyDataPrepperJar + destFile = project.file('build/docker/Dockerfile') + from("adoptopenjdk/openjdk14:jre-14.0.1_7-alpine") + workingDir("/app") + copyFile("${dataPrepperJarFilepath}", "/app/data-prepper.jar") + copyFile("src/integrationTest/resources/${BASIC_GROK_PIPELINE_YAML}", "/app/${BASIC_GROK_PIPELINE_YAML}") + copyFile("src/integrationTest/resources/data_prepper.yml", "/app/data_prepper.yml") + defaultCommand("java", "-jar", "data-prepper.jar", "/app/${BASIC_GROK_PIPELINE_YAML}", "/app/data_prepper.yml") +} + +task buildDataPrepperDockerImage(type: DockerBuildImage) { + dependsOn createDataPrepperDockerFile + inputDir = file(".") + dockerFile = file("build/docker/Dockerfile") + images.add("e2e-test-log-pipeline-image") +} + +def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int sourcePort, + final int serverPort, final String pipelineConfigYAML) { + return tasks.create("create${taskBaseName}", DockerCreateContainer) { + dependsOn buildDataPrepperDockerImage + dependsOn createDataPrepperNetwork + containerName = dataPrepperName + exposePorts("tcp", [2021, 4900]) + hostConfig.portBindings = [String.format('%d:2021', sourcePort), String.format('%d:4900', serverPort)] + hostConfig.network = createDataPrepperNetwork.getNetworkName() + cmd = ["java", "-jar", "data-prepper.jar", pipelineConfigYAML, "/app/data_prepper.yml"] + targetImageId buildDataPrepperDockerImage.getImageId() + } +} + +def startDataPrepperDockerContainer(final DockerCreateContainer createDataPrepperDockerContainerTask) { + return tasks.create("start${createDataPrepperDockerContainerTask.getName()}", DockerStartContainer) { + dependsOn createDataPrepperDockerContainerTask + targetContainerId createDataPrepperDockerContainerTask.getContainerId() + } +} + +def stopDataPrepperDockerContainer(final DockerStartContainer startDataPrepperDockerContainerTask) { + return tasks.create("stop${startDataPrepperDockerContainerTask.getName()}", DockerStopContainer) { + targetContainerId startDataPrepperDockerContainerTask.getContainerId() + } +} + +def removeDataPrepperDockerContainer(final DockerStopContainer stopDataPrepperDockerContainerTask) { + return tasks.create("remove${stopDataPrepperDockerContainerTask.getName()}", DockerRemoveContainer) { + targetContainerId stopDataPrepperDockerContainerTask.getContainerId() + } +} + +/** + * OpenSearch Docker tasks + */ +task pullOpenSearchDockerImage(type: DockerPullImage) { + image = "opensearchproject/opensearch:${versionMap.opensearchVersion}" +} + +task createOpenSearchDockerContainer(type: DockerCreateContainer) { + dependsOn createDataPrepperNetwork + dependsOn pullOpenSearchDockerImage + targetImageId pullOpenSearchDockerImage.image + containerName = "node-0.example.com" + hostConfig.portBindings = ['9200:9200', '9600:9600'] + hostConfig.autoRemove = true + hostConfig.network = createDataPrepperNetwork.getNetworkName() + envVars = ['discovery.type':'single-node'] +} + +task startOpenSearchDockerContainer(type: DockerStartContainer) { + dependsOn createOpenSearchDockerContainer + targetContainerId createOpenSearchDockerContainer.getContainerId() + + doLast { + sleep(90*1000) + } +} + +task stopOpenSearchDockerContainer(type: DockerStopContainer) { + targetContainerId createOpenSearchDockerContainer.getContainerId() + + doLast { + sleep(5*1000) + } +} + +/** + * End to end test. Spins up OpenSearch and DataPrepper docker containers, then runs the integ test + * Stops the docker containers when finished + */ +task basicLogEndToEndTest(type: Test) { + dependsOn build + dependsOn startOpenSearchDockerContainer + def createDataPrepperTask = createDataPrepperDockerContainer( + "basicLogDataPrepper", "dataprepper", 2021, 4900, "/app/${BASIC_GROK_PIPELINE_YAML}") + def startDataPrepperTask = startDataPrepperDockerContainer(createDataPrepperTask as DockerCreateContainer) + dependsOn startDataPrepperTask + startDataPrepperTask.mustRunAfter 'startOpenSearchDockerContainer' + // wait for data-preppers to be ready + doFirst { + sleep(10*1000) + } + + description = 'Runs the basic grok end-to-end test.' + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + + filter { + includeTestsMatching "com.amazon.dataprepper.integration.log.EndToEndBasicLogTest.testPipelineEndToEnd*" + } + + finalizedBy stopOpenSearchDockerContainer + def stopDataPrepperTask = stopDataPrepperDockerContainer(startDataPrepperTask as DockerStartContainer) + finalizedBy stopDataPrepperTask + finalizedBy removeDataPrepperDockerContainer(stopDataPrepperTask as DockerStopContainer) + finalizedBy removeDataPrepperNetwork +} + +dependencies { + implementation "com.github.javafaker:javafaker:1.0.2" + integrationTestCompile project(':data-prepper-plugins:opensearch') + integrationTestImplementation "com.linecorp.armeria:armeria:1.0.0" + integrationTestImplementation "org.awaitility:awaitility:4.0.3" + integrationTestImplementation "org.opensearch.client:opensearch-rest-high-level-client:${versionMap.opensearchVersion}" +} \ No newline at end of file diff --git a/e2e-test/log/src/integrationTest/java/com/amazon/dataprepper/integration/log/EndToEndBasicLogTest.java b/e2e-test/log/src/integrationTest/java/com/amazon/dataprepper/integration/log/EndToEndBasicLogTest.java new file mode 100644 index 0000000000..b739ff02ba --- /dev/null +++ b/e2e-test/log/src/integrationTest/java/com/amazon/dataprepper/integration/log/EndToEndBasicLogTest.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.dataprepper.integration.log; + +import com.amazon.dataprepper.plugins.sink.opensearch.ConnectionConfiguration; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.SessionProtocol; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.action.admin.indices.refresh.RefreshRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasKey; + +public class EndToEndBasicLogTest { + private static final int HTTP_SOURCE_PORT = 2021; + private static final String TEST_INDEX_NAME = "test-grok-index"; + + private final ApacheLogFaker apacheLogFaker = new ApacheLogFaker(); + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + public void testPipelineEndToEnd() throws JsonProcessingException { + // Send data to http source + sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData(2)); + sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData(3)); + + // Verify data in OpenSearch backend + final RestHighLevelClient restHighLevelClient = prepareOpenSearchRestHighLevelClient(); + final List> retrievedDocs = new ArrayList<>(); + // Wait for data to flow through pipeline and be indexed by ES + await().atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> { + refreshIndices(restHighLevelClient); + final SearchRequest searchRequest = new SearchRequest(TEST_INDEX_NAME); + searchRequest.source( + SearchSourceBuilder.searchSource().size(100) + ); + final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + final List> foundSources = getSourcesFromSearchHits(searchResponse.getHits()); + Assert.assertEquals(5, foundSources.size()); + retrievedDocs.addAll(foundSources); + } + ); + // Verify original and grokked keys from retrieved docs + final List expectedDocKeys = Arrays.asList( + "date", "log", "clientip", "ident", "auth", "timestamp", "verb", "request", "httpversion", "response", "bytes"); + retrievedDocs.forEach(expectedDoc -> { + for (String key: expectedDocKeys) { + assertThat(expectedDoc, hasKey(key)); + } + }); + } + + private RestHighLevelClient prepareOpenSearchRestHighLevelClient() { + final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder( + Collections.singletonList("https://127.0.0.1:9200")); + builder.withUsername("admin"); + builder.withPassword("admin"); + return builder.build().createClient(); + } + + private void sendHttpRequestToSource(final int port, final HttpData httpData) { + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(String.format("127.0.0.1:%d", port)) + .method(HttpMethod.POST) + .path("/log/ingest") + .contentType(MediaType.JSON_UTF_8) + .build(), + httpData) + .aggregate() + .whenComplete((i, ex) -> assertThat(i.status(), is(HttpStatus.OK))).join(); + } + + private List> getSourcesFromSearchHits(final SearchHits searchHits) { + final List> sources = new ArrayList<>(); + searchHits.forEach(hit -> { + Map source = hit.getSourceAsMap(); + sources.add(source); + }); + return sources; + } + + private void refreshIndices(final RestHighLevelClient restHighLevelClient) throws IOException { + final RefreshRequest requestAll = new RefreshRequest(); + restHighLevelClient.indices().refresh(requestAll, RequestOptions.DEFAULT); + } + + private HttpData generateRandomApacheLogHttpData(final int numLogs) throws JsonProcessingException { + final List> jsonArray = new ArrayList<>(); + for (int i = 0; i < numLogs; i++) { + final Map logObj = new HashMap() {{ + put("date", System.currentTimeMillis()); + put("log", apacheLogFaker.generateRandomApacheLog()); + }}; + jsonArray.add(logObj); + } + final String jsonData = objectMapper.writeValueAsString(jsonArray); + return HttpData.ofUtf8(jsonData); + } +} diff --git a/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml b/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml new file mode 100644 index 0000000000..02b4100588 --- /dev/null +++ b/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml @@ -0,0 +1,13 @@ +grok-pipeline: + source: + http: + prepper: + - grok: + match: + log: [ "%{COMMONAPACHELOG}" ] + sink: + - opensearch: + hosts: [ "https://node-0.example.com:9200" ] + username: "admin" + password: "admin" + index: "test-grok-index" \ No newline at end of file diff --git a/e2e-test/log/src/integrationTest/resources/data_prepper.yml b/e2e-test/log/src/integrationTest/resources/data_prepper.yml new file mode 100644 index 0000000000..7462c0a70e --- /dev/null +++ b/e2e-test/log/src/integrationTest/resources/data_prepper.yml @@ -0,0 +1 @@ +ssl: false diff --git a/e2e-test/log/src/main/java/com/amazon/dataprepper/integration/log/ApacheLogFaker.java b/e2e-test/log/src/main/java/com/amazon/dataprepper/integration/log/ApacheLogFaker.java new file mode 100644 index 0000000000..f88dcd96cc --- /dev/null +++ b/e2e-test/log/src/main/java/com/amazon/dataprepper/integration/log/ApacheLogFaker.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.dataprepper.integration.log; + +import com.github.javafaker.Faker; +import com.github.javafaker.Internet; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * This class provides the API to generate fake Apache log. + */ +public class ApacheLogFaker { + private final String[] HTTP_METHODS = new String[] {"GET", "POST", "DELETE", "PUT"}; + private final String[] FAKE_URIS = new String[] {"/list", "/explore", "/search/tag/list", "/apps/cart.jsp?appID="}; + private final String[] FAKE_STATUS = new String[] {"200", "404", "500", "301"}; + private final String APACHE_LOG_FORMAT = "%s %s %s [%s] \"%s %s HTTP/1.0\" %s %s \"http://%s\" \"%s\""; + private final Faker faker = new Faker(); + private final Random random = new Random(); + private final DateFormat dateFormat = new SimpleDateFormat("dd/MMM/y:HH:mm:ss Z"); + + public String generateRandomApacheLog() { + final Internet internet = faker.internet(); + final String username = faker.name().username(); + final String password = internet.password(); + final String httpMethod = HTTP_METHODS[random.nextInt(4)]; + final String uri = FAKE_URIS[random.nextInt(4)]; + final String status = FAKE_STATUS[random.nextInt(4)]; + final int bytes = random.nextInt(1001) + 4000; + return String.format( + APACHE_LOG_FORMAT, + internet.ipV4Address(), + username, + password, + dateFormat.format(faker.date().past(100, TimeUnit.DAYS)), + httpMethod, + uri, + status, + bytes, + internet.url(), + internet.userAgentAny() + ); + } +} diff --git a/e2e-test/log/src/test/java/com/amazon/dataprepper/integration/log/ApacheLogFakerTest.java b/e2e-test/log/src/test/java/com/amazon/dataprepper/integration/log/ApacheLogFakerTest.java new file mode 100644 index 0000000000..54b46a7c36 --- /dev/null +++ b/e2e-test/log/src/test/java/com/amazon/dataprepper/integration/log/ApacheLogFakerTest.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.dataprepper.integration.log; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ApacheLogFakerTest { + public final String APACHE_LOG_REGEX = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+-]\\d{4})\\] \"(.+?)\" " + + "(\\d{3}) (\\d+) \"([^\"]+)\" \"(.+?)\""; + + @Test + public void testRandomLogPattern() { + // Given + ApacheLogFaker objectUnderTest = new ApacheLogFaker(); + + // When/Then + Assertions.assertTrue(objectUnderTest.generateRandomApacheLog().matches(APACHE_LOG_REGEX)); + } +} \ No newline at end of file diff --git a/e2e-test/trace/build.gradle b/e2e-test/trace/build.gradle index eb7631c654..25f19f2fec 100644 --- a/e2e-test/trace/build.gradle +++ b/e2e-test/trace/build.gradle @@ -2,9 +2,15 @@ apply plugin: DockerRemoteApiPlugin import com.bmuschko.gradle.docker.DockerRemoteApiPlugin -import com.bmuschko.gradle.docker.tasks.container.* -import com.bmuschko.gradle.docker.tasks.image.* -import com.bmuschko.gradle.docker.tasks.network.* +import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer +import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer +import com.bmuschko.gradle.docker.tasks.container.DockerStopContainer +import com.bmuschko.gradle.docker.tasks.container.DockerRemoveContainer +import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage +import com.bmuschko.gradle.docker.tasks.image.DockerPullImage +import com.bmuschko.gradle.docker.tasks.image.Dockerfile +import com.bmuschko.gradle.docker.tasks.network.DockerCreateNetwork +import com.bmuschko.gradle.docker.tasks.network.DockerRemoveNetwork /** * End-to-end test docker network diff --git a/settings.gradle b/settings.gradle index 9aa3c2b7a2..d52b1ab611 100644 --- a/settings.gradle +++ b/settings.gradle @@ -40,4 +40,5 @@ include 'data-prepper-plugins:grok-prepper' include 'data-prepper-logstash-configuration' include 'e2e-test' include 'e2e-test:trace' +include 'e2e-test:log'