Skip to content

Commit

Permalink
botocore: Add support for SNS publish and publish_batch (#1409)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariojonke authored Nov 24, 2022
1 parent 8dbd142 commit 2179fb9
Show file tree
Hide file tree
Showing 6 changed files with 462 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1350](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1350))
- `opentelemetry-instrumentation-starlette` Add support for regular expression matching and sanitization of HTTP headers.
([#1404](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1404))
- `opentelemetry-instrumentation-botocore` Add support for SNS `publish` and `publish_batch`.
([#1409](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1409))
- Strip leading comments from SQL queries when generating the span name.
([#1434](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1434))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def loader():
_KNOWN_EXTENSIONS = {
"dynamodb": _lazy_load(".dynamodb", "_DynamoDbExtension"),
"lambda": _lazy_load(".lmbd", "_LambdaExtension"),
"sns": _lazy_load(".sns", "_SnsExtension"),
"sqs": _lazy_load(".sqs", "_SqsExtension"),
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Any, MutableMapping

from opentelemetry.propagate import get_global_textmap, inject
from opentelemetry.propagators.textmap import CarrierT, Setter

_logger = logging.getLogger(__name__)

_MAX_MESSAGE_ATTRIBUTES = 10


class MessageAttributesSetter(Setter[CarrierT]):
def set(self, carrier: CarrierT, key: str, value: str):
carrier[key] = {
"DataType": "String",
"StringValue": value,
}


message_attributes_setter = MessageAttributesSetter()


def inject_propagation_context(
carrier: MutableMapping[str, Any]
) -> MutableMapping[str, Any]:
if carrier is None:
carrier = {}

fields = get_global_textmap().fields
if len(carrier.keys()) + len(fields) <= _MAX_MESSAGE_ATTRIBUTES:
inject(carrier, setter=message_attributes_setter)
else:
_logger.warning(
"botocore instrumentation: cannot set context propagation on "
"SQS/SNS message due to maximum amount of MessageAttributes"
)

return carrier
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import inspect
from typing import Any, Dict, MutableMapping, Optional, Tuple

from opentelemetry.instrumentation.botocore.extensions._messaging import (
inject_propagation_context,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkCallContext,
_AwsSdkExtension,
)
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
SpanAttributes,
)
from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import Span

################################################################################
# SNS operations
################################################################################


class _SnsOperation(abc.ABC):
@classmethod
@abc.abstractmethod
def operation_name(cls) -> str:
pass

@classmethod
def span_kind(cls) -> SpanKind:
return SpanKind.CLIENT

@classmethod
def extract_attributes(
cls, call_context: _AwsSdkCallContext, attributes: _AttributeMapT
):
pass

@classmethod
def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span):
pass


class _OpPublish(_SnsOperation):
_arn_arg_names = ("TopicArn", "TargetArn")
_phone_arg_name = "PhoneNumber"

@classmethod
def operation_name(cls) -> str:
return "Publish"

@classmethod
def span_kind(cls) -> SpanKind:
return SpanKind.PRODUCER

@classmethod
def extract_attributes(
cls, call_context: _AwsSdkCallContext, attributes: _AttributeMapT
):
destination_name, is_phone_number = cls._extract_destination_name(
call_context
)
attributes[
SpanAttributes.MESSAGING_DESTINATION_KIND
] = MessagingDestinationKindValues.TOPIC.value
attributes[SpanAttributes.MESSAGING_DESTINATION] = destination_name

call_context.span_name = (
f"{'phone_number' if is_phone_number else destination_name} send"
)

@classmethod
def _extract_destination_name(
cls, call_context: _AwsSdkCallContext
) -> Tuple[str, bool]:
arn = cls._extract_input_arn(call_context)
if arn:
return arn.rsplit(":", 1)[-1], False

if cls._phone_arg_name:
phone_number = call_context.params.get(cls._phone_arg_name)
if phone_number:
return phone_number, True

return "unknown", False

@classmethod
def _extract_input_arn(
cls, call_context: _AwsSdkCallContext
) -> Optional[str]:
for input_arn in cls._arn_arg_names:
arn = call_context.params.get(input_arn)
if arn:
return arn
return None

@classmethod
def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span):
cls._inject_span_into_entry(call_context.params)

@classmethod
def _inject_span_into_entry(cls, entry: MutableMapping[str, Any]):
entry["MessageAttributes"] = inject_propagation_context(
entry.get("MessageAttributes")
)


class _OpPublishBatch(_OpPublish):
_arn_arg_names = ("TopicArn",)
_phone_arg_name = None

@classmethod
def operation_name(cls) -> str:
return "PublishBatch"

@classmethod
def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span):
for entry in call_context.params.get("PublishBatchRequestEntries", ()):
cls._inject_span_into_entry(entry)


################################################################################
# SNS extension
################################################################################

_OPERATION_MAPPING = {
op.operation_name(): op
for op in globals().values()
if inspect.isclass(op)
and issubclass(op, _SnsOperation)
and not inspect.isabstract(op)
} # type: Dict[str, _SnsOperation]


class _SnsExtension(_AwsSdkExtension):
def __init__(self, call_context: _AwsSdkCallContext):
super().__init__(call_context)
self._op = _OPERATION_MAPPING.get(call_context.operation)
if self._op:
call_context.span_kind = self._op.span_kind()

def extract_attributes(self, attributes: _AttributeMapT):
attributes[SpanAttributes.MESSAGING_SYSTEM] = "aws.sns"

if self._op:
self._op.extract_attributes(self._call_context, attributes)

def before_service_call(self, span: Span):
if self._op:
self._op.before_service_call(self._call_context, span)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from opentelemetry.instrumentation.botocore.extensions._messaging import (
inject_propagation_context,
message_attributes_setter,
)
from opentelemetry.test.test_base import TestBase


class TestMessageAttributes(TestBase):
def test_message_attributes_setter(self):
carrier = {}

message_attributes_setter.set(carrier, "key", "value")
self.assertEqual(
{"key": {"DataType": "String", "StringValue": "value"}}, carrier
)

def test_inject_propagation_context(self):
carrier = {
"key1": {"DataType": "String", "StringValue": "value1"},
"key2": {"DataType": "String", "StringValue": "value2"},
}

tracer = self.tracer_provider.get_tracer("test-tracer")
with tracer.start_as_current_span("span"):
inject_propagation_context(carrier)

self.assertGreater(len(carrier), 2)

def test_inject_propagation_context_too_many_attributes(self):
carrier = {
f"key{idx}": {"DataType": "String", "StringValue": f"value{idx}"}
for idx in range(10)
}
tracer = self.tracer_provider.get_tracer("test-tracer")
with tracer.start_as_current_span("span"):
inject_propagation_context(carrier)

self.assertEqual(10, len(carrier))
Loading

0 comments on commit 2179fb9

Please sign in to comment.