Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAINT: add OTelTraceGroupProcessor from trace ingestion migration branch #1224

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions data-prepper-plugins/otel-trace-group-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# OTel Trace Group Processor

This is a processor that fills in the missing trace group related fields in the collection of [Span](../../data-prepper-api/src/main/java/com/amazon/dataprepper/model/trace/Span.java) records output by [otel-trace-raw-processor](../otel-trace-raw-processor).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name here also should use underscores.

It finds the missing trace group info for a spanId by looking up the relevant fields in its root span stored in OpenSearch or Amazon OpenSearch Service backend that the local data-prepper host ingest into.

## Usages

### OpenSearch

```
pipeline:
...
processor:
- otel-trace-group:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be using underscores, right?

hosts: ["https://localhost:9200"]
cert: path/to/cert
username: YOUR_USERNAME_HERE
password: YOUR_PASSWORD_HERE
```

See [opensearch_security.md](../opensearch/opensearch_security.md) for detailed explanation.

### Amazon OpenSearch Service

```
pipeline:
...
processor:
- otel-trace-group-processor:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be using underscores, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, there is no _processor suffix, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch.

hosts: ["https://your-amazon-opensearch-service-endpoint"]
aws_sigv4: true
cert: path/to/cert
insecure: false
```

See [security.md](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/security.md) for detailed explanation.

## Configuration

- `hosts`: A list of IP addresses of OpenSearch nodes.

- `cert`(optional): CA certificate that is pem encoded. Accepts both .pem or .crt. This enables the client to trust the CA that has signed the certificate that OpenSearch is using.
Default is null.

- `aws_sigv4`: A boolean flag to sign the HTTP request with AWS credentials. Only applies to Amazon OpenSearch Service. See [security](security.md) for details. Default to `false`.

- `aws_region`: A String represents the region of Amazon OpenSearch Service domain, e.g. us-west-2. Only applies to Amazon OpenSearch Service. Defaults to `us-east-1`.

- `insecure`: A boolean flag to turn off SSL certificate verification. If set to true, CA certificate verification will be turned off and insecure HTTP requests will be sent. Default to `false`.

