diff --git a/pw_transfer/integration_test/BUILD.bazel b/pw_transfer/integration_test/BUILD.bazel index 59bcdad984..5cb92ee7a2 100644 --- a/pw_transfer/integration_test/BUILD.bazel +++ b/pw_transfer/integration_test/BUILD.bazel @@ -55,6 +55,7 @@ py_test( main = "proxy_test.py", deps = [ ":config_pb2", + "//pw_rpc:internal_packet_proto_pb2", "//pw_transfer:transfer_proto_pb2", "//pw_transfer/py:pw_transfer", ], diff --git a/pw_transfer/integration_test/JavaClient.java b/pw_transfer/integration_test/JavaClient.java index 88058655c6..834faadcf5 100644 --- a/pw_transfer/integration_test/JavaClient.java +++ b/pw_transfer/integration_test/JavaClient.java @@ -26,6 +26,7 @@ import dev.pigweed.pw_transfer.TransferClient; import dev.pigweed.pw_transfer.TransferError; import dev.pigweed.pw_transfer.TransferService; +import dev.pigweed.pw_transfer.TransferTimeoutSettings; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -155,6 +156,9 @@ public static ConfigProtos.ClientConfig ParseConfigFrom(InputStream reader) thro if (config_builder.getMaxRetries() == 0) { throw new AssertionError("max_retries may not be 0"); } + if (config_builder.getMaxLifetimeRetries() == 0) { + throw new AssertionError("max_lifetime_retries may not be 0"); + } return config_builder.build(); } @@ -253,10 +257,12 @@ public static void main(String[] args) { TransferClient client = new TransferClient( hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Read"), hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Write"), - config.getChunkTimeoutMs(), - config.getInitialChunkTimeoutMs(), - config.getMaxRetries(), - () -> false); + TransferTimeoutSettings.builder() + .setTimeoutMillis(config.getChunkTimeoutMs()) + .setInitialTimeoutMillis(config.getInitialChunkTimeoutMs()) + .setMaxRetries(config.getMaxRetries()) + .setMaxLifetimeRetries(config.getMaxLifetimeRetries()) + .build()); for (ConfigProtos.TransferAction action : config.getTransferActionsList()) { int resourceId = action.getResourceId(); diff --git a/pw_transfer/integration_test/client.cc b/pw_transfer/integration_test/client.cc index 3ffcc5522e..49bb3f2671 100644 --- a/pw_transfer/integration_test/client.cc +++ b/pw_transfer/integration_test/client.cc @@ -88,6 +88,9 @@ pw::Status PerformTransferActions(const pw::transfer::ClientConfig& config) { rpc::integration_test::kChannelId, transfer_thread); + client.set_max_retries(config.max_retries()); + client.set_max_lifetime_retries(config.max_lifetime_retries()); + Status status = pw::OkStatus(); for (const pw::transfer::TransferAction& action : config.transfer_actions()) { TransferResult result; diff --git a/pw_transfer/integration_test/config.proto b/pw_transfer/integration_test/config.proto index 7ad4bfa090..d120fc5762 100644 --- a/pw_transfer/integration_test/config.proto +++ b/pw_transfer/integration_test/config.proto @@ -81,6 +81,10 @@ message ClientConfig { // // TODO(tpudlik): google.protobuf.Duration? uint32 chunk_timeout_ms = 4; + + // Cumulative maximum number of times to retry over the course of the transfer + // before giving up. + uint32 max_lifetime_retries = 5; } // Stacks of paths to use when doing transfers. Each new initiated transfer diff --git a/pw_transfer/integration_test/expected_errors_test.py b/pw_transfer/integration_test/expected_errors_test.py index 5cbd303056..64d90a3904 100644 --- a/pw_transfer/integration_test/expected_errors_test.py +++ b/pw_transfer/integration_test/expected_errors_test.py @@ -282,6 +282,55 @@ def test_server_read_timeout(self, client_type): expected_status=status_pb2.StatusCode.DEADLINE_EXCEEDED, ) + @parameterized.expand( + [ + ("cpp"), + ("java"), + ("python"), + ] + ) + def test_data_drop_client_lifetime_timeout(self, client_type): + """Drops the first data chunk of a transfer but allows the rest.""" + payload = random.Random(67336391945).randbytes(1234) + + # This test is expected to hit the lifetime retry count, so make it + # reasonable. + client_config = self.default_client_config() + client_config.max_lifetime_retries = 20 + client_config.chunk_timeout_ms = 1000 + + config = TransferConfig( + self.default_server_config(), + client_config, + text_format.Parse( + """ + client_filter_stack: [ + { hdlc_packetizer: {} }, + { window_packet_dropper: { window_packet_to_drop: 0 } } + ] + + server_filter_stack: [ + { hdlc_packetizer: {} }, + { window_packet_dropper: { window_packet_to_drop: 0 } } + ]""", + config_pb2.ProxyConfig(), + ), + ) + # Resource ID is arbitrary, but deliberately set to be >1 byte. + resource_id = 7332 + + # This test deliberately tries to time out the transfer, so because of + # the retry process the resource ID might be re-initialized multiple + # times. + self.do_single_read( + client_type, + config, + resource_id, + payload, + permanent_resource_id=True, + expected_status=status_pb2.StatusCode.DEADLINE_EXCEEDED, + ) + if __name__ == '__main__': test_fixture.run_tests_for(ErrorTransferIntegrationTest) diff --git a/pw_transfer/integration_test/legacy_binaries_test.py b/pw_transfer/integration_test/legacy_binaries_test.py index 7187788469..7c6b8aa997 100644 --- a/pw_transfer/integration_test/legacy_binaries_test.py +++ b/pw_transfer/integration_test/legacy_binaries_test.py @@ -61,6 +61,14 @@ class LegacyTransferIntegrationTest(test_fixture.TransferIntegrationTest): LEGACY_SERVER = False LEGACY_CLIENT = False + def default_config(self) -> test_fixture.TransferConfig: + # The legacy binaries aren't aware of the max_lifetime_retries field, + # which was added more recently. Clear it so it isn't encoded into the + # serialized message. + config = super().default_config() + config.client.max_lifetime_retries = 0 + return config + @parameterized.expand( [ ("cpp"), diff --git a/pw_transfer/integration_test/proxy.py b/pw_transfer/integration_test/proxy.py index 6c74491b04..d9df6555c6 100644 --- a/pw_transfer/integration_test/proxy.py +++ b/pw_transfer/integration_test/proxy.py @@ -32,6 +32,7 @@ from google.protobuf import text_format +from pigweed.pw_rpc.internal import packet_pb2 from pigweed.pw_transfer import transfer_pb2 from pigweed.pw_transfer.integration_test import config_pb2 from pw_hdlc import decode @@ -58,6 +59,7 @@ class Event(Enum): TRANSFER_START = 1 PARAMETERS_RETRANSMIT = 2 PARAMETERS_CONTINUE = 3 + START_ACK_CONFIRMATION = 4 class Filter(abc.ABC): @@ -345,15 +347,28 @@ def __init__( self._window_packet = 0 async def process(self, data: bytes) -> None: - if self._window_packet != self._window_packet_to_drop: - await self.send_data(data) + try: + is_data_chunk = ( + _extract_transfer_chunk(data).type is Chunk.Type.DATA + ) + except Exception: + # Invalid / non-chunk data (e.g. text logs); ignore. + is_data_chunk = False - self._window_packet += 1 + # Only count transfer data chunks as part of a window. + if is_data_chunk: + if self._window_packet != self._window_packet_to_drop: + await self.send_data(data) + + self._window_packet += 1 + else: + await self.send_data(data) def handle_event(self, event: Event) -> None: - if ( - event is Event.PARAMETERS_RETRANSMIT - or event is Event.PARAMETERS_CONTINUE + if event in ( + Event.PARAMETERS_RETRANSMIT, + Event.PARAMETERS_CONTINUE, + Event.START_ACK_CONFIRMATION, ): self._window_packet = 0 @@ -376,11 +391,11 @@ def __init__( async def process(self, data: bytes) -> None: try: - raw_chunk = transfer_pb2.Chunk() - raw_chunk.ParseFromString(data) - chunk = Chunk.from_message(raw_chunk) + chunk = _extract_transfer_chunk(data) if chunk.type is Chunk.Type.START: await self._queue.put(Event.TRANSFER_START) + if chunk.type is Chunk.Type.START_ACK_CONFIRMATION: + await self._queue.put(Event.START_ACK_CONFIRMATION) elif chunk.type is Chunk.Type.PARAMETERS_RETRANSMIT: await self._queue.put(Event.PARAMETERS_RETRANSMIT) elif chunk.type is Chunk.Type.PARAMETERS_CONTINUE: @@ -392,6 +407,23 @@ async def process(self, data: bytes) -> None: await self.send_data(data) +def _extract_transfer_chunk(data: bytes) -> Chunk: + """Gets a transfer Chunk from an HDLC frame containing an RPC packet. + + Raises an exception if a valid chunk does not exist. + """ + + decoder = decode.FrameDecoder() + for frame in decoder.process(data): + packet = packet_pb2.RpcPacket() + packet.ParseFromString(frame.data) + raw_chunk = transfer_pb2.Chunk() + raw_chunk.ParseFromString(packet.payload) + return Chunk.from_message(raw_chunk) + + raise ValueError("Invalid transfer frame") + + async def _handle_simplex_events( event_queue: asyncio.Queue, handlers: List[Callable[[Event], None]] ): diff --git a/pw_transfer/integration_test/proxy_test.py b/pw_transfer/integration_test/proxy_test.py index ec4d18c05f..5a0599c96c 100644 --- a/pw_transfer/integration_test/proxy_test.py +++ b/pw_transfer/integration_test/proxy_test.py @@ -21,6 +21,12 @@ from typing import List import unittest +from pigweed.pw_rpc.internal import packet_pb2 +from pigweed.pw_transfer import transfer_pb2 +from pw_hdlc import encode +from pw_transfer import ProtocolVersion +from pw_transfer.chunk import Chunk + import proxy @@ -204,11 +210,21 @@ async def append(list: List[bytes], data: bytes): ) packets = [ - b'1', - b'2', - b'3', - b'4', - b'5', + _encode_rpc_frame( + Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'1') + ), + _encode_rpc_frame( + Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'2') + ), + _encode_rpc_frame( + Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'3') + ), + _encode_rpc_frame( + Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'4') + ), + _encode_rpc_frame( + Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'5') + ), ] expected_packets = packets[1:] @@ -230,5 +246,16 @@ async def append(list: List[bytes], data: bytes): window_packet_dropper.handle_event(event) +def _encode_rpc_frame(chunk: Chunk) -> bytes: + packet = packet_pb2.RpcPacket( + type=packet_pb2.PacketType.SERVER_STREAM, + channel_id=101, + service_id=1001, + method_id=100001, + payload=chunk.to_message().SerializeToString(), + ).SerializeToString() + return encode.ui_frame(73, packet) + + if __name__ == '__main__': unittest.main() diff --git a/pw_transfer/integration_test/python_client.py b/pw_transfer/integration_test/python_client.py index e618433d31..4c27189b0b 100644 --- a/pw_transfer/integration_test/python_client.py +++ b/pw_transfer/integration_test/python_client.py @@ -73,6 +73,7 @@ def _main() -> int: default_response_timeout_s=config.chunk_timeout_ms / 1000, initial_response_timeout_s=config.initial_chunk_timeout_ms / 1000, max_retries=config.max_retries, + max_lifetime_retries=config.max_lifetime_retries, default_protocol_version=pw_transfer.ProtocolVersion.LATEST, ) diff --git a/pw_transfer/integration_test/test_fixture.py b/pw_transfer/integration_test/test_fixture.py index 04a2d04e9b..a7b297c770 100644 --- a/pw_transfer/integration_test/test_fixture.py +++ b/pw_transfer/integration_test/test_fixture.py @@ -376,6 +376,7 @@ def default_server_config() -> config_pb2.ServerConfig: def default_client_config() -> config_pb2.ClientConfig: return config_pb2.ClientConfig( max_retries=5, + max_lifetime_retries=1500, initial_chunk_timeout_ms=4000, chunk_timeout_ms=4000, )