Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ms 274 live queueing video frames #60

Merged
merged 11 commits into from
Nov 21, 2022
Merged
84 changes: 70 additions & 14 deletions lib/membrane/compositor_element.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
defmodule Membrane.VideoCompositor.CompositorElement do
@moduledoc """
The element responsible for placing the first received frame
above the other and sending forward buffer with
merged frame binary in the payload.
The element responsible for composing frames.

It is capable of operating in one of two modes:

* offline compositing:
The compositor will wait for all videos to have a recent enough frame available and then perform the compositing.

* real-time compositing:
In this mode, if the compositor will start a timer ticking every spf (seconds per frame). The timer is reset every time a frame is produced.
If the compositor doesn't have all frames ready by the time the timer ticks, it will produce a frame anyway, using old frames as fallback in cases when a current frame is not available.
If the frames arrive later, they will be dropped. The newest dropped frame will become the new fallback frame.

"""

use Membrane.Filter
alias Membrane.Buffer
alias Membrane.RawVideo
alias Membrane.VideoCompositor.Wgpu

def_options caps: [
type: RawVideo,
spec: RawVideo.t(),
description: "Struct with video width, height, framerate and pixel format."
],
real_time: [
spec: boolean(),
description: """
Set the compositor to real-time mode.
""",
default: false
]

