Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement live queueing strategy and callbacks system #111

Merged
merged 93 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 88 commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
33f9123
Add offline queue element. Change `new-compositor` scene to old API. …
WojciechBarczynski May 10, 2023
763040e
Add updating stream format, scene and sending buffer test. Fix queue …
WojciechBarczynski May 10, 2023
b54e619
Fix recursive frames poping
WojciechBarczynski May 12, 2023
5d5e2fd
Fix droping pads
WojciechBarczynski May 12, 2023
382dfe2
Add queue tests, fix and refactor events poping, actions, pads removi…
WojciechBarczynski May 13, 2023
4a151d0
Add update scene events handling
WojciechBarczynski May 13, 2023
8cc121d
Add update scene events handling test
WojciechBarczynski May 13, 2023
9d3bc95
Add docs
WojciechBarczynski May 13, 2023
d941890
Add `CompositorCoreFormat` docs
WojciechBarczynski May 13, 2023
4ad282c
Renamed queue output from `:compositor_core` to `:output`
WojciechBarczynski May 15, 2023
b9e0c52
Fix EOS action
WojciechBarczynski May 15, 2023
f6793d1
Fix readability
WojciechBarczynski May 15, 2023
e73a134
Add module for queue contracts
WojciechBarczynski May 15, 2023
85e2b33
Fix `calculate_next_buffer_pts` readability
WojciechBarczynski May 15, 2023
9682a7f
Fix EOS handling add EOS test
WojciechBarczynski May 16, 2023
f1e665d
Fix lint
WojciechBarczynski May 16, 2023
8b6660b
Adjust core VC to the new input format from the queue.
WojciechBarczynski May 18, 2023
1fea81b
Change setting video. Update `VC Core`, `VC bin`, add `Offline Queue …
WojciechBarczynski May 19, 2023
45a323a
Naming fixes
WojciechBarczynski May 19, 2023
10f4316
Merge branch master into adjust_vc_core_to_new_queue
WojciechBarczynski May 19, 2023
9fd396a
Fix scene notification event and rust decoding
WojciechBarczynski May 22, 2023
2eb1c59
Fix tests
WojciechBarczynski May 22, 2023
7569b0d
Fix dirty scheduling
WojciechBarczynski May 22, 2023
bbf1707
Add docs
WojciechBarczynski May 23, 2023
0fe5e0a
Use `Map.new` instead of Map -> Enum -> Map conversions
WojciechBarczynski May 23, 2023
5a5edfb
Remove `wgpu_test` - covered in E2E tests
WojciechBarczynski May 23, 2023
68f7d0a
Fix `pad_ref` offline queue pad refs handling
WojciechBarczynski May 23, 2023
dd99e91
Fix `handle_pad_removed`
WojciechBarczynski May 24, 2023
f0ae43b
Fix plural variables naming
WojciechBarczynski May 24, 2023
5f68419
Move queue spawing logic to seperate module
WojciechBarczynski May 24, 2023
085aaaa
Fix queue bin spawning
WojciechBarczynski May 24, 2023
5425b36
Fix custom queue strategies states handling
WojciechBarczynski May 24, 2023
12d2bc7
Moved putting events to state
WojciechBarczynski May 24, 2023
5c11aae
Remove unused alias
WojciechBarczynski May 24, 2023
7f754c6
Wrap scene update in `SceneChangeEvent`
WojciechBarczynski May 25, 2023
ce66eea
Remove EOS handling from Rust
WojciechBarczynski May 30, 2023
ce68ae4
Add core `handle_process` scene presence check
WojciechBarczynski May 30, 2023
391c3ac
Remove `current` prefixes from `state` fields
WojciechBarczynski Jun 3, 2023
02467c5
Remove `different_video_indexes` check. Scene doesn't have to contain…
WojciechBarczynski Jun 5, 2023
7aed909
Add scene validation
WojciechBarczynski Jun 5, 2023
23f1cdb
Fix scene validation. The scene doesn't have to specify all input pads
WojciechBarczynski Jun 5, 2023
36dd5f4
Add scene docs
WojciechBarczynski Jun 5, 2023
5f0181e
Fix scene validation
WojciechBarczynski Jun 5, 2023
f845f25
Fix child removing
WojciechBarczynski Jun 5, 2023
ed26aa6
Change VC modules structure
WojciechBarczynski Jun 5, 2023
1151617
Add empty video_configs check
WojciechBarczynski Jun 5, 2023
78fc701
Change scene validation in `handle_process` to more descriptive
WojciechBarczynski Jun 6, 2023
5473020
Fix setting videos in VC core
WojciechBarczynski Jun 6, 2023
4910e20
Fix scene validation
WojciechBarczynski Jun 6, 2023
d7c5080
Implement live queueing strategy
WojciechBarczynski Jun 6, 2023
aa5762e
Merge branch 'master' into MS_444_implement_live_queueing_strategy
WojciechBarczynski Jun 6, 2023
76232f9
Fix lint
WojciechBarczynski Jun 6, 2023
da4e01b
Move creating actions to State module
WojciechBarczynski Jun 6, 2023
ab6d929
Design VC callbacks
WojciechBarczynski Jun 7, 2023
91c56e2
Fix `handle_init` ctx typespec
WojciechBarczynski Jun 7, 2023
d04e440
Add init metadata. Refine docs
WojciechBarczynski Jun 7, 2023
e9e3fac
Fix lint
WojciechBarczynski Jun 7, 2023
d282be5
Add matadata to VC `init_options`
WojciechBarczynski Jun 7, 2023
098afd9
Move updating next buffer pts into state. Add live queue :start_timer…
WojciechBarczynski Jun 7, 2023
22f726e
Add start timer check
WojciechBarczynski Jun 7, 2023
496649c
Fix naming ambiguity. Add `handle_info` callback
WojciechBarczynski Jun 9, 2023
f53fabd
Merge branch 'design_callback_system' into MS_444_implement_live_queu…
WojciechBarczynski Jun 10, 2023
58a1a48
Implement callbacks system. Moved pipeline to test. TODO: refactor qu…
WojciechBarczynski Jun 11, 2023
8d6305c
Move queues common logic into state
WojciechBarczynski Jun 12, 2023
b749c7d
Moved `transformations test` handler into a separate module
WojciechBarczynski Jun 12, 2023
bcc1e44
Rewrite pipeline integration test
WojciechBarczynski Jun 12, 2023
fe85aff
Create common test handler
WojciechBarczynski Jun 14, 2023
9d6a615
Remove `pad_added` event. Fix eos pads dropping
WojciechBarczynski Jun 14, 2023
2345115
Add custom messages handling to offline queue
WojciechBarczynski Jun 14, 2023
2f4eb24
Add custom msg handling to live queue
WojciechBarczynski Jun 14, 2023
c1859e1
Merge branch 'master' into MS_444_implement_live_queueing_strategy
WojciechBarczynski Jun 14, 2023
e45b311
Separate handler state
WojciechBarczynski Jun 14, 2023
784477f
Refactor init options passing
WojciechBarczynski Jun 14, 2023
b951dbf
Fix user messages handling
WojciechBarczynski Jun 14, 2023
423a0ac
Change demand mode of live queue to auto
WojciechBarczynski Jun 14, 2023
028b7e5
Fix live queue output flow control
WojciechBarczynski Jun 15, 2023
25c05b4
Fix handle_tick nearest index calculation
WojciechBarczynski Jun 15, 2023
76bbc0a
Fix `:no_frame` filtering in live queue
WojciechBarczynski Jun 16, 2023
6ae1c01
Fix timer_started check in live queue
WojciechBarczynski Jun 16, 2023
195f4f1
Fix timer_started? check
WojciechBarczynski Jun 16, 2023
921e4c4
Add eos handling to live queue
WojciechBarczynski Jun 16, 2023
5614c95
Fixed eos pads dropping
WojciechBarczynski Jun 22, 2023
e0e9fc4
Fix live queue eos
WojciechBarczynski Jun 23, 2023
8b22cd4
Fix pads_states dropping, add sending black frame on empty live queue
WojciechBarczynski Jun 23, 2023
aa9127b
Refine docs
WojciechBarczynski Jun 23, 2023
767c4ed
Refine queueing strategies docs
WojciechBarczynski Jun 26, 2023
3f098b8
Add live queue tests
WojciechBarczynski Jun 26, 2023
b93e58a
Add `sends stream format and scene only once` test
WojciechBarczynski Jun 26, 2023
ec5faf7
Add `vc_init_options` description
WojciechBarczynski Jun 26, 2023
123655c
Fix test and typos
WojciechBarczynski Jun 26, 2023
5a3e0ec
Restructure modules
WojciechBarczynski Jun 27, 2023
e5022d1
Fix structure. Fix tests. Fix typos
WojciechBarczynski Jun 28, 2023
79176d7
Merge branch 'master' into MS_444_implement_live_queueing_strategy
WojciechBarczynski Jun 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/membrane/video_compositor/compositor_core_format.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
defmodule Membrane.VideoCompositor.CompositorCoreFormat do
@moduledoc """
Describes CoreVC input format.
"""
@moduledoc false
# Describes CoreVC input format.

alias Membrane.{Pad, RawVideo}

Expand Down
32 changes: 20 additions & 12 deletions lib/membrane/video_compositor/core.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
defmodule Membrane.VideoCompositor.Core do
@moduledoc """
The element responsible for composing frames.
"""
@moduledoc false
# The element responsible for composing frames.

use Membrane.Filter

Expand All @@ -10,9 +9,8 @@ defmodule Membrane.VideoCompositor.Core do
alias Membrane.VideoCompositor.{CompositorCoreFormat, Scene, SceneChangeEvent, WgpuAdapter}

defmodule State do
@moduledoc """
The internal state of the compositor
"""
@moduledoc false
# The internal state of the compositor

@enforce_keys [:wgpu_state, :output_stream_format]
defstruct @enforce_keys ++
Expand Down Expand Up @@ -97,7 +95,8 @@ defmodule Membrane.VideoCompositor.Core do
wgpu_state: wgpu_state,
scene: scene,
input_stream_format: stream_format,
update_videos?: update_videos?
update_videos?: update_videos?,
output_stream_format: output_stream_format
}
) do
if update_videos? do
Expand All @@ -110,11 +109,14 @@ defmodule Membrane.VideoCompositor.Core do
end

{:ok, rendered_frame} =
payload
|> Map.to_list()
|> Enum.filter(fn {pad, _frame} -> Map.has_key?(scene.video_configs, pad) end)
|> Enum.map(fn {pad, frame} -> {Map.get(pads_to_ids, pad), frame, pts} end)
|> then(fn pads_frames -> send_pads_frames(wgpu_state, pads_frames) end)
if payload == %{} do
{:ok, get_blank_frame(output_stream_format)}
else
payload
|> Map.to_list()
|> Enum.map(fn {pad, frame} -> {Map.fetch!(pads_to_ids, pad), frame, pts} end)
|> then(fn pads_frames -> send_pads_frames(wgpu_state, pads_frames) end)
end

output_buffer = %Buffer{pts: pts, dts: pts, payload: rendered_frame}

Expand Down Expand Up @@ -154,4 +156,10 @@ defmodule Membrane.VideoCompositor.Core do
raise "Core should render frame only on last buffer"
end
end

@spec get_blank_frame(RawVideo.t()) :: binary()
defp get_blank_frame(%RawVideo{width: width, height: height}) do
pixels = div(width * height * 3, 2)
WojciechBarczynski marked this conversation as resolved.
Show resolved Hide resolved
<<0::size(pixels)>>
end
end
5 changes: 5 additions & 0 deletions lib/membrane/video_compositor/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ defmodule Membrane.VideoCompositor.Handler do
alias Membrane.VideoCompositor.Handler.InputProperties
alias Membrane.VideoCompositor.Scene

@typedoc """
Module implementing `#{inspect(__MODULE__)}` behaviour.
"""
@type t :: module()

@typedoc """
Type of user-managed inner state of the handler.
"""
Expand Down
26 changes: 14 additions & 12 deletions lib/membrane/video_compositor/queue.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
defmodule Membrane.VideoCompositor.Queue do
@moduledoc """
Defines input pads and compositor core contracts, that each
implementation of a queue should meet.
"""
@moduledoc false

# Defines input pads and compositor core contracts, that each
# implementation of a queue should meet.

alias Membrane.{Buffer, Pad, RawVideo}
alias Membrane.{Buffer, Pad}
alias Membrane.VideoCompositor
alias Membrane.VideoCompositor.{CompositorCoreFormat, SceneChangeEvent}
alias Membrane.VideoCompositor.{CompositorCoreFormat, QueueingStrategy, SceneChangeEvent}
alias Membrane.VideoCompositor.Queue.Live, as: LiveQueue
alias Membrane.VideoCompositor.Queue.Offline, as: OfflineQueue

@typedoc """
Expand Down Expand Up @@ -42,12 +43,13 @@ defmodule Membrane.VideoCompositor.Queue do
@type queue_bin :: OfflineQueue.t()

