Skip to content

Commit

Permalink
Maintenance: add OTelTraceGroupProcessor from trace ingestion migrati…
Browse files Browse the repository at this point in the history
…on branch (#1224)

* ADD: otel-trace-group-processor

Signed-off-by: Qi Chen <[email protected]>

* MAINT: update header

Signed-off-by: Qi Chen <[email protected]>

* MAINT: migrate to processor interface

Signed-off-by: Qi Chen <[email protected]>

* MAINT: README and renaming plugin

Signed-off-by: Qi Chen <[email protected]>

* DOC: fix plugin names in README

Signed-off-by: Qi Chen <[email protected]>

* REF: normalizeDateTime

Signed-off-by: Qi Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Mar 30, 2022
1 parent 12be9b6 commit 46b205a
Show file tree
Hide file tree
Showing 13 changed files with 931 additions and 0 deletions.
67 changes: 67 additions & 0 deletions data-prepper-plugins/otel-trace-group-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# OTel Trace Group Processor

This is a processor that fills in the missing trace group related fields in the collection of [Span](../../data-prepper-api/src/main/java/com/amazon/dataprepper/model/trace/Span.java) records output by [otel_trace_raw](../otel-trace-raw-processor) processor.
It finds the missing trace group info for a spanId by looking up the relevant fields in its root span stored in OpenSearch or Amazon OpenSearch Service backend that the local data-prepper host ingest into.

## Usages

### OpenSearch

```
pipeline:
...
processor:
- otel_trace_group:
hosts: ["https://localhost:9200"]
cert: path/to/cert
username: YOUR_USERNAME_HERE
password: YOUR_PASSWORD_HERE
```

See [opensearch_security.md](../opensearch/opensearch_security.md) for detailed explanation.

### Amazon OpenSearch Service

```
pipeline:
...
processor:
- otel_trace_group:
hosts: ["https://your-amazon-opensearch-service-endpoint"]
aws_sigv4: true
cert: path/to/cert
insecure: false
```

See [security.md](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/security.md) for detailed explanation.

## Configuration

- `hosts`: A list of IP addresses of OpenSearch nodes.

- `cert`(optional): CA certificate that is pem encoded. Accepts both .pem or .crt. This enables the client to trust the CA that has signed the certificate that OpenSearch is using.
Default is null.

- `aws_sigv4`: A boolean flag to sign the HTTP request with AWS credentials. Only applies to Amazon OpenSearch Service. See [security](security.md) for details. Default to `false`.

- `aws_region`: A String represents the region of Amazon OpenSearch Service domain, e.g. us-west-2. Only applies to Amazon OpenSearch Service. Defaults to `us-east-1`.

- `insecure`: A boolean flag to turn off SSL certificate verification. If set to true, CA certificate verification will be turned off and insecure HTTP requests will be sent. Default to `false`.

- `username`(optional): A String of username used in the [internal users](https://opensearch.org/docs/latest/security-plugin/access-control/users-roles) of OpenSearch cluster. Default is null.

- `password`(optional): A String of password used in the [internal users](https://opensearch.org/docs/latest/security-plugin/access-control/users-roles) of OpenSearch cluster. Default is null.

## Metrics

### Counter
- `recordsInMissingTraceGroup`: number of ingress records missing trace group fields.
- `recordsOutFixedTraceGroup`: number of egress records with trace group fields filled successfully.
- `recordsOutMissingTraceGroup`: number of egress records missing trace group fields.

## Developer Guide

This plugin is compatible with Java 8. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
22 changes: 22 additions & 0 deletions data-prepper-plugins/otel-trace-group-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

ext {
opensearch_version = System.getProperty("opensearch.version", "${versionMap.opensearchVersion}")
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:opensearch')
implementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}"
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'io.micrometer:micrometer-core'
testImplementation project(':data-prepper-api').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.oteltracegroup;

import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.processor.AbstractProcessor;
import com.amazon.dataprepper.model.processor.Processor;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.trace.DefaultTraceGroupFields;
import com.amazon.dataprepper.model.trace.Span;
import com.amazon.dataprepper.model.trace.TraceGroupFields;
import com.amazon.dataprepper.plugins.processor.oteltracegroup.model.TraceGroup;
import com.google.common.base.Strings;
import io.micrometer.core.instrument.Counter;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.document.DocumentField;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

@DataPrepperPlugin(name = "otel_trace_group", pluginType = Processor.class)
public class OTelTraceGroupProcessor extends AbstractProcessor<Record<Span>, Record<Span>> {

public static final String RECORDS_IN_MISSING_TRACE_GROUP = "recordsInMissingTraceGroup";
public static final String RECORDS_OUT_FIXED_TRACE_GROUP = "recordsOutFixedTraceGroup";
public static final String RECORDS_OUT_MISSING_TRACE_GROUP = "recordsOutMissingTraceGroup";

private static final Logger LOG = LoggerFactory.getLogger(OTelTraceGroupProcessor.class);

private final OTelTraceGroupProcessorConfig otelTraceGroupProcessorConfig;
private final RestHighLevelClient restHighLevelClient;

private final Counter recordsInMissingTraceGroupCounter;
private final Counter recordsOutFixedTraceGroupCounter;
private final Counter recordsOutMissingTraceGroupCounter;

public OTelTraceGroupProcessor(final PluginSetting pluginSetting) {
super(pluginSetting);
otelTraceGroupProcessorConfig = OTelTraceGroupProcessorConfig.buildConfig(pluginSetting);
restHighLevelClient = otelTraceGroupProcessorConfig.getEsConnectionConfig().createClient();

recordsInMissingTraceGroupCounter = pluginMetrics.counter(RECORDS_IN_MISSING_TRACE_GROUP);
recordsOutFixedTraceGroupCounter = pluginMetrics.counter(RECORDS_OUT_FIXED_TRACE_GROUP);
recordsOutMissingTraceGroupCounter = pluginMetrics.counter(RECORDS_OUT_MISSING_TRACE_GROUP);
}

@Override
public Collection<Record<Span>> doExecute(final Collection<Record<Span>> rawSpanRecords) {
final List<Record<Span>> recordsOut = new LinkedList<>();
final Set<Record<Span>> recordsMissingTraceGroupInfo = new HashSet<>();
final Set<String> traceIdsToLookUp = new HashSet<>();
for (Record<Span> record: rawSpanRecords) {
final Span span = record.getData();
final String traceGroup = span.getTraceGroup();
final String traceId = span.getTraceId();
if (Strings.isNullOrEmpty(traceGroup)) {
traceIdsToLookUp.add(traceId);
recordsMissingTraceGroupInfo.add(record);
recordsInMissingTraceGroupCounter.increment();
} else {
recordsOut.add(record);
}
}

final Map<String, TraceGroup> traceIdToTraceGroup = searchTraceGroupByTraceIds(traceIdsToLookUp);
for (final Record<Span> record: recordsMissingTraceGroupInfo) {
final Span span = record.getData();
final String traceId = span.getTraceId();
final TraceGroup traceGroup = traceIdToTraceGroup.get(traceId);
if (traceGroup != null) {
try {
fillInTraceGroupInfo(span, traceGroup);
recordsOut.add(record);
recordsOutFixedTraceGroupCounter.increment();
} catch (Exception e) {
recordsOut.add(record);
recordsOutMissingTraceGroupCounter.increment();
LOG.error("Failed to process the span: [{}]", record.getData(), e);
}
} else {
recordsOut.add(record);
recordsOutMissingTraceGroupCounter.increment();
final String spanId = span.getSpanId();
LOG.warn("Failed to find traceGroup for spanId: {} due to traceGroup missing for traceId: {}", spanId, traceId);
}
}

return recordsOut;
}

private void fillInTraceGroupInfo(final Span span, final TraceGroup traceGroup) {
span.setTraceGroup(traceGroup.getTraceGroup());
span.setTraceGroupFields(traceGroup.getTraceGroupFields());
}

private Map<String, TraceGroup> searchTraceGroupByTraceIds(final Collection<String> traceIds) {
final Map<String, TraceGroup> traceIdToTraceGroup = new HashMap<>();
final SearchRequest searchRequest = createSearchRequest(traceIds);

try {
final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
final SearchHit[] searchHits = searchResponse.getHits().getHits();
Arrays.asList(searchHits).forEach(searchHit -> {
final Optional<Map.Entry<String, TraceGroup>> optionalStringTraceGroupEntry = fromSearchHitToMapEntry(searchHit);
optionalStringTraceGroupEntry.ifPresent(entry -> traceIdToTraceGroup.put(entry.getKey(), entry.getValue()));
});
} catch (Exception e) {
// TODO: retry for status code 429 of OpenSearchException?
LOG.error("Search request for traceGroup failed for traceIds: {} due to {}", traceIds, e.getMessage());
}

return traceIdToTraceGroup;
}

private SearchRequest createSearchRequest(final Collection<String> traceIds) {
final SearchRequest searchRequest = new SearchRequest(OTelTraceGroupProcessorConfig.RAW_INDEX_ALIAS);
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termsQuery(OTelTraceGroupProcessorConfig.TRACE_ID_FIELD, traceIds))
.must(QueryBuilders.termQuery(OTelTraceGroupProcessorConfig.PARENT_SPAN_ID_FIELD, ""))
);
searchSourceBuilder.docValueField(OTelTraceGroupProcessorConfig.TRACE_ID_FIELD);
searchSourceBuilder.docValueField(TraceGroup.TRACE_GROUP_NAME_FIELD);
searchSourceBuilder.docValueField(TraceGroup.TRACE_GROUP_END_TIME_FIELD, OTelTraceGroupProcessorConfig.STRICT_DATE_TIME);
searchSourceBuilder.docValueField(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD);
searchSourceBuilder.docValueField(TraceGroup.TRACE_GROUP_STATUS_CODE_FIELD);
searchSourceBuilder.fetchSource(false);
searchRequest.source(searchSourceBuilder);

return searchRequest;
}

private Optional<Map.Entry<String, TraceGroup>> fromSearchHitToMapEntry(final SearchHit searchHit) {
final DocumentField traceIdDocField = searchHit.field(OTelTraceGroupProcessorConfig.TRACE_ID_FIELD);
final DocumentField traceGroupNameDocField = searchHit.field(TraceGroup.TRACE_GROUP_NAME_FIELD);
final DocumentField traceGroupEndTimeDocField = searchHit.field(TraceGroup.TRACE_GROUP_END_TIME_FIELD);
final DocumentField traceGroupDurationInNanosDocField = searchHit.field(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD);
final DocumentField traceGroupStatusCodeDocField = searchHit.field(TraceGroup.TRACE_GROUP_STATUS_CODE_FIELD);
if (Stream.of(traceIdDocField, traceGroupNameDocField, traceGroupEndTimeDocField, traceGroupDurationInNanosDocField,
traceGroupStatusCodeDocField).allMatch(Objects::nonNull)) {
final String traceId = traceIdDocField.getValue();
final String traceGroupName = traceGroupNameDocField.getValue();
final String traceGroupEndTime = normalizeDateTime(traceGroupEndTimeDocField.getValue());
final Number traceGroupDurationInNanos = traceGroupDurationInNanosDocField.getValue();
final Number traceGroupStatusCode = traceGroupStatusCodeDocField.getValue();
final TraceGroupFields traceGroupFields = DefaultTraceGroupFields.builder()
.withEndTime(traceGroupEndTime)
.withDurationInNanos(traceGroupDurationInNanos.longValue())
.withStatusCode(traceGroupStatusCode.intValue())
.build();
final TraceGroup traceGroup = new TraceGroup.TraceGroupBuilder()
.setTraceGroup(traceGroupName)
.setTraceGroupFields(traceGroupFields)
.build();
return Optional.of(new AbstractMap.SimpleEntry<>(traceId, traceGroup));
}
return Optional.empty();
}

/**
* Restores trailing zeros for thousand, e.g. 2020-08-20T05:40:46.0895568Z -> 2020-08-20T05:40:46.089556800Z
*/
private String normalizeDateTime(String dateTimeString) {
return Instant.parse(dateTimeString).toString();
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
try {
restHighLevelClient.close();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.oteltracegroup;

import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConstants;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;

public class OTelTraceGroupProcessorConfig {
protected static final String TRACE_ID_FIELD = "traceId";
protected static final String SPAN_ID_FIELD = "spanId";
protected static final String PARENT_SPAN_ID_FIELD = "parentSpanId";
protected static final String RAW_INDEX_ALIAS = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
protected static final String STRICT_DATE_TIME = "strict_date_time";

private final ConnectionConfiguration esConnectionConfig;

public ConnectionConfiguration getEsConnectionConfig() {
return esConnectionConfig;
}

private OTelTraceGroupProcessorConfig(final ConnectionConfiguration esConnectionConfig) {
this.esConnectionConfig = esConnectionConfig;
}

public static OTelTraceGroupProcessorConfig buildConfig(final PluginSetting pluginSetting) {
final ConnectionConfiguration esConnectionConfig = ConnectionConfiguration.readConnectionConfiguration(pluginSetting);
return new OTelTraceGroupProcessorConfig(esConnectionConfig);
}
}
Loading

0 comments on commit 46b205a

Please sign in to comment.