diff --git a/net/dcsctp/socket/stream_reset_handler.cc b/net/dcsctp/socket/stream_reset_handler.cc index c81b34b626..f9201eadf0 100644 --- a/net/dcsctp/socket/stream_reset_handler.cc +++ b/net/dcsctp/socket/stream_reset_handler.cc @@ -278,8 +278,8 @@ absl::optional StreamResetHandler::MakeStreamResetRequest() { return absl::nullopt; } - current_request_.emplace(TSN(*retransmission_queue_->next_tsn() - 1), - retransmission_queue_->GetStreamsReadyToBeReset()); + current_request_.emplace(retransmission_queue_->last_assigned_tsn(), + retransmission_queue_->BeginResetStreams()); reconfig_timer_->set_duration(ctx_->current_rto()); reconfig_timer_->Start(); return MakeReconfigChunk(); diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index 43fd41639e..5219317d97 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -104,6 +104,7 @@ rtc_library("outstanding_data") { "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base/containers:flat_set", "../common:math", "../common:sequence_numbers", "../common:str_join", diff --git a/net/dcsctp/tx/outstanding_data.cc b/net/dcsctp/tx/outstanding_data.cc index 4f1e863056..c65c195432 100644 --- a/net/dcsctp/tx/outstanding_data.cc +++ b/net/dcsctp/tx/outstanding_data.cc @@ -159,6 +159,9 @@ void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, outstanding_data_.erase(outstanding_data_.begin(), first_unacked); last_cumulative_tsn_ack_ = cumulative_tsn_ack; + stream_reset_breakpoint_tsns_.erase(stream_reset_breakpoint_tsns_.begin(), + stream_reset_breakpoint_tsns_.upper_bound( + cumulative_tsn_ack.next_value())); } void OutstandingData::AckGapBlocks( @@ -487,7 +490,8 @@ ForwardTsnChunk OutstandingData::CreateForwardTsn() const { UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; for (const auto& [tsn, item] : outstanding_data_) { - if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { + if (stream_reset_breakpoint_tsns_.contains(tsn) || + (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; } new_cumulative_ack = tsn; @@ -510,7 +514,8 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; for (const auto& [tsn, item] : outstanding_data_) { - if ((tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { + if (stream_reset_breakpoint_tsns_.contains(tsn) || + (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; } new_cumulative_ack = tsn; @@ -540,4 +545,8 @@ void OutstandingData::ResetSequenceNumbers(UnwrappedTSN next_tsn, next_tsn_ = next_tsn; last_cumulative_tsn_ack_ = last_cumulative_tsn; } + +void OutstandingData::BeginResetStreams() { + stream_reset_breakpoint_tsns_.insert(next_tsn_); +} } // namespace dcsctp diff --git a/net/dcsctp/tx/outstanding_data.h b/net/dcsctp/tx/outstanding_data.h index 6b4b7121fb..0bca1c6fe4 100644 --- a/net/dcsctp/tx/outstanding_data.h +++ b/net/dcsctp/tx/outstanding_data.h @@ -22,6 +22,7 @@ #include "net/dcsctp/packet/chunk/sack_chunk.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/types.h" +#include "rtc_base/containers/flat_set.h" namespace dcsctp { @@ -159,6 +160,10 @@ class OutstandingData { void ResetSequenceNumbers(UnwrappedTSN next_tsn, UnwrappedTSN last_cumulative_tsn); + // Called when an outgoing stream reset is sent, marking the last assigned TSN + // as a breakpoint that a FORWARD-TSN shouldn't cross. + void BeginResetStreams(); + private: // A fragmented message's DATA chunk while in the retransmission queue, and // its associated metadata. @@ -345,6 +350,10 @@ class OutstandingData { std::set to_be_fast_retransmitted_; // Data chunks that are to be retransmitted. std::set to_be_retransmitted_; + // Wben a stream reset has begun, the "next TSN to assign" is added to this + // set, and removed when the cum-ack TSN reaches it. This is used to limit a + // FORWARD-TSN to reset streams past a "stream reset last assigned TSN". + webrtc::flat_set stream_reset_breakpoint_tsns_; }; } // namespace dcsctp #endif // NET_DCSCTP_TX_OUTSTANDING_DATA_H_ diff --git a/net/dcsctp/tx/outstanding_data_test.cc b/net/dcsctp/tx/outstanding_data_test.cc index cdca40cfef..3bb82fd3e6 100644 --- a/net/dcsctp/tx/outstanding_data_test.cc +++ b/net/dcsctp/tx/outstanding_data_test.cc @@ -27,11 +27,14 @@ namespace { using ::testing::MockFunction; using State = ::dcsctp::OutstandingData::State; using ::testing::_; +using ::testing::AllOf; using ::testing::ElementsAre; using ::testing::IsEmpty; using ::testing::Pair; +using ::testing::Property; using ::testing::Return; using ::testing::StrictMock; +using ::testing::UnorderedElementsAre; constexpr TimeMs kNow(42); @@ -587,5 +590,97 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) { EXPECT_FALSE(ack2.has_packet_loss); EXPECT_THAT(ack2.abandoned_lifecycle_ids, ElementsAre(LifecycleId(42))); } + +TEST_F(OutstandingDataTest, GeneratesForwardTsnUntilNextStreamResetTsn) { + // This test generates: + // * Stream 1: TSN 10, 11, 12 + // * Stream 2: TSN 13, 14 + // * Stream 3: TSN 15, 16 + // + // Then it expires chunk 12-15, and ensures that the generated FORWARD-TSN + // only includes up till TSN 12 until the cum ack TSN has reached 12, and then + // 13 and 14 are included, and then after the cum ack TSN has reached 14, then + // 15 is included. + // + // What it shouldn't do, is to generate a FORWARD-TSN directly at the start + // with new TSN=15, and setting [(sid=1, ssn=44), (sid=2, ssn=46), + // (sid=3, ssn=47)], because that will confuse the receiver at TSN=17, + // receiving SID=1, SSN=0 (it's reset!), expecting SSN to be 45. + constexpr DataGeneratorOptions kStream1 = {.stream_id = StreamID(1)}; + constexpr DataGeneratorOptions kStream2 = {.stream_id = StreamID(2)}; + constexpr DataGeneratorOptions kStream3 = {.stream_id = StreamID(3)}; + EXPECT_CALL(on_discard_, Call).WillRepeatedly(Return(false)); + + // TSN 10-12 + buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow); + buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow); + buf_.Insert(gen_.Ordered({1}, "BE", kStream1), kNow, MaxRetransmits(0)); + + buf_.BeginResetStreams(); + + // TSN 13, 14 + buf_.Insert(gen_.Ordered({1}, "BE", kStream2), kNow, MaxRetransmits(0)); + buf_.Insert(gen_.Ordered({1}, "BE", kStream2), kNow, MaxRetransmits(0)); + + buf_.BeginResetStreams(); + + // TSN 15, 16 + buf_.Insert(gen_.Ordered({1}, "BE", kStream3), kNow, MaxRetransmits(0)); + buf_.Insert(gen_.Ordered({1}, "BE", kStream3), kNow); + + EXPECT_FALSE(buf_.ShouldSendForwardTsn()); + + buf_.HandleSack(unwrapper_.Unwrap(TSN(11)), {}, false); + buf_.NackAll(); + EXPECT_THAT(buf_.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(11), State::kAcked), // + Pair(TSN(12), State::kAbandoned), // + Pair(TSN(13), State::kAbandoned), // + Pair(TSN(14), State::kAbandoned), // + Pair(TSN(15), State::kAbandoned), // + Pair(TSN(16), State::kToBeRetransmitted))); + + EXPECT_TRUE(buf_.ShouldSendForwardTsn()); + EXPECT_THAT( + buf_.CreateForwardTsn(), + AllOf(Property(&ForwardTsnChunk::new_cumulative_tsn, TSN(12)), + Property(&ForwardTsnChunk::skipped_streams, + UnorderedElementsAre(ForwardTsnChunk::SkippedStream( + StreamID(1), SSN(44)))))); + + // Ack 12, allowing a FORWARD-TSN that spans to TSN=14 to be created. + buf_.HandleSack(unwrapper_.Unwrap(TSN(12)), {}, false); + EXPECT_TRUE(buf_.ShouldSendForwardTsn()); + EXPECT_THAT( + buf_.CreateForwardTsn(), + AllOf(Property(&ForwardTsnChunk::new_cumulative_tsn, TSN(14)), + Property(&ForwardTsnChunk::skipped_streams, + UnorderedElementsAre(ForwardTsnChunk::SkippedStream( + StreamID(2), SSN(46)))))); + + // Ack 13, allowing a FORWARD-TSN that spans to TSN=14 to be created. + buf_.HandleSack(unwrapper_.Unwrap(TSN(13)), {}, false); + EXPECT_TRUE(buf_.ShouldSendForwardTsn()); + EXPECT_THAT( + buf_.CreateForwardTsn(), + AllOf(Property(&ForwardTsnChunk::new_cumulative_tsn, TSN(14)), + Property(&ForwardTsnChunk::skipped_streams, + UnorderedElementsAre(ForwardTsnChunk::SkippedStream( + StreamID(2), SSN(46)))))); + + // Ack 14, allowing a FORWARD-TSN that spans to TSN=15 to be created. + buf_.HandleSack(unwrapper_.Unwrap(TSN(14)), {}, false); + EXPECT_TRUE(buf_.ShouldSendForwardTsn()); + EXPECT_THAT( + buf_.CreateForwardTsn(), + AllOf(Property(&ForwardTsnChunk::new_cumulative_tsn, TSN(15)), + Property(&ForwardTsnChunk::skipped_streams, + UnorderedElementsAre(ForwardTsnChunk::SkippedStream( + StreamID(3), SSN(47)))))); + + buf_.HandleSack(unwrapper_.Unwrap(TSN(15)), {}, false); + EXPECT_FALSE(buf_.ShouldSendForwardTsn()); +} + } // namespace } // namespace dcsctp diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index 93084cc27b..4cc0b64074 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -573,6 +573,10 @@ void RetransmissionQueue::PrepareResetStream(StreamID stream_id) { bool RetransmissionQueue::HasStreamsReadyToBeReset() const { return send_queue_.HasStreamsReadyToBeReset(); } +std::vector RetransmissionQueue::BeginResetStreams() { + outstanding_data_.BeginResetStreams(); + return send_queue_.GetStreamsReadyToBeReset(); +} void RetransmissionQueue::CommitResetStreams() { send_queue_.CommitResetStreams(); } diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h index 51e9c5b318..b44db2a9a0 100644 --- a/net/dcsctp/tx/retransmission_queue.h +++ b/net/dcsctp/tx/retransmission_queue.h @@ -103,6 +103,10 @@ class RetransmissionQueue { // Returns the next TSN that will be allocated for sent DATA chunks. TSN next_tsn() const { return outstanding_data_.next_tsn().Wrap(); } + TSN last_assigned_tsn() const { + return UnwrappedTSN::AddTo(outstanding_data_.next_tsn(), -1).Wrap(); + } + // Returns the size of the congestion window, in bytes. This is the number of // bytes that may be in-flight. size_t cwnd() const { return cwnd_; } @@ -148,9 +152,7 @@ class RetransmissionQueue { // to stream resetting. void PrepareResetStream(StreamID stream_id); bool HasStreamsReadyToBeReset() const; - std::vector GetStreamsReadyToBeReset() const { - return send_queue_.GetStreamsReadyToBeReset(); - } + std::vector BeginResetStreams(); void CommitResetStreams(); void RollbackResetStreams();