Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] remove network trace params from extra in pyamqp logging #34365

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### Other Changes

- Updated network trace logging to include AMQP connection/session/link names for advanced usage.

## 5.11.6 (2024-02-12)

This version and all future versions will require Python 3.8+. Python 3.7 is no longer supported.
Expand Down
112 changes: 81 additions & 31 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def _set_state(self, new_state: ConnectionState) -> None:
previous_state = self.state
self.state = new_state
_LOGGER.info(
"Connection state changed: %r -> %r",
"[Connection:%s] Connection state changed: %r -> %r",
self._network_trace_params["amqpConnection"],
previous_state,
new_state,
extra=self._network_trace_params
new_state
)
for session in self._outgoing_endpoints.values():
session._on_connection_state_change() # pylint:disable=protected-access
Expand Down Expand Up @@ -321,7 +321,11 @@ def _send_frame(self, channel: int, frame: NamedTuple, **kwargs: Any) -> None:
except Exception: # pylint:disable=try-except-raise
raise
else:
_LOGGER.info("Cannot write frame in current state: %r", self.state, extra=self._network_trace_params)
_LOGGER.info(
"[Connection:%s] Cannot write frame in current state: %r",
self._network_trace_params["amqpConnection"],
self.state
)

def _get_next_outgoing_channel(self) -> int:
"""Get the next available outgoing channel number within the max channel limit.
Expand All @@ -338,7 +342,10 @@ def _get_next_outgoing_channel(self) -> int:
def _outgoing_empty(self) -> None:
"""Send an empty frame to prevent the connection from reaching an idle timeout."""
if self._network_trace:
_LOGGER.debug("-> EmptyFrame()", extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] -> EmptyFrame()",
self._network_trace_params["amqpConnection"]
)
if self._error:
raise self._error

Expand All @@ -359,7 +366,11 @@ def _outgoing_header(self)-> None:
"""Send the AMQP protocol header to initiate the connection."""
self._last_frame_sent_time = time.time()
if self._network_trace:
_LOGGER.debug("-> Header(%r)", HEADER_FRAME, extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] -> Header(%r)",
self._network_trace_params["amqpConnection"],
HEADER_FRAME
)
self._transport.write(HEADER_FRAME)

def _incoming_header(self, _, frame: bytes) -> None:
Expand All @@ -369,7 +380,11 @@ def _incoming_header(self, _, frame: bytes) -> None:
:param bytes frame: The incoming frame.
"""
if self._network_trace:
_LOGGER.debug("<- Header(%r)", frame, extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] <- Header(%r)",
self._network_trace_params["amqpConnection"],
frame,
)
if self.state == ConnectionState.START:
self._set_state(ConnectionState.HDR_RCVD)
elif self.state == ConnectionState.HDR_SENT:
Expand All @@ -392,7 +407,11 @@ def _outgoing_open(self) -> None:
properties=self._properties,
)
if self._network_trace:
_LOGGER.debug("-> %r", open_frame, extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] -> %r",
self._network_trace_params["amqpConnection"],
open_frame
)
self._send_frame(0, open_frame)

