Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

Commit

Permalink
[RTC-258][RTC-259] Fix overflow in Tee tests. Fix RC in VAD Tee test. (
Browse files Browse the repository at this point in the history
  • Loading branch information
Rados13 authored May 31, 2023
1 parent 1b6ede2 commit d1f7a1b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 68 deletions.
39 changes: 20 additions & 19 deletions test/membrane_rtc_engine/tee_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Membrane.RTC.Engine.TeeTest do

alias Membrane.RTC.Engine.Exception.VoiceActivityError

alias Membrane.RTC.Engine.Support.{TestSink, TestSource, Utils}
alias Membrane.RTC.Engine.Support.{TestSink, TestSource}
alias Membrane.RTC.Engine.Track
alias Membrane.Testing.Pipeline

Expand Down Expand Up @@ -184,6 +184,15 @@ defmodule Membrane.RTC.Engine.TeeTest do
# * when variant is marked as inactive and active again,
# Tee doesn't send any variant until it's re-requested

send_buffers = fn pipeline, variant ->
[
%Buffer{payload: <<1, 2, 3, 4, 5>>, metadata: %{is_keyframe: false}},
%Buffer{payload: <<>>, metadata: %{is_keyframe: true}},
%Buffer{payload: <<6, 7, 8, 9, 10>>, metadata: %{is_keyframe: false}}
]
|> Enum.each(&send_buffer(pipeline, variant, &1))
end

request_and_check_high = fn pipeline ->
request_track_variant(pipeline, :high)
assert_sink_event(pipeline, {:source, :high}, %Membrane.KeyframeRequestEvent{})
Expand All @@ -192,12 +201,7 @@ defmodule Membrane.RTC.Engine.TeeTest do
refute_sink_buffer(pipeline, :sink, _buffer)

# send three buffers, only the last two should be received in the sink
[
%Buffer{payload: <<1, 2, 3, 4, 5>>, metadata: %{is_keyframe: false}},
%Buffer{payload: <<>>, metadata: %{is_keyframe: true}},
%Buffer{payload: <<6, 7, 8, 9, 10>>, metadata: %{is_keyframe: false}}
]
|> Enum.each(&send_buffer(pipeline, :high, &1))
send_buffers.(pipeline, :high)

# TODO assert we receive TrackVariantSwitched before any buffer

Expand All @@ -211,21 +215,22 @@ defmodule Membrane.RTC.Engine.TeeTest do
end

track = build_h264_track()
pipeline = build_pipeline(track, {nil, &Utils.generator/2}, 3, false)
pipeline = build_pipeline(track, [], 3, false)

Enum.each(track.variants, &mark_variant_as_resumed(pipeline, &1))

Enum.each(track.variants, fn variant ->
assert_sink_event(pipeline, :sink, %TrackVariantResumed{variant: ^variant})
end)

# we don't want to use generator for `:high`
# to have full control over the number of buffers
# we don't want to use generator for all variants to avoid toilet overflow
# and to have full control over the number of buffers
# being sent to the sink;
# this breaks the concept of demands but it's only
# for testing purposes
activate_source(pipeline, :low)
activate_source(pipeline, :medium)

send_buffers.(pipeline, :low)
send_buffers.(pipeline, :medium)

# tee shouldn't send us any packets
# until we request specific track variant
Expand All @@ -239,7 +244,7 @@ defmodule Membrane.RTC.Engine.TeeTest do
mark_variant_as_resumed(pipeline, :high)
assert_sink_event(pipeline, :sink, %TrackVariantResumed{variant: :high})

activate_source(pipeline, :high)
send_buffers.(pipeline, :high)

refute_sink_buffer(pipeline, :sink, _buffer)

Expand Down Expand Up @@ -311,9 +316,9 @@ defmodule Membrane.RTC.Engine.TeeTest do
end)

request_track_variant(pipeline, :high)
assert_sink_event(pipeline, {:source, :high}, %Membrane.KeyframeRequestEvent{})

buffer = %Buffer{payload: <<>>, metadata: %{is_keyframe: true}}
send_buffer(pipeline, :high, buffer)
send_buffer(pipeline, :high, %Buffer{payload: <<>>, metadata: %{is_keyframe: true}})

assert_sink_event(pipeline, :sink, %TrackVariantSwitched{new_variant: :high})

