diff --git a/CHANGELOG.md b/CHANGELOG.md index cc71d38d2c..6c5b2d1a24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1424](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1424)) - `opentelemetry-instrumentation-fastapi` Add support for regular expression matching and sanitization of HTTP headers. ([#1403](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1403)) +- `opentelemetry-instrumentation-botocore` add support for `messaging.*` in the sqs extension. + ([#1350](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1350)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py index 83d8e0af33..777108cbb5 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py @@ -11,11 +11,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from opentelemetry.instrumentation.botocore.extensions.types import ( _AttributeMapT, _AwsSdkExtension, + _BotoResultT, ) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace.span import Span + +_SUPPORTED_OPERATIONS = ["SendMessage", "SendMessageBatch", "ReceiveMessage"] + +_logger = logging.getLogger(__name__) class _SqsExtension(_AwsSdkExtension): @@ -24,3 +32,38 @@ def extract_attributes(self, attributes: _AttributeMapT): if queue_url: # TODO: update when semantic conventions exist attributes["aws.queue_url"] = queue_url + attributes[SpanAttributes.MESSAGING_SYSTEM] = "aws.sqs" + attributes[SpanAttributes.MESSAGING_URL] = queue_url + try: + attributes[ + SpanAttributes.MESSAGING_DESTINATION + ] = queue_url.split("/")[-1] + except IndexError: + _logger.error( + "Could not extract messaging destination from '%s'", + queue_url, + ) + + def on_success(self, span: Span, result: _BotoResultT): + operation = self._call_context.operation + if operation in _SUPPORTED_OPERATIONS: + try: + if operation == "SendMessage": + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + result.get("MessageId"), + ) + elif operation == "SendMessageBatch" and result.get( + "Successful" + ): + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + result["Successful"][0]["MessageId"], + ) + elif operation == "ReceiveMessage" and result.get("Messages"): + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + result["Messages"][0]["MessageId"], + ) + except (IndexError, KeyError): + _logger.error("Could not extract the messaging message ID") diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sqs.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sqs.py new file mode 100644 index 0000000000..6bcffd9274 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sqs.py @@ -0,0 +1,136 @@ +import botocore.session +from moto import mock_sqs + +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase + + +class TestSqsExtension(TestBase): + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.region = "us-west-2" + self.client = session.create_client("sqs", region_name=self.region) + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + @mock_sqs + def test_sqs_messaging_send_message(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + response = self.client.send_message( + QueueUrl=queue_url, MessageBody="content" + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 2) + span = spans[1] + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + response["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_send_message_batch(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + response = self.client.send_message_batch( + QueueUrl=queue_url, + Entries=[ + {"Id": "1", "MessageBody": "content"}, + {"Id": "2", "MessageBody": "content2"}, + ], + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 2) + span = spans[1] + self.assertEqual(span.attributes["rpc.method"], "SendMessageBatch") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + response["Successful"][0]["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_receive_message(self): + create_queue_result = self.client.create_queue( + QueueName="test_queue_name" + ) + queue_url = create_queue_result["QueueUrl"] + self.client.send_message(QueueUrl=queue_url, MessageBody="content") + message_result = self.client.receive_message( + QueueUrl=create_queue_result["QueueUrl"] + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 3) + span = spans[-1] + self.assertEqual(span.attributes["rpc.method"], "ReceiveMessage") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], queue_url + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + "test_queue_name", + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID], + message_result["Messages"][0]["MessageId"], + ) + + @mock_sqs + def test_sqs_messaging_failed_operation(self): + with self.assertRaises(Exception): + self.client.send_message( + QueueUrl="non-existing", MessageBody="content" + ) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.attributes["rpc.method"], "SendMessage") + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs" + ) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_URL], "non-existing" + )