@spec get_queue(VideoCompositor.init_options()) :: OfflineQueue.t()
def get_queue(%VideoCompositor{
queuing_strategy: queuing_strategy,
output_stream_format: %RawVideo{framerate: framerate}
}) do
case queuing_strategy do
:offline -> %OfflineQueue{output_framerate: framerate}
def get_queue(options = %VideoCompositor{}) do
varsill marked this conversation as resolved.
Show resolved Hide resolved
case options.queuing_strategy do
QueueingStrategy.Offline ->
%OfflineQueue{vc_init_options: options}

%QueueingStrategy.Live{latency: latency} ->
%LiveQueue{vc_init_options: options, latency: latency}
end
end
end
231 changes: 231 additions & 0 deletions lib/membrane/video_compositor/queue/live.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
defmodule Membrane.VideoCompositor.Queue.Live do
@moduledoc false
# Module responsible for frames / events enqueueing accordingly to live composing strategy

use Membrane.Filter

alias Membrane.RawVideo
alias Membrane.VideoCompositor
alias Membrane.VideoCompositor.CompositorCoreFormat
alias Membrane.VideoCompositor.Queue.Live.State, as: LiveState
alias Membrane.VideoCompositor.Queue.State
alias Membrane.VideoCompositor.Queue.State.{HandlerState, PadState}
alias Membrane.VideoCompositor.QueueingStrategy.Live

