Skip to content

Commit

Permalink
enhanced error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
brzep committed Nov 7, 2024
1 parent 1fd3d93 commit e286d6d
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 96 deletions.
11 changes: 10 additions & 1 deletion compositor_pipeline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use compositor_render::{
InputId, OutputId,
};

use crate::pipeline::{decoder::AacDecoderError, VideoCodec};
use crate::pipeline::{decoder::AacDecoderError, output::whip, VideoCodec};
use fdk_aac_sys as fdk;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -93,6 +93,15 @@ pub enum OutputInitError {

#[error("Failed to register output. FFmpeg error: {0}.")]
FfmpegMp4Error(ffmpeg_next::Error),

#[error("Unkown Whip output error, channel unexpectedly closed")]
UnknownWhipError,

#[error("Whip init timeout exceeded")]
WhipInitTimeout,

#[error("Failed to init whip output: {0}")]
WhipInitError(Box<whip::WhipError>),
}

#[derive(Debug, thiserror::Error)]
Expand Down
16 changes: 10 additions & 6 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use input::RawDataInputOptions;
use output::EncodedDataOutputOptions;
use output::OutputOptions;
use output::RawDataOutputOptions;
use pipeline_output::register_pipeline_output;
use tokio::runtime::Runtime;
use tracing::{error, info, trace, warn};
use types::RawDataSender;
Expand Down Expand Up @@ -254,11 +255,12 @@ impl Pipeline {
}

pub fn register_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<OutputOptions>,
) -> Result<Option<Port>, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand All @@ -267,11 +269,12 @@ impl Pipeline {
}

pub fn register_encoded_data_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<EncodedDataOutputOptions>,
) -> Result<Receiver<EncoderOutputEvent>, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand All @@ -280,11 +283,12 @@ impl Pipeline {
}

