Skip to content

Commit

Permalink
pw_rpc: Clean up comments; call MarkClosed() in MoveFrom()
Browse files Browse the repository at this point in the history
- Have Call::MoveFrom() call MarkClosed() to reduce duplication.
- Set the call ID to 0 when a call is closed.
- Make the public set_on_next() function protected.
- Expand or update a few comments.

Change-Id: I0bd01fa802661b7532fe797d805e5cdf24e7f829
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126750
Reviewed-by: Alexei Frolov <[email protected]>
Pigweed-Auto-Submit: Wyatt Hepler <[email protected]>
Commit-Queue: Auto-Submit <[email protected]>
  • Loading branch information
255 authored and CQ Bot Account committed Jan 19, 2023
1 parent de048d8 commit 35c0f02
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 20 deletions.
3 changes: 1 addition & 2 deletions pw_rpc/call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ void Call::MoveFrom(Call& other) {
on_next_ = std::move(other.on_next_);

// Mark the other call inactive, unregister it, and register this one.
other.rpc_state_ = kInactive;
other.client_stream_state_ = kClientStreamInactive;
other.MarkClosed();

endpoint().UnregisterCall(other);
endpoint().RegisterUniqueCall(*this);
Expand Down
26 changes: 26 additions & 0 deletions pw_rpc/call_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,32 @@ TEST_F(ServerReaderWriterTest, Move_MovesCallbacks) {
EXPECT_EQ(calls, 2 + PW_RPC_CLIENT_STREAM_END_CALLBACK);
}

TEST_F(ServerReaderWriterTest, Move_ClearsCallAndChannelId) {
rpc_lock().lock();
reader_writer_.set_id(999);
EXPECT_NE(reader_writer_.channel_id_locked(), 0u);
rpc_lock().unlock();

FakeServerReaderWriter destination(std::move(reader_writer_));

LockGuard lock(rpc_lock());
EXPECT_EQ(reader_writer_.id(), 0u);
EXPECT_EQ(reader_writer_.channel_id_locked(), 0u);
}

TEST_F(ServerReaderWriterTest, Close_ClearsCallAndChannelId) {
rpc_lock().lock();
reader_writer_.set_id(999);
EXPECT_NE(reader_writer_.channel_id_locked(), 0u);
rpc_lock().unlock();

EXPECT_EQ(OkStatus(), reader_writer_.Finish());

LockGuard lock(rpc_lock());
EXPECT_EQ(reader_writer_.id(), 0u);
EXPECT_EQ(reader_writer_.channel_id_locked(), 0u);
}

} // namespace
} // namespace internal
} // namespace pw::rpc
37 changes: 21 additions & 16 deletions pw_rpc/public/pw_rpc/internal/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,21 @@ class Call : public IntrusiveList<Call>::Item {

void set_id(uint32_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { id_ = id; }

// Public function for accessing the channel ID of this call. Set to 0 when
// the call is closed.
uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
LockGuard lock(rpc_lock());
return channel_id_locked();
}

uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
return channel_id_;
}

uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
return service_id_;
}

uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
return method_id_;
}
Expand Down Expand Up @@ -173,13 +178,13 @@ class Call : public IntrusiveList<Call>::Item {
pwpb::PacketType::SERVER_ERROR, {}, error);
}

// Public call that ends the client stream for a client call.
// Public function that ends the client stream for a client call.
Status CloseClientStream() PW_LOCKS_EXCLUDED(rpc_lock()) {
LockGuard lock(rpc_lock());
return CloseClientStreamLocked();
}