@type latency :: Membrane.Time.non_neg_t() | :wait_for_start_event

@type start_timer_message :: :start_timer | {:start_timer, delay :: Membrane.Time.non_neg_t()}

def_options vc_init_options: [
spec: VideoCompositor.init_options()
],
latency: [
spec: latency()
]

def_input_pad :input,
accepted_format: %RawVideo{pixel_format: :I420},
availability: :on_request,
demand_mode: :auto,
options: [
timestamp_offset: [
spec: Membrane.Time.non_neg_t(),
description: "Input stream PTS offset in nanoseconds. Must be non-negative.",
default: 0
],
metadata: [
spec: VideoCompositor.init_metadata()
]
]

def_output_pad :output,
accepted_format: %CompositorCoreFormat{},
availability: :always,
demand_mode: :auto

@impl true
def handle_init(_ctx, %{
vc_init_options:
vc_init_options = %VideoCompositor{
output_stream_format: %RawVideo{framerate: framerate},
queuing_strategy: %Live{latency: latency}
}
}) do
{[],
%State{
output_framerate: framerate,
custom_strategy_state: %LiveState{
latency: latency
},
handler: HandlerState.new(vc_init_options)
}}
end

@impl true
def handle_pad_added(pad, context, state) do
state = Bunch.Struct.put_in(state, [:pads_states, pad], PadState.new(context.options))