pub fn register_raw_data_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<RawDataOutputOptions>,
) -> Result<RawDataReceiver, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand Down
41 changes: 37 additions & 4 deletions compositor_pipeline/src/pipeline/output/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ use init_peer_connection::init_pc;
use packet_stream::PacketStream;
use payloader::{Payload, Payloader, PayloadingError};
use reqwest::{Method, StatusCode, Url};
use std::sync::{atomic::AtomicBool, Arc};
use std::{
sync::{atomic::AtomicBool, Arc},
thread,
time::{Duration, SystemTime},
};
use tokio::sync::oneshot;
use tracing::{debug, error, span, Instrument, Level};
use url::ParseError;
use webrtc::{
Expand Down Expand Up @@ -88,6 +93,8 @@ pub enum WhipError {
PayloadingError(#[from] PayloadingError),
}

const WHIP_INIT_TIMEOUT: Duration = Duration::from_secs(60);

impl WhipSender {
pub fn new(
output_id: &OutputId,
Expand All @@ -111,6 +118,9 @@ impl WhipSender {
let tokio_rt_clone = tokio_rt.clone();
let output_id_clone = output_id.clone();

let (init_confirmation_sender, mut init_confirmation_receiver) =
oneshot::channel::<Result<(), WhipError>>();

tokio_rt.spawn(
async move {
run_whip_sender_thread(
Expand All @@ -120,6 +130,7 @@ impl WhipSender {
packet_stream,
request_keyframe_sender,
tokio_rt_clone,
init_confirmation_sender,
)
.await;
event_emitter.emit(Event::OutputDone(output_id_clone));
Expand All @@ -132,6 +143,27 @@ impl WhipSender {
)),
);

let start_time = SystemTime::now();
loop {
thread::sleep(Duration::from_secs(5));
let elapsed_time = SystemTime::now().duration_since(start_time).unwrap();
if elapsed_time > WHIP_INIT_TIMEOUT {
return Err(OutputInitError::WhipInitTimeout);
}
match init_confirmation_receiver.try_recv() {
Ok(result) => match result {
Ok(_) => break,
Err(err) => return Err(OutputInitError::WhipInitError(err.into())),
},
Err(err) => match err {
oneshot::error::TryRecvError::Closed => {
return Err(OutputInitError::UnknownWhipError)
}
oneshot::error::TryRecvError::Empty => {}
},
};
}

Ok(Self {
connection_options: options,
should_close,
Expand All @@ -153,6 +185,7 @@ async fn run_whip_sender_thread(
packet_stream: PacketStream,
request_keyframe_sender: Option<Sender<()>>,
tokio_rt: Arc<tokio::runtime::Runtime>,
init_confirmation_sender: oneshot::Sender<Result<(), WhipError>>,
) {
let client = reqwest::Client::new();
let peer_connection: Arc<RTCPeerConnection>;
Expand All @@ -165,7 +198,7 @@ async fn run_whip_sender_thread(
audio_track = audio;
}
Err(err) => {
error!("Error occured while initializing peer connection: {err}");
init_confirmation_sender.send(Err(err)).unwrap();
return;
}
}
Expand All @@ -182,10 +215,11 @@ async fn run_whip_sender_thread(
{
Ok(val) => val,
Err(err) => {
error!("{err}");
init_confirmation_sender.send(Err(err)).unwrap();
return;
}
};
init_confirmation_sender.send(Ok(())).unwrap();

for chunk in packet_stream {
if should_close.load(std::sync::atomic::Ordering::Relaxed) {
Expand Down Expand Up @@ -223,5 +257,4 @@ async fn run_whip_sender_thread(
}
}
let _ = client.delete(whip_session_url).send().await;
// });
}
110 changes: 53 additions & 57 deletions compositor_pipeline/src/pipeline/pipeline_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,73 +34,69 @@ pub(super) enum OutputSender<T> {
FinishedSender,
}

impl Pipeline {
pub(super) fn register_pipeline_output<NewOutputResult>(
&mut self,
output_id: OutputId,
output_options: &dyn OutputOptionsExt<NewOutputResult>,
video: Option<OutputVideoOptions>,
audio: Option<OutputAudioOptions>,
) -> Result<NewOutputResult, RegisterOutputError> {
let (has_video, has_audio) = (video.is_some(), audio.is_some());
if !has_video && !has_audio {
return Err(RegisterOutputError::NoVideoAndAudio(output_id));
}
pub(super) fn register_pipeline_output<NewOutputResult>(
pipeline: &Arc<Mutex<Pipeline>>,
output_id: OutputId,
output_options: &dyn OutputOptionsExt<NewOutputResult>,
video: Option<OutputVideoOptions>,
audio: Option<OutputAudioOptions>,
) -> Result<NewOutputResult, RegisterOutputError> {
let (has_video, has_audio) = (video.is_some(), audio.is_some());
if !has_video && !has_audio {
return Err(RegisterOutputError::NoVideoAndAudio(output_id));
}

if self.outputs.contains_key(&output_id) {
return Err(RegisterOutputError::AlreadyRegistered(output_id));
}
if pipeline.lock().unwrap().outputs.contains_key(&output_id) {
return Err(RegisterOutputError::AlreadyRegistered(output_id));
}

let (output, output_result) = output_options.new_output(&output_id, &self.ctx)?;
let pipeline_ctx = pipeline.lock().unwrap().ctx.clone();

let output = PipelineOutput {
output,
audio_end_condition: audio.as_ref().map(|audio| {
PipelineOutputEndConditionState::new_audio(
audio.end_condition.clone(),
&self.inputs,
)
}),
video_end_condition: video.as_ref().map(|video| {
PipelineOutputEndConditionState::new_video(
video.end_condition.clone(),
&self.inputs,
)
}),
};
let (output, output_result) = output_options.new_output(&output_id, &pipeline_ctx)?;

if let (Some(video_opts), Some(resolution), Some(format)) = (
video.clone(),
output.output.resolution(),
output.output.output_frame_format(),
) {
let result = self.renderer.update_scene(
output_id.clone(),
resolution,
format,
video_opts.initial,
);
let mut guard = pipeline.lock().unwrap();

if let Err(err) = result {
self.renderer.unregister_output(&output_id);
return Err(RegisterOutputError::SceneError(output_id.clone(), err));
}
};
let output = PipelineOutput {
output,
audio_end_condition: audio.as_ref().map(|audio| {
PipelineOutputEndConditionState::new_audio(audio.end_condition.clone(), &guard.inputs)
}),
video_end_condition: video.as_ref().map(|video| {
PipelineOutputEndConditionState::new_video(video.end_condition.clone(), &guard.inputs)
}),
};

if let Some(audio_opts) = audio.clone() {
self.audio_mixer.register_output(
output_id.clone(),
audio_opts.initial,
audio_opts.mixing_strategy,
audio_opts.channels,
);
}
if let (Some(video_opts), Some(resolution), Some(format)) = (
video.clone(),
output.output.resolution(),
output.output.output_frame_format(),
) {
let result =
guard
.renderer
.update_scene(output_id.clone(), resolution, format, video_opts.initial);

self.outputs.insert(output_id.clone(), output);
if let Err(err) = result {
guard.renderer.unregister_output(&output_id);
return Err(RegisterOutputError::SceneError(output_id.clone(), err));
}
};

Ok(output_result)
if let Some(audio_opts) = audio.clone() {
guard.audio_mixer.register_output(
output_id.clone(),
audio_opts.initial,
audio_opts.mixing_strategy,
audio_opts.channels,
);
}

guard.outputs.insert(output_id.clone(), output);

Ok(output_result)
}

impl Pipeline {
pub(super) fn all_output_video_senders_iter(
pipeline: &Arc<Mutex<Pipeline>>,
) -> impl Iterator<Item = (OutputId, OutputSender<Sender<PipelineEvent<Frame>>>)> {
Expand Down
9 changes: 3 additions & 6 deletions integration_tests/examples/encoded_channel_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,9 @@ fn main() {

Pipeline::register_input(&state.pipeline, input_id.clone(), input_options).unwrap();

let output_receiver = state
.pipeline
.lock()
.unwrap()
.register_encoded_data_output(output_id.clone(), output_options)
.unwrap();
let output_receiver =
Pipeline::register_encoded_data_output(&state.pipeline, output_id.clone(), output_options)
.unwrap();

Pipeline::start(&state.pipeline);

Expand Down
6 changes: 1 addition & 5 deletions integration_tests/examples/raw_channel_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ fn main() {
)
.unwrap();

pipeline
.lock()
.unwrap()
.register_output(output_id.clone(), output_options)
.unwrap();
Pipeline::register_output(&pipeline, output_id.clone(), output_options).unwrap();

let frames = generate_frames(&wgpu_device, &wgpu_queue);

Expand Down
7 changes: 2 additions & 5 deletions integration_tests/examples/raw_channel_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,8 @@ fn main() {

Pipeline::register_input(&pipeline, input_id.clone(), input_options).unwrap();

let RawDataReceiver { video, audio } = pipeline
.lock()
.unwrap()
.register_raw_data_output(output_id.clone(), output_options)
.unwrap();
let RawDataReceiver { video, audio } =
Pipeline::register_raw_data_output(&pipeline, output_id.clone(), output_options).unwrap();

Pipeline::start(&pipeline);

Expand Down
6 changes: 1 addition & 5 deletions integration_tests/examples/vulkan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,7 @@ fn client_code() -> Result<()> {
audio: None,
};

pipeline
.lock()
.unwrap()
.register_output(OutputId("output_1".into()), output_options)
.unwrap();
Pipeline::register_output(&pipeline, OutputId("output_1".into()), output_options).unwrap();

Pipeline::start(&pipeline);

Expand Down
5 changes: 1 addition & 4 deletions integration_tests/examples/whip_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use compositor_api::types::Resolution;
use serde_json::json;
use std::{thread::sleep, time::Duration};
use std::time::Duration;

use integration_tests::examples::{self, run_example};

Expand Down Expand Up @@ -69,8 +69,5 @@ fn client_code() -> Result<()> {

examples::post("start", &json!({}))?;

sleep(Duration::from_secs(300));
examples::post("output/output_1/unregister", &json!({}))?;

Ok(())
}
Loading

0 comments on commit e286d6d

Please sign in to comment.