-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TST: basic grok e2e test #536
Changes from 14 commits
e3505fb
f55d2f3
3cefc1c
1df2c70
5d1d67d
27d059a
57f77ca
86575df
146a766
cec485a
b1974d8
c498078
41580a2
bfc186d
ed51057
559c032
9a23eb4
ba90c7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# This workflow will build a Java project with Gradle | ||
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-gradle | ||
|
||
name: Data Prepper Log Analytics Basic Grok End-to-end test with Gradle | ||
|
||
on: | ||
push: | ||
branches: [ main ] | ||
pull_request: | ||
workflow_dispatch: | ||
|
||
jobs: | ||
build: | ||
strategy: | ||
matrix: | ||
java: [14] | ||
|
||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- name: Set up JDK ${{ matrix.java }} | ||
uses: actions/setup-java@v1 | ||
with: | ||
java-version: ${{ matrix.java }} | ||
- name: Checkout Data-Prepper | ||
uses: actions/checkout@v2 | ||
- name: Grant execute permission for gradlew | ||
run: chmod +x gradlew | ||
- name: Run basic grok end-to-end tests with Gradle | ||
run: ./gradlew :e2e-test:log:basicGrokEndToEndTest |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Log Data Ingestion End-to-end Tests | ||
|
||
This module includes e2e tests for log data ingestion supported by data-prepper. | ||
|
||
## Basic Grok Ingestion Pipeline End-to-end test | ||
|
||
Run from current directory | ||
``` | ||
./gradlew :basicGrokEndToEndTest | ||
``` | ||
or from project root directory | ||
``` | ||
./gradlew :e2e-test:log:basicGrokEndToEndTest | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
apply plugin: DockerRemoteApiPlugin | ||
|
||
|
||
import com.bmuschko.gradle.docker.DockerRemoteApiPlugin | ||
import com.bmuschko.gradle.docker.tasks.container.* | ||
import com.bmuschko.gradle.docker.tasks.image.* | ||
import com.bmuschko.gradle.docker.tasks.network.* | ||
|
||
/** | ||
* End-to-end test docker network | ||
*/ | ||
task createDataPrepperNetwork(type: DockerCreateNetwork) { | ||
networkName = "data_prepper_network" | ||
} | ||
|
||
task removeDataPrepperNetwork(type: DockerRemoveNetwork) { | ||
dependsOn createDataPrepperNetwork | ||
networkId = createDataPrepperNetwork.getNetworkId() | ||
} | ||
|
||
def BASIC_GROK_PIPELINE_YAML = "basic-grok-e2e-pipeline.yml" | ||
|
||
/** | ||
* DataPrepper Docker tasks | ||
*/ | ||
task createDataPrepperDockerFile(type: Dockerfile) { | ||
dependsOn copyDataPrepperJar | ||
destFile = project.file('build/docker/Dockerfile') | ||
from("adoptopenjdk/openjdk14:jre-14.0.1_7-alpine") | ||
workingDir("/app") | ||
copyFile("${dataPrepperJarFilepath}", "/app/data-prepper.jar") | ||
copyFile("src/integrationTest/resources/${BASIC_GROK_PIPELINE_YAML}", "/app/${BASIC_GROK_PIPELINE_YAML}") | ||
copyFile("src/integrationTest/resources/data_prepper.yml", "/app/data_prepper.yml") | ||
defaultCommand("java", "-jar", "data-prepper.jar", "/app/${BASIC_GROK_PIPELINE_YAML}", "/app/data_prepper.yml") | ||
} | ||
|
||
task buildDataPrepperDockerImage(type: DockerBuildImage) { | ||
dependsOn createDataPrepperDockerFile | ||
inputDir = file(".") | ||
dockerFile = file("build/docker/Dockerfile") | ||
images.add("e2e-test-log-pipeline-image") | ||
} | ||
|
||
def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int sourcePort, | ||
final int serverPort, final String pipelineConfigYAML) { | ||
return tasks.create("create${taskBaseName}", DockerCreateContainer) { | ||
dependsOn buildDataPrepperDockerImage | ||
dependsOn createDataPrepperNetwork | ||
containerName = dataPrepperName | ||
exposePorts("tcp", [2021, 4900]) | ||
hostConfig.portBindings = [String.format('%d:2021', sourcePort), String.format('%d:4900', serverPort)] | ||
hostConfig.network = createDataPrepperNetwork.getNetworkName() | ||
cmd = ["java", "-jar", "data-prepper.jar", pipelineConfigYAML, "/app/data_prepper.yml"] | ||
targetImageId buildDataPrepperDockerImage.getImageId() | ||
} | ||
} | ||
|
||
def startDataPrepperDockerContainer(final DockerCreateContainer createDataPrepperDockerContainerTask) { | ||
return tasks.create("start${createDataPrepperDockerContainerTask.getName()}", DockerStartContainer) { | ||
dependsOn createDataPrepperDockerContainerTask | ||
targetContainerId createDataPrepperDockerContainerTask.getContainerId() | ||
} | ||
} | ||
|
||
def stopDataPrepperDockerContainer(final DockerStartContainer startDataPrepperDockerContainerTask) { | ||
return tasks.create("stop${startDataPrepperDockerContainerTask.getName()}", DockerStopContainer) { | ||
targetContainerId startDataPrepperDockerContainerTask.getContainerId() | ||
} | ||
} | ||
|
||
def removeDataPrepperDockerContainer(final DockerStopContainer stopDataPrepperDockerContainerTask) { | ||
return tasks.create("remove${stopDataPrepperDockerContainerTask.getName()}", DockerRemoveContainer) { | ||
targetContainerId stopDataPrepperDockerContainerTask.getContainerId() | ||
} | ||
} | ||
|
||
/** | ||
* OpenSearch Docker tasks | ||
*/ | ||
task pullOpenSearchDockerImage(type: DockerPullImage) { | ||
image = "opensearchproject/opensearch:${versionMap.opensearchVersion}" | ||
} | ||
|
||
task createOpenSearchDockerContainer(type: DockerCreateContainer) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am just curious. Is it possible to set up the end to end tests through creating a docker-compose.yaml. It seems like it would be a lot cleaner to initialize the tests rather than having to write all this code to set up the containers. Maybe there are some downsides to doing this that I'm unaware of? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for calling out. There is definitely an option: https://bmuschko.com/blog/gradle-docker-compose/ Looks like from the above doc all we need to do is hardcode a docker-compose.yml. It might greatly simplify the gradle setup as you mentioned. But I think it is worth a bit more exploration to tell if it fits well into both trace-analytics and log-analytics e2e. I will open an issue for that and reserve it for a future enhancement PR on our infra. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great! I do think making end to end tests through docker-compose is something that would be nice down the line. It would definitely be the easiest way for external contributors to make their own end to end tests. |
||
dependsOn createDataPrepperNetwork | ||
dependsOn pullOpenSearchDockerImage | ||
targetImageId pullOpenSearchDockerImage.image | ||
containerName = "node-0.example.com" | ||
hostConfig.portBindings = ['9200:9200', '9600:9600'] | ||
hostConfig.autoRemove = true | ||
hostConfig.network = createDataPrepperNetwork.getNetworkName() | ||
envVars = ['discovery.type':'single-node'] | ||
} | ||
|
||
task startOpenSearchDockerContainer(type: DockerStartContainer) { | ||
dependsOn createOpenSearchDockerContainer | ||
targetContainerId createOpenSearchDockerContainer.getContainerId() | ||
|
||
doLast { | ||
sleep(90*1000) | ||
} | ||
} | ||
|
||
task stopOpenSearchDockerContainer(type: DockerStopContainer) { | ||
targetContainerId createOpenSearchDockerContainer.getContainerId() | ||
|
||
doLast { | ||
sleep(5*1000) | ||
} | ||
} | ||
|
||
/** | ||
* End to end test. Spins up OpenSearch and DataPrepper docker containers, then runs the integ test | ||
* Stops the docker containers when finished | ||
*/ | ||
task basicGrokEndToEndTest(type: Test) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, it tests more than just |
||
dependsOn build | ||
dependsOn startOpenSearchDockerContainer | ||
def createDataPrepperTask = createDataPrepperDockerContainer( | ||
"basicGrokDataPrepper", "dataprepper", 2021, 4900, "/app/${BASIC_GROK_PIPELINE_YAML}") | ||
def startDataPrepperTask = startDataPrepperDockerContainer(createDataPrepperTask as DockerCreateContainer) | ||
dependsOn startDataPrepperTask | ||
startDataPrepperTask.mustRunAfter 'startOpenSearchDockerContainer' | ||
// wait for data-preppers to be ready | ||
doFirst { | ||
sleep(10*1000) | ||
} | ||
|
||
description = 'Runs the basic grok end-to-end test.' | ||
group = 'verification' | ||
testClassesDirs = sourceSets.integrationTest.output.classesDirs | ||
classpath = sourceSets.integrationTest.runtimeClasspath | ||
|
||
filter { | ||
includeTestsMatching "com.amazon.dataprepper.integration.log.EndToEndBasicGrokTest.testPipelineEndToEnd*" | ||
} | ||
|
||
finalizedBy stopOpenSearchDockerContainer | ||
def stopDataPrepperTask = stopDataPrepperDockerContainer(startDataPrepperTask as DockerStartContainer) | ||
finalizedBy stopDataPrepperTask | ||
finalizedBy removeDataPrepperDockerContainer(stopDataPrepperTask as DockerStopContainer) | ||
finalizedBy removeDataPrepperNetwork | ||
} | ||
|
||
dependencies { | ||
integrationTestCompile project(':data-prepper-plugins:opensearch') | ||
integrationTestImplementation "com.github.javafaker:javafaker:1.0.2" | ||
integrationTestImplementation "com.linecorp.armeria:armeria:1.0.0" | ||
integrationTestImplementation "org.awaitility:awaitility:4.0.3" | ||
integrationTestImplementation "org.opensearch.client:opensearch-rest-high-level-client:${versionMap.opensearchVersion}" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package com.amazon.dataprepper.integration.log; | ||
|
||
import com.github.javafaker.Faker; | ||
import com.github.javafaker.Internet; | ||
|
||
import java.text.DateFormat; | ||
import java.text.SimpleDateFormat; | ||
import java.util.Random; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* This class provides the API to generate fake Apache log. | ||
*/ | ||
public class ApacheLogFaker { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like that you made your own ApacheLogFaker. It might be nice to have a single basic unit test for this log creation since changes here can cause the end to end test to fail. A failing unit test has potential to save people time trying to debug a failed end to end test. |
||
private final String[] HTTP_METHODS = new String[] {"GET", "POST", "DELETE", "PUT"}; | ||
private final String[] FAKE_URIS = new String[] {"/list", "/explore", "/search/tag/list", "/apps/cart.jsp?appID="}; | ||
private final String[] FAKE_STATUS = new String[] {"200", "404", "500", "301"}; | ||
private final String APACHE_LOG_FORMAT = "%s %s %s [%s] \"%s %s HTTP/1.0\" %s %s \"http://%s\" \"%s\""; | ||
private final Faker faker = new Faker(); | ||
private final Random random = new Random(); | ||
private final DateFormat dateFormat = new SimpleDateFormat("dd/MMM/y:HH:mm:ss Z"); | ||
|
||
public String generateRandomApacheLog() { | ||
final Internet internet = faker.internet(); | ||
final String username = faker.name().username(); | ||
final String password = internet.password(); | ||
final String httpMethod = HTTP_METHODS[random.nextInt(4)]; | ||
final String uri = FAKE_URIS[random.nextInt(4)]; | ||
final String status = FAKE_STATUS[random.nextInt(4)]; | ||
final int bytes = random.nextInt(1001) + 4000; | ||
return String.format( | ||
APACHE_LOG_FORMAT, | ||
internet.ipV4Address(), | ||
username, | ||
password, | ||
dateFormat.format(faker.date().past(100, TimeUnit.DAYS)), | ||
httpMethod, | ||
uri, | ||
status, | ||
bytes, | ||
internet.url(), | ||
internet.userAgentAny() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't need to be changed but I thought I would call it out. Technically adding this url and and userAgent isn't being used since we are matching against %{COMMONAPACHELOG} which doesn't have these. You could match these with %{COMBINEDAPACHELOG} but this is unnecessary. |
||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package com.amazon.dataprepper.integration.log; | ||
|
||
import com.amazon.dataprepper.plugins.sink.opensearch.ConnectionConfiguration; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.linecorp.armeria.client.WebClient; | ||
import com.linecorp.armeria.common.HttpData; | ||
import com.linecorp.armeria.common.HttpMethod; | ||
import com.linecorp.armeria.common.HttpStatus; | ||
import com.linecorp.armeria.common.MediaType; | ||
import com.linecorp.armeria.common.RequestHeaders; | ||
import com.linecorp.armeria.common.SessionProtocol; | ||
import org.junit.Assert; | ||
import org.junit.Test; | ||
import org.opensearch.action.admin.indices.refresh.RefreshRequest; | ||
import org.opensearch.action.search.SearchRequest; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.client.RequestOptions; | ||
import org.opensearch.client.RestHighLevelClient; | ||
import org.opensearch.search.SearchHits; | ||
import org.opensearch.search.builder.SearchSourceBuilder; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.awaitility.Awaitility.await; | ||
import static org.hamcrest.CoreMatchers.is; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
|
||
public class EndToEndBasicGrokTest { | ||
private static final int HTTP_SOURCE_PORT = 2021; | ||
private static final String TEST_INDEX_NAME = "test-grok-index"; | ||
|
||
private final ApacheLogFaker apacheLogFaker = new ApacheLogFaker(); | ||
private final ObjectMapper objectMapper = new ObjectMapper(); | ||
|
||
@Test | ||
public void testPipelineEndToEnd() throws JsonProcessingException { | ||
// Send data to http source | ||
sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData(2)); | ||
sendHttpRequestToSource(HTTP_SOURCE_PORT, generateRandomApacheLogHttpData(3)); | ||
|
||
// Verify data in OpenSearch backend | ||
final RestHighLevelClient restHighLevelClient = prepareOpenSearchRestHighLevelClient(); | ||
final List<Map<String, Object>> retrievedDocs = new ArrayList<>(); | ||
// Wait for data to flow through pipeline and be indexed by ES | ||
await().atMost(10, TimeUnit.SECONDS).untilAsserted( | ||
() -> { | ||
refreshIndices(restHighLevelClient); | ||
final SearchRequest searchRequest = new SearchRequest(TEST_INDEX_NAME); | ||
searchRequest.source( | ||
SearchSourceBuilder.searchSource().size(100) | ||
); | ||
final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); | ||
final List<Map<String, Object>> foundSources = getSourcesFromSearchHits(searchResponse.getHits()); | ||
Assert.assertEquals(5, foundSources.size()); | ||
retrievedDocs.addAll(foundSources); | ||
} | ||
); | ||
// Verify original and grokked keys from retrieved docs | ||
final List<String> expectedDocKeys = Arrays.asList( | ||
"date", "log", "clientip", "ident", "auth", "timestamp", "verb", "request", "httpversion", "response", "bytes"); | ||
retrievedDocs.forEach(expectedDoc -> { | ||
for (String key: expectedDocKeys) { | ||
Assert.assertNotNull(expectedDoc.get(key)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to do a check for the values themselves instead of just checking for non-null? Since ordering in sending and retrieving isn't guaranteed I can see this being slightly annoying. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd recommend:
Using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dlvenable @graytaylor0 Thanks for the comments. I think |
||
} | ||
}); | ||
} | ||
|
||
private RestHighLevelClient prepareOpenSearchRestHighLevelClient() { | ||
final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder( | ||
Collections.singletonList("https://127.0.0.1:9200")); | ||
builder.withUsername("admin"); | ||
builder.withPassword("admin"); | ||
return builder.build().createClient(); | ||
} | ||
|
||
private void sendHttpRequestToSource(final int port, final HttpData httpData) { | ||
WebClient.of().execute(RequestHeaders.builder() | ||
.scheme(SessionProtocol.HTTP) | ||
.authority(String.format("127.0.0.1:%d", port)) | ||
.method(HttpMethod.POST) | ||
.path("/log/ingest") | ||
.contentType(MediaType.JSON_UTF_8) | ||
.build(), | ||
httpData) | ||
.aggregate() | ||
.whenComplete((i, ex) -> assertThat(i.status(), is(HttpStatus.OK))).join(); | ||
} | ||
|
||
private List<Map<String, Object>> getSourcesFromSearchHits(final SearchHits searchHits) { | ||
final List<Map<String, Object>> sources = new ArrayList<>(); | ||
searchHits.forEach(hit -> { | ||
Map<String, Object> source = hit.getSourceAsMap(); | ||
sources.add(source); | ||
}); | ||
return sources; | ||
} | ||
|
||
private void refreshIndices(final RestHighLevelClient restHighLevelClient) throws IOException { | ||
final RefreshRequest requestAll = new RefreshRequest(); | ||
restHighLevelClient.indices().refresh(requestAll, RequestOptions.DEFAULT); | ||
} | ||
|
||
private HttpData generateRandomApacheLogHttpData(final int numLogs) throws JsonProcessingException { | ||
final List<Map<String, Object>> jsonArray = new ArrayList<>(); | ||
for (int i = 0; i < numLogs; i++) { | ||
final Map<String, Object> logObj = new HashMap<String, Object>() {{ | ||
put("date", System.currentTimeMillis()); | ||
put("log", apacheLogFaker.generateRandomApacheLog()); | ||
}}; | ||
jsonArray.add(logObj); | ||
} | ||
final String jsonData = objectMapper.writeValueAsString(jsonArray); | ||
return HttpData.ofUtf8(jsonData); | ||
} | ||
|
||
public static void main(String[] args) throws JsonProcessingException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand why we need this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah. That is my mis. Great catch! Will remove that. |
||
EndToEndBasicGrokTest endToEndBasicGrokTest = new EndToEndBasicGrokTest(); | ||
System.out.println(endToEndBasicGrokTest.generateRandomApacheLogHttpData(10).toStringUtf8()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of these wildcard imports?