def _incoming_open(self, channel: int, frame) -> None:
Expand All @@ -418,17 +437,27 @@ def _incoming_open(self, channel: int, frame) -> None:
"""
# TODO: Add type hints for full frame tuple contents.
if self._network_trace:
_LOGGER.debug("<- %r", OpenFrame(*frame), extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] <- %r",
self._network_trace_params["amqpConnection"],
OpenFrame(*frame)
)
if channel != 0:
_LOGGER.error("OPEN frame received on a channel that is not 0.", extra=self._network_trace_params)
_LOGGER.error(
"[Connection:%s] OPEN frame received on a channel that is not 0.",
self._network_trace_params["amqpConnection"]
)
self.close(
error=AMQPError(
condition=ErrorCondition.NotAllowed, description="OPEN frame received on a channel that is not 0."
)
)
self._set_state(ConnectionState.END)
if self.state == ConnectionState.OPENED:
_LOGGER.error("OPEN frame received in the OPENED state.", extra=self._network_trace_params)
_LOGGER.error(
"[Connection:%s] OPEN frame received in the OPENED state.",
self._network_trace_params["amqpConnection"]
)
self.close()
if frame[4]:
self._remote_idle_timeout = cast(float, frame[4] / 1000) # Convert to seconds
Expand All @@ -447,8 +476,8 @@ def _incoming_open(self, channel: int, frame) -> None:
)
)
_LOGGER.error(
"Failed parsing OPEN frame: Max frame size is less than supported minimum.",
extra=self._network_trace_params
"[Connection:%s] Failed parsing OPEN frame: Max frame size is less than supported minimum.",
self._network_trace_params["amqpConnection"]
)
return
self._remote_max_frame_size = frame[2]
Expand All @@ -466,15 +495,23 @@ def _incoming_open(self, channel: int, frame) -> None:
description=f"connection is an illegal state: {self.state}",
)
)
_LOGGER.error("Connection is an illegal state: %r", self.state, extra=self._network_trace_params)
_LOGGER.error(
"[Connection:%s] Connection is an illegal state: %r",
self._network_trace_params["amqpConnection"],
self.state
)

def _outgoing_close(self, error: Optional[AMQPError] = None) -> None:
"""Send a Close frame to shutdown connection with optional error information.
:param ~pyamqp.error.AMQPError or None error: Optional error information.
"""
close_frame = CloseFrame(error=error)
if self._network_trace:
_LOGGER.debug("-> %r", close_frame, extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] -> %r",
self._network_trace_params["amqpConnection"],
close_frame
)
self._send_frame(0, close_frame)

def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
Expand All @@ -488,7 +525,11 @@ def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
:param tuple frame: The incoming Close frame.
"""
if self._network_trace:
_LOGGER.debug("<- %r", CloseFrame(*frame), extra=self._network_trace_params)
_LOGGER.debug(
"[Connection:%s] <- %r",
self._network_trace_params["amqpConnection"],
CloseFrame(*frame),
)
disconnect_states = [
ConnectionState.HDR_RCVD,
ConnectionState.HDR_EXCH,
Expand All @@ -503,8 +544,8 @@ def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
close_error = None
if channel > self._channel_max:
_LOGGER.error(
"CLOSE frame received on a channel greated than support max.",
extra=self._network_trace_params
"[Connection:%s] CLOSE frame received on a channel greated than support max.",
self._network_trace_params["amqpConnection"]
)
close_error = AMQPError(condition=ErrorCondition.InvalidField, description="Invalid channel", info=None)

Expand All @@ -517,8 +558,9 @@ def _incoming_close(self, channel: int, frame: Tuple[Any, ...]) -> None:
condition=frame[0][0], description=frame[0][1], info=frame[0][2]
)
_LOGGER.error(
"Connection closed with error: %r", frame[0],
extra=self._network_trace_params
"[Connection:%s] Connection closed with error: %r",
self._network_trace_params["amqpConnection"],
frame[0],
)


Expand Down Expand Up @@ -577,8 +619,8 @@ def _incoming_end(self, channel: int, frame: Tuple[Any, ...]) -> None:
description="Invalid channel number received"
))
_LOGGER.error(
"END frame received on invalid channel. Closing connection.",
extra=self._network_trace_params
"[Connection:%s] END frame received on invalid channel. Closing connection.",
self._network_trace_params["amqpConnection"]
)
return

