Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

[RTC-258][RTC-259] Fix overflow in Tee tests. Fix RC in VAD Tee test. #272

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions test/membrane_rtc_engine/tee_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Membrane.RTC.Engine.TeeTest do

alias Membrane.RTC.Engine.Exception.VoiceActivityError

alias Membrane.RTC.Engine.Support.{TestSink, TestSource, Utils}
alias Membrane.RTC.Engine.Support.{TestSink, TestSource}
alias Membrane.RTC.Engine.Track
alias Membrane.Testing.Pipeline

Expand Down Expand Up @@ -184,6 +184,15 @@ defmodule Membrane.RTC.Engine.TeeTest do
# * when variant is marked as inactive and active again,
# Tee doesn't send any variant until it's re-requested

send_buffers = fn pipeline, variant ->
[
%Buffer{payload: <<1, 2, 3, 4, 5>>, metadata: %{is_keyframe: false}},
%Buffer{payload: <<>>, metadata: %{is_keyframe: true}},
%Buffer{payload: <<6, 7, 8, 9, 10>>, metadata: %{is_keyframe: false}}
]
|> Enum.each(&send_buffer(pipeline, variant, &1))
end

request_and_check_high = fn pipeline ->
request_track_variant(pipeline, :high)
assert_sink_event(pipeline, {:source, :high}, %Membrane.KeyframeRequestEvent{})
Expand All @@ -192,12 +201,7 @@ defmodule Membrane.RTC.Engine.TeeTest do
refute_sink_buffer(pipeline, :sink, _buffer)

# send three buffers, only the last two should be received in the sink
[
%Buffer{payload: <<1, 2, 3, 4, 5>>, metadata: %{is_keyframe: false}},
%Buffer{payload: <<>>, metadata: %{is_keyframe: true}},
%Buffer{payload: <<6, 7, 8, 9, 10>>, metadata: %{is_keyframe: false}}
]
|> Enum.each(&send_buffer(pipeline, :high, &1))
send_buffers.(pipeline, :high)

# TODO assert we receive TrackVariantSwitched before any buffer

Expand All @@ -211,21 +215,22 @@ defmodule Membrane.RTC.Engine.TeeTest do
end

track = build_h264_track()
pipeline = build_pipeline(track, {nil, &Utils.generator/2}, 3, false)
pipeline = build_pipeline(track, [], 3, false)

Enum.each(track.variants, &mark_variant_as_resumed(pipeline, &1))

Enum.each(track.variants, fn variant ->
assert_sink_event(pipeline, :sink, %TrackVariantResumed{variant: ^variant})
end)

# we don't want to use generator for `:high`
# to have full control over the number of buffers
# we don't want to use generator for all variants to avoid toilet overflow
# and to have full control over the number of buffers
# being sent to the sink;
# this breaks the concept of demands but it's only
# for testing purposes
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
activate_source(pipeline, :low)
activate_source(pipeline, :medium)

send_buffers.(pipeline, :low)
send_buffers.(pipeline, :medium)

# tee shouldn't send us any packets
# until we request specific track variant
Expand All @@ -239,7 +244,7 @@ defmodule Membrane.RTC.Engine.TeeTest do
mark_variant_as_resumed(pipeline, :high)
assert_sink_event(pipeline, :sink, %TrackVariantResumed{variant: :high})

activate_source(pipeline, :high)
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
send_buffers.(pipeline, :high)

refute_sink_buffer(pipeline, :sink, _buffer)

Expand Down Expand Up @@ -311,9 +316,9 @@ defmodule Membrane.RTC.Engine.TeeTest do
end)

request_track_variant(pipeline, :high)
assert_sink_event(pipeline, {:source, :high}, %Membrane.KeyframeRequestEvent{})

buffer = %Buffer{payload: <<>>, metadata: %{is_keyframe: true}}
send_buffer(pipeline, :high, buffer)
send_buffer(pipeline, :high, %Buffer{payload: <<>>, metadata: %{is_keyframe: true}})

assert_sink_event(pipeline, :sink, %TrackVariantSwitched{new_variant: :high})

Expand Down Expand Up @@ -449,10 +454,6 @@ defmodule Membrane.RTC.Engine.TeeTest do
)
end

