Skip to content

Commit

Permalink
liveman: use nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
hongcha98 committed Dec 8, 2024
1 parent b4bd8bc commit cd13afe
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
32 changes: 24 additions & 8 deletions liveman/src/route/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use axum::{
routing::{delete, get, post},
Router,
};
use axum_extra::extract::Query;
use http::{header, HeaderValue, Uri};
use serde::{Deserialize, Serialize};
use tracing::{debug, error, warn, Span};

use api::response::Stream;
Expand All @@ -15,6 +17,12 @@ use crate::route::stream;
use crate::store::Server;
use crate::{error::AppError, result::Result, AppState};

#[derive(Serialize, Deserialize, Clone)]
pub struct QueryExtract {
#[serde(default)]
pub nodes: Vec<String>,
}

pub fn route() -> Router<AppState> {
Router::new()
.route(&api::path::whip(":stream"), post(whip))
Expand Down Expand Up @@ -73,23 +81,27 @@ async fn api_whep(
async fn whip(
State(mut state): State<AppState>,
Path(stream): Path<String>,
Query(query_extract): Query<QueryExtract>,
req: Request,
) -> Result<Response> {
let stream_nodes = state.storage.stream_get(stream.clone()).await?;
debug!("{:?}", stream_nodes);
let target = match stream_nodes.is_empty() {
true => {
let nodes = state.storage.nodes().await;
let mut nodes = state.storage.nodes().await;
warn!("{:?}", nodes);
if !query_extract.nodes.is_empty() {
nodes.retain(|x| query_extract.nodes.contains(&x.alias));
}
maximum_idle_node(state.clone(), nodes, stream.clone()).await
}
false => match stream_nodes.first() {
Some(node) => Some(node.clone()),
None => {
error!("WHIP Error: No available node");
None
false => {
let mut nodes = stream_nodes.clone();
if !query_extract.nodes.is_empty() {
nodes.retain(|x| query_extract.nodes.contains(&x.alias));
}
},
nodes.first().cloned()
}
};

match target {
Expand Down Expand Up @@ -133,9 +145,13 @@ async fn whip(
async fn whep(
State(mut state): State<AppState>,
Path(stream): Path<String>,
Query(query_extract): Query<QueryExtract>,
req: Request,
) -> Result<Response> {
let servers = state.storage.stream_get(stream.clone()).await.unwrap();
let mut servers = state.storage.stream_get(stream.clone()).await.unwrap();
if !query_extract.nodes.is_empty() {
servers.retain(|x| query_extract.nodes.contains(&x.alias));
}
if servers.is_empty() {
debug!("whep servers is empty");
return Err(AppError::ResourceNotFound);
Expand Down
11 changes: 10 additions & 1 deletion liveman/src/route/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ use axum::{
response::Response,
Json,
};
use axum_extra::extract::Query;
use http::{header, StatusCode};
use tracing::warn;

use api::response::Stream;

use crate::{error::AppError, result::Result, AppState};

use super::proxy::QueryExtract;

fn get_map_server_stream(map_info: HashMap<String, Vec<Stream>>) -> HashMap<String, Stream> {
let mut map_server_stream = HashMap::new();
for (alias, streams) in map_info.iter() {
Expand All @@ -22,13 +25,19 @@ fn get_map_server_stream(map_info: HashMap<String, Vec<Stream>>) -> HashMap<Stri
map_server_stream
}

pub async fn index(State(mut state): State<AppState>) -> Result<Json<Vec<api::response::Stream>>> {
pub async fn index(
State(mut state): State<AppState>,
Query(query_extract): Query<QueryExtract>,
) -> Result<Json<Vec<api::response::Stream>>> {
let map_server_stream = get_map_server_stream(state.storage.info_raw_all().await.unwrap());

let streams = state.storage.stream_all().await;
let mut result_streams: HashMap<String, Stream> = HashMap::new();
for (stream_id, servers) in streams.into_iter() {
for server_alias in servers.iter() {
if !query_extract.nodes.is_empty() && !query_extract.nodes.contains(server_alias) {
continue;
}
let alias = format!("{}:{}", server_alias, stream_id);
match map_server_stream.get(&alias) {
Some(s) => {
Expand Down

0 comments on commit cd13afe

Please sign in to comment.