Skip to content

Commit

Permalink
pw_transfer: Integration test for lifetime retries
Browse files Browse the repository at this point in the history
This adds an integration test which triggers a transfer's lifetime retry
limit, ensuring that it does not attempt indefinitely.

To support this, some proxy changes were required:

- Correct the assumption that the data passed into filters is a transfer
  chunk. It is actually an HDLC frame containing an RPC packet with the
  transfer chunk as its payload.
- Update the window dropper filter to only count data chunks as part of
  a window.
- Extend integration clients to support configuration of the max
  lifetime retry count.

Bug: 261634006
Change-Id: I30cae8d18fc70db0acb6e9956fccb095c9795a3d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/123231
Reviewed-by: Armando Montanez <[email protected]>
Commit-Queue: Alexei Frolov <[email protected]>
  • Loading branch information
frolv authored and CQ Bot Account committed Dec 9, 2022
1 parent 33d0313 commit 1213a5e
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 18 deletions.
1 change: 1 addition & 0 deletions pw_transfer/integration_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ py_test(
main = "proxy_test.py",
deps = [
":config_pb2",
"//pw_rpc:internal_packet_proto_pb2",
"//pw_transfer:transfer_proto_pb2",
"//pw_transfer/py:pw_transfer",
],
Expand Down
14 changes: 10 additions & 4 deletions pw_transfer/integration_test/JavaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import dev.pigweed.pw_transfer.TransferClient;
import dev.pigweed.pw_transfer.TransferError;
import dev.pigweed.pw_transfer.TransferService;
import dev.pigweed.pw_transfer.TransferTimeoutSettings;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -155,6 +156,9 @@ public static ConfigProtos.ClientConfig ParseConfigFrom(InputStream reader) thro
if (config_builder.getMaxRetries() == 0) {
throw new AssertionError("max_retries may not be 0");
}
if (config_builder.getMaxLifetimeRetries() == 0) {
throw new AssertionError("max_lifetime_retries may not be 0");
}
return config_builder.build();
}

Expand Down Expand Up @@ -253,10 +257,12 @@ public static void main(String[] args) {
TransferClient client = new TransferClient(
hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Read"),
hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Write"),
config.getChunkTimeoutMs(),
config.getInitialChunkTimeoutMs(),
config.getMaxRetries(),
() -> false);
TransferTimeoutSettings.builder()
.setTimeoutMillis(config.getChunkTimeoutMs())
.setInitialTimeoutMillis(config.getInitialChunkTimeoutMs())
.setMaxRetries(config.getMaxRetries())
.setMaxLifetimeRetries(config.getMaxLifetimeRetries())
.build());

