Skip to content

Commit

Permalink
pw_transfer: Version 2 completion handshake in C++
Browse files Browse the repository at this point in the history
In legacy transfer, completions were one-sided. As such, there was no
guarantee that the peer of a transfer recognized that it was finished.
If the completion chunk was lost, the peer would continue trying to send
regular chunks until it timed out, while the terminator could already
have cleaned up all of its resources and been reused in a new context.
This has led to several bugs in projects using pw_transfer.

This change updates the completion protocol in transfer v2 to require
that completion chunks are acknowledged. After one side of the transfer
terminates with a completion chunk, it waits for the peer to acknowledge
the completion. The completion chunk is re-sent in response to non-ack
chunks or following a timeout.

Change-Id: Ic5924caab4aad4c2ddfc5a897e521937ea553d45
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/102600
Commit-Queue: Alexei Frolov <[email protected]>
Reviewed-by: Wyatt Hepler <[email protected]>
  • Loading branch information
frolv authored and CQ Bot Account committed Aug 15, 2022
1 parent 1493ec5 commit 6433797
Show file tree
Hide file tree
Showing 9 changed files with 552 additions and 111 deletions.
2 changes: 1 addition & 1 deletion pw_transfer/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pw_source_set("core") {
public_configs = [ ":public_include_path" ]
public_deps = [
":config",
":proto.pwpb",
"$dir_pw_chrono:system_clock",
"$dir_pw_preprocessor",
"$dir_pw_rpc:client",
Expand All @@ -94,7 +95,6 @@ pw_source_set("core") {
dir_pw_stream,
]
deps = [
":proto.pwpb",
dir_pw_log,
dir_pw_protobuf,
dir_pw_varint,
Expand Down
1 change: 0 additions & 1 deletion pw_transfer/chunk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "pw_protobuf/decoder.h"
#include "pw_protobuf/serialized_size.h"
#include "pw_status/try.h"
#include "pw_transfer/transfer.pwpb.h"

namespace pw::transfer::internal {

Expand Down
240 changes: 237 additions & 3 deletions pw_transfer/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,10 @@ TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded());

EXPECT_EQ(transfer_status, Status::DeadlineExceeded());

// Ensure we don't leave a dangling reference to transfer_status.
client_.CancelTransfer(14);
transfer_thread_.WaitUntilEventIsProcessed();
}

TEST_F(WriteTransfer, ManualCancel) {
Expand Down Expand Up @@ -1671,6 +1675,10 @@ TEST_F(ReadTransfer, Version2_SingleChunk) {
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
0);

context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
.set_session_id(29)));
}

TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
Expand Down Expand Up @@ -1813,6 +1821,10 @@ TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
0);

context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
.set_session_id(31)));
}

TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
Expand Down Expand Up @@ -1902,6 +1914,10 @@ TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
0);

context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
.set_session_id(33)));
}

TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
Expand Down Expand Up @@ -1944,6 +1960,203 @@ TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
EXPECT_EQ(transfer_status, Status::Unauthenticated());
}

TEST_F(ReadTransfer, Version2_TimeoutWaitingForCompletionAckRetries) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();

ASSERT_EQ(OkStatus(),
client_.Read(
3,
writer,
[&transfer_status](Status status) { transfer_status = status; },
cfg::kDefaultChunkTimeout,
ProtocolVersion::kVersionTwo));

transfer_thread_.WaitUntilEventIsProcessed();

// Initial chunk of the transfer is sent. This chunk should contain all the
// fields from both legacy and version 2 protocols for backwards
// compatibility.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());

Chunk chunk = DecodeChunk(payloads[0]);
EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.session_id(), 3u);
EXPECT_EQ(chunk.resource_id(), 3u);
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), 64u);
EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);

// The server responds with a START_ACK, continuing the version 2 handshake
// and assigning a session_id to the transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
.set_session_id(29)
.set_resource_id(3)));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(payloads.size(), 2u);

// Client should accept the session_id with a START_ACK_CONFIRMATION,
// additionally containing the initial parameters for the read transfer.
chunk = DecodeChunk(payloads.back());
EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.session_id(), 29u);
EXPECT_FALSE(chunk.resource_id().has_value());
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), 64u);
EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);

// Send all the transfer data. Client should accept it and complete the
// transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
.set_session_id(29)
.set_offset(0)
.set_payload(kData32)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(payloads.size(), 3u);

chunk = DecodeChunk(payloads.back());
EXPECT_EQ(chunk.session_id(), 29u);
EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());

EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
0);

