Skip to content

Commit

Permalink
pw_bluetooth_proxy: L2capCoc segmentation
Browse files Browse the repository at this point in the history
Bug: 360932103, 379337272
Change-Id: Ie799a49e3f3ddf4a5e817d9d4a1ac623901a4533
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/254674
Docs-Not-Needed: Ali Saeed <[email protected]>
Lint: Lint 🤖 <[email protected]>
Commit-Queue: Ali Saeed <[email protected]>
Reviewed-by: Ben Lawson <[email protected]>
Reviewed-by: David Rees <[email protected]>
  • Loading branch information
acsaeed authored and CQ Bot Account committed Dec 23, 2024
1 parent 409b1f4 commit c0162a3
Show file tree
Hide file tree
Showing 15 changed files with 526 additions and 264 deletions.
55 changes: 49 additions & 6 deletions pw_bluetooth_proxy/acl_data_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,36 @@

namespace pw::bluetooth::proxy {

AclDataChannel::SendCredit::SendCredit(SendCredit&& other) {
*this = std::move(other);
}

AclDataChannel::SendCredit& AclDataChannel::SendCredit::operator=(
SendCredit&& other) {
if (this != &other) {
transport_ = other.transport_;
relinquish_fn_ = std::move(other.relinquish_fn_);
other.relinquish_fn_ = nullptr;
}
return *this;
}

AclDataChannel::SendCredit::~SendCredit() {
if (relinquish_fn_) {
relinquish_fn_(transport_);
}
}

AclDataChannel::SendCredit::SendCredit(
AclTransportType transport,
Function<void(AclTransportType transport)>&& relinquish_fn)
: transport_(transport), relinquish_fn_(std::move(relinquish_fn)) {}

void AclDataChannel::SendCredit::MarkUsed() {
PW_CHECK(relinquish_fn_);
relinquish_fn_ = nullptr;
}

void AclDataChannel::Reset() {
std::lock_guard lock(mutex_);
le_credits_.Reset();
Expand Down Expand Up @@ -376,7 +406,21 @@ uint16_t AclDataChannel::GetNumFreeAclPackets(
return LookupCredits(transport).Remaining();
}

pw::Status AclDataChannel::SendAcl(H4PacketWithH4&& h4_packet) {
std::optional<AclDataChannel::SendCredit> AclDataChannel::ReserveSendCredit(
AclTransportType transport) {
std::lock_guard lock(mutex_);
if (const auto status = LookupCredits(transport).MarkPending(1);
!status.ok()) {
return std::nullopt;
}
return SendCredit(transport, [this](AclTransportType t) {
std::lock_guard fn_lock(mutex_);
LookupCredits(t).MarkCompleted(1);
});
}

pw::Status AclDataChannel::SendAcl(H4PacketWithH4&& h4_packet,
SendCredit&& credit) {
std::lock_guard lock(mutex_);
Result<emboss::AclDataFrameHeaderView> acl_view =
MakeEmbossView<emboss::AclDataFrameHeaderView>(h4_packet.GetHciSpan());
Expand All @@ -392,12 +436,11 @@ pw::Status AclDataChannel::SendAcl(H4PacketWithH4&& h4_packet) {
return pw::Status::NotFound();
}

if (const auto status =
LookupCredits(connection_ptr->transport()).MarkPending(1);
!status.ok()) {
PW_LOG_WARN("No ACL send credits available. So will not send.");
return pw::Status::Unavailable();
if (connection_ptr->transport() != credit.transport_) {
PW_LOG_WARN("Provided credit for wrong transport. So will not send.");
return pw::Status::InvalidArgument();
}
credit.MarkUsed();

connection_ptr->set_num_pending_packets(
connection_ptr->num_pending_packets() + 1);
Expand Down
3 changes: 2 additions & 1 deletion pw_bluetooth_proxy/basic_l2cap_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ pw::Status BasicL2capChannel::Write(pw::span<const uint8_t> payload) {

// TODO: https://pwbug.dev/360929142 - Reject payloads exceeding MTU.

pw::Result<H4PacketWithH4> h4_result = PopulateTxL2capPacket(payload.size());
pw::Result<H4PacketWithH4> h4_result =
PopulateTxL2capPacketDuringWrite(payload.size());
if (!h4_result.ok()) {
// This can fail as a result of the L2CAP PDU not fitting in an H4 buffer
// or if all buffers are occupied.
Expand Down
3 changes: 2 additions & 1 deletion pw_bluetooth_proxy/gatt_notify_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pw::Status GattNotifyChannel::Write(pw::span<const uint8_t> attribute_value) {

size_t att_size =
emboss::AttHandleValueNtf::MinSizeInBytes() + attribute_value.size();
pw::Result<H4PacketWithH4> h4_result = PopulateTxL2capPacket(att_size);
pw::Result<H4PacketWithH4> h4_result =
PopulateTxL2capPacketDuringWrite(att_size);
if (!h4_result.ok()) {
// This can fail as a result of the L2CAP PDU not fitting in an H4 buffer
// or if all buffers are occupied.
Expand Down
66 changes: 56 additions & 10 deletions pw_bluetooth_proxy/l2cap_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,8 @@ std::optional<H4PacketWithH4> L2capChannel::DequeuePacket() {
bool should_notify = false;
{
std::lock_guard lock(send_queue_mutex_);
if (!send_queue_.empty()) {
packet.emplace(std::move(send_queue_.front()));
send_queue_.pop();
packet = GenerateNextTxPacket();
if (packet) {
should_notify = notify_on_dequeue_;
notify_on_dequeue_ = false;
}
Expand All @@ -130,6 +129,38 @@ std::optional<H4PacketWithH4> L2capChannel::DequeuePacket() {
return packet;
}

StatusWithMultiBuf L2capChannel::QueuePayload(multibuf::MultiBuf&& buf) {
PW_CHECK(state() == State::kRunning);
PW_CHECK(buf.IsContiguous());

{
std::lock_guard lock(send_queue_mutex_);
if (payload_queue_.full()) {
notify_on_dequeue_ = true;
return {Status::Unavailable(), std::move(buf)};
}
payload_queue_.push(std::move(buf));
}

ReportPacketsMayBeReadyToSend();
return {OkStatus(), std::nullopt};
}

void L2capChannel::PopFrontPayload() {
PW_CHECK(!payload_queue_.empty());
payload_queue_.pop();
}

ConstByteSpan L2capChannel::GetFrontPayloadSpan() const {
PW_CHECK(!payload_queue_.empty());
const multibuf::MultiBuf& buf = payload_queue_.front();
std::optional<ConstByteSpan> span = buf.ContiguousSpan();
PW_CHECK(span);
return *span;
}

bool L2capChannel::PayloadQueueEmpty() const { return payload_queue_.empty(); }

bool L2capChannel::OnPduReceivedFromController(pw::span<uint8_t> l2cap_pdu) {
if (state() != State::kRunning) {
SendEvent(L2capChannelEvent::kRxWhileStopped);
Expand Down Expand Up @@ -186,11 +217,33 @@ bool L2capChannel::AreValidParameters(uint16_t connection_handle,
return true;
}

std::optional<H4PacketWithH4> L2capChannel::GenerateNextTxPacket() {
if (send_queue_.empty()) {
return std::nullopt;
}
H4PacketWithH4 packet = std::move(send_queue_.front());
send_queue_.pop();
return packet;
}

pw::Result<H4PacketWithH4> L2capChannel::PopulateTxL2capPacket(
uint16_t data_length) {
return PopulateL2capPacket(data_length);
}

pw::Result<H4PacketWithH4> L2capChannel::PopulateTxL2capPacketDuringWrite(
uint16_t data_length) {
pw::Result<H4PacketWithH4> packet_result = PopulateL2capPacket(data_length);
if (packet_result.status().IsUnavailable()) {
std::lock_guard lock(send_queue_mutex_);
// If there were no buffers, they are all in the queue currently. This can
// happen if queue size == buffer count. Mark that a writer is getting an
// Unavailable status, and should be notified when queue space opens up.
notify_on_dequeue_ = true;
}
return packet_result;
}

pw::Result<H4PacketWithH4> L2capChannel::PopulateL2capPacket(
uint16_t data_length) {
const size_t l2cap_packet_size =
Expand All @@ -202,13 +255,6 @@ pw::Result<H4PacketWithH4> L2capChannel::PopulateL2capPacket(
pw::Result<H4PacketWithH4> h4_packet_res =
l2cap_channel_manager_.GetAclH4Packet(h4_packet_size);
if (!h4_packet_res.ok()) {
// If there were no buffers, they are all in the queue currently. This can
// happen if queue size == buffer count. Mark that a writer is getting an
// Unavailable status, and should be notified when queue space opens up.
if (h4_packet_res.status().IsUnavailable()) {
std::lock_guard lock(send_queue_mutex_);
notify_on_dequeue_ = true;
}
return h4_packet_res.status();
}
H4PacketWithH4 h4_packet = std::move(h4_packet_res.value());
Expand Down
84 changes: 53 additions & 31 deletions pw_bluetooth_proxy/l2cap_channel_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ void L2capChannelManager::Reset() { h4_storage_.Reset(); }

void L2capChannelManager::RegisterChannel(L2capChannel& channel) {
std::lock_guard lock(channels_mutex_);
channels_.push_front(channel);
// Insert new channels before `lrd_channel_`.
IntrusiveForwardList<L2capChannel>::iterator before_it =
channels_.before_begin();
for (auto it = channels_.begin(); it != lrd_channel_; ++it) {
++before_it;
}
channels_.insert_after(before_it, channel);
if (lrd_channel_ == channels_.end()) {
lrd_channel_ = channels_.begin();
}
Expand Down Expand Up @@ -65,11 +71,11 @@ pw::Result<H4PacketWithH4> L2capChannelManager::GetAclH4Packet(uint16_t size) {
return pw::Status::Unavailable();
}

H4PacketWithH4 h4_packet(
span(h4_buff->data(), size),
/*release_fn=*/[h4_storage = &h4_storage_](const uint8_t* buffer) {
h4_storage->ReleaseH4Buff(buffer);
});
H4PacketWithH4 h4_packet(span(h4_buff->data(), size),
/*release_fn=*/[this](const uint8_t* buffer) {
this->h4_storage_.ReleaseH4Buff(buffer);
DrainChannelQueues();
});
h4_packet.SetH4Type(emboss::H4PacketType::ACL_DATA);

return h4_packet;
Expand All @@ -80,42 +86,58 @@ uint16_t L2capChannelManager::GetH4BuffSize() const {
}

void L2capChannelManager::DrainChannelQueues() {
std::lock_guard lock(channels_mutex_);

if (channels_.empty()) {
return;
}

DrainChannelQueues(AclTransportType::kBrEdr);
DrainChannelQueues(AclTransportType::kLe);
}

void L2capChannelManager::DrainChannelQueues(AclTransportType transport) {
IntrusiveForwardList<L2capChannel>::iterator round_robin_start = lrd_channel_;
// Iterate around `channels_` in round robin fashion. For each channel, send
// as many queued packets as are available. Proceed until we run out of ACL
// send credits or finish visiting every channel.
// TODO: https://pwbug.dev/379337260 - Only drain one L2CAP PDU per channel
// before moving on. (This may require sending multiple ACL fragments.)
while (acl_data_channel_.GetNumFreeAclPackets(transport) > 0) {
if (lrd_channel_->transport() != transport) {
Advance(lrd_channel_);
if (lrd_channel_ == round_robin_start) {
return;
}
continue;
// Establish an upper bound on the number of channels to visit on this round
// robin. We cannot simply store a pointer to the starting channel, because
// any channel can be released while unlocked to send a packet.
size_t visits_remaining = 0;
{
std::lock_guard lock(channels_mutex_);
containers::ForEach(
channels_, [&visits_remaining](L2capChannel&) { ++visits_remaining; });
if (visits_remaining == 0) {
return;
}
}

std::optional<H4PacketWithH4> packet = lrd_channel_->DequeuePacket();
if (!packet) {
Advance(lrd_channel_);
if (lrd_channel_ == round_robin_start) {
for (std::optional<AclDataChannel::SendCredit> credit =
acl_data_channel_.ReserveSendCredit(transport);
credit;
credit = acl_data_channel_.ReserveSendCredit(transport)) {
// In each iteration of this loop we have reserved one send credit.

std::optional<H4PacketWithH4> packet;
{
std::lock_guard lock(channels_mutex_);
// It is possible for channels to have been released before current lock
// was acquired.
if (lrd_channel_ == channels_.end()) {
return;
}
continue;
do {
if (lrd_channel_->transport() == transport) {
packet = lrd_channel_->DequeuePacket();
}
Advance(lrd_channel_);
--visits_remaining;
} while (!packet && visits_remaining > 0);
}

if (packet) {
// Send while unlocked. This can trigger a recursive round robin once
// `packet` is released, but this is fine because `lrd_channel_` has
// been adjusted so the recursive call will start where this one left off.
PW_CHECK_OK(
acl_data_channel_.SendAcl(std::move(*packet), std::move(*credit)));
}

PW_CHECK_OK(acl_data_channel_.SendAcl(std::move(*packet)));
if (visits_remaining == 0) {
break;
}
}
}

Expand Down
Loading

0 comments on commit c0162a3

Please sign in to comment.