From 35c0f029b35ab92e13d79bbca87e2f26acc2c4ed Mon Sep 17 00:00:00 2001 From: Wyatt Hepler Date: Thu, 19 Jan 2023 19:50:31 +0000 Subject: [PATCH] pw_rpc: Clean up comments; call MarkClosed() in MoveFrom() - Have Call::MoveFrom() call MarkClosed() to reduce duplication. - Set the call ID to 0 when a call is closed. - Make the public set_on_next() function protected. - Expand or update a few comments. Change-Id: I0bd01fa802661b7532fe797d805e5cdf24e7f829 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126750 Reviewed-by: Alexei Frolov Pigweed-Auto-Submit: Wyatt Hepler Commit-Queue: Auto-Submit --- pw_rpc/call.cc | 3 +- pw_rpc/call_test.cc | 26 +++++++++++++ pw_rpc/public/pw_rpc/internal/call.h | 37 +++++++++++-------- pw_rpc/public/pw_rpc/internal/client_call.h | 3 +- .../fake_server_reader_writer.h | 3 ++ 5 files changed, 52 insertions(+), 20 deletions(-) diff --git a/pw_rpc/call.cc b/pw_rpc/call.cc index 1a3f160e03..ed16c1f4c0 100644 --- a/pw_rpc/call.cc +++ b/pw_rpc/call.cc @@ -102,8 +102,7 @@ void Call::MoveFrom(Call& other) { on_next_ = std::move(other.on_next_); // Mark the other call inactive, unregister it, and register this one. - other.rpc_state_ = kInactive; - other.client_stream_state_ = kClientStreamInactive; + other.MarkClosed(); endpoint().UnregisterCall(other); endpoint().RegisterUniqueCall(*this); diff --git a/pw_rpc/call_test.cc b/pw_rpc/call_test.cc index 49101e8efa..8971658537 100644 --- a/pw_rpc/call_test.cc +++ b/pw_rpc/call_test.cc @@ -293,6 +293,32 @@ TEST_F(ServerReaderWriterTest, Move_MovesCallbacks) { EXPECT_EQ(calls, 2 + PW_RPC_CLIENT_STREAM_END_CALLBACK); } +TEST_F(ServerReaderWriterTest, Move_ClearsCallAndChannelId) { + rpc_lock().lock(); + reader_writer_.set_id(999); + EXPECT_NE(reader_writer_.channel_id_locked(), 0u); + rpc_lock().unlock(); + + FakeServerReaderWriter destination(std::move(reader_writer_)); + + LockGuard lock(rpc_lock()); + EXPECT_EQ(reader_writer_.id(), 0u); + EXPECT_EQ(reader_writer_.channel_id_locked(), 0u); +} + +TEST_F(ServerReaderWriterTest, Close_ClearsCallAndChannelId) { + rpc_lock().lock(); + reader_writer_.set_id(999); + EXPECT_NE(reader_writer_.channel_id_locked(), 0u); + rpc_lock().unlock(); + + EXPECT_EQ(OkStatus(), reader_writer_.Finish()); + + LockGuard lock(rpc_lock()); + EXPECT_EQ(reader_writer_.id(), 0u); + EXPECT_EQ(reader_writer_.channel_id_locked(), 0u); +} + } // namespace } // namespace internal } // namespace pw::rpc diff --git a/pw_rpc/public/pw_rpc/internal/call.h b/pw_rpc/public/pw_rpc/internal/call.h index 7e6a677eb6..2a3b79aae1 100644 --- a/pw_rpc/public/pw_rpc/internal/call.h +++ b/pw_rpc/public/pw_rpc/internal/call.h @@ -129,16 +129,21 @@ class Call : public IntrusiveList::Item { void set_id(uint32_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { id_ = id; } + // Public function for accessing the channel ID of this call. Set to 0 when + // the call is closed. uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) { LockGuard lock(rpc_lock()); return channel_id_locked(); } + uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return channel_id_; } + uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return service_id_; } + uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return method_id_; } @@ -173,13 +178,13 @@ class Call : public IntrusiveList::Item { pwpb::PacketType::SERVER_ERROR, {}, error); } - // Public call that ends the client stream for a client call. + // Public function that ends the client stream for a client call. Status CloseClientStream() PW_LOCKS_EXCLUDED(rpc_lock()) { LockGuard lock(rpc_lock()); return CloseClientStreamLocked(); } - // Internal call that closes the client stream. + // Internal function that closes the client stream. Status CloseClientStreamLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { client_stream_state_ = kClientStreamInactive; return SendPacket(pwpb::PacketType::CLIENT_STREAM_END, {}, {}); @@ -198,8 +203,6 @@ class Call : public IntrusiveList::Item { // is closed. void SendInitialClientRequest(ConstByteSpan payload) PW_UNLOCK_FUNCTION(rpc_lock()) { - // TODO(b/234876851): Ensure the call object is locked before releasing the - // RPC mutex. if (const Status status = SendPacket(pwpb::PacketType::REQUEST, payload); !status.ok()) { HandleError(status); @@ -233,9 +236,8 @@ class Call : public IntrusiveList::Item { // service unregistered). Does NOT unregister the call! The calls must be // removed when iterating over the list in the endpoint. void Abort() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { - // Locking here is problematic because CallOnError releases rpc_lock(). - // - // b/234876851 must be addressed before the locking here can be cleaned up. + // TODO(b/260922913): Locking here is problematic because CallOnError + // releases rpc_lock(). MarkClosed(); CallOnError(Status::Aborted()); @@ -256,14 +258,6 @@ class Call : public IntrusiveList::Item { return client_stream_state_ == kClientStreamActive; } - // Keep this public so the Nanopb implementation can set it from a helper - // function. - void set_on_next(Function&& on_next) - PW_LOCKS_EXCLUDED(rpc_lock()) { - LockGuard lock(rpc_lock()); - set_on_next_locked(std::move(on_next)); - } - protected: // Creates an inactive Call. constexpr Call() @@ -294,17 +288,27 @@ class Call : public IntrusiveList::Item { return *endpoint_; } + // Public function that sets the on_next function in the raw API. + void set_on_next(Function&& on_next) + PW_LOCKS_EXCLUDED(rpc_lock()) { + LockGuard lock(rpc_lock()); + set_on_next_locked(std::move(on_next)); + } + + // Internal function that sets on_next. void set_on_next_locked(Function&& on_next) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { on_next_ = std::move(on_next); } + // Public function that sets the on_error callback. void set_on_error(Function&& on_error) PW_LOCKS_EXCLUDED(rpc_lock()) { LockGuard lock(rpc_lock()); set_on_error_locked(std::move(on_error)); } + // Internal function that sets on_error. void set_on_error_locked(Function&& on_error) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { on_error_ = std::move(on_error); @@ -320,7 +324,7 @@ class Call : public IntrusiveList::Item { pwpb::PacketType::RESPONSE, {}, status); } - // Cancels an RPC. For client calls only. + // Cancels an RPC. Public function for client calls only. Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) { LockGuard lock(rpc_lock()); return CloseAndSendFinalPacketLocked( @@ -360,6 +364,7 @@ class Call : public IntrusiveList::Item { void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { channel_id_ = Channel::kUnassignedChannelId; + id_ = 0; rpc_state_ = kInactive; client_stream_state_ = kClientStreamInactive; } diff --git a/pw_rpc/public/pw_rpc/internal/client_call.h b/pw_rpc/public/pw_rpc/internal/client_call.h index db1cd76441..48af2d20d2 100644 --- a/pw_rpc/public/pw_rpc/internal/client_call.h +++ b/pw_rpc/public/pw_rpc/internal/client_call.h @@ -57,8 +57,7 @@ class ClientCall : public Call { CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) : Call(client, channel_id, service_id, method_id, properties) {} - // Sends CLIENT_STREAM_END if applicable, releases any held payload buffer, - // and marks the call as closed. + // Sends CLIENT_STREAM_END if applicable and marks the call as closed. void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); void MoveClientCallFrom(ClientCall& other) diff --git a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h index b5222d452e..89c5d91c38 100644 --- a/pw_rpc/pw_rpc_private/fake_server_reader_writer.h +++ b/pw_rpc/pw_rpc_private/fake_server_reader_writer.h @@ -67,6 +67,9 @@ class FakeServerReaderWriter : private ServerCall { // Expose a few additional methods for test use. ServerCall& as_server_call() { return *this; } + using Call::channel_id_locked; + using Call::id; + using Call::set_id; }; class FakeServerWriter : private FakeServerReaderWriter {