From 112a0817b17aae03f666d55e449cf06b163c2c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 25 May 2023 12:45:19 +0200 Subject: [PATCH 1/5] Add RTPRealtimer --- test/membrane_rtc_engine/tee_test.exs | 2 + test/support/rtp_realtimer.ex | 74 +++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 test/support/rtp_realtimer.ex diff --git a/test/membrane_rtc_engine/tee_test.exs b/test/membrane_rtc_engine/tee_test.exs index a7169ec82..0f467a3c0 100644 --- a/test/membrane_rtc_engine/tee_test.exs +++ b/test/membrane_rtc_engine/tee_test.exs @@ -7,6 +7,7 @@ defmodule Membrane.RTC.Engine.TeeTest do require Membrane.Pad alias Membrane.{Buffer, Pad} + alias Membrane.RTC.Engine.RTPRealtimer alias Membrane.RTC.Engine.Tee alias Membrane.RTC.Engine.Event.{ @@ -407,6 +408,7 @@ defmodule Membrane.RTC.Engine.TeeTest do } child({:source, variant}, source) + |> child({:realtimer, track.id, variant}, RTPRealtimer) |> via_in(Pad.ref(:input, {track.id, variant})) |> get_child(:tee) end diff --git a/test/support/rtp_realtimer.ex b/test/support/rtp_realtimer.ex new file mode 100644 index 000000000..a75dfafd2 --- /dev/null +++ b/test/support/rtp_realtimer.ex @@ -0,0 +1,74 @@ +defmodule Membrane.RTC.Engine.RTPRealtimer do + @moduledoc false + use Membrane.Filter + + def_options interval: [ + spec: integer(), + default: 10, + description: "Interval between following messages" + ] + + def_input_pad :input, accepted_format: _any, demand_unit: :buffers + def_output_pad :output, accepted_format: _any, mode: :push + + @impl true + def handle_init(_ctx, opts) do + {[], %{tick_actions: [], interval: opts.interval}} + end + + @impl true + def handle_playing(_ctx, state) do + {[start_timer: {:timer, :no_interval}, demand: {:input, 1}], state} + end + + @dialyzer {:no_behaviours, {:handle_process, 4}} + @impl true + def handle_process(:input, buffer, _ctx, state) do + use Ratio + interval = state.interval + + state = %{state | tick_actions: [buffer: {:output, buffer}] ++ state.tick_actions} + + {[timer_interval: {:timer, interval}], state} + end + + @impl true + def handle_event(pad, event, _ctx, %{tick_actions: tick_actions} = state) + when pad == :output or tick_actions == [] do + {[forward: event], state} + end + + @impl true + def handle_event(:input, event, _ctx, state) do + {[], %{state | tick_actions: [event: {:output, event}] ++ state.tick_actions}} + end + + @impl true + def handle_stream_format(:input, stream_format, _ctx, %{tick_actions: []} = state) do + {[forward: stream_format], state} + end + + @impl true + def handle_stream_format(:input, stream_format, _ctx, state) do + {[], %{state | tick_actions: [stream_format: {:output, stream_format}] ++ state.tick_actions}} + end + + @impl true + def handle_end_of_stream(:input, _ctx, %{tick_actions: []} = state) do + {[end_of_stream: :output], state} + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) do + {[], %{state | tick_actions: [end_of_stream: :output] ++ state.tick_actions}} + end + + @impl true + def handle_tick(:timer, _ctx, state) do + actions = + [timer_interval: {:timer, :no_interval}] ++ + Enum.reverse(state.tick_actions) ++ [demand: {:input, 1}] + + {actions, %{state | tick_actions: []}} + end +end From b77935630f5ff59f882be1858e4acb5a28a104f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 25 May 2023 17:25:01 +0200 Subject: [PATCH 2/5] Use realtimer from fake_rtsp_server --- test/membrane_rtc_engine/tee_test.exs | 7 ++- test/support/fake_rtsp_server.ex | 53 ++----------------- test/support/precise_realtimer.ex | 63 +++++++++++++++++++++++ test/support/rtp_realtimer.ex | 74 --------------------------- 4 files changed, 72 insertions(+), 125 deletions(-) create mode 100644 test/support/precise_realtimer.ex delete mode 100644 test/support/rtp_realtimer.ex diff --git a/test/membrane_rtc_engine/tee_test.exs b/test/membrane_rtc_engine/tee_test.exs index 0f467a3c0..e20112d9b 100644 --- a/test/membrane_rtc_engine/tee_test.exs +++ b/test/membrane_rtc_engine/tee_test.exs @@ -7,7 +7,7 @@ defmodule Membrane.RTC.Engine.TeeTest do require Membrane.Pad alias Membrane.{Buffer, Pad} - alias Membrane.RTC.Engine.RTPRealtimer + alias Membrane.RTC.Engine.PreciseRealtimer alias Membrane.RTC.Engine.Tee alias Membrane.RTC.Engine.Event.{ @@ -408,7 +408,10 @@ defmodule Membrane.RTC.Engine.TeeTest do } child({:source, variant}, source) - |> child({:realtimer, track.id, variant}, RTPRealtimer) + |> child({:realtimer, track.id, variant}, %PreciseRealtimer{ + timer_resolution: Membrane.Time.microseconds(1), + drop_late_packets?: false + }) |> via_in(Pad.ref(:input, {track.id, variant})) |> get_child(:tee) end diff --git a/test/support/fake_rtsp_server.ex b/test/support/fake_rtsp_server.ex index cbb01b59b..6586c28bb 100644 --- a/test/support/fake_rtsp_server.ex +++ b/test/support/fake_rtsp_server.ex @@ -25,58 +25,11 @@ defmodule FakeRTSPserver do end end - defmodule PreciseRealtimer do - @moduledoc false - - use Membrane.Filter - - def_input_pad :input, demand_mode: :auto, accepted_format: _any - def_output_pad :output, demand_mode: :auto, accepted_format: _any - def_options timer_resolution: [spec: Membrane.Time.t()] - - @impl true - def handle_init(_ctx, opts) do - {[], %{tick_idx: 0, resolution: opts.timer_resolution, queue: Qex.new()}} - end - - @impl true - def handle_playing(_ctx, state) do - {[start_timer: {:timer, state.resolution}], state} - end - - @impl true - def handle_process(:input, buffer, _ctx, state) do - {[], %{state | queue: Qex.push(state.queue, {:buffer, {:output, buffer}})}} - end - - @impl true - def handle_end_of_stream(:input, _ctx, state) do - {[], %{state | queue: Qex.push(state.queue, {:end_of_stream, :output})}} - end - - @impl true - def handle_tick(:timer, _ctx, %{tick_idx: tick_idx} = state) do - current_ts = tick_idx * state.resolution - state = %{state | tick_idx: tick_idx + 1} - - if Enum.empty?(state.queue) do - {[], state} - else - {actions, buffered} = - Enum.split_while(state.queue, fn - {:buffer, {:output, buffer}} -> Membrane.Buffer.get_dts_or_pts(buffer) <= current_ts - _other -> true - end) - - {actions, %{state | queue: Qex.new(buffered)}} - end - end - end - defmodule Pipeline do @moduledoc false use Membrane.Pipeline + alias Membrane.RTC.Engine.PreciseRealtimer @impl true def handle_init(_ctx, opts) do @@ -100,7 +53,9 @@ defmodule FakeRTSPserver do clock_rate: opts.clock_rate ] ) - |> child(:realtimer, %PreciseRealtimer{timer_resolution: Membrane.Time.milliseconds(10)}) + |> child(:realtimer, %PreciseRealtimer{ + timer_resolution: Membrane.Time.milliseconds(10) + }) |> child(:udp_sink, %Membrane.UDP.Sink{ destination_address: address, destination_port_no: opts.client_port, diff --git a/test/support/precise_realtimer.ex b/test/support/precise_realtimer.ex new file mode 100644 index 000000000..ac2c7037a --- /dev/null +++ b/test/support/precise_realtimer.ex @@ -0,0 +1,63 @@ +defmodule Membrane.RTC.Engine.PreciseRealtimer do + @moduledoc false + + use Membrane.Filter + + def_input_pad :input, demand_mode: :auto, accepted_format: _any + def_output_pad :output, demand_mode: :auto, accepted_format: _any + + def_options timer_resolution: [ + spec: Membrane.Time.t() + ], + drop_late_packets?: [ + spec: boolean(), + default: true + ] + + @impl true + def handle_init(_ctx, opts) do + {[], + %{ + tick_idx: 0, + resolution: opts.timer_resolution, + queue: Qex.new(), + drop_late_packets?: opts.drop_late_packets? + }} + end + + @impl true + def handle_playing(_ctx, state) do + {[start_timer: {:timer, state.resolution}], state} + end + + @impl true + def handle_process(:input, buffer, _ctx, state) do + {[], %{state | queue: Qex.push(state.queue, {:buffer, {:output, buffer}})}} + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) do + {[], %{state | queue: Qex.push(state.queue, {:end_of_stream, :output})}} + end + + @impl true + def handle_tick(:timer, _ctx, %{tick_idx: tick_idx} = state) do + current_ts = tick_idx * state.resolution + state = %{state | tick_idx: tick_idx + 1} + + if Enum.empty?(state.queue) do + {[], state} + else + {actions, buffered} = + Enum.split_while(state.queue, fn + {:buffer, {:output, buffer}} when state.drop_late_packets? -> + Membrane.Buffer.get_dts_or_pts(buffer) <= current_ts + + _other -> + true + end) + + {actions, %{state | queue: Qex.new(buffered)}} + end + end +end diff --git a/test/support/rtp_realtimer.ex b/test/support/rtp_realtimer.ex deleted file mode 100644 index a75dfafd2..000000000 --- a/test/support/rtp_realtimer.ex +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Membrane.RTC.Engine.RTPRealtimer do - @moduledoc false - use Membrane.Filter - - def_options interval: [ - spec: integer(), - default: 10, - description: "Interval between following messages" - ] - - def_input_pad :input, accepted_format: _any, demand_unit: :buffers - def_output_pad :output, accepted_format: _any, mode: :push - - @impl true - def handle_init(_ctx, opts) do - {[], %{tick_actions: [], interval: opts.interval}} - end - - @impl true - def handle_playing(_ctx, state) do - {[start_timer: {:timer, :no_interval}, demand: {:input, 1}], state} - end - - @dialyzer {:no_behaviours, {:handle_process, 4}} - @impl true - def handle_process(:input, buffer, _ctx, state) do - use Ratio - interval = state.interval - - state = %{state | tick_actions: [buffer: {:output, buffer}] ++ state.tick_actions} - - {[timer_interval: {:timer, interval}], state} - end - - @impl true - def handle_event(pad, event, _ctx, %{tick_actions: tick_actions} = state) - when pad == :output or tick_actions == [] do - {[forward: event], state} - end - - @impl true - def handle_event(:input, event, _ctx, state) do - {[], %{state | tick_actions: [event: {:output, event}] ++ state.tick_actions}} - end - - @impl true - def handle_stream_format(:input, stream_format, _ctx, %{tick_actions: []} = state) do - {[forward: stream_format], state} - end - - @impl true - def handle_stream_format(:input, stream_format, _ctx, state) do - {[], %{state | tick_actions: [stream_format: {:output, stream_format}] ++ state.tick_actions}} - end - - @impl true - def handle_end_of_stream(:input, _ctx, %{tick_actions: []} = state) do - {[end_of_stream: :output], state} - end - - @impl true - def handle_end_of_stream(:input, _ctx, state) do - {[], %{state | tick_actions: [end_of_stream: :output] ++ state.tick_actions}} - end - - @impl true - def handle_tick(:timer, _ctx, state) do - actions = - [timer_interval: {:timer, :no_interval}] ++ - Enum.reverse(state.tick_actions) ++ [demand: {:input, 1}] - - {actions, %{state | tick_actions: []}} - end -end From 3c50e760cb2a7e8df650a73eaba40818a3d6bd86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Fri, 26 May 2023 09:57:32 +0200 Subject: [PATCH 3/5] Changes after discussion --- test/membrane_rtc_engine/tee_test.exs | 45 +++++++++++++-------------- test/support/precise_realtimer.ex | 9 ++---- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/test/membrane_rtc_engine/tee_test.exs b/test/membrane_rtc_engine/tee_test.exs index e20112d9b..239d60ee3 100644 --- a/test/membrane_rtc_engine/tee_test.exs +++ b/test/membrane_rtc_engine/tee_test.exs @@ -7,7 +7,6 @@ defmodule Membrane.RTC.Engine.TeeTest do require Membrane.Pad alias Membrane.{Buffer, Pad} - alias Membrane.RTC.Engine.PreciseRealtimer alias Membrane.RTC.Engine.Tee alias Membrane.RTC.Engine.Event.{ @@ -21,7 +20,7 @@ defmodule Membrane.RTC.Engine.TeeTest do alias Membrane.RTC.Engine.Exception.VoiceActivityError - alias Membrane.RTC.Engine.Support.{TestSink, TestSource, Utils} + alias Membrane.RTC.Engine.Support.{TestSink, TestSource} alias Membrane.RTC.Engine.Track alias Membrane.Testing.Pipeline @@ -185,6 +184,15 @@ defmodule Membrane.RTC.Engine.TeeTest do # * when variant is marked as inactive and active again, # Tee doesn't send any variant until it's re-requested + send_buffers = fn pipeline, variant -> + [ + %Buffer{payload: <<1, 2, 3, 4, 5>>, metadata: %{is_keyframe: false}}, + %Buffer{payload: <<>>, metadata: %{is_keyframe: true}}, + %Buffer{payload: <<6, 7, 8, 9, 10>>, metadata: %{is_keyframe: false}} + ] + |> Enum.each(&send_buffer(pipeline, variant, &1)) + end + request_and_check_high = fn pipeline -> request_track_variant(pipeline, :high) assert_sink_event(pipeline, {:source, :high}, %Membrane.KeyframeRequestEvent{}) @@ -193,12 +201,7 @@ defmodule Membrane.RTC.Engine.TeeTest do refute_sink_buffer(pipeline, :sink, _buffer) # send three buffers, only the last two should be received in the sink - [ - %Buffer{payload: <<1, 2, 3, 4, 5>>, metadata: %{is_keyframe: false}}, - %Buffer{payload: <<>>, metadata: %{is_keyframe: true}}, - %Buffer{payload: <<6, 7, 8, 9, 10>>, metadata: %{is_keyframe: false}} - ] - |> Enum.each(&send_buffer(pipeline, :high, &1)) + send_buffers.(pipeline, :high) # TODO assert we receive TrackVariantSwitched before any buffer @@ -212,7 +215,7 @@ defmodule Membrane.RTC.Engine.TeeTest do end track = build_h264_track() - pipeline = build_pipeline(track, {nil, &Utils.generator/2}, 3, false) + pipeline = build_pipeline(track, [], 3, false) Enum.each(track.variants, &mark_variant_as_resumed(pipeline, &1)) @@ -225,8 +228,9 @@ defmodule Membrane.RTC.Engine.TeeTest do # being sent to the sink; # this breaks the concept of demands but it's only # for testing purposes - activate_source(pipeline, :low) - activate_source(pipeline, :medium) + + send_buffers.(pipeline, :low) + send_buffers.(pipeline, :medium) # tee shouldn't send us any packets # until we request specific track variant @@ -240,8 +244,6 @@ defmodule Membrane.RTC.Engine.TeeTest do mark_variant_as_resumed(pipeline, :high) assert_sink_event(pipeline, :sink, %TrackVariantResumed{variant: :high}) - activate_source(pipeline, :high) - refute_sink_buffer(pipeline, :sink, _buffer) request_and_check_high.(pipeline) @@ -313,10 +315,13 @@ defmodule Membrane.RTC.Engine.TeeTest do request_track_variant(pipeline, :high) - buffer = %Buffer{payload: <<>>, metadata: %{is_keyframe: true}} - send_buffer(pipeline, :high, buffer) + [ + %Buffer{payload: <<1, 2, 3, 4, 5>>, metadata: %{is_keyframe: false}}, + %Buffer{payload: <<>>, metadata: %{is_keyframe: true}} + ] + |> Enum.each(&send_buffer(pipeline, :high, &1)) - assert_sink_event(pipeline, :sink, %TrackVariantSwitched{new_variant: :high}) + assert_sink_event(pipeline, :sink, %TrackVariantSwitched{new_variant: :high}, 5_000) event = %VoiceActivityChanged{voice_activity: :speech} @@ -408,10 +413,6 @@ defmodule Membrane.RTC.Engine.TeeTest do } child({:source, variant}, source) - |> child({:realtimer, track.id, variant}, %PreciseRealtimer{ - timer_resolution: Membrane.Time.microseconds(1), - drop_late_packets?: false - }) |> via_in(Pad.ref(:input, {track.id, variant})) |> get_child(:tee) end @@ -454,10 +455,6 @@ defmodule Membrane.RTC.Engine.TeeTest do ) end - defp activate_source(pipeline, variant) do - Pipeline.execute_actions(pipeline, notify_child: {{:source, variant}, {:set_active, true}}) - end - defp request_keyframe(pipeline) do actions = [event: {:input, %Membrane.KeyframeRequestEvent{}}] Pipeline.execute_actions(pipeline, notify_child: {:sink, {:execute_actions, actions}}) diff --git a/test/support/precise_realtimer.ex b/test/support/precise_realtimer.ex index ac2c7037a..b39a0e640 100644 --- a/test/support/precise_realtimer.ex +++ b/test/support/precise_realtimer.ex @@ -8,10 +8,6 @@ defmodule Membrane.RTC.Engine.PreciseRealtimer do def_options timer_resolution: [ spec: Membrane.Time.t() - ], - drop_late_packets?: [ - spec: boolean(), - default: true ] @impl true @@ -20,8 +16,7 @@ defmodule Membrane.RTC.Engine.PreciseRealtimer do %{ tick_idx: 0, resolution: opts.timer_resolution, - queue: Qex.new(), - drop_late_packets?: opts.drop_late_packets? + queue: Qex.new() }} end @@ -50,7 +45,7 @@ defmodule Membrane.RTC.Engine.PreciseRealtimer do else {actions, buffered} = Enum.split_while(state.queue, fn - {:buffer, {:output, buffer}} when state.drop_late_packets? -> + {:buffer, {:output, buffer}} -> Membrane.Buffer.get_dts_or_pts(buffer) <= current_ts _other -> From 825cb29fde157c1d9c650f70b0732a3d85a7064a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Fri, 26 May 2023 10:39:25 +0200 Subject: [PATCH 4/5] Fix race condition --- test/membrane_rtc_engine/tee_test.exs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/test/membrane_rtc_engine/tee_test.exs b/test/membrane_rtc_engine/tee_test.exs index 239d60ee3..8a973b476 100644 --- a/test/membrane_rtc_engine/tee_test.exs +++ b/test/membrane_rtc_engine/tee_test.exs @@ -314,12 +314,9 @@ defmodule Membrane.RTC.Engine.TeeTest do end) request_track_variant(pipeline, :high) + assert_sink_event(pipeline, {:source, :high}, %Membrane.KeyframeRequestEvent{}) - [ - %Buffer{payload: <<1, 2, 3, 4, 5>>, metadata: %{is_keyframe: false}}, - %Buffer{payload: <<>>, metadata: %{is_keyframe: true}} - ] - |> Enum.each(&send_buffer(pipeline, :high, &1)) + send_buffer(pipeline, :high, %Buffer{payload: <<>>, metadata: %{is_keyframe: true}}) assert_sink_event(pipeline, :sink, %TrackVariantSwitched{new_variant: :high}, 5_000) From f2bafd3ba101da51443ce6c2d5ce2ab9292c499d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Tue, 30 May 2023 07:50:47 +0200 Subject: [PATCH 5/5] Review fixes --- test/membrane_rtc_engine/tee_test.exs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/membrane_rtc_engine/tee_test.exs b/test/membrane_rtc_engine/tee_test.exs index 8a973b476..70eab272a 100644 --- a/test/membrane_rtc_engine/tee_test.exs +++ b/test/membrane_rtc_engine/tee_test.exs @@ -223,8 +223,8 @@ defmodule Membrane.RTC.Engine.TeeTest do assert_sink_event(pipeline, :sink, %TrackVariantResumed{variant: ^variant}) end) - # we don't want to use generator for `:high` - # to have full control over the number of buffers + # we don't want to use generator for all variants to avoid toilet overflow + # and to have full control over the number of buffers # being sent to the sink; # this breaks the concept of demands but it's only # for testing purposes @@ -244,6 +244,8 @@ defmodule Membrane.RTC.Engine.TeeTest do mark_variant_as_resumed(pipeline, :high) assert_sink_event(pipeline, :sink, %TrackVariantResumed{variant: :high}) + send_buffers.(pipeline, :high) + refute_sink_buffer(pipeline, :sink, _buffer) request_and_check_high.(pipeline) @@ -318,7 +320,7 @@ defmodule Membrane.RTC.Engine.TeeTest do send_buffer(pipeline, :high, %Buffer{payload: <<>>, metadata: %{is_keyframe: true}}) - assert_sink_event(pipeline, :sink, %TrackVariantSwitched{new_variant: :high}, 5_000) + assert_sink_event(pipeline, :sink, %TrackVariantSwitched{new_variant: :high}) event = %VoiceActivityChanged{voice_activity: :speech}