Skip to content

Commit

Permalink
Test for log correlation from HEC exporter as well (#1424)
Browse files Browse the repository at this point in the history
* Check correlation from HEC exporter too

* Some comment formatting.

* Some cleanup.

* PR fixes
  • Loading branch information
agoallikmaa authored Sep 20, 2023
1 parent fdc6fd0 commit 0da3584
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 1 deletion.
1 change: 1 addition & 0 deletions smoke-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
testImplementation("ch.qos.logback:logback-classic:1.4.11")
testImplementation("com.github.docker-java:docker-java-core")
testImplementation("com.github.docker-java:docker-java-transport-httpclient5")
testImplementation("org.mock-server:mockserver-client-java:5.15.0")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Splunk Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.splunk.opentelemetry;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.function.Predicate;

class HecTelemetryInspector {
private static final String EVENT_KEY = "event";
private static final String FIELDS_KEY = "fields";

static Predicate<JsonNode> hasEventName(String eventName) {
return it -> {
JsonNode node = it.findValue(EVENT_KEY);
return node != null && node.isTextual() && eventName.equals(node.textValue());
};
}

static Predicate<JsonNode> hasTextFieldValue(String fieldName, String fieldValue) {
return it -> {
JsonNode node = it.findPath(FIELDS_KEY).findPath(fieldName);
return !node.isMissingNode() && node.isTextual() && fieldValue.equals(node.textValue());
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Splunk Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.splunk.opentelemetry;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.mockserver.client.MockServerClient;
import org.mockserver.model.ClearType;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;

class HecTelemetryRetriever {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final JsonFactory JSON_FACTORY = new JsonFactory(OBJECT_MAPPER);
private static final String HEC_PATH = "/services/collector/event";

private final MockServerClient client;

HecTelemetryRetriever(int backendPort) {
this.client = new MockServerClient("localhost", backendPort);
}

void initializeEndpoints() {
client
.when(HttpRequest.request(HEC_PATH))
.respond(HttpResponse.response("").withStatusCode(200));
}

void clearTelemetry() {
client.clear(HttpRequest.request(), ClearType.LOG);
}

List<JsonNode> waitForEntries() throws IOException, InterruptedException {
int previousSize = 0;
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
HttpRequest[] requests = new HttpRequest[0];

while (System.currentTimeMillis() < deadline) {
requests = client.retrieveRecordedRequests(HttpRequest.request(HEC_PATH));

if (requests.length > 0 && requests.length == previousSize) {
break;
}
previousSize = requests.length;
System.out.printf("Current HEC entry count %d%n", previousSize);
TimeUnit.MILLISECONDS.sleep(500);
}

return extractJsonEntries(requests);
}

private List<JsonNode> extractJsonEntries(HttpRequest[] requests) throws IOException {
List<JsonNode> entryNodes = new ArrayList<>();

for (HttpRequest request : requests) {
// HEC format just concatenates multiple JSON bodies after one another without using a JSON
// array, that's why we use readValuesAs. Also use getBodyAsRawBytes instead of
// getBodyAsString - MockServerClient parses the content and reformats if content type is
// JSON, keeping only the JSON body.
try (JsonParser parser = JSON_FACTORY.createParser(request.getBodyAsRawBytes())) {
Iterator<JsonNode> entries = parser.readValuesAs(JsonNode.class);

while (entries.hasNext()) {
entryNodes.add(entries.next());
}
}
}

return entryNodes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.splunk.opentelemetry;

import static com.splunk.opentelemetry.HecTelemetryInspector.hasEventName;
import static com.splunk.opentelemetry.HecTelemetryInspector.hasTextFieldValue;
import static com.splunk.opentelemetry.LogsInspector.hasSpanId;
import static com.splunk.opentelemetry.LogsInspector.hasStringBody;
import static com.splunk.opentelemetry.LogsInspector.hasTraceId;
Expand Down Expand Up @@ -73,6 +75,14 @@ void springBootSmokeTestOnJDK(int jdk) throws IOException, InterruptedException
.anyMatch(
hasTraceId(traceId).and(hasSpanId(spanId)).and(hasStringBody("HTTP request received")));

if (isHecEnabled()) {
assertThat(waitForHecEntries())
.anyMatch(
hasEventName("HTTP request received")
.and(hasTextFieldValue("span_id", spanId))
.and(hasTextFieldValue("trace_id", traceId)));
}

// cleanup
stopTarget();
}
Expand Down
25 changes: 25 additions & 0 deletions smoke-tests/src/test/java/com/splunk/opentelemetry/SmokeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.junit.jupiter.api.Assumptions.assumeTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.splunk.opentelemetry.helper.LinuxTestContainerManager;
import com.splunk.opentelemetry.helper.ResourceMapping;
import com.splunk.opentelemetry.helper.TargetContainerBuilder;
Expand Down Expand Up @@ -48,6 +49,7 @@ public abstract class SmokeTest {
System.getProperty("io.opentelemetry.smoketest.agent.shadowJar.path");

private TelemetryRetriever telemetryRetriever;
private HecTelemetryRetriever hecTelemetryRetriever;

/** Subclasses can override this method to pass jvm arguments in another environment variable */
protected String getJvmArgsEnvVarName() {
Expand Down Expand Up @@ -130,11 +132,22 @@ protected TargetWaitStrategy getWaitStrategy() {
@BeforeEach
void setUpTelemetryRetriever() {
telemetryRetriever = new TelemetryRetriever(client, containerManager.getBackendMappedPort());

int hecBackendPort = containerManager.getHecBackendMappedPort();
if (hecBackendPort != 0) {
hecTelemetryRetriever = new HecTelemetryRetriever(hecBackendPort);
hecTelemetryRetriever.initializeEndpoints();
}
}

@AfterEach
void clearTelemetry() throws IOException {
telemetryRetriever.clearTelemetry();

if (hecTelemetryRetriever != null) {
hecTelemetryRetriever.clearTelemetry();
hecTelemetryRetriever = null;
}
}

void stopTarget() {
Expand All @@ -153,6 +166,18 @@ protected LogsInspector waitForLogs() throws IOException, InterruptedException {
return telemetryRetriever.waitForLogs();
}

protected List<JsonNode> waitForHecEntries() throws IOException, InterruptedException {
if (hecTelemetryRetriever == null) {
return Collections.emptyList();
}

return hecTelemetryRetriever.waitForEntries();
}

protected boolean isHecEnabled() {
return hecTelemetryRetriever != null;
}

protected String getOtelInstrumentationVersion() throws IOException {
try (JarFile agentJar = new JarFile(agentPath)) {
// any instrumentation properties file will do, they all should contain the same version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

public abstract class AbstractTestContainerManager implements TestContainerManager {
protected static final int BACKEND_PORT = 8080;
protected static final int HEC_BACKEND_PORT = 1080;
protected static final int COLLECTOR_PORT = 4317;

protected static final String BACKEND_ALIAS = "backend";
protected static final String HEC_BACKEND_ALIAS = "hec-backend";
protected static final String COLLECTOR_ALIAS = "collector";
protected static final String COLLECTOR_CONFIG_RESOURCE = "/otel.yaml";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class LinuxTestContainerManager extends AbstractTestContainerManager {

private Network network;
private GenericContainer<?> backend;
private GenericContainer<?> hecBackend;
private GenericContainer<?> collector;
private GenericContainer<?> target;

Expand All @@ -52,6 +53,15 @@ public void startEnvironment() {
.withLogConsumer(new Slf4jLogConsumer(logger));
backend.start();

hecBackend =
new GenericContainer<>(DockerImageName.parse("mockserver/mockserver:5.15.0"))
.withExposedPorts(HEC_BACKEND_PORT)
.waitingFor(Wait.forLogMessage(".*started on port.*", 1))
.withNetwork(network)
.withNetworkAliases(HEC_BACKEND_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(logger));
hecBackend.start();

collector =
new GenericContainer<>(DockerImageName.parse("otel/opentelemetry-collector-contrib:latest"))
.dependsOn(backend)
Expand All @@ -70,6 +80,10 @@ public void stopEnvironment() {
backend.stop();
backend = null;
}
if (hecBackend != null) {
hecBackend.stop();
hecBackend = null;
}
if (collector != null) {
collector.stop();
collector = null;
Expand Down Expand Up @@ -118,6 +132,11 @@ public int getBackendMappedPort() {
return backend.getMappedPort(BACKEND_PORT);
}

@Override
public int getHecBackendMappedPort() {
return hecBackend.getMappedPort(HEC_BACKEND_PORT);
}

@Override
public int getTargetMappedPort(int originalPort) {
return target.getMappedPort(originalPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface TestContainerManager {

int getBackendMappedPort();

int getHecBackendMappedPort();

int getTargetMappedPort(int originalPort);

void startTarget(TargetContainerBuilder builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ public int getBackendMappedPort() {
return extractMappedPort(backend, BACKEND_PORT);
}

@Override
public int getHecBackendMappedPort() {
return 0;
}

@Override
public int getTargetMappedPort(int originalPort) {
return extractMappedPort(target, originalPort);
Expand Down
10 changes: 9 additions & 1 deletion smoke-tests/src/test/resources/otel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ exporters:
endpoint: backend:8080
tls:
insecure: true
splunk_hec:
token: "00000000-0000-0000-0000-000000000000"
endpoint: http://hec-backend:1080/services/collector/event
sourcetype: "test"
max_connections: 20
timeout: 10s
tls:
insecure_skip_verify: true

service:
pipelines:
Expand All @@ -41,6 +49,6 @@ service:
logs:
receivers: [ otlp ]
processors: [ batch ]
exporters: [ logging/logging_info, otlp ]
exporters: [ logging/logging_info, otlp, splunk_hec ]

extensions: [ health_check, pprof, zpages ]

0 comments on commit 0da3584

Please sign in to comment.