Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

live audio mixer #30

Merged
merged 12 commits into from
May 18, 2023
Merged
7 changes: 3 additions & 4 deletions lib/membrane_audio_mixer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ defmodule Membrane.AudioMixer do
when sample_format in [:s8, :s16le, :s16be, :s24le, :s24be, :s32le, :s32be],
Membrane.RemoteStream
),
#
options: [
offset: [
spec: Time.non_neg_t(),
Expand Down Expand Up @@ -294,9 +293,9 @@ defmodule Membrane.AudioMixer do
@impl true
def handle_stream_format(
_pad,
%Membrane.RemoteStream{} = _input_stream_format,
%Membrane.RemoteStream{} = _input_stream,
_context,
%{input_stream_format: nil} = _state
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
%{stream_format: nil} = _state
) do
raise """
You need to specify `stream_format` in options if `Membrane.RemoteStream` will be received on the `:input` pad
Expand All @@ -309,7 +308,7 @@ defmodule Membrane.AudioMixer do
end

@impl true
def handle_stream_format(_pad, %Membrane.RemoteStream{} = _input_stream_format, _context, state) do
def handle_stream_format(_pad, %Membrane.RemoteStream{} = _input_stream, _context, state) do
{[], state}
end

Expand Down
239 changes: 239 additions & 0 deletions lib/membrane_live_audio_mixer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
defmodule Membrane.LiveAudioMixer do
@moduledoc """
This element performs audio mixing for live streams.

Audio format can be set as an element option or received through stream_format from input pads. All
received stream_format have to be identical and match ones in element option (if that option is
different from `nil`).

Input pads can have offset - it tells how much timestamps differ from mixer time.

Mixer mixes only raw audio (PCM), so some parser may be needed to precede it in pipeline.
"""

use Membrane.Filter
use Bunch

require Membrane.Logger

alias Membrane.AudioMixer.{Adder, ClipPreventingAdder, NativeAdder}
alias Membrane.Buffer
alias Membrane.LiveAudioMixer.LiveQueue
alias Membrane.RawAudio
alias Membrane.Time

@interval Membrane.Time.milliseconds(20)

def_options prevent_clipping: [
spec: boolean(),
description: """
Defines how the mixer should act in the case when an overflow happens.
- If true, the wave will be scaled down, so a peak will become the maximal
value of the sample in the format. See `Membrane.AudioMixer.ClipPreventingAdder`.
- If false, overflow will be clipped to the maximal value of the sample in
the format. See `Membrane.AudioMixer.Adder`.
""",
Comment on lines +30 to +35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we mean by an overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option is taken from the regular audio mixer and the description is about how particular mixers work.

default: true
],
native_mixer: [
spec: boolean(),
description: """
The value determines if mixer should use NIFs for mixing audio. Only
clip preventing version of native mixer is available.
See `Membrane.AudioMixer.NativeAdder`.
""",
default: false
],
latency: [
spec: non_neg_integer(),
description: """
The value determines after what time the clock will start interval that mixes audio in real time.
Latency is crucial to quality of output audio, the smaller the value, the more packets will be lost.
But the biggest the value, the latency of the stream is bigger.
""",
default: Membrane.Time.milliseconds(200),
inspector: &Time.inspect/1
]

def_output_pad :output,
demand_mode: :auto,
availability: :always,
accepted_format: RawAudio

def_input_pad :input,
demand_mode: :auto,
availability: :on_request,
accepted_format:
any_of(
%RawAudio{sample_format: sample_format}
when sample_format in [:s8, :s16le, :s16be, :s24le, :s24be, :s32le, :s32be]
),
options: [
offset: [
spec: Time.non_neg_t(),
default: 0,
description: "Offset of the input audio at the pad."
]
]

@impl true
def handle_init(_ctx, options) do
if options.native_mixer && !options.prevent_clipping do
raise("Invalid element options, for native mixer only clipping preventing one is available")
else
state =
options
|> Map.from_struct()
|> Map.put(:mixer_state, nil)
|> Map.put(:live_queue, nil)
|> Map.put(:stream_format, nil)
|> Map.put(:end_of_stream?, false)

{[], state}
end
end

@impl true
def handle_stream_format(_pad, stream_format, _context, %{stream_format: nil} = state) do
state = %{state | stream_format: stream_format}
mixer_state = initialize_mixer_state(stream_format, state)
live_queue = LiveQueue.init(stream_format)

{[stream_format: {:output, stream_format}],
%{state | mixer_state: mixer_state, live_queue: live_queue}}
end

@impl true
def handle_stream_format(_pad, stream_format, _context, %{stream_format: stream_format} = state) do
{[], state}
end

@impl true
def handle_stream_format(pad, stream_format, _context, state) do
raise(
RuntimeError,
"received invalid stream_format on pad #{inspect(pad)}, expected: #{inspect(state.stream_format)}, got: #{inspect(stream_format)}"
)
end

@impl true
def handle_start_of_stream(
Pad.ref(:input, pad_id) = pad,
context,
%{live_queue: live_queue} = state
) do
offset = context.pads[pad].options.offset

new_live_queue = LiveQueue.add_queue(live_queue, pad_id, offset)

started_input_pads_number =
context.pads
|> Enum.filter(fn {_id, pad} -> pad.direction == :input and pad.start_of_stream? == true end)
|> Enum.count()

actions =
if started_input_pads_number == 1,
do: [start_timer: {:initiator, state.latency}],
else: []

{actions, %{state | live_queue: new_live_queue}}
end

@impl true
def handle_process(
Pad.ref(:input, pad_id),
buffer,
_context,
%{live_queue: live_queue} = state
) do
new_live_queue = LiveQueue.add_buffer(live_queue, pad_id, buffer)

{[], %{state | live_queue: new_live_queue}}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, pad_id), context, %{live_queue: live_queue} = state) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a case where someone adds a new pad when we already got eos on all input pads we had so far? Maybe we should only send EOS when an element is terminating?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question but I don't think we should produce audio when we don't have any input pad unless we are explicitly told to

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider a case where some room in Jellyfish is empty for a while, after there were some people

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of hls_endpoint, we have silence_generator added as an input pad.
We can also make it stop the clock and remove live_queue instead of sending eof. In case of a new input pad, we would init live_queue and start the clock.
Also, we should take into consideration how the compositor handles those cases because those elements very often work together.
(compositor also sends eof after it gets eof on all inputs)

new_live_queue = LiveQueue.remove_queue(live_queue, pad_id)

{actions, end_of_stream?, state} =
cond do
not all_streams_ended?(context) ->
{[], false, state}

LiveQueue.all_queues_empty?(new_live_queue) ->
{payload, state} = flush_mixer(state)

{[
buffer: {:output, %Buffer{payload: payload}},
end_of_stream: :output,
stop_timer: :timer
], true, state}

# All streams ended but queues are not empty.
# Set `end_of_stream?` to true.
# Handle tick will send `end_of_stream` when all queues will be empty.
true ->
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
{[], true, state}
end

{actions, %{state | live_queue: new_live_queue, end_of_stream?: end_of_stream?}}
end

@impl true
def handle_tick(:initiator, _context, state) do
{[stop_timer: :initiator, start_timer: {:timer, @interval}], state}
end

def handle_tick(:timer, _context, %{end_of_stream?: end_of_stream?} = state) do
{payload, state} = mix(@interval, state)

{actions, payload, state} =
if end_of_stream? and LiveQueue.all_queues_empty?(state.live_queue) do
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
{flushed_payload, state} = flush_mixer(state)
{[end_of_stream: :output, stop_timer: :timer], payload <> flushed_payload, state}
else
{[], payload, state}
end

{[buffer: {:output, %Buffer{payload: payload}}] ++ actions, state}
end

defp initialize_mixer_state(nil, _state), do: nil

defp initialize_mixer_state(stream_format, state) do
mixer_module =
if state.prevent_clipping do
if state.native_mixer, do: NativeAdder, else: ClipPreventingAdder
else
Adder
end

mixer_module.init(stream_format)
end

defp mix(duration, %{live_queue: live_queue} = state) do
{payloads, new_live_queue} = LiveQueue.get_audio(live_queue, duration)
payloads = Enum.map(payloads, fn {_audio_id, payload} -> payload end)
{payload, state} = mix_payloads(payloads, state)
{payload, %{state | live_queue: new_live_queue}}
end

defp all_streams_ended?(%{pads: pads}) do
pads
|> Enum.filter(fn {pad_name, _info} -> pad_name != :output end)
|> Enum.map(fn {_pad, %{end_of_stream?: end_of_stream?}} -> end_of_stream? end)
|> Enum.all?()
end

defp mix_payloads(payloads, %{mixer_state: %module{} = mixer_state} = state) do
{payload, mixer_state} = module.mix(payloads, mixer_state)
state = %{state | mixer_state: mixer_state}
{payload, state}
end

defp flush_mixer(%{mixer_state: %module{} = mixer_state} = state) do
{payload, mixer_state} = module.flush(mixer_state)
state = %{state | mixer_state: mixer_state}
{payload, state}
end
end
Loading