// Time out instead of sending a completion ACK. THe transfer should resend
// its completion chunk.
transfer_thread_.SimulateClientTimeout(29);
ASSERT_EQ(payloads.size(), 4u);

// Reset transfer_status to check whether the handler is called again.
transfer_status = Status::Unknown();

chunk = DecodeChunk(payloads.back());
EXPECT_EQ(chunk.session_id(), 29u);
EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());

// Transfer handler should not be called a second time in response to the
// re-sent completion chunk.
EXPECT_EQ(transfer_status, Status::Unknown());

// Send a completion ACK to end the transfer.
context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
.set_session_id(29)));
transfer_thread_.WaitUntilEventIsProcessed();

// No further chunks should be sent following the ACK.
transfer_thread_.SimulateClientTimeout(29);
ASSERT_EQ(payloads.size(), 4u);
}

TEST_F(ReadTransfer,
Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries) {
stream::MemoryWriterBuffer<64> writer;
Status transfer_status = Status::Unknown();

ASSERT_EQ(OkStatus(),
client_.Read(
3,
writer,
[&transfer_status](Status status) { transfer_status = status; },
cfg::kDefaultChunkTimeout,
ProtocolVersion::kVersionTwo));

transfer_thread_.WaitUntilEventIsProcessed();

// Initial chunk of the transfer is sent. This chunk should contain all the
// fields from both legacy and version 2 protocols for backwards
// compatibility.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Read>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());

Chunk chunk = DecodeChunk(payloads[0]);
EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.session_id(), 3u);
EXPECT_EQ(chunk.resource_id(), 3u);
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), 64u);
EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);

// The server responds with a START_ACK, continuing the version 2 handshake
// and assigning a session_id to the transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
.set_session_id(29)
.set_resource_id(3)));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(payloads.size(), 2u);

// Client should accept the session_id with a START_ACK_CONFIRMATION,
// additionally containing the initial parameters for the read transfer.
chunk = DecodeChunk(payloads.back());
EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.session_id(), 29u);
EXPECT_FALSE(chunk.resource_id().has_value());
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), 64u);
EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);

// Send all the transfer data. Client should accept it and complete the
// transfer.
context_.server().SendServerStream<Transfer::Read>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
.set_session_id(29)
.set_offset(0)
.set_payload(kData32)
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(payloads.size(), 3u);

chunk = DecodeChunk(payloads.back());
EXPECT_EQ(chunk.session_id(), 29u);
EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());

EXPECT_EQ(transfer_status, OkStatus());
EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
0);

// Time out instead of sending a completion ACK. THe transfer should resend
// its completion chunk at first, then terminate after the maximum number of
// retries.
transfer_thread_.SimulateClientTimeout(29);
ASSERT_EQ(payloads.size(), 4u); // Retry 1.

transfer_thread_.SimulateClientTimeout(29);
ASSERT_EQ(payloads.size(), 5u); // Retry 2.

transfer_thread_.SimulateClientTimeout(29);
ASSERT_EQ(payloads.size(), 6u); // Retry 3.

transfer_thread_.SimulateClientTimeout(29);
ASSERT_EQ(payloads.size(), 6u); // No more retries; transfer has ended.
}

TEST_F(WriteTransfer, Version2_SingleChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
Expand Down Expand Up @@ -2023,7 +2236,14 @@ TEST_F(WriteTransfer, Version2_SingleChunk) {
EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 29, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();

EXPECT_EQ(payloads.size(), 4u);
// Client should acknowledge the completion of the transfer.
EXPECT_EQ(payloads.size(), 5u);

chunk = DecodeChunk(payloads[4]);
EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.session_id(), 29u);

EXPECT_EQ(transfer_status, OkStatus());
}

Expand Down Expand Up @@ -2180,7 +2400,14 @@ TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 31, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();

EXPECT_EQ(payloads.size(), 5u);
// Client should acknowledge the completion of the transfer.
EXPECT_EQ(payloads.size(), 6u);

chunk = DecodeChunk(payloads[5]);
EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.session_id(), 31u);

EXPECT_EQ(transfer_status, OkStatus());
}

Expand Down Expand Up @@ -2273,7 +2500,14 @@ TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 33, OkStatus())));
transfer_thread_.WaitUntilEventIsProcessed();

EXPECT_EQ(payloads.size(), 5u);
// Client should acknowledge the completion of the transfer.
EXPECT_EQ(payloads.size(), 6u);

chunk = DecodeChunk(payloads[5]);
EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.session_id(), 33u);

EXPECT_EQ(transfer_status, OkStatus());
}

Expand Down
Loading

0 comments on commit 6433797

Please sign in to comment.