Skip to content

Commit

Permalink
remove subscribe track auto flush
Browse files Browse the repository at this point in the history
  • Loading branch information
hongcha98 committed Dec 16, 2023
1 parent 98c1fd6 commit 0a75c7e
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 141 deletions.
5 changes: 1 addition & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ pub struct Account {
}

fn default_listen() -> String {
format!(
"[::]:{}",
env::var("PORT").unwrap_or(String::from("7777"))
)
format!("[::]:{}", env::var("PORT").unwrap_or(String::from("7777")))
}

fn default_ice_servers() -> Vec<IceServer> {
Expand Down
134 changes: 6 additions & 128 deletions src/forward/forward_internal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Weak};
use std::time::Duration;

use crate::media;
use crate::AppError;
use anyhow::Result;
use log::{debug, info};
use tokio::sync::mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedSender};
Expand All @@ -27,15 +28,10 @@ use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirecti
use webrtc::rtp_transceiver::RTCRtpTransceiverInit;
use webrtc::sdp::extmap::{SDES_MID_URI, SDES_RTP_STREAM_ID_URI};
use webrtc::sdp::MediaDescription;
use webrtc::stats::{RemoteInboundRTPStats, StatsReportType};
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
use webrtc::track::track_remote::TrackRemote;

use crate::forward::track_match::{track_match_codec, track_sort};
use crate::media;
use crate::AppError;

use super::rtcp::RtcpMessage;
use super::track_match;

