Skip to content

Commit

Permalink
feat: adhoc: Implement opentelemetry for thrift client and server
Browse files Browse the repository at this point in the history
  • Loading branch information
Xaelias committed Dec 22, 2023
1 parent 53c3130 commit c7fbb13
Show file tree
Hide file tree
Showing 5 changed files with 1,346 additions and 200 deletions.
288 changes: 178 additions & 110 deletions baseplate/clients/thrift.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import contextlib
import inspect
import logging
import socket
import sys
import time

from collections import OrderedDict
from math import ceil
from typing import Any
from typing import Callable
from typing import Iterator
from typing import Optional

from opentelemetry import propagate
from opentelemetry import trace
from opentelemetry.semconv.trace import MessageTypeValues
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import status
from prometheus_client import Counter
from prometheus_client import Gauge
from prometheus_client import Histogram
Expand All @@ -28,6 +36,8 @@
from baseplate.thrift.ttypes import Error
from baseplate.thrift.ttypes import ErrorCode

logger = logging.getLogger(__name__)

PROM_NAMESPACE = "thrift_client"

PROM_COMMON_LABELS = [
Expand Down Expand Up @@ -191,6 +201,15 @@ def __init__(
self.server_span = server_span
self.namespace = namespace
self.retry_policy = retry_policy or RetryPolicy.new(attempts=1)
self.tracer = trace.get_tracer(__name__)

self.otel_peer_name = None
self.otel_peer_ip = None
try:
self.otel_peer_name = socket.getfqdn()
self.otel_peer_ip = socket.gethostbyname(self.otel_peer_name)
except Exception:
logger.exception("Failed to retrieve local fqdn/pod name/pod IP for otel traces.")

@contextlib.contextmanager
def retrying(self, **policy: Any) -> Iterator["_PooledClientProxy"]:
Expand All @@ -205,9 +224,27 @@ def retrying(self, **policy: Any) -> Iterator["_PooledClientProxy"]:

def _build_thrift_proxy_method(name: str) -> Callable[..., Any]:
def _call_thrift_method(self: Any, *args: Any, **kwargs: Any) -> Any:
trace_name = f"{self.namespace}.{name}"
last_error = None

# this is technically incorrect, but we don't currently have a reliable way
# of getting the name of the service being called, so relying on the name of
# the client is the best we can do
rpc_service = self.namespace
rpc_method = name

# RPC specific headers
# 1.20 doc https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/trace/semantic_conventions/rpc.md
otel_attributes = {
SpanAttributes.RPC_SYSTEM: "thrift",
SpanAttributes.RPC_SERVICE: rpc_service,
SpanAttributes.RPC_METHOD: rpc_method,
SpanAttributes.NET_HOST_NAME: self.otel_peer_name,
SpanAttributes.NET_HOST_IP: self.otel_peer_ip,
}

otelspan_name = f"{rpc_service}/{rpc_method}"
trace_name = f"{self.namespace}.{name}" # old bp.py span name

for time_remaining in self.retry_policy:
try:
with self.pool.connection() as prot, ACTIVE_REQUESTS.labels(
Expand All @@ -223,119 +260,150 @@ def _call_thrift_method(self: Any, *args: Any, **kwargs: Any) -> Any:
span.set_tag("method", method.__name__)
span.start()

try:
baseplate = span.baseplate
if baseplate:
service_name = baseplate.service_name
if service_name:
prot.trans.set_header(b"User-Agent", service_name.encode())

prot.trans.set_header(b"Trace", str(span.trace_id).encode())
prot.trans.set_header(b"Parent", str(span.parent_id).encode())
prot.trans.set_header(b"Span", str(span.id).encode())
if span.sampled is not None:
sampled = "1" if span.sampled else "0"
prot.trans.set_header(b"Sampled", sampled.encode())
if span.flags:
prot.trans.set_header(b"Flags", str(span.flags).encode())

min_timeout = time_remaining
if self.pool.timeout:
if not min_timeout or self.pool.timeout < min_timeout:
min_timeout = self.pool.timeout
if min_timeout and min_timeout > 0:
# min_timeout is in float seconds, we are converting to int milliseconds
# rounding up here.
prot.trans.set_header(
b"Deadline-Budget", str(int(ceil(min_timeout * 1000))).encode()
)

mutable_metadata: OrderedDict = OrderedDict()

pool_addr = self.pool.endpoint.address
if isinstance(pool_addr, str):
otel_attributes[SpanAttributes.NET_PEER_IP] = pool_addr
elif pool_addr is not None:
otel_attributes[SpanAttributes.NET_PEER_IP] = pool_addr.host
otel_attributes[SpanAttributes.NET_PEER_PORT] = pool_addr.port
if otel_attributes.get(SpanAttributes.NET_PEER_IP) in ["127.0.0.1", "::1"]:
otel_attributes[SpanAttributes.NET_PEER_NAME] = "localhost"
logger.debug(
"Will use the following otel span attributes. [span=%s, otel_attributes=%s]",
span,
otel_attributes,
)

with self.tracer.start_as_current_span(
otelspan_name,
kind=trace.SpanKind.CLIENT,
attributes=otel_attributes,
) as otelspan:
try:
edge_context = span.context.raw_edge_context
except AttributeError:
edge_context = None

if edge_context:
prot.trans.set_header(b"Edge-Request", edge_context)

result = method(*args, **kwargs)
except TTransportException as exc:
# the connection failed for some reason, retry if able
span.finish(exc_info=sys.exc_info())
last_error = str(exc)
if exc.inner is not None:
last_error += f" ({exc.inner})"
raise # we need to raise all exceptions so that self.pool.connect() self-heals
except (TApplicationException, TProtocolException):
# these are subclasses of TException but aren't ones that
# should be expected in the protocol. this is an error!
span.finish(exc_info=sys.exc_info())
raise
except Error as exc:
# a 5xx error is an unexpected exception but not 5xx are
# not.
if 500 <= exc.code < 600:
baseplate = span.baseplate
if baseplate:
service_name = baseplate.service_name
if service_name:
prot.trans.set_header(b"User-Agent", service_name.encode())

# Inject all tracing headers into mutable_metadata and add as headers
propagate.inject(mutable_metadata)
for k, v in mutable_metadata.items():
prot.set_header(k.encode(), v.encode())

min_timeout = time_remaining
if self.pool.timeout:
if not min_timeout or self.pool.timeout < min_timeout:
min_timeout = self.pool.timeout
if min_timeout and min_timeout > 0:
# min_timeout is in float seconds, we are converting to int milliseconds
# rounding up here.
prot.trans.set_header(
b"Deadline-Budget", str(int(ceil(min_timeout * 1000))).encode()
)

try:
edge_context = span.context.raw_edge_context
except AttributeError:
edge_context = None

if edge_context:
prot.trans.set_header(b"Edge-Request", edge_context)

result = method(*args, **kwargs)
except TTransportException as exc:
# the connection failed for some reason, retry if able
span.finish(exc_info=sys.exc_info())
otelspan.set_status(status.Status(status.StatusCode.ERROR))
last_error = str(exc)
if exc.inner is not None:
last_error += f" ({exc.inner})"
raise # we need to raise all exceptions so that self.pool.connect() self-heals
except (TApplicationException, TProtocolException):
# these are subclasses of TException but aren't ones that
# should be expected in the protocol. this is an error!
span.finish(exc_info=sys.exc_info())
otelspan.set_status(status.Status(status.StatusCode.ERROR))
raise
except Error as exc:
# a 5xx error is an unexpected exception but not 5xx are
# not.
if 500 <= exc.code < 600:
span.finish(exc_info=sys.exc_info())
otelspan.set_status(status.Status(status.StatusCode.ERROR))
else:
span.finish()
raise
except TException:
# this is an expected exception, as defined in the IDL
otelspan.set_status(status.Status(status.StatusCode.OK))
span.finish()
raise
except: # noqa: E722
# something unexpected happened
span.finish(exc_info=sys.exc_info())
otelspan.set_status(status.Status(status.StatusCode.ERROR))
raise
else:
# a normal result
span.finish()
raise
except TException:
# this is an expected exception, as defined in the IDL
span.finish()
raise
except: # noqa: E722
# something unexpected happened
span.finish(exc_info=sys.exc_info())
raise
else:
# a normal result
span.finish()
return result
finally:
thrift_success = "true"
exception_type = ""
baseplate_status = ""
baseplate_status_code = ""
exc_info = sys.exc_info()
if exc_info[0] is not None:
thrift_success = "false"
exception_type = exc_info[0].__name__
current_exc = exc_info[1]
try:
# We want the following code to execute whenever the
# service raises an instance of Baseplate's `Error` class.
# Unfortunately, we cannot just rely on `isinstance` to do
# what we want here because some services compile
# Baseplate's thrift file on their own and import `Error`
# from that. When this is done, `isinstance` will always
# return `False` since it's technically a different class.
# To fix this, we optimistically try to access `code` on
# `current_exc` and just catch the `AttributeError` if the
# `code` attribute is not present.
# Note: if the error code was not originally defined in baseplate, or the
# name associated with the error was overriden, this cannot reflect that
# we will emit the status code in both cases
# but the status will be blank in the first case, and the baseplate name
# in the second
baseplate_status_code = current_exc.code # type: ignore
baseplate_status = ErrorCode()._VALUES_TO_NAMES.get(current_exc.code, "") # type: ignore
except AttributeError:
pass

REQUEST_LATENCY.labels(
thrift_method=name,
thrift_client_name=self.namespace,
thrift_success=thrift_success,
).observe(time.perf_counter() - start_time)

REQUESTS_TOTAL.labels(
thrift_method=name,
thrift_client_name=self.namespace,
thrift_success=thrift_success,
thrift_exception_type=exception_type,
thrift_baseplate_status_code=baseplate_status_code,
thrift_baseplate_status=baseplate_status,
).inc()
otelspan.set_status(status.Status(status.StatusCode.OK))
return result
finally:
event_attributes = {
SpanAttributes.MESSAGE_TYPE: MessageTypeValues.SENT.value,
# SpanAttributes.MESSAGE_ID: _, # TODO if we want to
# SpanAttributes.MESSAGE_COMPRESSED_SIZE: _, # TODO if we want to
# SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: _, # TODO if we want to
}
otelspan.add_event(name="message", attributes=event_attributes)

thrift_success = "true"
exception_type = ""
baseplate_status = ""
baseplate_status_code = ""
exc_info = sys.exc_info()
if exc_info[0] is not None:
thrift_success = "false"
exception_type = exc_info[0].__name__
current_exc = exc_info[1]
try:
# We want the following code to execute whenever the
# service raises an instance of Baseplate's `Error` class.
# Unfortunately, we cannot just rely on `isinstance` to do
# what we want here because some services compile
# Baseplate's thrift file on their own and import `Error`
# from that. When this is done, `isinstance` will always
# return `False` since it's technically a different class.
# To fix this, we optimistically try to access `code` on
# `current_exc` and just catch the `AttributeError` if the
# `code` attribute is not present.
# Note: if the error code was not originally defined in baseplate, or the
# name associated with the error was overriden, this cannot reflect that
# we will emit the status code in both cases
# but the status will be blank in the first case, and the baseplate name
# in the second
baseplate_status_code = current_exc.code # type: ignore
baseplate_status = ErrorCode()._VALUES_TO_NAMES.get(current_exc.code, "") # type: ignore
except AttributeError:
pass

REQUEST_LATENCY.labels(
thrift_method=name,
thrift_client_name=self.namespace,
thrift_success=thrift_success,
).observe(time.perf_counter() - start_time)

REQUESTS_TOTAL.labels(
thrift_method=name,
thrift_client_name=self.namespace,
thrift_success=thrift_success,
thrift_exception_type=exception_type,
thrift_baseplate_status_code=baseplate_status_code,
thrift_baseplate_status=baseplate_status,
).inc()

except TTransportException:
# swallow exception so we can retry on TTransportException (relies on the for loop)
Expand Down
Loading

0 comments on commit c7fbb13

Please sign in to comment.