Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message handler #8331

Draft
wants to merge 48 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
4889f26
BatchMessageHandler
atshaw43 Apr 20, 2023
0131ff2
Fixing parent bug
atshaw43 Apr 21, 2023
5934f86
Fixing readme
atshaw43 Apr 21, 2023
0ce11c5
Applying spot check
atshaw43 Apr 21, 2023
2f912cf
Addressing style checks and addign test for invalid upstream span con…
atshaw43 Apr 21, 2023
9bb39f4
Adding OTEL format
atshaw43 Apr 21, 2023
7aca0d4
Addressing PR
atshaw43 Apr 24, 2023
d6ec4d0
Applying spot check
atshaw43 Apr 24, 2023
1741b7b
Fixing style check
atshaw43 Apr 24, 2023
a34eba7
Adding clarification
atshaw43 Apr 24, 2023
b23a221
Refactorting some logic
atshaw43 Apr 25, 2023
1e2402e
Fixing stylecheck
atshaw43 Apr 25, 2023
88f0484
Addressing PR
atshaw43 May 31, 2023
d5af1fd
Running spotless
atshaw43 May 31, 2023
0e08dad
Fixing readme
atshaw43 May 31, 2023
072f189
Adding exception test
atshaw43 May 31, 2023
f036493
Running spotless
atshaw43 Jun 1, 2023
6874ab7
Update instrumentation/message-handler/library/src/main/java/io/opent…
atshaw43 Jun 1, 2023
9bc1864
Merging
atshaw43 Jun 1, 2023
af3ce0f
Running spotlessapply
atshaw43 Jun 1, 2023
3b625c4
Changing to AWS SDK SQS message object
atshaw43 Jun 2, 2023
f9433b1
Fixing annotations file
atshaw43 Jun 2, 2023
149d9b6
Fixing annotations
atshaw43 Jun 2, 2023
27eefc3
Merge branch 'open-telemetry:main' into MessageHandler
atshaw43 Jun 5, 2023
1b8273e
Adding customer gitter for message atribute values
atshaw43 Jun 5, 2023
9a46997
Applying spotless
atshaw43 Jun 5, 2023
1b14ab3
Addresing PR
atshaw43 Jun 7, 2023
40c7a67
Applying spotless
atshaw43 Jun 7, 2023
6aef8cb
Fixing test runner
atshaw43 Jun 7, 2023
dfceb72
Refactoring test code
atshaw43 Jun 7, 2023
a591789
Refactoring to use Messaging Extractors
atshaw43 Jun 9, 2023
371d793
Refactoring moduel heirarchy
atshaw43 Jun 13, 2023
5a51314
Refactoring modules
atshaw43 Jun 13, 2023
46feb15
Remove Warning
atshaw43 Jun 13, 2023
8505b8e
Fixing spot check
atshaw43 Jun 13, 2023
e81aa56
Refactor to receiver
atshaw43 Jun 15, 2023
24af0c9
Cleanup
atshaw43 Jun 15, 2023
80eacf1
Changing to process messages only
atshaw43 Jul 21, 2023
19c419b
Fixing typo
atshaw43 Jul 21, 2023
c0958a8
Adding more information regarding what console will look like
atshaw43 Jul 21, 2023
5f41613
Adding support for SQSEvent.SQSMessage
atshaw43 Jul 25, 2023
b810b67
Fixing AWS SDK
atshaw43 Aug 14, 2023
1e0104e
Splitting out lambda
atshaw43 Aug 16, 2023
ed4d6d2
Fixing tests
atshaw43 Aug 22, 2023
018cd53
Polishing
atshaw43 Aug 28, 2023
bbbc0aa
Addressing PR
atshaw43 Aug 28, 2023
c14d4c1
Fixing Read Me
atshaw43 Aug 28, 2023
c6a0fd8
Adding back removed import
atshaw43 Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions instrumentation/message-handler/library/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Message Handler

This package contains handlers to instrument message system spans for a batch of messages. It is not designed for a for-loop case.

The handler will create a single new messaging span, add span links to it, and set the messaging attributes behind the scene. These values are based off of the messages passed in.

The handler provides constructors to change the messaging operation and span name of the newly created messaging span.

## Available Handlers
- `SQSBatchMessageHandlerTest` - processes messages from Amazon's SQS.

## Using SQSBatchMessageHandlerTest

1. Retrieve a collection of messages to process.
2. Create a SQSBatchMessageHandler and provide the business logic on what to do with the messages.
3. Call the handleMessages function and pass in your messages.
4. It will call the doHandleMessages function you provided wrapped in the messaging span.

```java
Collection<SQSEvent.SQSMessage> sqsMessages;

SqsBatchMessageHandler messageHandler = new SqsBatchMessageHandler(GlobalOpenTelemetry.get()) {
@Override
protected void doHandleMessages(Collection<SQSEvent.SQSMessage> messages) {
// Do my business logic
}
};

messageHandler.handleMessages(sqsMessages);
```
14 changes: 14 additions & 0 deletions instrumentation/message-handler/library/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
plugins {
id("otel.library-instrumentation")
}

