Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

live audio mixer #30

Merged
merged 12 commits into from
May 18, 2023
92 changes: 92 additions & 0 deletions lib/membrane_audio_mixer/live_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
defmodule Membrane.AudioMixer.LiveQueue do
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
@moduledoc """
"""
alias Membrane.RawAudio

def init(stream_format),
do: {:ok, %{queues: %{}, offsets: %{}, current_time: 0, stream_format: stream_format}}
Karolk99 marked this conversation as resolved.
Show resolved Hide resolved

def add_queue(audio_id, offset \\ 0, %{queues: queues, offsets: offsets} = state) do
Karolk99 marked this conversation as resolved.
Show resolved Hide resolved
queues = Map.put(queues, audio_id, <<>>)
offsets = Map.put(offsets, audio_id, offset)
{:ok, %{state | queues: queues, offsets: offsets}}
end

def remove_queue(audio_id, state) do
queues = state.queues
queues = Map.delete(queues, audio_id)
{:ok, %{state | queues: queues}}
Karolk99 marked this conversation as resolved.
Show resolved Hide resolved
end

def add_buffer(
audio_id,
%{pts: pts, payload: payload},
%{
stream_format: stream_format,
current_time: current_time,
offsets: offsets,
queues: queues
} = state
) do
pts = pts + Map.get(offsets, audio_id)

current_time =
current_time + RawAudio.bytes_to_time(byte_size(Map.get(queues, audio_id)), stream_format)

end_pts = pts + RawAudio.bytes_to_time(byte_size(payload), stream_format)

case {pts > current_time, end_pts > current_time} do
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
{false, false} ->
{:ok, state}

{false, true} ->
duration = end_pts - current_time
bytes = RawAudio.time_to_bytes(duration, stream_format)
<<to_add::binary-size(bytes), _rest::binary>> = payload
new_state = update_in(state, [:queues, audio_id], fn queue -> queue <> to_add end)
{:ok, new_state}

{true, true} ->
silence_duration = pts - current_time
silence = RawAudio.silence(stream_format, silence_duration)

new_state =
update_in(state, [:queues, audio_id], fn queue -> queue <> silence <> payload end)

{:ok, new_state}

_else ->
{:error, state}
end
end

def get_audio(duration, %{current_time: current_time} = state) do
{audios, new_state} =
Enum.map_reduce(state.queues, state, fn {audio_id, _queue}, acc_state ->
{audio, new_state} = get_audio(audio_id, duration, acc_state)
{{audio_id, audio}, new_state}
end)

{audios, %{new_state | current_time: current_time + duration}}
end

defp get_audio(audio_id, duration, state) do
queue = get_in(state, [:queues, audio_id])
{audio, new_queue} = get_duration(queue, duration, state)
new_state = put_in(state, [:queues, audio_id], new_queue)
{audio, new_state}
end
Karolk99 marked this conversation as resolved.
Show resolved Hide resolved

defp get_duration(queue, duration, %{stream_format: stream_format}) do
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
queue_duration = RawAudio.bytes_to_time(byte_size(queue), stream_format)

if queue_duration < duration do
audio = queue <> RawAudio.silence(stream_format, duration - queue_duration)
{audio, <<>>}
else
bytes = RawAudio.time_to_bytes(duration, stream_format)
<<audio::binary-size(bytes), new_queue::binary>> = queue
{audio, new_queue}
end
end
end
217 changes: 217 additions & 0 deletions lib/membrane_live_audio_mixer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
defmodule Membrane.LiveAudioMixer do
@moduledoc """
This element performs audio mixing.

Audio format can be set as an element option or received through stream_format from input pads. All
received stream_format have to be identical and match ones in element option (if that option is
different from `nil`).

Input pads can have offset - it tells how much silence should be added before first sample
from that pad. Offset has to be positive.

Mixer mixes only raw audio (PCM), so some parser may be needed to precede it in pipeline.
"""

use Membrane.Filter
use Bunch

require Membrane.Logger

alias Membrane.AudioMixer.{Adder, ClipPreventingAdder, LiveQueue, NativeAdder}
alias Membrane.Buffer
alias Membrane.RawAudio
alias Membrane.Time

def_options stream_format: [
type: :struct,
Karolk99 marked this conversation as resolved.
Show resolved Hide resolved
spec: RawAudio.t(),
description: """
The value defines a raw audio format of pads connected to the
element. It should be the same for all the pads.
""",
default: nil
],
frames_per_buffer: [
type: :integer,
spec: pos_integer(),
description: """
Assumed number of raw audio frames in each buffer.
Used when converting demand from buffers into bytes.
""",
default: 2048
],
prevent_clipping: [
type: :boolean,
spec: boolean(),
description: """
Defines how the mixer should act in the case when an overflow happens.
- If true, the wave will be scaled down, so a peak will become the maximal
value of the sample in the format. See `Membrane.AudioMixer.ClipPreventingAdder`.
- If false, overflow will be clipped to the maximal value of the sample in
the format. See `Membrane.AudioMixer.Adder`.
""",
Comment on lines +30 to +35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we mean by an overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option is taken from the regular audio mixer and the description is about how particular mixers work.

default: true
],
native_mixer: [
type: :boolean,
spec: boolean(),
description: """
The value determines if mixer should use NIFs for mixing audio. Only
clip preventing version of native mixer is available.
See `Membrane.AudioMixer.NativeAdder`.
""",
default: false
],
synchronize_buffers?: [
type: :boolean,
spec: boolean(),
description: """
The value determines if mixer should synchronize buffers based on pts values.
- If true, mixer will synchronize buffers based on its pts values. If buffer pts value is lower then the current
mixing time (last_ts_sent) it will be dropped.
- If false, mixer will take all incoming buffers no matter what pts they have and put it in the queue.
""",
default: false
]

