Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Extra Lookup in PeerConnection::forwardMedia() #668

Merged
merged 4 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/rtc/description.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ class RTC_CPP_EXPORT Description {
void removeSSRC(uint32_t ssrc);
void replaceSSRC(uint32_t old, uint32_t ssrc, optional<string> name,
optional<string> msid = nullopt, optional<string> trackID = nullopt);
bool hasSSRC(uint32_t ssrc);
std::vector<uint32_t> getSSRCs();
std::optional<std::string> getCNameForSsrc(uint32_t ssrc);
bool hasSSRC(uint32_t ssrc) const;
std::vector<uint32_t> getSSRCs() const;
std::optional<std::string> getCNameForSsrc(uint32_t ssrc) const;

int bitrate() const;
void setBitrate(int bitrate);
Expand Down
6 changes: 3 additions & 3 deletions src/description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ void Description::Media::replaceSSRC(uint32_t old, uint32_t ssrc, optional<strin
addSSRC(ssrc, std::move(name), std::move(msid), std::move(trackID));
}

bool Description::Media::hasSSRC(uint32_t ssrc) {
bool Description::Media::hasSSRC(uint32_t ssrc) const {
return std::find(mSsrcs.begin(), mSsrcs.end(), ssrc) != mSsrcs.end();
}

Expand Down Expand Up @@ -905,9 +905,9 @@ Description::Media Description::Media::reciprocate() const {
return reciprocated;
}

std::vector<uint32_t> Description::Media::getSSRCs() { return mSsrcs; }
std::vector<uint32_t> Description::Media::getSSRCs() const { return mSsrcs; }

optional<string> Description::Media::getCNameForSsrc(uint32_t ssrc) {
optional<string> Description::Media::getCNameForSsrc(uint32_t ssrc) const {
auto it = mCNameMap.find(ssrc);
if (it != mCNameMap.end()) {
return it->second;
Expand Down
109 changes: 48 additions & 61 deletions src/impl/peerconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,24 +517,23 @@ void PeerConnection::forwardMedia(message_ptr message) {
}

if (!ssrcs.empty()) {
std::shared_lock lock(mTracksMutex); // read-only
for (uint32_t ssrc : ssrcs) {
if (auto mid = getMidFromSsrc(ssrc)) {
std::shared_lock lock(mTracksMutex); // read-only
if (auto it = mTracks.find(*mid); it != mTracks.end())
if (auto track = it->second.lock())
track->incoming(message);
if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
if (auto track = it->second.lock())
track->incoming(message);
}
}
return;
}
}

uint32_t ssrc = uint32_t(message->stream);
if (auto mid = getMidFromSsrc(ssrc)) {
std::shared_lock lock(mTracksMutex); // read-only
if (auto it = mTracks.find(*mid); it != mTracks.end())
if (auto track = it->second.lock())
track->incoming(message);

std::shared_lock lock(mTracksMutex); // read-only
if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
if (auto track = it->second.lock())
track->incoming(message);
} else {
/*
* TODO: So the problem is that when stop sending streams, we stop getting report blocks for
Expand All @@ -547,57 +546,6 @@ void PeerConnection::forwardMedia(message_ptr message) {
}
}

optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
return it->second;

{
std::lock_guard lock(mRemoteDescriptionMutex);
if (!mRemoteDescription)
return nullopt;

for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
if (auto found =
std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
return std::nullopt;
},
[&](Description::Media *media) -> optional<string> {
return media->hasSSRC(ssrc)
? std::make_optional(media->mid())
: nullopt;
}},
mRemoteDescription->media(i))) {

mMidFromSsrc.emplace(ssrc, *found);
return *found;
}
}
}
{
std::lock_guard lock(mLocalDescriptionMutex);
if (!mLocalDescription)
return nullopt;
for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
if (auto found =
std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
return std::nullopt;
},
[&](Description::Media *media) -> optional<string> {
return media->hasSSRC(ssrc)
? std::make_optional(media->mid())
: nullopt;
}},
mLocalDescription->media(i))) {

mMidFromSsrc.emplace(ssrc, *found);
return *found;
}
}
}

return nullopt;
}

void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
if (auto channel = findDataChannel(stream))
channel->triggerBufferedAmount(amount);
Expand Down Expand Up @@ -993,6 +941,8 @@ void PeerConnection::processLocalDescription(Description description) {
if (description.mediaCount() == 0)
throw std::logic_error("Local description has no media line");

updateTrackSsrcCache(description);

{
// Set as local description
std::lock_guard lock(mLocalDescriptionMutex);
Expand Down Expand Up @@ -1037,6 +987,8 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
}

void PeerConnection::processRemoteDescription(Description description) {
updateTrackSsrcCache(description);

{
// Set as remote description
std::lock_guard lock(mRemoteDescriptionMutex);
Expand Down Expand Up @@ -1218,4 +1170,39 @@ void PeerConnection::resetCallbacks() {
gatheringStateChangeCallback = nullptr;
}

void PeerConnection::updateTrackSsrcCache(const Description &description) {
std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc

// Setup SSRC -> Track mapping
for (unsigned int i = 0; i < description.mediaCount(); ++i)
std::visit( // ssrc -> track mapping
rtc::overloaded{
[&](Description::Application const *) { return; },
[&](Description::Media const *media) {
const auto ssrcs = media->getSSRCs();

// Note: We don't want to lock (or do any other lookups), if we
// already know there's no SSRCs to loop over.
if (ssrcs.size() <= 0) {
return;
}

std::shared_ptr<Track> track{nullptr};
if (auto it = mTracks.find(media->mid()); it != mTracks.end())
if (auto track_for_mid = it->second.lock())
track = track_for_mid;

if (!track) {
// Unable to find track for MID
return;
}

for (auto ssrc : ssrcs) {
mTracksBySsrc.emplace(ssrc, track);
}
},
},
description.media(i));
}

} // namespace rtc::impl
10 changes: 5 additions & 5 deletions src/impl/peerconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
void forwardMessage(message_ptr message);
void forwardMedia(message_ptr message);
void forwardBufferedAmount(uint16_t stream, size_t amount);
optional<string> getMidFromSsrc(uint32_t ssrc);

shared_ptr<DataChannel> emplaceDataChannel(string label, DataChannelInit init);
shared_ptr<DataChannel> findDataChannel(uint16_t stream);
Expand Down Expand Up @@ -126,6 +125,8 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
synchronized_callback<shared_ptr<rtc::Track>> trackCallback;

private:
void updateTrackSsrcCache(const Description &description);

const init_token mInitToken = Init::Instance().token();
const future_certificate_ptr mCertificate;

Expand All @@ -142,14 +143,13 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
std::vector<weak_ptr<DataChannel>> mUnassignedDataChannels;
std::shared_mutex mDataChannelsMutex;

std::unordered_map<string, weak_ptr<Track>> mTracks; // by mid
std::vector<weak_ptr<Track>> mTrackLines; // by SDP order
std::unordered_map<string, weak_ptr<Track>> mTracks; // by mid
std::unordered_map<uint32_t, weak_ptr<Track>> mTracksBySsrc; // by SSRC
std::vector<weak_ptr<Track>> mTrackLines; // by SDP order
std::shared_mutex mTracksMutex;

Queue<shared_ptr<DataChannel>> mPendingDataChannels;
Queue<shared_ptr<Track>> mPendingTracks;

std::unordered_map<uint32_t, string> mMidFromSsrc; // cache
};

} // namespace rtc::impl
Expand Down
19 changes: 17 additions & 2 deletions test/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void test_track() {

shared_ptr<Track> t2;
string newTrackMid;
Description::Video media;
pc2.onTrack([&t2, &newTrackMid](shared_ptr<Track> t) {
string mid = t->mid();
cout << "Track 2: Received track with mid \"" << mid << "\"" << endl;
Expand All @@ -99,7 +100,13 @@ void test_track() {

// Test opening a track
newTrackMid = "test";
auto t1 = pc1.addTrack(Description::Video(newTrackMid));

media = Description::Video(newTrackMid, Description::Direction::SendOnly);
media.addH264Codec(96);
media.setBitrate(3000);
media.addSSRC(1234, "video-send");

auto t1 = pc1.addTrack(media);

pc1.setLocalDescription();

Expand All @@ -117,7 +124,15 @@ void test_track() {

// Test renegotiation
newTrackMid = "added";
t1 = pc1.addTrack(Description::Video(newTrackMid));

media = Description::Video(newTrackMid, Description::Direction::SendOnly);
media.addH264Codec(96);
media.setBitrate(3000);
media.addSSRC(2468, "video-send");

// NOTE: Overwriting the old shared_ptr for t1 will cause it's respective
// track to be dropped (so it's SSRCs won't be on the description next time)
t1 = pc1.addTrack(media);

pc1.setLocalDescription();

Expand Down