- `username`(optional): A String of username used in the [internal users](https://opensearch.org/docs/latest/security-plugin/access-control/users-roles) of OpenSearch cluster. Default is null.

- `password`(optional): A String of password used in the [internal users](https://opensearch.org/docs/latest/security-plugin/access-control/users-roles) of OpenSearch cluster. Default is null.

## Metrics

### Counter
- `recordsInMissingTraceGroup`: number of ingress records missing trace group fields.
- `recordsOutFixedTraceGroup`: number of egress records with trace group fields filled successfully.
- `recordsOutMissingTraceGroup`: number of egress records missing trace group fields.

## Developer Guide

This plugin is compatible with Java 8. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
22 changes: 22 additions & 0 deletions data-prepper-plugins/otel-trace-group-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

ext {
opensearch_version = System.getProperty("opensearch.version", "${versionMap.opensearchVersion}")
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:opensearch')
implementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}"
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'io.micrometer:micrometer-core'
testImplementation project(':data-prepper-api').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.oteltracegroup;

import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.processor.AbstractProcessor;
import com.amazon.dataprepper.model.processor.Processor;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.trace.DefaultTraceGroupFields;
import com.amazon.dataprepper.model.trace.Span;
import com.amazon.dataprepper.model.trace.TraceGroupFields;
import com.amazon.dataprepper.plugins.processor.oteltracegroup.model.TraceGroup;
import com.google.common.base.Strings;
import io.micrometer.core.instrument.Counter;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.document.DocumentField;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

@DataPrepperPlugin(name = "otel_trace_group", pluginType = Processor.class)
public class OTelTraceGroupProcessor extends AbstractProcessor<Record<Span>, Record<Span>> {

public static final String RECORDS_IN_MISSING_TRACE_GROUP = "recordsInMissingTraceGroup";
public static final String RECORDS_OUT_FIXED_TRACE_GROUP = "recordsOutFixedTraceGroup";
public static final String RECORDS_OUT_MISSING_TRACE_GROUP = "recordsOutMissingTraceGroup";

private static final Logger LOG = LoggerFactory.getLogger(OTelTraceGroupProcessor.class);

private final OTelTraceGroupProcessorConfig otelTraceGroupProcessorConfig;
private final RestHighLevelClient restHighLevelClient;

private final Counter recordsInMissingTraceGroupCounter;
private final Counter recordsOutFixedTraceGroupCounter;
private final Counter recordsOutMissingTraceGroupCounter;

public OTelTraceGroupProcessor(final PluginSetting pluginSetting) {
super(pluginSetting);
otelTraceGroupProcessorConfig = OTelTraceGroupProcessorConfig.buildConfig(pluginSetting);
restHighLevelClient = otelTraceGroupProcessorConfig.getEsConnectionConfig().createClient();

recordsInMissingTraceGroupCounter = pluginMetrics.counter(RECORDS_IN_MISSING_TRACE_GROUP);
recordsOutFixedTraceGroupCounter = pluginMetrics.counter(RECORDS_OUT_FIXED_TRACE_GROUP);
recordsOutMissingTraceGroupCounter = pluginMetrics.counter(RECORDS_OUT_MISSING_TRACE_GROUP);
}

@Override
public Collection<Record<Span>> doExecute(final Collection<Record<Span>> rawSpanRecords) {
final List<Record<Span>> recordsOut = new LinkedList<>();
final Set<Record<Span>> recordsMissingTraceGroupInfo = new HashSet<>();
final Set<String> traceIdsToLookUp = new HashSet<>();
for (Record<Span> record: rawSpanRecords) {
final Span span = record.getData();
final String traceGroup = span.getTraceGroup();
final String traceId = span.getTraceId();
if (Strings.isNullOrEmpty(traceGroup)) {
traceIdsToLookUp.add(traceId);
recordsMissingTraceGroupInfo.add(record);
recordsInMissingTraceGroupCounter.increment();
} else {
recordsOut.add(record);
}
}

final Map<String, TraceGroup> traceIdToTraceGroup = searchTraceGroupByTraceIds(traceIdsToLookUp);
for (final Record<Span> record: recordsMissingTraceGroupInfo) {
final Span span = record.getData();
final String traceId = span.getTraceId();
final TraceGroup traceGroup = traceIdToTraceGroup.get(traceId);
if (traceGroup != null) {
try {
fillInTraceGroupInfo(span, traceGroup);
recordsOut.add(record);
recordsOutFixedTraceGroupCounter.increment();
} catch (Exception e) {
recordsOut.add(record);
recordsOutMissingTraceGroupCounter.increment();
LOG.error("Failed to process the span: [{}]", record.getData(), e);
}
} else {
recordsOut.add(record);
recordsOutMissingTraceGroupCounter.increment();
final String spanId = span.getSpanId();
LOG.warn("Failed to find traceGroup for spanId: {} due to traceGroup missing for traceId: {}", spanId, traceId);
}
}

return recordsOut;
}

private void fillInTraceGroupInfo(final Span span, final TraceGroup traceGroup) {
span.setTraceGroup(traceGroup.getTraceGroup());
span.setTraceGroupFields(traceGroup.getTraceGroupFields());
}

private Map<String, TraceGroup> searchTraceGroupByTraceIds(final Collection<String> traceIds) {
final Map<String, TraceGroup> traceIdToTraceGroup = new HashMap<>();
final SearchRequest searchRequest = createSearchRequest(traceIds);

try {
final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
final SearchHit[] searchHits = searchResponse.getHits().getHits();
Arrays.asList(searchHits).forEach(searchHit -> {
final Optional<Map.Entry<String, TraceGroup>> optionalStringTraceGroupEntry = fromSearchHitToMapEntry(searchHit);
optionalStringTraceGroupEntry.ifPresent(entry -> traceIdToTraceGroup.put(entry.getKey(), entry.getValue()));
});
} catch (Exception e) {
// TODO: retry for status code 429 of OpenSearchException?
LOG.error("Search request for traceGroup failed for traceIds: {} due to {}", traceIds, e.getMessage());
}

return traceIdToTraceGroup;
}

private SearchRequest createSearchRequest(final Collection<String> traceIds) {
final SearchRequest searchRequest = new SearchRequest(OTelTraceGroupProcessorConfig.RAW_INDEX_ALIAS);
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termsQuery(OTelTraceGroupProcessorConfig.TRACE_ID_FIELD, traceIds))
.must(QueryBuilders.termQuery(OTelTraceGroupProcessorConfig.PARENT_SPAN_ID_FIELD, ""))
);
searchSourceBuilder.docValueField(OTelTraceGroupProcessorConfig.TRACE_ID_FIELD);
searchSourceBuilder.docValueField(TraceGroup.TRACE_GROUP_NAME_FIELD);
searchSourceBuilder.docValueField(TraceGroup.TRACE_GROUP_END_TIME_FIELD, OTelTraceGroupProcessorConfig.STRICT_DATE_TIME);
searchSourceBuilder.docValueField(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD);
searchSourceBuilder.docValueField(TraceGroup.TRACE_GROUP_STATUS_CODE_FIELD);
searchSourceBuilder.fetchSource(false);
searchRequest.source(searchSourceBuilder);

return searchRequest;
}

private Optional<Map.Entry<String, TraceGroup>> fromSearchHitToMapEntry(final SearchHit searchHit) {
final DocumentField traceIdDocField = searchHit.field(OTelTraceGroupProcessorConfig.TRACE_ID_FIELD);
final DocumentField traceGroupNameDocField = searchHit.field(TraceGroup.TRACE_GROUP_NAME_FIELD);
final DocumentField traceGroupEndTimeDocField = searchHit.field(TraceGroup.TRACE_GROUP_END_TIME_FIELD);
final DocumentField traceGroupDurationInNanosDocField = searchHit.field(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD);
final DocumentField traceGroupStatusCodeDocField = searchHit.field(TraceGroup.TRACE_GROUP_STATUS_CODE_FIELD);
if (Stream.of(traceIdDocField, traceGroupNameDocField, traceGroupEndTimeDocField, traceGroupDurationInNanosDocField,
traceGroupStatusCodeDocField).allMatch(Objects::nonNull)) {
final String traceId = traceIdDocField.getValue();
final String traceGroupName = traceGroupNameDocField.getValue();
// Restore trailing zeros for thousand, e.g. 2020-08-20T05:40:46.0895568Z -> 2020-08-20T05:40:46.089556800Z
final String traceGroupEndTime = Instant.parse(traceGroupEndTimeDocField.getValue()).toString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than add an inline comment (which is prone to being lost over time), I suggest you make this a function.

Perhaps:

/**
 * Restores trailing zeros for thousand, e.g. 2020-08-20T05:40:46.0895568Z -> 2020-08-20T05:40:46.089556800Z
 */ 
private String normalizeDateTime(String dateTimeString) {
  return Instant.parse(dateTimeString).toString();
}

final Number traceGroupDurationInNanos = traceGroupDurationInNanosDocField.getValue();
final Number traceGroupStatusCode = traceGroupStatusCodeDocField.getValue();
final TraceGroupFields traceGroupFields = DefaultTraceGroupFields.builder()
.withEndTime(traceGroupEndTime)
.withDurationInNanos(traceGroupDurationInNanos.longValue())
.withStatusCode(traceGroupStatusCode.intValue())
.build();
final TraceGroup traceGroup = new TraceGroup.TraceGroupBuilder()
.setTraceGroup(traceGroupName)
.setTraceGroupFields(traceGroupFields)
.build();
return Optional.of(new AbstractMap.SimpleEntry<>(traceId, traceGroup));
}
return Optional.empty();
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
try {
restHighLevelClient.close();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.oteltracegroup;

import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConstants;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;

public class OTelTraceGroupProcessorConfig {
protected static final String TRACE_ID_FIELD = "traceId";
protected static final String SPAN_ID_FIELD = "spanId";
protected static final String PARENT_SPAN_ID_FIELD = "parentSpanId";
protected static final String RAW_INDEX_ALIAS = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
protected static final String STRICT_DATE_TIME = "strict_date_time";

private final ConnectionConfiguration esConnectionConfig;

public ConnectionConfiguration getEsConnectionConfig() {
return esConnectionConfig;
}

private OTelTraceGroupProcessorConfig(final ConnectionConfiguration esConnectionConfig) {
this.esConnectionConfig = esConnectionConfig;
}

public static OTelTraceGroupProcessorConfig buildConfig(final PluginSetting pluginSetting) {
final ConnectionConfiguration esConnectionConfig = ConnectionConfiguration.readConnectionConfiguration(pluginSetting);
return new OTelTraceGroupProcessorConfig(esConnectionConfig);
}
}
Loading