Skip to content

Commit

Permalink
botocore: Make common span attributes compliant with semconv in spec (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mariojonke authored Oct 6, 2021
1 parent 3b5071b commit 1960371
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 504 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.5.0-0.24b0...HEAD)
- `opentelemetry-sdk-extension-aws` Release AWS Python SDK Extension as 1.0.0
([#667](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/667))

### Added
- `opentelemetry-instrumentation-elasticsearch` Added `response_hook` and `request_hook` callbacks
Expand All @@ -22,6 +20,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#706](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/706))

### Changed
- `opentelemetry-instrumentation-botocore` Make common span attributes compliant with semantic conventions
([#674](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/674))
- `opentelemetry-sdk-extension-aws` Release AWS Python SDK Extension as 1.0.0
([#667](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/667))
- `opentelemetry-instrumentation-botocore` Unpatch botocore Endpoint.prepare_request on uninstrument
([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664))
- `opentelemetry-instrumentation-botocore` Fix span injection for lambda invoke
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,17 @@ def response_hook(span, service_name, operation_name, result):

import json
import logging
from typing import Collection
from typing import Any, Collection, Dict, Optional, Tuple

from botocore.client import BaseClient
from botocore.endpoint import Endpoint
from botocore.exceptions import ClientError
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
)
from opentelemetry.instrumentation.botocore.package import _instruments
from opentelemetry.instrumentation.botocore.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
Expand All @@ -97,7 +100,8 @@ def response_hook(span, service_name, operation_name, result):
)
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.trace import get_tracer
from opentelemetry.trace.span import Span

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -157,12 +161,12 @@ def _uninstrument(self, **kwargs):
unwrap(Endpoint, "prepare_request")

@staticmethod
def _is_lambda_invoke(service_name, operation_name, api_params):
def _is_lambda_invoke(call_context: _AwsSdkCallContext):
return (
service_name == "lambda"
and operation_name == "Invoke"
and isinstance(api_params, dict)
and "Payload" in api_params
call_context.service == "lambda"
and call_context.operation == "Invoke"
and isinstance(call_context.params, dict)
and "Payload" in call_context.params
)

@staticmethod
Expand All @@ -182,97 +186,126 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return original_func(*args, **kwargs)

# pylint: disable=protected-access
service_name = instance._service_model.service_name
operation_name, api_params = args
call_context = _determine_call_context(instance, args)
if call_context is None:
return original_func(*args, **kwargs)

error = None
result = None
attributes = {
SpanAttributes.RPC_SYSTEM: "aws-api",
SpanAttributes.RPC_SERVICE: call_context.service_id,
SpanAttributes.RPC_METHOD: call_context.operation,
# TODO: update when semantic conventions exist
"aws.region": call_context.region,
}

with self._tracer.start_as_current_span(
f"{service_name}", kind=SpanKind.CLIENT,
call_context.span_name,
kind=call_context.span_kind,
attributes=attributes,
) as span:
# inject trace context into payload headers for lambda Invoke
if BotocoreInstrumentor._is_lambda_invoke(
service_name, operation_name, api_params
):
BotocoreInstrumentor._patch_lambda_invoke(api_params)
if BotocoreInstrumentor._is_lambda_invoke(call_context):
BotocoreInstrumentor._patch_lambda_invoke(call_context.params)

self._set_api_call_attributes(
span, instance, service_name, operation_name, api_params
)
_set_api_call_attributes(span, call_context)
self._call_request_hook(span, call_context)

token = context_api.attach(
context_api.set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True)
)

if callable(self.request_hook):
self.request_hook(
span, service_name, operation_name, api_params
)

result = None
try:
result = original_func(*args, **kwargs)
except ClientError as ex:
error = ex
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
raise
else:
_apply_response_attributes(span, result)
finally:
context_api.detach(token)

if error:
result = error.response
self._call_response_hook(span, call_context, result)

if callable(self.response_hook):
self.response_hook(span, service_name, operation_name, result)
return result

self._set_api_call_result_attributes(span, result)
def _call_request_hook(self, span: Span, call_context: _AwsSdkCallContext):
if not callable(self.request_hook):
return
self.request_hook(
span,
call_context.service,
call_context.operation,
call_context.params,
)

if error:
raise error
def _call_response_hook(
self, span: Span, call_context: _AwsSdkCallContext, result
):
if not callable(self.response_hook):
return
self.response_hook(
span, call_context.service, call_context.operation, result
)

return result

@staticmethod
def _set_api_call_attributes(
span, instance, service_name, operation_name, api_params
):
if span.is_recording():
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute("aws.table_name", api_params["TableName"])
def _set_api_call_attributes(span, call_context: _AwsSdkCallContext):
if not span.is_recording():
return

@staticmethod
def _set_api_call_result_attributes(span, result):
if span.is_recording():
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "RetryAttempts" in metadata:
span.set_attribute(
"retry_attempts", metadata["RetryAttempts"],
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
metadata["HTTPStatusCode"],
)
if "QueueUrl" in call_context.params:
span.set_attribute("aws.queue_url", call_context.params["QueueUrl"])
if "TableName" in call_context.params:
span.set_attribute("aws.table_name", call_context.params["TableName"])


def _apply_response_attributes(span: Span, result):
if result is None or not span.is_recording():
return

metadata = result.get("ResponseMetadata")
if metadata is None:
return

request_id = metadata.get("RequestId")
if request_id is None:
headers = metadata.get("HTTPHeaders")
if headers is not None:
request_id = (
headers.get("x-amzn-RequestId")
or headers.get("x-amz-request-id")
or headers.get("x-amz-id-2")
)
if request_id:
# TODO: update when semantic conventions exist
span.set_attribute("aws.request_id", request_id)

retry_attempts = metadata.get("RetryAttempts")
if retry_attempts is not None:
# TODO: update when semantic conventinos exists
span.set_attribute("retry_attempts", retry_attempts)

status_code = metadata.get("HTTPStatusCode")
if status_code is not None:
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code)


def _determine_call_context(
client: BaseClient, args: Tuple[str, Dict[str, Any]]
) -> Optional[_AwsSdkCallContext]:
try:
call_context = _AwsSdkCallContext(client, args)

logger.debug(
"AWS SDK invocation: %s %s",
call_context.service,
call_context.operation,
)

return call_context
except Exception as ex: # pylint:disable=broad-except
# this shouldn't happen actually unless internals of botocore changed and
# extracting essential attributes ('service' and 'operation') failed.
logger.error("Error when initializing call context", exc_info=ex)
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging
from typing import Any, Dict, Optional, Tuple

from opentelemetry.trace import SpanKind

_logger = logging.getLogger(__name__)

_BotoClientT = "botocore.client.BaseClient"

_OperationParamsT = Dict[str, Any]


class _AwsSdkCallContext:
"""An context object providing information about the invoked AWS service
call.
Args:
service: the AWS service (e.g. s3, lambda, ...) which is called
service_id: the name of the service in propper casing
operation: the called operation (e.g. ListBuckets, Invoke, ...) of the
AWS service.
params: a dict of input parameters passed to the service operation.
region: the AWS region in which the service call is made
endpoint_url: the endpoint which the service operation is calling
api_version: the API version of the called AWS service.
span_name: the name used to create the span.
span_kind: the kind used to create the span.
"""

def __init__(self, client: _BotoClientT, args: Tuple[str, Dict[str, Any]]):
operation = args[0]
try:
params = args[1]
except (IndexError, TypeError):
_logger.warning("Could not get request params.")
params = {}

boto_meta = client.meta
service_model = boto_meta.service_model

self.service = service_model.service_name.lower() # type: str
self.operation = operation # type: str
self.params = params # type: Dict[str, Any]

# 'operation' and 'service' are essential for instrumentation.
# for all other attributes we extract them defensively. All of them should
# usually exist unless some future botocore version moved things.
self.region = self._get_attr(
boto_meta, "region_name"
) # type: Optional[str]
self.endpoint_url = self._get_attr(
boto_meta, "endpoint_url"
) # type: Optional[str]

self.api_version = self._get_attr(
service_model, "api_version"
) # type: Optional[str]
# name of the service in proper casing
self.service_id = str(
self._get_attr(service_model, "service_id", self.service)
)

self.span_name = f"{self.service_id}.{self.operation}"
self.span_kind = SpanKind.CLIENT

@staticmethod
def _get_attr(obj, name: str, default=None):
try:
return getattr(obj, name)
except AttributeError:
_logger.warning("Could not get attribute '%s'", name)
return default
Loading

0 comments on commit 1960371

Please sign in to comment.