{[], state}
end

@impl true
def handle_start_of_stream(_pad, _ctx, state = %State{}) do
if state.custom_strategy_state.timer_started? do
{[], state}
else
state = Bunch.Struct.put_in(state, [:custom_strategy_state, :timer_started?], true)

case state.custom_strategy_state.latency do
:wait_for_start_event ->
{[], state}

latency_time ->
{[start_timer: {:initializer, latency_time}], state}
end
end
end

@impl true
def handle_stream_format(pad, stream_format, _ctx, state) do
state = State.put_event(state, {{:stream_format, stream_format}, pad})
{[], state}
end

@impl true
def handle_end_of_stream(pad, _ctx, state) do
state = State.put_event(state, {:end_of_stream, pad})
{[], state}
end

@impl true
def handle_process(pad, buffer, _ctx, state) do
state = State.put_event(state, {{:frame, buffer.pts, buffer.payload}, pad})
{[], state}
end

@impl true
def handle_tick(:initializer, _ctx, state) do
{[stop_timer: :initializer, start_timer: {:buffer_scheduler, get_tick_ratio(state)}], state}
end

@impl true
def handle_tick(
:buffer_scheduler,
_ctx,
initial_state = %State{next_buffer_pts: buffer_pts}
) do
state = drop_eos_pads(initial_state)

