Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message handler #8331

Draft
wants to merge 48 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
4889f26
BatchMessageHandler
atshaw43 Apr 20, 2023
0131ff2
Fixing parent bug
atshaw43 Apr 21, 2023
5934f86
Fixing readme
atshaw43 Apr 21, 2023
0ce11c5
Applying spot check
atshaw43 Apr 21, 2023
2f912cf
Addressing style checks and addign test for invalid upstream span con…
atshaw43 Apr 21, 2023
9bb39f4
Adding OTEL format
atshaw43 Apr 21, 2023
7aca0d4
Addressing PR
atshaw43 Apr 24, 2023
d6ec4d0
Applying spot check
atshaw43 Apr 24, 2023
1741b7b
Fixing style check
atshaw43 Apr 24, 2023
a34eba7
Adding clarification
atshaw43 Apr 24, 2023
b23a221
Refactorting some logic
atshaw43 Apr 25, 2023
1e2402e
Fixing stylecheck
atshaw43 Apr 25, 2023
88f0484
Addressing PR
atshaw43 May 31, 2023
d5af1fd
Running spotless
atshaw43 May 31, 2023
0e08dad
Fixing readme
atshaw43 May 31, 2023
072f189
Adding exception test
atshaw43 May 31, 2023
f036493
Running spotless
atshaw43 Jun 1, 2023
6874ab7
Update instrumentation/message-handler/library/src/main/java/io/opent…
atshaw43 Jun 1, 2023
9bc1864
Merging
atshaw43 Jun 1, 2023
af3ce0f
Running spotlessapply
atshaw43 Jun 1, 2023
3b625c4
Changing to AWS SDK SQS message object
atshaw43 Jun 2, 2023
f9433b1
Fixing annotations file
atshaw43 Jun 2, 2023
149d9b6
Fixing annotations
atshaw43 Jun 2, 2023
27eefc3
Merge branch 'open-telemetry:main' into MessageHandler
atshaw43 Jun 5, 2023
1b8273e
Adding customer gitter for message atribute values
atshaw43 Jun 5, 2023
9a46997
Applying spotless
atshaw43 Jun 5, 2023
1b14ab3
Addresing PR
atshaw43 Jun 7, 2023
40c7a67
Applying spotless
atshaw43 Jun 7, 2023
6aef8cb
Fixing test runner
atshaw43 Jun 7, 2023
dfceb72
Refactoring test code
atshaw43 Jun 7, 2023
a591789
Refactoring to use Messaging Extractors
atshaw43 Jun 9, 2023
371d793
Refactoring moduel heirarchy
atshaw43 Jun 13, 2023
5a51314
Refactoring modules
atshaw43 Jun 13, 2023
46feb15
Remove Warning
atshaw43 Jun 13, 2023
8505b8e
Fixing spot check
atshaw43 Jun 13, 2023
e81aa56
Refactor to receiver
atshaw43 Jun 15, 2023
24af0c9
Cleanup
atshaw43 Jun 15, 2023
80eacf1
Changing to process messages only
atshaw43 Jul 21, 2023
19c419b
Fixing typo
atshaw43 Jul 21, 2023
c0958a8
Adding more information regarding what console will look like
atshaw43 Jul 21, 2023
5f41613
Adding support for SQSEvent.SQSMessage
atshaw43 Jul 25, 2023
b810b67
Fixing AWS SDK
atshaw43 Aug 14, 2023
1e0104e
Splitting out lambda
atshaw43 Aug 16, 2023
ed4d6d2
Fixing tests
atshaw43 Aug 22, 2023
018cd53
Polishing
atshaw43 Aug 28, 2023
bbbc0aa
Addressing PR
atshaw43 Aug 28, 2023
c14d4c1
Fixing Read Me
atshaw43 Aug 28, 2023
c6a0fd8
Adding back removed import
atshaw43 Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* href="https://github.com/aws/aws-sdk-java-v2/blob/2.2.0/services/sqs/src/main/resources/codegen-resources/service-2.json#L821-L856">Definition
* JSON</a>
*/
final class SqsMessageAccess {
public final class SqsMessageAccess {
atshaw43 marked this conversation as resolved.
Show resolved Hide resolved

@Nullable private static final MethodHandle GET_ATTRIBUTES;
@Nullable private static final MethodHandle GET_MESSAGE_ATTRIBUTES;
Expand Down Expand Up @@ -68,7 +68,7 @@ final class SqsMessageAccess {
}

@SuppressWarnings("unchecked")
static Map<String, String> getAttributes(Object message) {
public static Map<String, String> getAttributes(Object message) {
if (GET_ATTRIBUTES == null) {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.Map;
import software.amazon.awssdk.core.SdkPojo;

final class SqsParentContext {
public final class SqsParentContext {

enum StringMapGetter implements TextMapGetter<Map<String, String>> {
INSTANCE;
Expand Down Expand Up @@ -52,13 +52,13 @@ public String get(Map<String, SdkPojo> map, String s) {

static final String AWS_TRACE_SYSTEM_ATTRIBUTE = "AWSTraceHeader";

static Context ofMessageAttributes(
public static Context ofMessageAttributes(
Map<String, SdkPojo> messageAttributes, TextMapPropagator propagator) {
return propagator.extract(
Context.root(), messageAttributes, MessageAttributeValueMapGetter.INSTANCE);
}

static Context ofSystemAttributes(Map<String, String> systemAttributes) {
public static Context ofSystemAttributes(Map<String, String> systemAttributes) {
String traceHeader = systemAttributes.get(AWS_TRACE_SYSTEM_ATTRIBUTE);
return AwsXrayPropagator.getInstance()
.extract(
Expand Down
19 changes: 19 additions & 0 deletions instrumentation/message-handler/aws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Using SqsMessageHandler

1. Retrieve a collection of messages to process.
2. Create a SqsMessageHandler and provide the business logic on what to do with the messages.
3. Call the handleMessages function and pass in your messages.
4. It will call the doHandleMessages function you provided wrapped in the messaging span.

```java
Collection<SQSEvent.SQSMessage> sqsMessages;

SqsMessageHandler messageHandler = new SqsMessageHandler(opentelemetry) {
@Override
protected void doHandleMessages(Collection<SQSEvent.SQSMessage> messages) {
// Do my business logic
}
};

messageHandler.handleMessages(sqsMessages, "destination.name", MessageOperation.RECEIVE);
```
14 changes: 14 additions & 0 deletions instrumentation/message-handler/aws/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
plugins {
atshaw43 marked this conversation as resolved.
Show resolved Hide resolved
id("otel.library-instrumentation")
}

dependencies {
compileOnly("io.opentelemetry:opentelemetry-sdk")

implementation(project(":instrumentation:message-handler:core"))
implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library"))

library("software.amazon.awssdk:sqs:2.2.0")
atshaw43 marked this conversation as resolved.
Show resolved Hide resolved

implementation("io.opentelemetry.contrib:opentelemetry-aws-xray-propagator")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.messagehandler;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.awssdk.v2_2.SqsMessageAccess;
import io.opentelemetry.instrumentation.awssdk.v2_2.SqsParentContext;
import java.util.Collection;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.services.sqs.model.Message;

public abstract class SqsMessageHandler extends MessageHandler<Message> {
private static final Logger logger = Logger.getLogger(SqsMessageHandler.class.getName());

private final OpenTelemetry openTelemetry;
private final MessageOperation messageOperation;
private final String destination;

public SqsMessageHandler(
OpenTelemetry openTelemetry, String destination, MessageOperation messageOperation) {
this.openTelemetry = openTelemetry;
this.messageOperation = messageOperation;
this.destination = destination;
}

@Override
protected Instrumenter<Collection<Message>, Void> getMessageInstrumenter() {
return Instrumenter.<Collection<Message>, Void>builder(
openTelemetry, "io.opentelemetry.message-handler", getSpanNameExtractor())
.setInstrumentationVersion("1.0")
.addAttributesExtractor(getAttributesExtractor())
.addSpanLinksExtractor(getSpanLinksExtractor())
.buildInstrumenter(getSpanKindExtractor());
}

protected SpanNameExtractor<Collection<Message>> getSpanNameExtractor() {
return MessagingSpanNameExtractor.create(getMessageingAttributesGetter(), messageOperation);
}

private MessagingAttributesGetter<Collection<Message>, Void> getMessageingAttributesGetter() {
String destination = this.destination;

return new MessagingAttributesGetter<Collection<Message>, Void>() {
@Nullable
@Override
public String getSystem(Collection<Message> messages) {
return "AmazonSQS";
}

@Nullable
@Override
public String getDestinationKind(Collection<Message> messages) {
return null;
}

@Nullable
@Override
public String getDestination(Collection<Message> messages) {
return destination;
}

@Override
public boolean isTemporaryDestination(Collection<Message> messages) {
return false;
}

@Nullable
@Override
public String getConversationId(Collection<Message> messages) {
return null;
}

@Nullable
@Override
public Long getMessagePayloadSize(Collection<Message> messages) {
long total = 0;

for (Message message : messages) {
if (message.body() != null) {
total += message.body().length();
}
}

return total;
}

@Nullable
@Override
public Long getMessagePayloadCompressedSize(Collection<Message> messages) {
return null;
}

@Nullable
@Override
public String getMessageId(Collection<Message> messages, @Nullable Void unused) {
return null;
}
};
}

protected SpanKindExtractor<Collection<Message>> getSpanKindExtractor() {
if (messageOperation == MessageOperation.RECEIVE
|| messageOperation == MessageOperation.PROCESS) {
return SpanKindExtractor.alwaysConsumer();
} else if (messageOperation == MessageOperation.SEND) {
return SpanKindExtractor.alwaysProducer();
} else {
logger.log(
Level.WARNING, "Unknown Messaging Operation {0}", new Object[] {messageOperation.name()});
return SpanKindExtractor.alwaysConsumer();
}
}

protected AttributesExtractor<Collection<Message>, Void> getAttributesExtractor() {
return MessagingAttributesExtractor.create(getMessageingAttributesGetter(), messageOperation);
}

protected SpanLinksExtractor<Collection<Message>> getSpanLinksExtractor() {
TextMapPropagator messagingPropagator = openTelemetry.getPropagators().getTextMapPropagator();

return (spanLinks, parentContext, sqsMessages) -> {
for (Message message : sqsMessages) {
Map<String, SdkPojo> messageAtributes = SqsMessageAccess.getMessageAttributes(message);

Context context =
SqsParentContext.ofMessageAttributes(messageAtributes, messagingPropagator);

if (context == Context.root()) {
context = SqsParentContext.ofSystemAttributes(SqsMessageAccess.getAttributes(message));
}

SpanContext messageSpanCtx = Span.fromContext(context).getSpanContext();

if (messageSpanCtx.isValid()) {
spanLinks.addLink(messageSpanCtx);
}
}
};
}
}
Loading