defp activate_source(pipeline, variant) do
Pipeline.execute_actions(pipeline, notify_child: {{:source, variant}, {:set_active, true}})
end

defp request_keyframe(pipeline) do
actions = [event: {:input, %Membrane.KeyframeRequestEvent{}}]
Pipeline.execute_actions(pipeline, notify_child: {:sink, {:execute_actions, actions}})
Expand Down
53 changes: 4 additions & 49 deletions test/support/fake_rtsp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,11 @@ defmodule FakeRTSPserver do
end
end

defmodule PreciseRealtimer do
@moduledoc false

use Membrane.Filter

def_input_pad :input, demand_mode: :auto, accepted_format: _any
def_output_pad :output, demand_mode: :auto, accepted_format: _any
def_options timer_resolution: [spec: Membrane.Time.t()]

@impl true
def handle_init(_ctx, opts) do
{[], %{tick_idx: 0, resolution: opts.timer_resolution, queue: Qex.new()}}
end

@impl true
def handle_playing(_ctx, state) do
{[start_timer: {:timer, state.resolution}], state}
end

@impl true
def handle_process(:input, buffer, _ctx, state) do
{[], %{state | queue: Qex.push(state.queue, {:buffer, {:output, buffer}})}}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) do
{[], %{state | queue: Qex.push(state.queue, {:end_of_stream, :output})}}
end

@impl true
def handle_tick(:timer, _ctx, %{tick_idx: tick_idx} = state) do
current_ts = tick_idx * state.resolution
state = %{state | tick_idx: tick_idx + 1}

if Enum.empty?(state.queue) do
{[], state}
else
{actions, buffered} =
Enum.split_while(state.queue, fn
{:buffer, {:output, buffer}} -> Membrane.Buffer.get_dts_or_pts(buffer) <= current_ts
_other -> true
end)

{actions, %{state | queue: Qex.new(buffered)}}
end
end
end

defmodule Pipeline do
@moduledoc false

use Membrane.Pipeline
alias Membrane.RTC.Engine.PreciseRealtimer

@impl true
def handle_init(_ctx, opts) do
Expand All @@ -100,7 +53,9 @@ defmodule FakeRTSPserver do
clock_rate: opts.clock_rate
]
)
|> child(:realtimer, %PreciseRealtimer{timer_resolution: Membrane.Time.milliseconds(10)})
|> child(:realtimer, %PreciseRealtimer{
timer_resolution: Membrane.Time.milliseconds(10)
})
|> child(:udp_sink, %Membrane.UDP.Sink{
destination_address: address,
destination_port_no: opts.client_port,
Expand Down
58 changes: 58 additions & 0 deletions test/support/precise_realtimer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule Membrane.RTC.Engine.PreciseRealtimer do
@moduledoc false

use Membrane.Filter

def_input_pad :input, demand_mode: :auto, accepted_format: _any
def_output_pad :output, demand_mode: :auto, accepted_format: _any

def_options timer_resolution: [
spec: Membrane.Time.t()
]

@impl true
def handle_init(_ctx, opts) do
{[],
%{
tick_idx: 0,
resolution: opts.timer_resolution,
queue: Qex.new()
}}
end

@impl true
def handle_playing(_ctx, state) do
{[start_timer: {:timer, state.resolution}], state}
end

@impl true
def handle_process(:input, buffer, _ctx, state) do
{[], %{state | queue: Qex.push(state.queue, {:buffer, {:output, buffer}})}}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) do
{[], %{state | queue: Qex.push(state.queue, {:end_of_stream, :output})}}
end

@impl true
def handle_tick(:timer, _ctx, %{tick_idx: tick_idx} = state) do
current_ts = tick_idx * state.resolution
state = %{state | tick_idx: tick_idx + 1}

if Enum.empty?(state.queue) do
{[], state}
else
{actions, buffered} =
Enum.split_while(state.queue, fn
{:buffer, {:output, buffer}} ->
Membrane.Buffer.get_dts_or_pts(buffer) <= current_ts

_other ->
true
end)

{actions, %{state | queue: Qex.new(buffered)}}
end
end
end