indexes =
state.pads_states
|> Enum.map(fn {pad, %PadState{events_queue: events_queue}} ->
{pad, nearest_frame_index(events_queue, buffer_pts)}
end)
|> Enum.reject(fn {_pad, index} -> index == :no_frame end)
|> Enum.into(%{})

{pads_frames, new_state} = State.pop_events(state, indexes, true)

actions = State.get_actions(new_state, initial_state, pads_frames, buffer_pts)

if all_pads_eos?(new_state) do
{actions ++ [stop_timer: :buffer_scheduler, end_of_stream: :output], new_state}
else
{actions, new_state}
end
end

@impl true
def handle_parent_notification(:start_timer, _ctx, state) do
check_timer_started(state)
{[start_timer: {:buffer_scheduler, get_tick_ratio(state)}], state}
end

@impl true
def handle_parent_notification({:start_timer, delay}, _ctx, state) do
check_timer_started(state)
{[start_timer: {:initializer, delay}], state}
end

@impl true
def handle_parent_notification(msg, _ctx, state) do
state = State.put_event(state, {:message, msg})
{[], state}
end

@spec nearest_frame_index([PadState.pad_event()], Membrane.Time.non_neg_t()) ::
non_neg_integer() | :no_frame
defp nearest_frame_index(events_queue, tick_pts) do
events_queue
|> Enum.with_index()
|> Enum.reduce_while(
{:no_frame, :no_frame},
fn {event, index}, {best_diff, best_diff_index} ->
case event do
{:frame, frame_pts, _frame_data}
when best_diff == :no_frame or best_diff > abs(frame_pts - tick_pts) ->
{:cont, {abs(frame_pts - tick_pts), index}}

{:frame, _frame_pts, _frame_data} ->
{:halt, {best_diff, best_diff_index}}

_else ->
{:cont, {best_diff, best_diff_index}}
end
end
)
|> then(fn {_best_diff, best_diff_index} -> best_diff_index end)
end

defp get_tick_ratio(%State{output_framerate: {output_fps_num, output_fps_den}}) do
%Ratio{numerator: output_fps_den * Membrane.Time.second(), denominator: output_fps_num}
end

defp check_timer_started(state) do
if state.custom_strategy_state.timer_started? do
raise "Failed to start timer. Timer already started."
end
end

@spec all_pads_eos?(State.t()) :: boolean()
defp all_pads_eos?(%State{pads_states: pads_states, next_buffer_pts: buffer_pts}) do
pads_states
|> Map.values()
|> Enum.all?(fn %PadState{events_queue: events_queue} ->
eos_before_pts?(events_queue, buffer_pts)
end)
end

@spec drop_eos_pads(State.t()) :: State.t()
defp drop_eos_pads(
state = %State{
pads_states: pads_states,
output_format: %CompositorCoreFormat{pad_formats: pad_formats},
next_buffer_pts: buffer_pts
}
) do
eos_pads =
pads_states
|> Enum.filter(fn {_pad, %PadState{events_queue: events_queue}} ->
eos_before_pts?(events_queue, buffer_pts)
end)
|> Enum.map(fn {pad, _pad_state} -> pad end)

%State{
state
| pads_states: Map.drop(pads_states, eos_pads),
output_format: %CompositorCoreFormat{pad_formats: Map.drop(pad_formats, eos_pads)}
}
end

@spec eos_before_pts?(list(PadState.pad_event()), Membrane.Time.non_neg_t()) :: boolean()
defp eos_before_pts?(events_queue, buffer_pts) do
Enum.reduce_while(events_queue, false, fn event, _eos_before_pts? ->
case event do
{:frame, pts, _data} when pts > buffer_pts -> {:halt, false}
:end_of_stream -> {:halt, true}
_else -> {:cont, false}
end
end)
end
end
13 changes: 13 additions & 0 deletions lib/membrane/video_compositor/queue/live/state.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Membrane.VideoCompositor.Queue.Live.State do
@moduledoc false

alias Membrane.VideoCompositor.Queue.Live

@enforce_keys [:latency]
defstruct @enforce_keys ++ [timer_started?: false]

@type t :: %__MODULE__{
latency: Live.latency(),
timer_started?: boolean()
}
end
Loading