dependencies {
api(project(":instrumentation:aws-lambda:aws-lambda-core-1.0:library"))

compileOnly("io.opentelemetry:opentelemetry-sdk")

library("com.amazonaws:aws-lambda-java-core:1.0.0")
library("com.amazonaws:aws-lambda-java-events:2.2.1")
atshaw43 marked this conversation as resolved.
Show resolved Hide resolved

implementation("io.opentelemetry.contrib:opentelemetry-aws-xray-propagator")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.messagehandler;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TracerBuilder;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public abstract class BatchMessageHandler<T> {
atshaw43 marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger logger = Logger.getLogger(BatchMessageHandler.class.getName());
private static final String AWS_TRACE_HEADER_PROPAGATOR_KEY = "x-amzn-trace-id";

protected String messagingOperation;
protected OpenTelemetry openTelemetry;
protected String spanName;

public BatchMessageHandler(OpenTelemetry openTelemetry) {
this(openTelemetry, MessageOperation.RECEIVE.name());
}

public BatchMessageHandler(OpenTelemetry openTelemetry, String messageOperation) {
this(openTelemetry, messageOperation, "Batch Message");
}

public BatchMessageHandler(
OpenTelemetry openTelemetry, String messageOperation, String spanName) {
this.openTelemetry = openTelemetry;
this.spanName = spanName;
this.messagingOperation = messageOperation;
}

public abstract String getParentHeaderFromMessage(T t);

protected abstract void doHandleMessages(Collection<T> messages);

protected void addMessagingAttributes(SpanBuilder spanBuilder) {
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_OPERATION, messagingOperation);
}

public void handleMessages(Collection<T> messages) {
TracerBuilder tracerBuilder =
openTelemetry
.tracerBuilder("io.opentelemetry.message.handler")
.setInstrumentationVersion("1.0");

Span parentSpan = Span.current();

SpanBuilder spanBuilder =
tracerBuilder
.build()
.spanBuilder(spanName)
.setParent(Context.current().with(parentSpan))
atshaw43 marked this conversation as resolved.
Show resolved Hide resolved
.setSpanKind(SpanKind.INTERNAL);

addMessagingAttributes(spanBuilder);

for (T t : messages) {
SpanContext spanContext = getParentSpanContextFromHeader(getParentHeaderFromMessage(t));
atshaw43 marked this conversation as resolved.
Show resolved Hide resolved

if (spanContext != null) {
spanBuilder.addLink(spanContext);
}
}

Span span = spanBuilder.startSpan();

try (Scope scope = span.makeCurrent()) {
doHandleMessages(messages);
} finally {
span.end();
}
}

public SpanContext getParentSpanContextFromHeader(String parentHeader) {
if (parentHeader == null) {
return null;
}

// We do not know if the upstream is W3C or X-Ray format.
// We will first try to decode it as a X-Ray trace context.
// Then we will try to decode it as a W3C trace context.
SpanContext spanContext = getParentSpanContextXray(parentHeader);

if (spanContext != null) {
return spanContext;
}
atshaw43 marked this conversation as resolved.
Show resolved Hide resolved

spanContext = getParentSpanContextW3C(parentHeader);

if (spanContext != null) {
return spanContext;
}

logger.log(Level.WARNING, "Invalid upstream span context: {0}", parentHeader);
return null;
}

private static SpanContext getParentSpanContextW3C(String parentHeader) {
try {
Context w3cContext =
W3CTraceContextPropagator.getInstance()
.extract(
Context.root(),
Collections.singletonMap("traceparent", parentHeader),
MapGetter.INSTANCE);

SpanContext messageSpanCtx = Span.fromContext(w3cContext).getSpanContext();

if (messageSpanCtx.isValid()) {
return messageSpanCtx;
} else {
return null;
}
} catch (RuntimeException e) {
return null;
}
}

private static SpanContext getParentSpanContextXray(String parentHeader) {
try {
Context xrayContext =
AwsXrayPropagator.getInstance()
.extract(
Context.root(),
Collections.singletonMap(AWS_TRACE_HEADER_PROPAGATOR_KEY, parentHeader),
MapGetter.INSTANCE);

SpanContext messageSpanCtx = Span.fromContext(xrayContext).getSpanContext();

if (messageSpanCtx.isValid()) {
return messageSpanCtx;
} else {
return null;
}
} catch (RuntimeException e) {
return null;
}
}

private enum MapGetter implements TextMapGetter<Map<String, String>> {
INSTANCE;

@Override
public Iterable<String> keys(Map<String, String> map) {
return map.keySet();
}

@Override
public String get(Map<String, String> map, String s) {
return map.get(s.toLowerCase(Locale.ROOT));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.messagehandler;

import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;

public abstract class SqsBatchMessageHandler extends BatchMessageHandler<SQSEvent.SQSMessage> {
private static final String AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY = "AWSTraceHeader";

public SqsBatchMessageHandler(OpenTelemetry openTelemetry) {
super(openTelemetry);
}

public SqsBatchMessageHandler(OpenTelemetry openTelemetry, String messageOperation) {
super(openTelemetry, messageOperation);
}

public SqsBatchMessageHandler(
OpenTelemetry openTelemetry, String messageOperation, String spanName) {
super(openTelemetry, messageOperation, spanName);
}

@Override
protected void addMessagingAttributes(SpanBuilder spanBuilder) {
super.addMessagingAttributes(spanBuilder);
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS");
}

@Override
public String getParentHeaderFromMessage(SQSEvent.SQSMessage message) {
return message.getAttributes().get(AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY);
}
}
Loading