// Internal call that closes the client stream.
// Internal function that closes the client stream.
Status CloseClientStreamLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
client_stream_state_ = kClientStreamInactive;
return SendPacket(pwpb::PacketType::CLIENT_STREAM_END, {}, {});
Expand All @@ -198,8 +203,6 @@ class Call : public IntrusiveList<Call>::Item {
// is closed.
void SendInitialClientRequest(ConstByteSpan payload)
PW_UNLOCK_FUNCTION(rpc_lock()) {
// TODO(b/234876851): Ensure the call object is locked before releasing the
// RPC mutex.
if (const Status status = SendPacket(pwpb::PacketType::REQUEST, payload);
!status.ok()) {
HandleError(status);
Expand Down Expand Up @@ -233,9 +236,8 @@ class Call : public IntrusiveList<Call>::Item {
// service unregistered). Does NOT unregister the call! The calls must be
// removed when iterating over the list in the endpoint.
void Abort() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
// Locking here is problematic because CallOnError releases rpc_lock().
//
// b/234876851 must be addressed before the locking here can be cleaned up.
// TODO(b/260922913): Locking here is problematic because CallOnError
// releases rpc_lock().
MarkClosed();

CallOnError(Status::Aborted());
Expand All @@ -256,14 +258,6 @@ class Call : public IntrusiveList<Call>::Item {
return client_stream_state_ == kClientStreamActive;
}

// Keep this public so the Nanopb implementation can set it from a helper
// function.
void set_on_next(Function<void(ConstByteSpan)>&& on_next)
PW_LOCKS_EXCLUDED(rpc_lock()) {
LockGuard lock(rpc_lock());
set_on_next_locked(std::move(on_next));
}

protected:
// Creates an inactive Call.
constexpr Call()
Expand Down Expand Up @@ -294,17 +288,27 @@ class Call : public IntrusiveList<Call>::Item {
return *endpoint_;
}

// Public function that sets the on_next function in the raw API.
void set_on_next(Function<void(ConstByteSpan)>&& on_next)
PW_LOCKS_EXCLUDED(rpc_lock()) {
LockGuard lock(rpc_lock());
set_on_next_locked(std::move(on_next));
}

// Internal function that sets on_next.
void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
on_next_ = std::move(on_next);
}

// Public function that sets the on_error callback.
void set_on_error(Function<void(Status)>&& on_error)
PW_LOCKS_EXCLUDED(rpc_lock()) {
LockGuard lock(rpc_lock());
set_on_error_locked(std::move(on_error));
}

// Internal function that sets on_error.
void set_on_error_locked(Function<void(Status)>&& on_error)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
on_error_ = std::move(on_error);
Expand All @@ -320,7 +324,7 @@ class Call : public IntrusiveList<Call>::Item {
pwpb::PacketType::RESPONSE, {}, status);
}

// Cancels an RPC. For client calls only.
// Cancels an RPC. Public function for client calls only.
Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) {
LockGuard lock(rpc_lock());
return CloseAndSendFinalPacketLocked(
Expand Down Expand Up @@ -360,6 +364,7 @@ class Call : public IntrusiveList<Call>::Item {

void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
channel_id_ = Channel::kUnassignedChannelId;
id_ = 0;
rpc_state_ = kInactive;
client_stream_state_ = kClientStreamInactive;
}
Expand Down
3 changes: 1 addition & 2 deletions pw_rpc/public/pw_rpc/internal/client_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class ClientCall : public Call {
CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
: Call(client, channel_id, service_id, method_id, properties) {}

// Sends CLIENT_STREAM_END if applicable, releases any held payload buffer,
// and marks the call as closed.
// Sends CLIENT_STREAM_END if applicable and marks the call as closed.
void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

void MoveClientCallFrom(ClientCall& other)
Expand Down
3 changes: 3 additions & 0 deletions pw_rpc/pw_rpc_private/fake_server_reader_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class FakeServerReaderWriter : private ServerCall {

// Expose a few additional methods for test use.
ServerCall& as_server_call() { return *this; }
using Call::channel_id_locked;
using Call::id;
using Call::set_id;
};

class FakeServerWriter : private FakeServerReaderWriter {
Expand Down

0 comments on commit 35c0f02

Please sign in to comment.