Skip to content

Commit

Permalink
feat: layer extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
hongcha98 committed Dec 18, 2023
1 parent aa5c4f7 commit b3b5344
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/dto/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod req;
7 changes: 7 additions & 0 deletions src/dto/req.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use serde::Deserialize;

#[derive(Deserialize)]
pub struct SelectLayer {
#[serde(rename = "encodingId")]
pub encoding_id: Option<String>,
}
61 changes: 60 additions & 1 deletion src/forward/forward_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
use webrtc::track::track_remote::TrackRemote;

use crate::forward::info::Layer;
use crate::media;
use crate::AppError;

Expand Down Expand Up @@ -219,7 +220,65 @@ impl PeerForwardInternal {
}
}
}
return Err(anyhow::anyhow!("anchor is none"));
Err(anyhow::anyhow!("anchor svc rids error"))
}

pub async fn select_layer(&self, key: String, layer: Option<Layer>) -> Result<()> {
let rid = if let Some(layer) = layer {
layer.encoding_id
} else {
self.publish_svc_rids().await?[0].clone()
};
let peer: Option<PeerWrap> = self
.subscribe_group
.read()
.await
.iter()
.filter(|p| p.get_key() == key)
.map(|p| p.clone())
.next();
if let Some(peer) = peer {
let anchor_track_forward_map = self.anchor_track_forward_map.write().await;
for (track_remote, track_forward) in anchor_track_forward_map.iter() {
if track_remote.0.rid() == rid && track_remote.0.kind() == RTPCodecType::Video {
for (track_remote_original, track_forward_original) in
anchor_track_forward_map.iter()
{
if track_remote_original.0.kind() != RTPCodecType::Video {
continue;
}
let mut subscription_group =
track_forward_original.subscription_group.write().await;
if subscription_group.contains_key(&peer) {
if track_remote_original.0.rid() == rid {
return Ok(());
}
let sender = subscription_group.remove(&peer).unwrap();
drop(subscription_group);
track_forward
.subscription_group
.write()
.await
.insert(peer.clone(), sender);
let _ = track_forward
.rtcp_send
.try_send(RtcpMessage::PictureLossIndication);
info!(
"[{}] [subscribe] [{}] select layer {} to {} ",
self.id,
peer.get_key(),
track_remote_original.0.rid(),
rid
);
return Ok(());
}
}
}
}
Err(anyhow::anyhow!("not found layer"))
} else {
Err(anyhow::anyhow!("not found key"))
}
}

pub async fn remove_subscribe(&self, peer: Arc<RTCPeerConnection>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion src/layer.rs → src/forward/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ use serde::{Deserialize, Serialize};
pub struct Layer {
#[serde(rename = "encodingId")]
pub encoding_id: String,
// TODO Other fields
}
11 changes: 10 additions & 1 deletion src/forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
use webrtc::sdp::{MediaDescription, SessionDescription};

use crate::forward::forward_internal::{get_peer_key, PeerForwardInternal};
use crate::layer::Layer;
use crate::forward::info::Layer;
use crate::AppError;
use crate::{media, metrics};

mod forward_internal;
pub mod info;
mod rtcp;
mod track_match;

Expand Down Expand Up @@ -170,7 +171,15 @@ impl PeerForward {
Err(anyhow::anyhow!("not layers"))
}
}

pub async fn select_layer(&self, key: String, layer: Option<Layer>) -> Result<()> {
if !self.internal.publish_is_svc().await {
return Err(anyhow::anyhow!("anchor svc is not enabled"));
}
self.internal.select_layer(key, layer).await
}
}

async fn peer_complete(
offer: RTCSessionDescription,
peer: Arc<RTCPeerConnection>,
Expand Down
54 changes: 50 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use std::sync::Arc;

use axum::http::{HeaderMap, Uri};
use axum::routing::get;
use axum::Json;
use axum::{
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::post,
Router,
};
use forward::info::Layer;
use http::header::ToStrError;
use log::info;
use thiserror::Error;
Expand All @@ -27,11 +29,12 @@ use {http::header, rust_embed::RustEmbed};

use crate::auth::ManyValidate;
use crate::config::Config;
use crate::dto::req::SelectLayer;

mod auth;
mod config;
mod dto;
mod forward;
mod layer;
mod media;
mod metrics;
mod path;
Expand Down Expand Up @@ -92,9 +95,13 @@ async fn main() {
post(whep)
.patch(add_ice_candidate)
.delete(remove_path_key)
.layer(auth_layer)
.layer(auth_layer.clone())
.options(ice_server_config),
)
.route(
"/whep/:id/layer",
get(get_layer).post(select_layer).layer(auth_layer),
)
.route("/metrics", get(metrics))
.with_state(app_state);
app = static_server(app);
Expand Down Expand Up @@ -195,8 +202,13 @@ async fn whep(
.header("E-Tag", key)
.header("Location", uri.to_string());
if state.paths.layers(id).await.is_ok() {
builder = builder.header("Link", format!("<{}/layer>; rel=\"urn:ietf:params:whep:ext:core:layer\"", uri.to_string()))
.header("Link", format!("<{}/sse_info>; rel=\"urn:ietf:params:whep:ext:core:server-sent-events\"; events=\"layers\"", uri.to_string()))
builder = builder.header(
"Link",
format!(
"<{}/layer>; rel=\"urn:ietf:params:whep:ext:core:layer\"",
uri.to_string()
),
)
}
Ok(builder.body(answer.sdp)?)
}
Expand Down Expand Up @@ -254,6 +266,40 @@ async fn ice_server_config(State(state): State<AppState>) -> AppResult<Response<
Ok(builder.body("".to_owned())?)
}

async fn get_layer(
State(state): State<AppState>,
Path(id): Path<String>,
) -> AppResult<Json<Vec<Layer>>> {
let layers = state.paths.layers(id).await?;
Ok(Json(layers))
}

async fn select_layer(
State(state): State<AppState>,
Path(id): Path<String>,
header: HeaderMap,
Json(layer): Json<SelectLayer>,
) -> AppResult<String> {
let key = header
.get("If-Match")
.ok_or(AppError::from(anyhow::anyhow!("If-Match is required")))?
.to_str()?
.to_string();
state
.paths
.select_layer(
id,
key,
if let Some(encoding_id) = layer.encoding_id {
Some(Layer { encoding_id })
} else {
None
},
)
.await?;
Ok("".to_string())
}

fn link_header(ice_servers: Vec<IceServer>) -> Vec<String> {
ice_servers
.into_iter()
Expand Down
18 changes: 17 additions & 1 deletion src/path/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use webrtc::{
peer_connection::sdp::session_description::RTCSessionDescription,
};

use crate::forward::info::Layer;
use crate::forward::PeerForward;
use crate::layer::Layer;
use crate::AppError;

#[derive(Clone)]
Expand Down Expand Up @@ -103,4 +103,20 @@ impl Manager {
Err(anyhow::anyhow!("resource not exists"))
}
}

pub async fn select_layer(
&self,
path: String,
key: String,
layer: Option<Layer>,
) -> Result<()> {
let paths = self.paths.read().await;
let forward = paths.get(&path).cloned();
drop(paths);
if let Some(forward) = forward {
forward.select_layer(key, layer).await
} else {
Err(anyhow::anyhow!("resource not exists"))
}
}
}

0 comments on commit b3b5344

Please sign in to comment.