Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] update amqpLink/Session/Connection network trace to comply with otel spec #34237

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,21 @@ logger = logging.getLogger('azure.servicebus')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

# Advanced usage: Log AMQP connection/session/link names
class CustomFormatter(logging.Formatter):

def format(self, record: logging.LogRecord) -> str:
if not "amqpConnection" in record.__dict__:
record.__dict__["amqpConnection"] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the values are not supposed to be None, why are they being set so here?

Copy link
Member Author

@swathipil swathipil Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they aren't supposed to be None for Otel, but if we're just printing them out/logging to other places, this should be fine. However, you're right in that I'm not sure how exactly this will affect otel, since the same extra params won't be present everywhere.

if not "amqpSession" in record.__dict__:
record.__dict__["amqpSession"] = None
if not "amqpLink" in record.__dict__:
record.__dict__["amqpLink"] = None

return super().format(record)

formatter = CustomFormatter('amqpConnection: %(amqpConnection)s, amqpSession: %(amqpSession)s, amqpLink: %(amqpLink)s - %(message)s')
handler.setFormatter(formatter)
...

from azure.servicebus import ServiceBusClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
custom_endpoint = f"{custom_parsed_url.hostname}:{custom_port}{custom_parsed_url.path}"
self._container_id = container_id or str(uuid.uuid4())
self._network_trace = network_trace
self._network_trace_params = {"amqpConnection": self._container_id, "amqpSession": None, "amqpLink": None}
self._network_trace_params = {"amqpConnection": self._container_id}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't we add this to fix another bug in our set up ? Would this cause it to resurface those errors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so?


transport = kwargs.get("transport")
self._transport_type = transport_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def __init__(
self._network_trace_params = {
"amqpConnection": self._session._connection._container_id,
"amqpSession": self._session.name,
"amqpLink": None
}

self._token_status_code: Optional[int] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,14 @@ async def close_async(self):
if self._keep_alive_thread:
await self._keep_alive_thread
self._keep_alive_thread = None
self._network_trace_params["amqpConnection"] = None
self._network_trace_params["amqpSession"] = None
try:
del self._network_trace_params["amqpConnection"]
except KeyError:
pass
try:
del self._network_trace_params["amqpSession"]
except KeyError:
pass

async def auth_complete_async(self):
"""Whether the authentication handshake is complete during
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(# pylint:disable=too-many-locals,too-many-statements
custom_endpoint = f"{custom_parsed_url.hostname}:{custom_port}{custom_parsed_url.path}"
self._container_id: str = container_id or str(uuid.uuid4())
self._network_trace = network_trace
self._network_trace_params = {"amqpConnection": self._container_id, "amqpSession": None, "amqpLink": None}
self._network_trace_params = {"amqpConnection": self._container_id}

transport = kwargs.get("transport")
self._transport_type = transport_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self, session, endpoint='$management', **kwargs):
self._network_trace_params = {
"amqpConnection": self._session._connection._container_id,
"amqpSession": self._session.name,
"amqpLink": None
}
self._mgmt_link: ManagementLink = self._session.create_request_response_link_pair(
endpoint=endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def __init__(
self._network_trace_params = {
"amqpConnection": self._session._connection._container_id,
"amqpSession": self._session.name,
"amqpLink": None
}

self._token_status_code: Optional[int] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def __init__(self, hostname, **kwargs):
"remote_idle_timeout_empty_frame_send_ratio", None
)
self._network_trace = kwargs.pop("network_trace", False)
self._network_trace_params = {"amqpConnection": None, "amqpSession": None, "amqpLink": None}
self._network_trace_params = {}

# Session settings
self._outgoing_window = kwargs.pop("outgoing_window", OUTGOING_WINDOW)
Expand Down Expand Up @@ -377,8 +377,15 @@ def close(self):
except RuntimeError: # Probably thread failed to start in .open()
logging.debug("Keep alive thread failed to join.", exc_info=True)
self._keep_alive_thread = None
self._network_trace_params["amqpConnection"] = None
self._network_trace_params["amqpSession"] = None
try:
del self._network_trace_params["amqpConnection"]
except KeyError:
pass
try:
del self._network_trace_params["amqpSession"]
except KeyError:
pass


def auth_complete(self):
"""Whether the authentication handshake is complete during
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self, session, endpoint='$management', **kwargs):
self._network_trace_params = {
"amqpConnection": self._session._connection._container_id,
"amqpSession": self._session.name,
"amqpLink": None
}
self._mgmt_link: ManagementLink = self._session.create_request_response_link_pair(
endpoint=endpoint,
Expand Down
Loading