diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 733dc5b..690638e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -7,7 +7,11 @@ ## Upgrading - `GrpcStreamBroadcaster` now takes a `AsyncIterable` instead of a `AsyncIterator` as the `stream_method`. This is to match the type of streaming methods generated by `grpc`, so no conversion to an `AsyncIterator` is needed. + +- `GrpcStreamBroadcaster` no longer tries to reconnect when a server closes a connection. This behaviour can be overridden by passing `retry_on_exhausted_stream=True` when constructing `GrpcStreamBroadcaster` instances. + - gRPC URLs don't have a default port anymore, unless a default is set via `ChannelOptions`. If you want to set a default port for URLs, please pass custom `ChannelOptions` as `defaults` to `parse_grpc_uri` or as `channel_defaults` to `BaseApiClient`. + * The `ExponentialBackoff` and `LinearBackoff` classes now require keyword arguments for their constructor. This change was made to make the classes easier to use and to avoid confusion with the order of the arguments. ## New Features diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index 99db426..baa58ee 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -27,12 +27,13 @@ class GrpcStreamBroadcaster(Generic[InputT, OutputT]): """Helper class to handle grpc streaming methods.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments self, stream_name: str, stream_method: Callable[[], AsyncIterable[InputT]], transform: Callable[[InputT], OutputT], retry_strategy: retry.Strategy | None = None, + retry_on_exhausted_stream: bool = False, ): """Initialize the streaming helper. @@ -43,6 +44,8 @@ def __init__( transform: A function to transform the input type to the output type. retry_strategy: The retry strategy to use, when the connection is lost. Defaults to retries every 3 seconds, with a jitter of 1 second, indefinitely. + retry_on_exhausted_stream: Whether to retry when the stream is exhausted, i.e. + when the server closes the stream. Defaults to False. """ self._stream_name = stream_name self._stream_method = stream_method @@ -50,6 +53,7 @@ def __init__( self._retry_strategy = ( retry.LinearBackoff() if retry_strategy is None else retry_strategy.copy() ) + self._retry_on_exhausted_stream = retry_on_exhausted_stream self._channel: channels.Broadcast[OutputT] = channels.Broadcast( name=f"GrpcStreamBroadcaster-{stream_name}" @@ -67,6 +71,15 @@ def new_receiver(self, maxsize: int = 50) -> channels.Receiver[OutputT]: """ return self._channel.new_receiver(limit=maxsize) + @property + def is_running(self) -> bool: + """Return whether the streaming helper is running. + + Returns: + Whether the streaming helper is running. + """ + return not self._task.done() + async def stop(self) -> None: """Stop the streaming helper.""" if self._task.done(): @@ -91,6 +104,12 @@ async def _run(self) -> None: await sender.send(self._transform(msg)) except grpc.aio.AioRpcError as err: error = err + if error is None and not self._retry_on_exhausted_stream: + _logger.info( + "%s: connection closed, stream exhausted", self._stream_name + ) + await self._channel.close() + break error_str = f"Error: {error}" if error else "Stream exhausted" interval = self._retry_strategy.next_interval() if interval is None: diff --git a/tests/streaming/test_grpc_stream_broadcaster.py b/tests/streaming/test_grpc_stream_broadcaster.py index 7d33466..e214606 100644 --- a/tests/streaming/test_grpc_stream_broadcaster.py +++ b/tests/streaming/test_grpc_stream_broadcaster.py @@ -40,6 +40,7 @@ def no_retry() -> mock.MagicMock: async def ok_helper( no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name + retry_on_exhausted_stream: bool, ) -> AsyncIterator[streaming.GrpcStreamBroadcaster[int, str]]: """Fixture for GrpcStreamBroadcaster.""" @@ -55,6 +56,7 @@ async def asynciter(ready_event: asyncio.Event) -> AsyncIterator[int]: stream_method=lambda: asynciter(receiver_ready_event), transform=_transformer, retry_strategy=no_retry, + retry_on_exhausted_stream=retry_on_exhausted_stream, ) yield helper await helper.stop() @@ -79,7 +81,8 @@ async def __anext__(self) -> int: return self._current -async def test_streaming_success( +@pytest.mark.parametrize("retry_on_exhausted_stream", [True]) +async def test_streaming_success_retry_on_exhausted( ok_helper: streaming.GrpcStreamBroadcaster[ int, str ], # pylint: disable=redefined-outer-name @@ -113,6 +116,43 @@ async def test_streaming_success( ] +@pytest.mark.parametrize("retry_on_exhausted_stream", [False]) +async def test_streaming_success( + ok_helper: streaming.GrpcStreamBroadcaster[ + int, str + ], # pylint: disable=redefined-outer-name + no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name + receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name + caplog: pytest.LogCaptureFixture, +) -> None: + """Test streaming success.""" + caplog.set_level(logging.INFO) + items: list[str] = [] + async with asyncio.timeout(1): + receiver = ok_helper.new_receiver() + receiver_ready_event.set() + async for item in receiver: + items.append(item) + assert ( + no_retry.next_interval.call_count == 0 + ), "next_interval should not be called when streaming is successful" + + assert items == [ + "transformed_0", + "transformed_1", + "transformed_2", + "transformed_3", + "transformed_4", + ] + assert caplog.record_tuples == [ + ( + "frequenz.client.base.streaming", + logging.INFO, + "test_helper: connection closed, stream exhausted", + ) + ] + + class _NamedMagicMock(mock.MagicMock): """Mock with a name."""