for (ConfigProtos.TransferAction action : config.getTransferActionsList()) {
int resourceId = action.getResourceId();
Expand Down
3 changes: 3 additions & 0 deletions pw_transfer/integration_test/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pw::Status PerformTransferActions(const pw::transfer::ClientConfig& config) {
rpc::integration_test::kChannelId,
transfer_thread);

client.set_max_retries(config.max_retries());
client.set_max_lifetime_retries(config.max_lifetime_retries());

Status status = pw::OkStatus();
for (const pw::transfer::TransferAction& action : config.transfer_actions()) {
TransferResult result;
Expand Down
4 changes: 4 additions & 0 deletions pw_transfer/integration_test/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ message ClientConfig {
//
// TODO(tpudlik): google.protobuf.Duration?
uint32 chunk_timeout_ms = 4;

// Cumulative maximum number of times to retry over the course of the transfer
// before giving up.
uint32 max_lifetime_retries = 5;
}

// Stacks of paths to use when doing transfers. Each new initiated transfer
Expand Down
49 changes: 49 additions & 0 deletions pw_transfer/integration_test/expected_errors_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,55 @@ def test_server_read_timeout(self, client_type):
expected_status=status_pb2.StatusCode.DEADLINE_EXCEEDED,
)

@parameterized.expand(
[
("cpp"),
("java"),
("python"),
]
)
def test_data_drop_client_lifetime_timeout(self, client_type):
"""Drops the first data chunk of a transfer but allows the rest."""
payload = random.Random(67336391945).randbytes(1234)

# This test is expected to hit the lifetime retry count, so make it
# reasonable.
client_config = self.default_client_config()
client_config.max_lifetime_retries = 20
client_config.chunk_timeout_ms = 1000

config = TransferConfig(
self.default_server_config(),
client_config,
text_format.Parse(
"""
client_filter_stack: [
{ hdlc_packetizer: {} },
{ window_packet_dropper: { window_packet_to_drop: 0 } }
]
server_filter_stack: [
{ hdlc_packetizer: {} },
{ window_packet_dropper: { window_packet_to_drop: 0 } }
]""",
config_pb2.ProxyConfig(),
),
)
# Resource ID is arbitrary, but deliberately set to be >1 byte.
resource_id = 7332

# This test deliberately tries to time out the transfer, so because of
# the retry process the resource ID might be re-initialized multiple
# times.
self.do_single_read(
client_type,
config,
resource_id,
payload,
permanent_resource_id=True,
expected_status=status_pb2.StatusCode.DEADLINE_EXCEEDED,
)


if __name__ == '__main__':
test_fixture.run_tests_for(ErrorTransferIntegrationTest)
8 changes: 8 additions & 0 deletions pw_transfer/integration_test/legacy_binaries_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ class LegacyTransferIntegrationTest(test_fixture.TransferIntegrationTest):
LEGACY_SERVER = False
LEGACY_CLIENT = False

def default_config(self) -> test_fixture.TransferConfig:
# The legacy binaries aren't aware of the max_lifetime_retries field,
# which was added more recently. Clear it so it isn't encoded into the
# serialized message.
config = super().default_config()
config.client.max_lifetime_retries = 0
return config

@parameterized.expand(
[
("cpp"),
Expand Down
50 changes: 41 additions & 9 deletions pw_transfer/integration_test/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from google.protobuf import text_format

from pigweed.pw_rpc.internal import packet_pb2
from pigweed.pw_transfer import transfer_pb2
from pigweed.pw_transfer.integration_test import config_pb2
from pw_hdlc import decode
Expand All @@ -58,6 +59,7 @@ class Event(Enum):
TRANSFER_START = 1
PARAMETERS_RETRANSMIT = 2
PARAMETERS_CONTINUE = 3
START_ACK_CONFIRMATION = 4


class Filter(abc.ABC):
Expand Down Expand Up @@ -345,15 +347,28 @@ def __init__(
self._window_packet = 0

async def process(self, data: bytes) -> None:
if self._window_packet != self._window_packet_to_drop:
await self.send_data(data)
try:
is_data_chunk = (
_extract_transfer_chunk(data).type is Chunk.Type.DATA
)
except Exception:
# Invalid / non-chunk data (e.g. text logs); ignore.
is_data_chunk = False

self._window_packet += 1
# Only count transfer data chunks as part of a window.
if is_data_chunk:
if self._window_packet != self._window_packet_to_drop:
await self.send_data(data)

self._window_packet += 1
else:
await self.send_data(data)

def handle_event(self, event: Event) -> None:
if (
event is Event.PARAMETERS_RETRANSMIT
or event is Event.PARAMETERS_CONTINUE
if event in (
Event.PARAMETERS_RETRANSMIT,
Event.PARAMETERS_CONTINUE,
Event.START_ACK_CONFIRMATION,
):
self._window_packet = 0

Expand All @@ -376,11 +391,11 @@ def __init__(

async def process(self, data: bytes) -> None:
try:
raw_chunk = transfer_pb2.Chunk()
raw_chunk.ParseFromString(data)
chunk = Chunk.from_message(raw_chunk)
chunk = _extract_transfer_chunk(data)
if chunk.type is Chunk.Type.START:
await self._queue.put(Event.TRANSFER_START)
if chunk.type is Chunk.Type.START_ACK_CONFIRMATION:
await self._queue.put(Event.START_ACK_CONFIRMATION)
elif chunk.type is Chunk.Type.PARAMETERS_RETRANSMIT:
await self._queue.put(Event.PARAMETERS_RETRANSMIT)
elif chunk.type is Chunk.Type.PARAMETERS_CONTINUE:
Expand All @@ -392,6 +407,23 @@ async def process(self, data: bytes) -> None:
await self.send_data(data)


def _extract_transfer_chunk(data: bytes) -> Chunk:
"""Gets a transfer Chunk from an HDLC frame containing an RPC packet.
Raises an exception if a valid chunk does not exist.
"""

decoder = decode.FrameDecoder()
for frame in decoder.process(data):
packet = packet_pb2.RpcPacket()
packet.ParseFromString(frame.data)
raw_chunk = transfer_pb2.Chunk()
raw_chunk.ParseFromString(packet.payload)
return Chunk.from_message(raw_chunk)

raise ValueError("Invalid transfer frame")


async def _handle_simplex_events(
event_queue: asyncio.Queue, handlers: List[Callable[[Event], None]]
):
Expand Down
37 changes: 32 additions & 5 deletions pw_transfer/integration_test/proxy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
from typing import List
import unittest

from pigweed.pw_rpc.internal import packet_pb2
from pigweed.pw_transfer import transfer_pb2
from pw_hdlc import encode
from pw_transfer import ProtocolVersion
from pw_transfer.chunk import Chunk

import proxy


Expand Down Expand Up @@ -204,11 +210,21 @@ async def append(list: List[bytes], data: bytes):
)

packets = [
b'1',
b'2',
b'3',
b'4',
b'5',
_encode_rpc_frame(
Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'1')
),
_encode_rpc_frame(
Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'2')
),
_encode_rpc_frame(
Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'3')
),
_encode_rpc_frame(
Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'4')
),
_encode_rpc_frame(
Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.DATA, data=b'5')
),
]

expected_packets = packets[1:]
Expand All @@ -230,5 +246,16 @@ async def append(list: List[bytes], data: bytes):
window_packet_dropper.handle_event(event)


def _encode_rpc_frame(chunk: Chunk) -> bytes:
packet = packet_pb2.RpcPacket(
type=packet_pb2.PacketType.SERVER_STREAM,
channel_id=101,
service_id=1001,
method_id=100001,
payload=chunk.to_message().SerializeToString(),
).SerializeToString()
return encode.ui_frame(73, packet)


if __name__ == '__main__':
unittest.main()
1 change: 1 addition & 0 deletions pw_transfer/integration_test/python_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def _main() -> int:
default_response_timeout_s=config.chunk_timeout_ms / 1000,
initial_response_timeout_s=config.initial_chunk_timeout_ms / 1000,
max_retries=config.max_retries,
max_lifetime_retries=config.max_lifetime_retries,
default_protocol_version=pw_transfer.ProtocolVersion.LATEST,
)

Expand Down
1 change: 1 addition & 0 deletions pw_transfer/integration_test/test_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def default_server_config() -> config_pb2.ServerConfig:
def default_client_config() -> config_pb2.ClientConfig:
return config_pb2.ClientConfig(
max_retries=5,
max_lifetime_retries=1500,
initial_chunk_timeout_ms=4000,
chunk_timeout_ms=4000,
)
Expand Down

0 comments on commit 1213a5e

Please sign in to comment.