def_input_pad :input,
Expand Down Expand Up @@ -41,6 +58,7 @@ defmodule Membrane.VideoCompositor.CompositorElement do
state = %{
video_positions_waiting_for_caps: %{},
caps: options.caps,
real_time: options.real_time,
wgpu_state: wgpu_state,
pads_to_ids: %{},
new_pad_id: 0
Expand All @@ -51,7 +69,25 @@ defmodule Membrane.VideoCompositor.CompositorElement do

@impl true
def handle_prepared_to_playing(_ctx, state) do
{{:ok, caps: {:output, state.caps}}, state}
spf = spf_from_framerate(state.caps.framerate)

actions =
if state.real_time do
[start_timer: {:render_frame, spf}, caps: {:output, state.caps}]
else
[caps: {:output, state.caps}]
end

{{:ok, actions}, state}
end

@impl true
def handle_tick(:render_frame, _ctx, state) do
{:ok, {frame, pts}} = Wgpu.force_render(state.wgpu_state)

actions = [buffer: {:output, %Buffer{payload: frame, pts: pts}}]

{{:ok, actions}, state}
end

@impl true
Expand Down Expand Up @@ -116,10 +152,12 @@ defmodule Membrane.VideoCompositor.CompositorElement do
{
{
:ok,
buffer: {
:output,
%Membrane.Buffer{payload: frame, pts: pts}
}
[
buffer: {
:output,
%Membrane.Buffer{payload: frame, pts: pts}
}
] ++ restart_timer_action_if_necessary(state)
},
state
}
Expand All @@ -129,6 +167,20 @@ defmodule Membrane.VideoCompositor.CompositorElement do
end
end

defp spf_from_framerate({frames, seconds}) do
Ratio.new(frames, seconds)
end

defp restart_timer_action_if_necessary(state) do
spf = spf_from_framerate(state.caps.framerate)

if state.real_time do
[stop_timer: :render_frame, start_timer: {:render_frame, spf}]
else
[]
end
end

@impl true
def handle_end_of_stream(
pad,
Expand All @@ -140,11 +192,15 @@ defmodule Membrane.VideoCompositor.CompositorElement do

:ok = Wgpu.send_end_of_stream(wgpu_state, id)

if all_input_pads_received_end_of_stream?(context.pads) do
{{:ok, end_of_stream: :output}, state}
else
{:ok, state}
end
actions =
if all_input_pads_received_end_of_stream?(context.pads) do
stop = if state.real_time, do: [stop_timer: :render_frame], else: []
[end_of_stream: :output] ++ stop
else
[]
end

{{:ok, actions}, state}
end

defp all_input_pads_received_end_of_stream?(pads) do
Expand Down
7 changes: 6 additions & 1 deletion lib/membrane/video_compositor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ defmodule Membrane.VideoCompositor do
def_options caps: [
spec: RawVideo.t(),
description: "Caps for the output video of the compositor"
],
real_time: [
spec: boolean(),
description: "Set compositor into real_time mode",
default: false
]

def_input_pad :input,
Expand All @@ -34,7 +39,7 @@ defmodule Membrane.VideoCompositor do
@impl true
def handle_init(options) do
children = %{
compositor: %CompositorElement{caps: options.caps}
compositor: %CompositorElement{caps: options.caps, real_time: options.real_time}
}

links = [
Expand Down
5 changes: 5 additions & 0 deletions native/membrane_videocompositor/src/compositor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ impl State {

/// This returns the pts of the new frame
pub async fn draw_into(&mut self, output_buffer: &mut [u8]) -> u64 {
let interval = self.frame_interval();
self.input_videos
.values_mut()
.for_each(|v| v.remove_stale_frames(interval));

let mut encoder = self
.device
.create_command_encoder(&wgpu::CommandEncoderDescriptor {
Expand Down
18 changes: 18 additions & 0 deletions native/membrane_videocompositor/src/compositor/videos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ impl InputVideo {
DrawResult::Rendered(pts)
}

pub fn remove_stale_frames(&mut self, interval: Option<(u64, u64)>) {
while self.is_front_frame_too_old(interval) {
self.pop_frame();
}
}

pub fn pop_frame(&mut self) {
if let Some(Message::Frame { pts, frame }) = self.frames.pop_front() {
self.previous_frame = Some(Message::Frame { pts, frame });
Expand All @@ -230,6 +236,18 @@ impl InputVideo {
self.frames.push_back(Message::EndOfStream);
}

pub fn is_front_frame_too_old(&self, interval: Option<(u64, u64)>) -> bool {
if let Some(Message::EndOfStream) = self.frames.front() {
return false;
}

if interval.is_none() || self.front_pts().is_none() {
return false;
}

self.front_pts().is_some() && interval.unwrap().0 > self.front_pts().unwrap()
}

pub fn is_frame_ready(&self, interval: Option<(u64, u64)>) -> bool {
if let Some(Message::EndOfStream) = self.frames.front() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ defmodule Membrane.VideoCompositor.Pipeline.ComposeMultipleInputs do
decoder = options.decoder
decoder_name = String.to_atom("decoder_#{i}")

input_filter = options.input_filter
input_filter_name = String.to_atom("input_filter_#{i}")

link(source_name, source)
|> then(if not is_nil(decoder), do: &to(&1, decoder_name, decoder), else: & &1)
|> then(if not is_nil(input_filter), do: &to(&1, input_filter_name, input_filter), else: & &1)
|> via_in(:input, options: [position: position])
|> to(:compositor)
end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@ defmodule Membrane.VideoCompositor.Pipeline.Utils.Options do
"""
@type encoder_t :: Membrane.Filter.t() | nil

@typedoc """
An additional plugin that sits between the decoder (or source if there is no decoder) and the compositor.
"""
@type input_filter_t :: Membrane.Filter.t() | nil

@type t() :: %__MODULE__{
inputs: inputs_t(),
output: output_t(),
caps: caps_t(),
compositor: compositor_t(),
decoder: decoder_t(),
encoder: encoder_t()
encoder: encoder_t(),
input_filter: input_filter_t()
}
@enforce_keys [:inputs, :output, :caps]
defstruct [:inputs, :output, :caps, :compositor, :decoder, :encoder]
defstruct [:inputs, :output, :caps, :compositor, :decoder, :encoder, :input_filter]
end
68 changes: 68 additions & 0 deletions test/pipeline_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Membrane.VideoCompositor.Test.Pipeline do
alias Membrane.RawVideo
alias Membrane.Testing.Pipeline, as: TestingPipeline
alias Membrane.VideoCompositor.Test.Support.Pipeline.H264, as: PipelineH264
alias Membrane.VideoCompositor.Test.Support.Pipeline.PacketLoss, as: PipelinePacketLoss
alias Membrane.VideoCompositor.Test.Support.Utils

@hd_video %RawVideo{
Expand Down Expand Up @@ -91,4 +92,71 @@ defmodule Membrane.VideoCompositor.Test.Pipeline do
assert_end_of_stream(pid, :sink, :input, 1_000_000)
TestingPipeline.terminate(pid, blocking?: true)
end

describe "Checks packet loss pipeline on" do
@describetag :tmp_dir

@tag wgpu: true
test "2s 720p 30fps video", %{tmp_dir: tmp_dir} do
test_h264_pipeline_with_packet_loss(
@hd_video,
2,
"short_videos",
tmp_dir
)
end

@tag wgpu: true
test "1s 1080p 30fps video", %{tmp_dir: tmp_dir} do
test_h264_pipeline_with_packet_loss(
@full_hd_video,
1,
"short_videos",
tmp_dir
)
end
end

defp test_h264_pipeline_with_packet_loss(
video_caps,
duration,
sub_dir_name,
tmp_dir
) do
alias Membrane.VideoCompositor.Pipeline.Utils.{InputStream, Options}

{input_path, output_path, _ref_file_name} =
Utils.prepare_testing_video(video_caps, duration, "h264", tmp_dir, sub_dir_name)

positions = [
{0, 0},
{div(video_caps.width, 2), 0},
{0, div(video_caps.height, 2)},
{div(video_caps.width, 2), div(video_caps.height, 2)}
]

inputs =
for pos <- positions,
do: %InputStream{
position: pos,
caps: video_caps,
input: input_path
}

out_caps = %RawVideo{video_caps | width: video_caps.width * 2, height: video_caps.height * 2}

options = %Options{
inputs: inputs,
output: output_path,
caps: out_caps
}

assert {:ok, pid} =
TestingPipeline.start_link(module: PipelinePacketLoss, custom_args: options)

assert_pipeline_playback_changed(pid, _, :playing)

assert_end_of_stream(pid, :sink, :input, 1_000_000)
TestingPipeline.terminate(pid, blocking?: true)
end
end
Loading