Skip to content

Commit

Permalink
Merge pull request #107 from harlanc/add_new_http_apis
Browse files Browse the repository at this point in the history
Add new http apis
  • Loading branch information
harlanc authored Mar 15, 2024
2 parents 010e0d3 + 425a36d commit 7645471
Show file tree
Hide file tree
Showing 74 changed files with 1,426 additions and 712 deletions.
35 changes: 19 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions application/xiu/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased] - ReleaseDate

## [0.12.5]
- Support querying more detailed statistic data by adding two new HTTP APIs.
- Fix publishing RTSP stream error caused by network problem. by @bailb
- Fix the bug that stopping the playback of RTSP stream leads to push(publish) failure.
- Upgrade failure library.

## [0.12.4]
- Fix the failure in generating Docker images.

Expand Down
2 changes: 1 addition & 1 deletion application/xiu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ serde_derive = "1.0"
serde = { version = "1.0.101", optional = true, features = ["derive"] }
anyhow = "^1.0"
log = "0.4.0"
failure = "0.1.1"
failure = "0.1.8"
clap = "4.1.4"
libc = "0.2.139"
serde_json = { version = "1", default-features = false, features = [
Expand Down
149 changes: 107 additions & 42 deletions application/xiu/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
use {
anyhow::Result,
axum::{
extract::Query,
routing::{get, post},
Json, Router,
},
serde::Deserialize,
serde_json::Value,
std::sync::Arc,
streamhub::{define, define::StreamHubEventSender, utils::Uuid},
{
tokio,
tokio::sync::{mpsc, oneshot},
streamhub::{
define::{self, StreamHubEventSender},
stream::StreamIdentifier,
utils::Uuid,
},
tokio::{self, sync::oneshot},
};

#[derive(serde::Serialize)]
struct ApiResponse<T> {
error_code: i32,
desp: String,
data: T,
}

// the input to our `KickOffClient` handler
#[derive(Deserialize)]
struct KickOffClient {
id: String,
uuid: String,
}

#[derive(Deserialize, Debug)]
struct QueryWholeStreamsParams {
// query top N by subscriber's count.
top: Option<usize>,
}

#[derive(Deserialize)]
struct QueryStream {
identifier: StreamIdentifier,
// if specify uuid, then query the stream by uuid and filter no used data.
uuid: Option<String>,
}

#[derive(Clone)]
Expand All @@ -28,50 +51,89 @@ impl ApiService {
async fn root(&self) -> String {
String::from(
"Usage of xiu http api:
./get_stream_status(get) get audio and video stream statistic information.
./kick_off_client(post) kick off client by publish/subscribe id.\n",
./api/query_whole_streams(get) query whole streams' information or top streams' information.
./api/query_stream(post) query stream information by identifier and uuid.
./api/kick_off_client(post) kick off client by publish/subscribe id.\n",
)
}

async fn get_stream_status(&self) -> Result<String> {
let (data_sender, mut data_receiver) = mpsc::unbounded_channel();
let (size_sender, size_receiver) = oneshot::channel();
async fn query_whole_streams(
&self,
params: QueryWholeStreamsParams,
) -> Json<ApiResponse<Value>> {
log::info!("query_whole_streams: {:?}", params);
let (result_sender, result_receiver) = oneshot::channel();
let hub_event = define::StreamHubEvent::ApiStatistic {
data_sender,
size_sender,
top_n: params.top,
identifier: None,
uuid: None,
result_sender,
};
if let Err(err) = self.channel_event_producer.send(hub_event) {
log::error!("send api event error: {}", err);
}
let mut data = Vec::new();
match size_receiver.await {
Ok(size) => {
if size == 0 {
return Ok(String::from("no stream data"));
}
loop {
if let Some(stream_statistics) = data_receiver.recv().await {
data.push(stream_statistics);
}
if data.len() == size {
break;
}
}

match result_receiver.await {
Ok(dat_val) => {
let api_response = ApiResponse {
error_code: 0,
desp: String::from("succ"),
data: dat_val,
};
Json(api_response)
}
Err(err) => {
log::error!("start_api_service recv size error: {}", err);
let api_response = ApiResponse {
error_code: -1,
desp: String::from("failed"),
data: serde_json::json!(err.to_string()),
};
Json(api_response)
}
}
}

async fn query_stream(&self, stream: QueryStream) -> Json<ApiResponse<Value>> {
let uuid = if let Some(uid) = stream.uuid {
Uuid::from_str2(&uid)
} else {
None
};

if let Ok(data) = serde_json::to_string(&data) {
return Ok(data);
let (result_sender, result_receiver) = oneshot::channel();
let hub_event = define::StreamHubEvent::ApiStatistic {
top_n: None,
identifier: Some(stream.identifier),
uuid,
result_sender,
};

if let Err(err) = self.channel_event_producer.send(hub_event) {
log::error!("send api event error: {}", err);
}

Ok(String::from(""))
match result_receiver.await {
Ok(dat_val) => {
let api_response = ApiResponse {
error_code: 0,
desp: String::from("succ"),
data: dat_val,
};
Json(api_response)
}
Err(err) => {
let api_response = ApiResponse {
error_code: -1,
desp: String::from("failed"),
data: serde_json::json!(err.to_string()),
};
Json(api_response)
}
}
}

async fn kick_off_client(&self, id: KickOffClient) -> Result<String> {
let id_result = Uuid::from_str2(&id.id);
let id_result = Uuid::from_str2(&id.uuid);

if let Some(id) = id_result {
let hub_event = define::StreamHubEvent::ApiKickClient { id };
Expand All @@ -93,26 +155,29 @@ pub async fn run(producer: StreamHubEventSender, port: usize) {
let api_root = api.clone();
let root = move || async move { api_root.root().await };

let get_status = api.clone();
let status = move || async move {
match get_status.get_stream_status().await {
Ok(response) => response,
Err(_) => "error".to_owned(),
}
let api_query_streams = api.clone();
let query_streams = move |Query(params): Query<QueryWholeStreamsParams>| async move {
api_query_streams.query_whole_streams(params).await
};

let api_query_stream = api.clone();
let query_stream = move |Json(stream): Json<QueryStream>| async move {
api_query_stream.query_stream(stream).await
};

let kick_off = api.clone();
let kick = move |Json(id): Json<KickOffClient>| async move {
match kick_off.kick_off_client(id).await {
let api_kick_off = api.clone();
let kick_off = move |Json(id): Json<KickOffClient>| async move {
match api_kick_off.kick_off_client(id).await {
Ok(response) => response,
Err(_) => "error".to_owned(),
}
};

let app = Router::new()
.route("/", get(root))
.route("/get_stream_status", get(status))
.route("/kick_off_client", post(kick));
.route("/api/query_whole_streams", get(query_streams))
.route("/api/query_stream", post(query_stream))
.route("/api/kick_off_client", post(kick_off));

log::info!("Http api server listening on http://0.0.0.0:{}", port);
axum::Server::bind(&([0, 0, 0, 0], port as u16).into())
Expand Down
2 changes: 1 addition & 1 deletion confs/local/flv.Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ edition = "2018"
[dependencies]
byteorder = "1.4.2"
bytes = "1.0.0"
failure = "0.1.1"
failure = "0.1.8"
serde = { version = "1.0", features = ["derive", "rc"] }
log = "0.4"

Expand Down
Loading

0 comments on commit 7645471

Please sign in to comment.