From 0a75c7ec3d1058b59218ae04eef63db63d3ccb03 Mon Sep 17 00:00:00 2001 From: hongcha Date: Sat, 16 Dec 2023 16:48:46 +0800 Subject: [PATCH] remove subscribe track auto flush --- src/config.rs | 5 +- src/forward/forward_internal.rs | 134 ++------------------------------ src/forward/mod.rs | 12 ++- src/main.rs | 16 ++-- src/path/manager.rs | 6 +- 5 files changed, 32 insertions(+), 141 deletions(-) diff --git a/src/config.rs b/src/config.rs index 32a66dc8..6e4a95d6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,10 +28,7 @@ pub struct Account { } fn default_listen() -> String { - format!( - "[::]:{}", - env::var("PORT").unwrap_or(String::from("7777")) - ) + format!("[::]:{}", env::var("PORT").unwrap_or(String::from("7777"))) } fn default_ice_servers() -> Vec { diff --git a/src/forward/forward_internal.rs b/src/forward/forward_internal.rs index 7a118d6a..b4d6a793 100644 --- a/src/forward/forward_internal.rs +++ b/src/forward/forward_internal.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::sync::{Arc, Weak}; -use std::time::Duration; +use crate::media; +use crate::AppError; use anyhow::Result; use log::{debug, info}; use tokio::sync::mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedSender}; @@ -27,15 +28,10 @@ use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirecti use webrtc::rtp_transceiver::RTCRtpTransceiverInit; use webrtc::sdp::extmap::{SDES_MID_URI, SDES_RTP_STREAM_ID_URI}; use webrtc::sdp::MediaDescription; -use webrtc::stats::{RemoteInboundRTPStats, StatsReportType}; use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP; use webrtc::track::track_local::{TrackLocal, TrackLocalWriter}; use webrtc::track::track_remote::TrackRemote; -use crate::forward::track_match::{track_match_codec, track_sort}; -use crate::media; -use crate::AppError; - use super::rtcp::RtcpMessage; use super::track_match; @@ -157,7 +153,10 @@ impl PeerForwardInternal { pub(crate) async fn set_anchor(&self, peer: Arc) -> Result<()> { let mut anchor = self.anchor.write().await; if anchor.is_some() { - return Err(AppError::ResourceAlreadyExists("A connection has already been established".to_string()).into()); + return Err(AppError::ResourceAlreadyExists( + "A connection has already been established".to_string(), + ) + .into()); } info!("[{}] [anchor] set {}", self.id, peer.get_stats_id()); *anchor = Some(peer); @@ -188,20 +187,10 @@ impl PeerForwardInternal { let mut subscribe_peers = self.subscribe_group.write().await; subscribe_peers.push(PeerWrap(peer.clone())); drop(subscribe_peers); - if self.publish_is_svc().await { - tokio::spawn(Self::subscribe_track_flush( - Arc::downgrade(&peer), - self.anchor_track_forward_map.clone(), - )); - } info!("[{}] [subscribe] [{}] up", self.id, peer.get_stats_id()); Ok(()) } - pub async fn publish_is_svc(&self) -> bool { - self.publish_track_remotes(RTPCodecType::Video).await.len() > 1 - } - async fn publish_track_remotes(&self, code_type: RTPCodecType) -> Vec> { let anchor_track_forward_map = self.anchor_track_forward_map.read().await; let mut video_track_remotes = vec![]; @@ -213,117 +202,6 @@ impl PeerForwardInternal { video_track_remotes } - async fn subscribe_track_flush( - peer: Weak, - anchor_track_forward_map: Arc>>, - ) { - let mut pre_report: Option = None; - loop { - let timeout = tokio::time::sleep(Duration::from_secs(20)); - tokio::pin!(timeout); - let _ = timeout.as_mut().await; - if let Some(pc) = peer.upgrade() { - for (_, report) in pc.get_stats().await.reports { - if let StatsReportType::RemoteInboundRTP(remote_inbound) = report { - if RTPCodecType::from(remote_inbound.kind) != RTPCodecType::Video { - continue; - } - let mut packets_received = remote_inbound.packets_received; - let mut packets_lost = remote_inbound.packets_lost; - if let Some(pre_report) = &pre_report { - packets_received -= pre_report.packets_received; - packets_lost -= pre_report.packets_lost; - } - if packets_received == 0 { - continue; - } - let packet_loss_rate = packets_lost as f64 / packets_received as f64; - if (0.05..=0.2).contains(&packet_loss_rate) { - continue; - } - Self::subscribe_track_reallocate( - pc.clone(), - anchor_track_forward_map.clone(), - packet_loss_rate < 0.05, - ) - .await; - pre_report = Some(remote_inbound); - } - } - } else { - break; - } - } - } - - async fn subscribe_track_reallocate( - pc: Arc, - anchor_track_forward_map: Arc>>, - upgrade: bool, - ) { - let peer_wrap = PeerWrap(pc); - let anchor_track_forward_map = anchor_track_forward_map.read().await; - let tracks: Vec = anchor_track_forward_map - .keys() - .cloned() - .filter(|t| t.0.kind() == RTPCodecType::Video) - .collect(); - let mut original_track = None; - for track in tracks.iter() { - if let Some(track_forward) = anchor_track_forward_map.get(track) { - let subscribes = track_forward.subscription_group.read().await; - if subscribes.contains_key(&peer_wrap) { - original_track = Some(track.clone()); - break; - } - } - } - if let Some(original_track) = original_track { - let tracks: Vec> = tracks.into_iter().map(|t| t.0).collect(); - let mut tracks = track_match_codec(&[original_track.0.codec().capability], &tracks); - track_sort(&mut tracks); - let tracks: Vec = tracks.into_iter().map(TrackRemoteWrap).collect(); - let mut original_index = None; - for (index, item) in tracks.iter().enumerate() { - if item == &original_track { - original_index = Some(index); - break; - } - } - if original_index.is_none() { - return; - } - let original_index = original_index.unwrap(); - let target_index = if upgrade { - original_index + 1 - } else { - original_index - 1 - }; - if !(0..tracks.len()).contains(&target_index) { - return; - } - let target_track = tracks.get(target_index).unwrap(); - if let Some(sender) = anchor_track_forward_map - .get(&original_track) - .unwrap() - .subscription_group - .write() - .await - .remove(&peer_wrap) - { - let target_track_forward = anchor_track_forward_map.get(target_track).unwrap(); - target_track_forward - .subscription_group - .write() - .await - .insert(peer_wrap, sender); - let _ = target_track_forward - .rtcp_send - .try_send(RtcpMessage::PictureLossIndication); - } - } - } - pub async fn remove_subscribe(&self, peer: Arc) -> Result<()> { let peer_wrap = PeerWrap(peer.clone()); for (_, track_forward) in self.anchor_track_forward_map.write().await.iter() { diff --git a/src/forward/mod.rs b/src/forward/mod.rs index 84ab1b78..fefd3332 100644 --- a/src/forward/mod.rs +++ b/src/forward/mod.rs @@ -13,8 +13,8 @@ use webrtc::rtp_transceiver::rtp_codec::RTPCodecType; use webrtc::sdp::{MediaDescription, SessionDescription}; use crate::forward::forward_internal::{get_peer_key, PeerForwardInternal}; -use crate::{media, metrics}; use crate::AppError; +use crate::{media, metrics}; mod forward_internal; mod rtcp; @@ -39,11 +39,17 @@ impl PeerForward { offer: RTCSessionDescription, ) -> Result<(RTCSessionDescription, String)> { if self.internal.anchor_is_some().await { - return Err(AppError::ResourceAlreadyExists("A connection has already been established".to_string()).into()); + return Err(AppError::ResourceAlreadyExists( + "A connection has already been established".to_string(), + ) + .into()); } let _ = self.anchor_lock.lock().await; if self.internal.anchor_is_some().await { - return Err(AppError::ResourceAlreadyExists("A connection has already been established".to_string()).into()); + return Err(AppError::ResourceAlreadyExists( + "A connection has already been established".to_string(), + ) + .into()); } let peer = self .internal diff --git a/src/main.rs b/src/main.rs index 6415fda4..9e14d1c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,13 +12,13 @@ use axum::{ routing::post, Router, }; +use http::header::ToStrError; use log::info; +use thiserror::Error; #[cfg(debug_assertions)] use tower_http::services::{ServeDir, ServeFile}; use tower_http::validate_request::ValidateRequestHeaderLayer; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; -use thiserror::Error; -use http::header::ToStrError; use config::IceServer; use path::manager::Manager; @@ -292,9 +292,15 @@ pub enum AppError { impl IntoResponse for AppError { fn into_response(self) -> Response { match self { - AppError::ResourceNotFound(err) => (StatusCode::NOT_FOUND, err.to_string()).into_response(), - AppError::InternalServerError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response(), - AppError::ResourceAlreadyExists(err) => (StatusCode::CONFLICT, err.to_string()).into_response(), + AppError::ResourceNotFound(err) => { + (StatusCode::NOT_FOUND, err.to_string()).into_response() + } + AppError::InternalServerError(err) => { + (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response() + } + AppError::ResourceAlreadyExists(err) => { + (StatusCode::CONFLICT, err.to_string()).into_response() + } } } } diff --git a/src/path/manager.rs b/src/path/manager.rs index 1512cd53..ce5db736 100644 --- a/src/path/manager.rs +++ b/src/path/manager.rs @@ -53,7 +53,11 @@ impl Manager { if let Some(forward) = forward { forward.add_subscribe(offer).await } else { - Err(AppError::ResourceNotFound(("The requested resource not exist,please check the path and try again.").to_string()).into()) + Err(AppError::ResourceNotFound( + ("The requested resource not exist,please check the path and try again.") + .to_string(), + ) + .into()) } }