Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ms 274 live queueing video frames #60

Merged
merged 11 commits into from
Nov 21, 2022
Merged
85 changes: 71 additions & 14 deletions lib/membrane/compositor_element.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
defmodule Membrane.VideoCompositor.CompositorElement do
@moduledoc """
The element responsible for placing the first received frame
above the other and sending forward buffer with
merged frame binary in the payload.
The element responsible for composing frames.

It is capable of operating in one of two modes:

* offline compositing:
The compositor will wait for all videos to have a recent enough frame available and then perform the compositing.

* live compositing:
In this mode, if the compositor will start a timer ticking every spf (seconds per frame). The timer is reset every time a frame is produced.
If the compositor doesn't have all frames ready by the time the timer ticks, it will produce a frame anyway, using old frames as fallback in cases when a current frame is not available.
If the frames arrive later, they will be dropped.
jerzywilczek marked this conversation as resolved.
Show resolved Hide resolved

"""

use Membrane.Filter
alias Membrane.Buffer
alias Membrane.RawVideo
alias Membrane.VideoCompositor.Wgpu

def_options caps: [
type: RawVideo,
spec: RawVideo.t(),
description: "Struct with video width, height, framerate and pixel format."
],
live: [
jerzywilczek marked this conversation as resolved.
Show resolved Hide resolved
spec: boolean(),
description: """
Set the compositor to live mode.
""",
default: false
]