def_output_pad :output,
mode: :pull,
availability: :always,
accepted_format: RawAudio

def_input_pad :input,
mode: :pull,
availability: :on_request,
demand_unit: :bytes,
accepted_format:
any_of(
%RawAudio{sample_format: sample_format}
when sample_format in [:s8, :s16le, :s16be, :s24le, :s24be, :s32le, :s32be],
Membrane.RemoteStream
),
#
options: [
offset: [
spec: Time.non_neg_t(),
default: 0,
description: "Offset of the input audio at the pad."
]
]

@impl true
def handle_init(_ctx, %__MODULE__{stream_format: stream_format} = options) do
if options.native_mixer && !options.prevent_clipping do
raise("Invalid element options, for native mixer only clipping preventing one is available")
else
{:ok, live_queue_state} = LiveQueue.init(stream_format)

state =
options
|> Map.from_struct()
|> Map.put(:mixer_state, initialize_mixer_state(stream_format, options))
|> Map.put(:last_ts_sent, 0)
|> Map.put(:live_queue, live_queue_state)

{[], state}
end
end

@impl true
def handle_playing(_context, %{stream_format: %RawAudio{} = stream_format} = state) do
{[
stream_format: {:output, stream_format},
start_timer: {:timer, Membrane.Time.milliseconds(100)}
], state}
end

def handle_playing(_context, %{stream_format: nil} = state) do
{[start_timer: {:timer, Membrane.Time.milliseconds(100)}], state}
end

@impl true
def handle_start_of_stream(
Pad.ref(:input, pad_id) = pad,
context,
%{live_queue: live_queue} = state
) do
offset = context.pads[pad].options.offset

{:ok, new_live_queue} = LiveQueue.add_queue(pad_id, offset, live_queue)

{[], %{state | live_queue: new_live_queue}}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, pad_id), context, %{live_queue: live_queue} = state) do
{:ok, new_live_queue} = LiveQueue.remove_queue(pad_id, live_queue)

actions =
if all_streams_ended?(context) do
[{:end_of_stream, :output}]
else
[]
end

{actions, %{state | live_queue: new_live_queue}}
end

@impl true
def handle_process(
Pad.ref(:input, pad_id),
buffer,
_context,
%{live_queue: live_queue} = state
) do
{:ok, new_live_queue} = LiveQueue.add_buffer(pad_id, buffer, live_queue)

{[], %{state | live_queue: new_live_queue}}
end

@impl true
def handle_tick(_timer_id, _context, state) do
{payload, state} = mix(Membrane.Time.milliseconds(100), state)
{[buffer: {:output, %Buffer{payload: payload}}], state}
end

@impl true
def handle_stream_format(_pad, stream_format, _context, %{stream_format: nil} = state) do
state = %{state | stream_format: stream_format}
mixer_state = initialize_mixer_state(stream_format, state)

{[stream_format: {:output, stream_format}, redemand: :output],
Karolk99 marked this conversation as resolved.
Show resolved Hide resolved
%{state | mixer_state: mixer_state}}
end

defp initialize_mixer_state(nil, _state), do: nil

defp initialize_mixer_state(stream_format, state) do
mixer_module =
if state.prevent_clipping do
if state.native_mixer, do: NativeAdder, else: ClipPreventingAdder
else
Adder
end

mixer_module.init(stream_format)
end

defp mix(duration, %{live_queue: live_queue} = state) do
{payloads, new_live_queue} = LiveQueue.get_audio(duration, live_queue)
payloads = Enum.map(payloads, fn {_audio_id, payload} -> payload end)
{payload, state} = mix_payloads(payloads, state)
{payload, %{state | live_queue: new_live_queue}}
end

defp all_streams_ended?(%{pads: pads}) do
pads
|> Enum.filter(fn {pad_name, _info} -> pad_name != :output end)
|> Enum.map(fn {_pad, %{end_of_stream?: end_of_stream?}} -> end_of_stream? end)
|> Enum.all?()
end

defp mix_payloads(payloads, %{mixer_state: %module{} = mixer_state} = state) do
{payload, mixer_state} = module.mix(payloads, mixer_state)
state = %{state | mixer_state: mixer_state}
{payload, state}
end
end