Expand Down Expand Up @@ -449,10 +454,6 @@ defmodule Membrane.RTC.Engine.TeeTest do
)
end

defp activate_source(pipeline, variant) do
Pipeline.execute_actions(pipeline, notify_child: {{:source, variant}, {:set_active, true}})
end

defp request_keyframe(pipeline) do
actions = [event: {:input, %Membrane.KeyframeRequestEvent{}}]
Pipeline.execute_actions(pipeline, notify_child: {:sink, {:execute_actions, actions}})
Expand Down
53 changes: 4 additions & 49 deletions test/support/fake_rtsp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,11 @@ defmodule FakeRTSPserver do
end
end

defmodule PreciseRealtimer do
@moduledoc false

use Membrane.Filter

def_input_pad :input, demand_mode: :auto, accepted_format: _any
def_output_pad :output, demand_mode: :auto, accepted_format: _any
def_options timer_resolution: [spec: Membrane.Time.t()]

@impl true
def handle_init(_ctx, opts) do
{[], %{tick_idx: 0, resolution: opts.timer_resolution, queue: Qex.new()}}
end

@impl true
def handle_playing(_ctx, state) do
{[start_timer: {:timer, state.resolution}], state}
end

@impl true
def handle_process(:input, buffer, _ctx, state) do
{[], %{state | queue: Qex.push(state.queue, {:buffer, {:output, buffer}})}}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) do
{[], %{state | queue: Qex.push(state.queue, {:end_of_stream, :output})}}
end

@impl true
def handle_tick(:timer, _ctx, %{tick_idx: tick_idx} = state) do
current_ts = tick_idx * state.resolution
state = %{state | tick_idx: tick_idx + 1}

if Enum.empty?(state.queue) do
{[], state}
else
{actions, buffered} =
Enum.split_while(state.queue, fn
{:buffer, {:output, buffer}} -> Membrane.Buffer.get_dts_or_pts(buffer) <= current_ts
_other -> true
end)

{actions, %{state | queue: Qex.new(buffered)}}
end
end
end

defmodule Pipeline do
@moduledoc false

use Membrane.Pipeline
alias Membrane.RTC.Engine.PreciseRealtimer

@impl true
def handle_init(_ctx, opts) do
Expand All @@ -100,7 +53,9 @@ defmodule FakeRTSPserver do
clock_rate: opts.clock_rate
]
)
|> child(:realtimer, %PreciseRealtimer{timer_resolution: Membrane.Time.milliseconds(10)})
|> child(:realtimer, %PreciseRealtimer{
timer_resolution: Membrane.Time.milliseconds(10)
})
|> child(:udp_sink, %Membrane.UDP.Sink{
destination_address: address,
destination_port_no: opts.client_port,
Expand Down
58 changes: 58 additions & 0 deletions test/support/precise_realtimer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule Membrane.RTC.Engine.PreciseRealtimer do
@moduledoc false

use Membrane.Filter

def_input_pad :input, demand_mode: :auto, accepted_format: _any
def_output_pad :output, demand_mode: :auto, accepted_format: _any

def_options timer_resolution: [
spec: Membrane.Time.t()
]

@impl true
def handle_init(_ctx, opts) do
{[],
%{
tick_idx: 0,
resolution: opts.timer_resolution,
queue: Qex.new()
}}
end

@impl true
def handle_playing(_ctx, state) do
{[start_timer: {:timer, state.resolution}], state}
end

@impl true
def handle_process(:input, buffer, _ctx, state) do
{[], %{state | queue: Qex.push(state.queue, {:buffer, {:output, buffer}})}}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) do
{[], %{state | queue: Qex.push(state.queue, {:end_of_stream, :output})}}
end

@impl true
def handle_tick(:timer, _ctx, %{tick_idx: tick_idx} = state) do
current_ts = tick_idx * state.resolution
state = %{state | tick_idx: tick_idx + 1}

if Enum.empty?(state.queue) do
{[], state}
else
{actions, buffered} =
Enum.split_while(state.queue, fn
{:buffer, {:output, buffer}} ->
Membrane.Buffer.get_dts_or_pts(buffer) <= current_ts

_other ->
true
end)

{actions, %{state | queue: Qex.new(buffered)}}
end
end
end

0 comments on commit d1f7a1b

Please sign in to comment.