def_input_pad :input,
Expand Down Expand Up @@ -41,6 +58,7 @@ defmodule Membrane.VideoCompositor.CompositorElement do
state = %{
video_positions_waiting_for_caps: %{},
caps: options.caps,
live: options.live,
internal_state: internal_state,
pads_to_ids: %{},
new_pad_id: 0
Expand All @@ -51,7 +69,26 @@ defmodule Membrane.VideoCompositor.CompositorElement do

@impl true
def handle_prepared_to_playing(_ctx, state) do
{{:ok, caps: {:output, state.caps}}, state}
spf = spf_from_framerate(state.caps.framerate)

actions =
if state.live do
[start_timer: {:render_frame, spf}, caps: {:output, state.caps}]
else
[caps: {:output, state.caps}]
end

{{:ok, actions}, state}
end

@impl true
def handle_tick(:render_frame, _ctx, state) do
{{:ok, {frame, pts}}, internal_state} = Wgpu.force_render(state.internal_state)

state = %{state | internal_state: internal_state}
actions = [buffer: {:output, %Buffer{payload: frame, pts: pts}}]

{{:ok, actions}, state}
end

@impl true
Expand Down Expand Up @@ -117,10 +154,12 @@ defmodule Membrane.VideoCompositor.CompositorElement do
{
{
:ok,
buffer: {
:output,
%Membrane.Buffer{payload: frame, pts: pts}
}
[
buffer: {
:output,
%Membrane.Buffer{payload: frame, pts: pts}
}
] ++ restart_timer_action_if_necessary(state)
},
%{state | internal_state: internal_state}
}
Expand All @@ -130,6 +169,20 @@ defmodule Membrane.VideoCompositor.CompositorElement do
end
end

defp spf_from_framerate({frames, seconds}) do
Ratio.new(frames, seconds)
end

defp restart_timer_action_if_necessary(state) do
spf = spf_from_framerate(state.caps.framerate)

if state.live do
[stop_timer: :render_frame, start_timer: {:render_frame, spf}]
else
[]
end
end

@impl true
def handle_end_of_stream(
pad,
Expand All @@ -142,11 +195,15 @@ defmodule Membrane.VideoCompositor.CompositorElement do
{:ok, internal_state} = Wgpu.send_end_of_stream(internal_state, id)
state = %{state | internal_state: internal_state}

if all_input_pads_received_end_of_stream?(context.pads) do
{{:ok, end_of_stream: :output}, state}
else
{:ok, state}
end
actions =
if all_input_pads_received_end_of_stream?(context.pads) do
stop = if state.live, do: [stop_timer: :render_frame], else: []
[end_of_stream: :output] ++ stop
else
[]
end

{{:ok, actions}, state}
end

defp all_input_pads_received_end_of_stream?(pads) do
Expand Down
7 changes: 6 additions & 1 deletion lib/membrane/video_compositor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ defmodule Membrane.VideoCompositor do
def_options caps: [
spec: RawVideo.t(),
description: "Caps for the output video of the compositor"
],
live: [
spec: boolean(),
description: "Set compositor into live mode",
default: false
]

def_input_pad :input,
Expand All @@ -34,7 +39,7 @@ defmodule Membrane.VideoCompositor do
@impl true
def handle_init(options) do
children = %{
compositor: %CompositorElement{caps: options.caps}
compositor: %CompositorElement{caps: options.caps, live: options.live}
}

links = [
Expand Down
5 changes: 5 additions & 0 deletions native/membrane_videocompositor/src/compositor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ impl State {

/// This returns the pts of the new frame
pub async fn draw_into(&mut self, output_buffer: &mut [u8]) -> u64 {
let interval = self.frame_interval();
self.input_videos
.values_mut()
.for_each(|v| v.remove_stale_frames(interval));

let mut encoder = self
.device
.create_command_encoder(&wgpu::CommandEncoderDescriptor {
Expand Down
18 changes: 18 additions & 0 deletions native/membrane_videocompositor/src/compositor/videos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ impl InputVideo {
DrawResult::Rendered(pts)
}

pub fn remove_stale_frames(&mut self, interval: Option<(u64, u64)>) {
while self.is_front_frame_too_old(interval) {
self.pop_frame();
}
}

pub fn pop_frame(&mut self) {
if let Some(Message::Frame { pts, frame }) = self.frames.pop_front() {
self.previous_frame = Some(Message::Frame { pts, frame });
Expand All @@ -230,6 +236,18 @@ impl InputVideo {
self.frames.push_back(Message::EndOfStream);
}

pub fn is_front_frame_too_old(&self, interval: Option<(u64, u64)>) -> bool {
if let Some(Message::EndOfStream) = self.frames.front() {
return false;
}

if interval.is_none() || self.front_pts().is_none() {
return false;
}

self.front_pts().is_some() && interval.unwrap().0 > self.front_pts().unwrap()
}

pub fn is_frame_ready(&self, interval: Option<(u64, u64)>) -> bool {
if let Some(Message::EndOfStream) = self.frames.front() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ defmodule Membrane.VideoCompositor.Pipeline.ComposeMultipleInputs do
decoder = options.decoder
decoder_name = String.to_atom("decoder_#{i}")

input_filter = options.input_filter
input_filter_name = String.to_atom("input_filter_#{i}")

link(source_name, source)
|> then(if not is_nil(decoder), do: &to(&1, decoder_name, decoder), else: & &1)
|> then(if not is_nil(input_filter), do: &to(&1, input_filter_name, input_filter), else: & &1)
|> via_in(:input, options: [position: position])
|> to(:compositor)
end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ defmodule Membrane.VideoCompositor.Pipeline.Utility.Options do
"""
@type encoder_t :: Membrane.Filter.t() | nil

@typedoc """
An additional plugin that sits between the decoder (or source if there is no decoder) and the compositor.
"""
@type input_filter_t :: Membrane.Filter.t() | nil

@typedoc """
Atom describing FrameComposer implementation
"""
Expand All @@ -45,8 +50,9 @@ defmodule Membrane.VideoCompositor.Pipeline.Utility.Options do
compositor: compositor_t(),
implementation: implementation_t(),
decoder: decoder_t(),
encoder: encoder_t()
encoder: encoder_t(),
input_filter: input_filter_t()
}
@enforce_keys [:inputs, :output, :caps]
defstruct [:inputs, :output, :caps, :compositor, :implementation, :decoder, :encoder]
defstruct [:inputs, :output, :caps, :compositor, :implementation, :decoder, :encoder, :input_filter]
end
75 changes: 75 additions & 0 deletions test/pipeline_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.VideoCompositor.Test.Pipeline do
alias Membrane.Testing.Pipeline, as: TestingPipeline
alias Membrane.VideoCompositor.Implementations
alias Membrane.VideoCompositor.Test.Support.Pipeline.H264, as: PipelineH264
alias Membrane.VideoCompositor.Test.Support.Pipeline.PacketLoss, as: PipelinePacketLoss
alias Membrane.VideoCompositor.Test.Support.Utility, as: TestingUtility

@implementation Implementations.get_all_implementations()
Expand Down Expand Up @@ -92,4 +93,78 @@ defmodule Membrane.VideoCompositor.Test.Pipeline do
assert_end_of_stream(pid, :sink, :input, 1_000_000)
TestingPipeline.terminate(pid, blocking?: true)
end

Enum.map(@implementation, fn implementation ->
describe "Checks packet loss #{implementation} pipeline on" do
@describetag :tmp_dir

@tag implementation
test "2s 720p 30fps video", %{tmp_dir: tmp_dir} do
test_h264_pipeline_with_packet_loss(
@hd_video,
2,
unquote(implementation),
"short_videos",
tmp_dir
)
end

@tag implementation
test "1s 1080p 30fps video", %{tmp_dir: tmp_dir} do
test_h264_pipeline_with_packet_loss(
@full_hd_video,
1,
unquote(implementation),
"short_videos",
tmp_dir
)
end
end
end)

defp test_h264_pipeline_with_packet_loss(
video_caps,
duration,
implementation,
sub_dir_name,
tmp_dir
) do
alias Membrane.VideoCompositor.Pipeline.Utility.InputStream
alias Membrane.VideoCompositor.Pipeline.Utility.Options

{input_path, output_path, _ref_file_name} =
TestingUtility.prepare_testing_video(video_caps, duration, "h264", tmp_dir, sub_dir_name)

positions = [
{0, 0},
{div(video_caps.width, 2), 0},
{0, div(video_caps.height, 2)},
{div(video_caps.width, 2), div(video_caps.height, 2)}
]

inputs =
for pos <- positions,
do: %InputStream{
position: pos,
caps: video_caps,
input: input_path
}

out_caps = %RawVideo{video_caps | width: video_caps.width * 2, height: video_caps.height * 2}

options = %Options{
inputs: inputs,
output: output_path,
caps: out_caps,
implementation: implementation
}

assert {:ok, pid} =
TestingPipeline.start_link(module: PipelinePacketLoss, custom_args: options)

assert_pipeline_playback_changed(pid, _, :playing)

assert_end_of_stream(pid, :sink, :input, 1_000_000)
TestingPipeline.terminate(pid, blocking?: true)
end
end
Loading