Expand Down Expand Up @@ -643,7 +685,11 @@ def _process_incoming_frame(self, channel: int, frame: Optional[Union[bytes, Tup
return True
if performative == 1:
return False
_LOGGER.error("Unrecognized incoming frame: %r", frame, extra=self._network_trace_params)
_LOGGER.error(
"[Connection:%s] Unrecognized incoming frame: %r",
self._network_trace_params["amqpConnection"],
frame
)
return True
except KeyError:
return True # TODO: channel error
Expand Down Expand Up @@ -675,8 +721,8 @@ def _process_outgoing_frame(self, channel: int, frame) -> None:
cast(float, self._last_frame_received_time),
) or self._get_remote_timeout(now):
_LOGGER.info(
"No frame received for the idle timeout. Closing connection.",
extra=self._network_trace_params
"[Connection:%s] No frame received for the idle timeout. Closing connection.",
self._network_trace_params["amqpConnection"]
)
self.close(
error=AMQPError(
Expand Down Expand Up @@ -755,8 +801,8 @@ def listen(self, wait: Union[float, bool] = False, batch: int = 1, **kwargs: Any
now
):
_LOGGER.info(
"No frame received for the idle timeout. Closing connection.",
extra=self._network_trace_params
"[Connection:%s] No frame received for the idle timeout. Closing connection.",
self._network_trace_params["amqpConnection"]
)
self.close(
error=AMQPError(
Expand All @@ -777,9 +823,9 @@ def listen(self, wait: Union[float, bool] = False, batch: int = 1, **kwargs: Any
break
else:
_LOGGER.info(
"Connection cannot read frames in this state: %r",
self.state,
extra=self._network_trace_params
"[Connection:%s] Connection cannot read frames in this state: %r",
self._network_trace_params["amqpConnection"],
self.state
)
break
except (OSError, IOError, SSLError, socket.error) as exc:
Expand Down Expand Up @@ -906,7 +952,11 @@ def close(self, error: Optional[AMQPError] = None, wait: bool = False) -> None:
self._wait_for_response(wait, ConnectionState.END)
except Exception as exc: # pylint:disable=broad-except
# If error happened during closing, ignore the error and set state to END
_LOGGER.info("An error occurred when closing the connection: %r", exc, extra=self._network_trace_params)
_LOGGER.info(
"[Connection:%s] An error occurred when closing the connection: %r",
self._network_trace_params["amqpConnection"],
exc
)
self._set_state(ConnectionState.END)
finally:
self._disconnect()
48 changes: 38 additions & 10 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ def connect(self):
# has _not_ been sent
self.connected = True
except (OSError, IOError, SSLError) as e:
_LOGGER.info("Transport connection failed: %r", e, extra=self.network_trace_params)
_LOGGER.info(
"[Connection:%s, Session:%s, Link:%s] Transport connection failed: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
e
)
# if not fully connected, close socket, and reraise error
if self.sock and not self.connected:
self.sock.close()
Expand Down Expand Up @@ -396,9 +402,11 @@ def close(self):
# TODO: shutdown could raise OSError, Transport endpoint is not connected if the endpoint is already
# disconnected. can we safely ignore the errors since the close operation is initiated by us.
_LOGGER.debug(
"Transport endpoint is already disconnected: %r",
exc,
extra=self.network_trace_params
"[Connection:%s, Session:%s, Link:%s] Transport endpoint is already disconnected: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
exc
)
self.sock.close()
self.sock = None
Expand All @@ -421,10 +429,12 @@ def read(self, verify_frame_type=0):
frame_type = frame_header[5]
if verify_frame_type is not None and frame_type != verify_frame_type:
_LOGGER.debug(
"Received invalid frame type: %r, expected: %r",
"[Connection:%s, Session:%s, Link:%s] Received invalid frame type: %r, expected: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
frame_type,
verify_frame_type,
extra=self.network_trace_params
verify_frame_type
)
raise ValueError(
f"Received invalid frame type: {frame_type}, expected: {verify_frame_type}"
Expand Down Expand Up @@ -453,7 +463,13 @@ def read(self, verify_frame_type=0):
raise socket.timeout()
if get_errno(exc) not in _UNAVAIL:
self.connected = False
_LOGGER.debug("Transport read failed: %r", exc, extra=self.network_trace_params)
_LOGGER.debug(
"[Connection:%s, Session:%s, Link:%s] Transport read failed: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
exc
)
raise
offset -= 2
return frame_header, channel, payload[offset:]
Expand All @@ -465,7 +481,13 @@ def write(self, s):
except socket.timeout:
raise
except (OSError, IOError, socket.error) as exc:
_LOGGER.debug("Transport write failed: %r", exc, extra=self.network_trace_params)
_LOGGER.debug(
"[Connection:%s, Session:%s, Link:%s] Transport write failed: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
exc
)
if get_errno(exc) not in _UNAVAIL:
self.connected = False
raise
Expand Down Expand Up @@ -753,7 +775,13 @@ def connect(self):
self.close()
raise ConnectionError("Websocket failed to establish connection: %r" % exc) from exc
except (OSError, IOError, SSLError) as e:
_LOGGER.info("Websocket connection failed: %r", e, extra=self.network_trace_params)
_LOGGER.info(
"[Connection:%s, Session:%s, Link:%s] Websocket connection failed: %r",
self.network_trace_params["amqpConnection"],
self.network_trace_params["amqpSession"],
self.network_trace_params["amqpLink"],
e
)
self.close()
raise

Expand Down
Loading
Loading