Expand Down Expand Up @@ -157,7 +153,10 @@ impl PeerForwardInternal {
pub(crate) async fn set_anchor(&self, peer: Arc<RTCPeerConnection>) -> Result<()> {
let mut anchor = self.anchor.write().await;
if anchor.is_some() {
return Err(AppError::ResourceAlreadyExists("A connection has already been established".to_string()).into());
return Err(AppError::ResourceAlreadyExists(
"A connection has already been established".to_string(),
)
.into());
}
info!("[{}] [anchor] set {}", self.id, peer.get_stats_id());
*anchor = Some(peer);
Expand Down Expand Up @@ -188,20 +187,10 @@ impl PeerForwardInternal {
let mut subscribe_peers = self.subscribe_group.write().await;
subscribe_peers.push(PeerWrap(peer.clone()));
drop(subscribe_peers);
if self.publish_is_svc().await {
tokio::spawn(Self::subscribe_track_flush(
Arc::downgrade(&peer),
self.anchor_track_forward_map.clone(),
));
}
info!("[{}] [subscribe] [{}] up", self.id, peer.get_stats_id());
Ok(())
}

pub async fn publish_is_svc(&self) -> bool {
self.publish_track_remotes(RTPCodecType::Video).await.len() > 1
}

async fn publish_track_remotes(&self, code_type: RTPCodecType) -> Vec<Arc<TrackRemote>> {
let anchor_track_forward_map = self.anchor_track_forward_map.read().await;
let mut video_track_remotes = vec![];
Expand All @@ -213,117 +202,6 @@ impl PeerForwardInternal {
video_track_remotes
}

async fn subscribe_track_flush(
peer: Weak<RTCPeerConnection>,
anchor_track_forward_map: Arc<RwLock<HashMap<TrackRemoteWrap, TrackForward>>>,
) {
let mut pre_report: Option<RemoteInboundRTPStats> = None;
loop {
let timeout = tokio::time::sleep(Duration::from_secs(20));
tokio::pin!(timeout);
let _ = timeout.as_mut().await;
if let Some(pc) = peer.upgrade() {
for (_, report) in pc.get_stats().await.reports {
if let StatsReportType::RemoteInboundRTP(remote_inbound) = report {
if RTPCodecType::from(remote_inbound.kind) != RTPCodecType::Video {
continue;
}
let mut packets_received = remote_inbound.packets_received;
let mut packets_lost = remote_inbound.packets_lost;
if let Some(pre_report) = &pre_report {
packets_received -= pre_report.packets_received;
packets_lost -= pre_report.packets_lost;
}
if packets_received == 0 {
continue;
}
let packet_loss_rate = packets_lost as f64 / packets_received as f64;
if (0.05..=0.2).contains(&packet_loss_rate) {
continue;
}
Self::subscribe_track_reallocate(
pc.clone(),
anchor_track_forward_map.clone(),
packet_loss_rate < 0.05,
)
.await;
pre_report = Some(remote_inbound);
}
}
} else {
break;
}
}
}

async fn subscribe_track_reallocate(
pc: Arc<RTCPeerConnection>,
anchor_track_forward_map: Arc<RwLock<HashMap<TrackRemoteWrap, TrackForward>>>,
upgrade: bool,
) {
let peer_wrap = PeerWrap(pc);
let anchor_track_forward_map = anchor_track_forward_map.read().await;
let tracks: Vec<TrackRemoteWrap> = anchor_track_forward_map
.keys()
.cloned()
.filter(|t| t.0.kind() == RTPCodecType::Video)
.collect();
let mut original_track = None;
for track in tracks.iter() {
if let Some(track_forward) = anchor_track_forward_map.get(track) {
let subscribes = track_forward.subscription_group.read().await;
if subscribes.contains_key(&peer_wrap) {
original_track = Some(track.clone());
break;
}
}
}
if let Some(original_track) = original_track {
let tracks: Vec<Arc<TrackRemote>> = tracks.into_iter().map(|t| t.0).collect();
let mut tracks = track_match_codec(&[original_track.0.codec().capability], &tracks);
track_sort(&mut tracks);
let tracks: Vec<TrackRemoteWrap> = tracks.into_iter().map(TrackRemoteWrap).collect();
let mut original_index = None;
for (index, item) in tracks.iter().enumerate() {
if item == &original_track {
original_index = Some(index);
break;
}
}
if original_index.is_none() {
return;
}
let original_index = original_index.unwrap();
let target_index = if upgrade {
original_index + 1
} else {
original_index - 1
};
if !(0..tracks.len()).contains(&target_index) {
return;
}
let target_track = tracks.get(target_index).unwrap();
if let Some(sender) = anchor_track_forward_map
.get(&original_track)
.unwrap()
.subscription_group
.write()
.await
.remove(&peer_wrap)
{
let target_track_forward = anchor_track_forward_map.get(target_track).unwrap();
target_track_forward
.subscription_group
.write()
.await
.insert(peer_wrap, sender);
let _ = target_track_forward
.rtcp_send
.try_send(RtcpMessage::PictureLossIndication);
}
}
}

pub async fn remove_subscribe(&self, peer: Arc<RTCPeerConnection>) -> Result<()> {
let peer_wrap = PeerWrap(peer.clone());
for (_, track_forward) in self.anchor_track_forward_map.write().await.iter() {
Expand Down
12 changes: 9 additions & 3 deletions src/forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
use webrtc::sdp::{MediaDescription, SessionDescription};

use crate::forward::forward_internal::{get_peer_key, PeerForwardInternal};
use crate::{media, metrics};
use crate::AppError;
use crate::{media, metrics};

mod forward_internal;
mod rtcp;
Expand All @@ -39,11 +39,17 @@ impl PeerForward {
offer: RTCSessionDescription,
) -> Result<(RTCSessionDescription, String)> {
if self.internal.anchor_is_some().await {
return Err(AppError::ResourceAlreadyExists("A connection has already been established".to_string()).into());
return Err(AppError::ResourceAlreadyExists(
"A connection has already been established".to_string(),
)
.into());
}
let _ = self.anchor_lock.lock().await;
if self.internal.anchor_is_some().await {
return Err(AppError::ResourceAlreadyExists("A connection has already been established".to_string()).into());
return Err(AppError::ResourceAlreadyExists(
"A connection has already been established".to_string(),
)
.into());
}
let peer = self
.internal
Expand Down
16 changes: 11 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ use axum::{
routing::post,
Router,
};
use http::header::ToStrError;
use log::info;
use thiserror::Error;
#[cfg(debug_assertions)]
use tower_http::services::{ServeDir, ServeFile};
use tower_http::validate_request::ValidateRequestHeaderLayer;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use thiserror::Error;
use http::header::ToStrError;

use config::IceServer;
use path::manager::Manager;
Expand Down Expand Up @@ -292,9 +292,15 @@ pub enum AppError {
impl IntoResponse for AppError {
fn into_response(self) -> Response {
match self {
AppError::ResourceNotFound(err) => (StatusCode::NOT_FOUND, err.to_string()).into_response(),
AppError::InternalServerError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response(),
AppError::ResourceAlreadyExists(err) => (StatusCode::CONFLICT, err.to_string()).into_response(),
AppError::ResourceNotFound(err) => {
(StatusCode::NOT_FOUND, err.to_string()).into_response()
}
AppError::InternalServerError(err) => {
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response()
}
AppError::ResourceAlreadyExists(err) => {
(StatusCode::CONFLICT, err.to_string()).into_response()
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/path/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ impl Manager {
if let Some(forward) = forward {
forward.add_subscribe(offer).await
} else {
Err(AppError::ResourceNotFound(("The requested resource not exist,please check the path and try again.").to_string()).into())
Err(AppError::ResourceNotFound(
("The requested resource not exist,please check the path and try again.")
.to_string(),
)
.into())
}
}

Expand Down

0 comments on commit 0a75c7e

Please sign in to comment.