Skip to content

Commit

Permalink
Merge pull request #1650 from aws/pubsub-fixups
Browse files Browse the repository at this point in the history
Pubsub fixups: 1. Logging instead of exceptions on the pending futures due to disconnect notifications. 2. Typos fixes
  • Loading branch information
ikolomi authored Jun 25, 2024
2 parents 8468f9d + 3e939cf commit f399dad
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 19 deletions.
2 changes: 1 addition & 1 deletion glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ async fn push_manager_loop(mut push_rx: mpsc::UnboundedReceiver<PushInfo>, write
let result = push_rx.recv().await;
match result {
None => {
log_trace("push manager loop", "got None as from push manager");
log_error("push manager loop", "got None from push manager");
return;
}
Some(push_msg) => {
Expand Down
2 changes: 1 addition & 1 deletion python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ async def sort_store(

async def publish(self, message: str, channel: str, sharded: bool = False) -> int:
"""
Publish message on pubsub channel.
Publish a message on pubsub channel.
This command aggregates PUBLISH and SPUBLISH commands functionalities.
The mode is selected using the 'sharded' parameter
See https://valkey.io/commands/publish and https://valkey.io/commands/spublish for more details.
Expand Down
5 changes: 3 additions & 2 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5056,7 +5056,8 @@ async def getex(

@dataclass
class PubSubMsg:
"""Describes incoming pubsub message
"""
Describes the incoming pubsub message
Attributes:
message (str): Incoming message.
Expand Down Expand Up @@ -5087,7 +5088,7 @@ async def get_pubsub_message(self) -> PubSubMsg:

def try_get_pubsub_message(self) -> Optional[PubSubMsg]:
"""
Tries to returns the next pubsub message.
Tries to return the next pubsub message.
Throws WrongConfiguration in cases:
1. No pubsub subscriptions are configured for the client
2. Callback is configured with the pubsub subsciptions
Expand Down
2 changes: 1 addition & 1 deletion python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ async def sort_store(

async def publish(self, message: str, channel: str) -> TOK:
"""
Publish message on pubsub channel.
Publish a message on pubsub channel.
See https://valkey.io/commands/publish for more details.
Args:
Expand Down
2 changes: 1 addition & 1 deletion python/python/glide/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ class PubSubSubscriptions:
channels_and_patterns (Dict[ClusterClientConfiguration.PubSubChannelModes, Set[str]]):
Channels and patterns by modes.
callback (Optional[Callable[[CoreCommands.PubSubMsg, Any], None]]):
Optional callback to accept the incomming messages.
Optional callback to accept the incoming messages.
context (Any):
Arbitrary context to pass to the callback.
"""
Expand Down
24 changes: 11 additions & 13 deletions python/python/glide/redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async def get_pubsub_message(self) -> CoreCommands.PubSubMsg:

if not self.config._is_pubsub_configured():
raise WrongConfiguration(
"The operation will never complete since there was no pubsbub subscriptions applied to the client."
"The operation will never complete since there was no pubsub subscriptions applied to the client."
)

if self.config._get_pubsub_callback_and_context()[0] is not None:
Expand Down Expand Up @@ -323,13 +323,11 @@ def _notification_to_pubsub_message_safe(
Dict[str, Any], value_from_pointer(response.resp_pointer)
)
message_kind = push_notification["kind"]
if message_kind == "Disconnect":
# cancel all futures since we dont know how many (if any) messages wont arrive
# TODO: consider cancelling a single future
self._cancel_pubsub_futures_with_exception_safe(
ConnectionError(
"Warning, transport disconnect occured, messages might be lost"
)
if message_kind == "Disconnection":
ClientLogger.log(
LogLevel.WARN,
"disconnect notification",
"Transport disconnected, messages might be lost",
)
elif (
message_kind == "Message"
Expand All @@ -353,11 +351,11 @@ def _notification_to_pubsub_message_safe(
):
pass
else:
err_msg = f"Unsupported push message: '{message_kind}'"
ClientLogger.log(LogLevel.ERROR, "pubsub message", err_msg)
# cancel all futures since its a serious
# TODO: consider cancelling a single future
self._cancel_pubsub_futures_with_exception_safe(ConnectionError(err_msg))
ClientLogger.log(
LogLevel.WARN,
"unknown notification",
f"Unknown notification message: '{message_kind}'",
)

return pubsub_message

Expand Down

0 comments on commit f399dad

Please sign in to comment.