Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

live audio mixer #30

Merged
merged 12 commits into from
May 18, 2023
Merged
2 changes: 1 addition & 1 deletion lib/membrane_audio_mixer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ defmodule Membrane.AudioMixer do
_pad,
%Membrane.RemoteStream{} = _input_stream_format,
_context,
%{input_stream_format: nil} = _state
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
%{stream_format: nil} = _state
) do
raise """
You need to specify `stream_format` in options if `Membrane.RemoteStream` will be received on the `:input` pad
Expand Down
165 changes: 84 additions & 81 deletions lib/membrane_audio_mixer/live_queue.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
defmodule Membrane.AudioMixer.LiveQueue do
DominikWolek marked this conversation as resolved.
Show resolved Hide resolved
@moduledoc """
This module provides a library for audio mixers that work with live streams.
The LiveQueue stores live audio streams so users don't have to worry about lost or late audio packets.

The LiveQueue has a global time (`current_time`) which represents the beginning of all queues. When a buffer is added to a queue, based on `current_time` and the queue's size LiveQueue adds a certain part of the buffer, there are three options:
* buffer is to old - in this case whole buffer is dropped and queue is the same as before adding
* buffer is partly to old - in this case the part of the buffer that is to old is dropped and the rest is added to the queue.
* buffer is "fresh" - in this case `LiveQueue` checks if there is an "empty space" between beginning of the buffer and the end of the queue, if there is `LiveQueue` will fill it with silence and than will add the buffer.

Removing queue is simple, if queue is empty it will be removed right away, otherwise it will be marked as finished and will be removed when it gets empty.
There are a lot of problems that a mixer can encounter while processing live audio streams.
* packets can be lost which will result in the lack of continuity of the stream.
* sometimes there is a problem with communication and there won’t be any packets from some streamer for a few seconds.
* In the live stream, we want to be able to force some maximum latency and that will result in some packets that will be too old and has to be dropped.
The LiveQueue is a tool for storing audio streams that handle all the problems mentioned above.
The LIveQueue has an independent queue for each stream, all the “holes” caused by late or dropped packets are filled with silence.
If there is a need for more audio than there is in a queue, the missing part will be replaced by silence.
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
"""
alias Membrane.AudioMixer.LiveQueue.Membrane.AudioMixer.LiveQueue.Queue
alias Membrane.RawAudio

defmodule Queue do
Expand All @@ -21,141 +20,145 @@ defmodule Membrane.AudioMixer.LiveQueue do
buffer: binary(),
buffer_duration: non_neg_integer(),
offset: non_neg_integer(),
finished?: boolean()
draining?: boolean()
}

defstruct buffer: <<>>, buffer_duration: 0, offset: 0, finished?: false
defstruct buffer: <<>>, buffer_duration: 0, offset: 0, draining?: false
end

@opaque state_t() :: %{
@opaque t() :: %{
queues: %{any() => Queue.t()},
current_time: non_neg_integer(),
stream_format: RawAudio.t()
}

@spec init(RawAudio.t()) :: state_t()
@spec init(RawAudio.t()) :: t()
def init(stream_format),
do: %{queues: %{}, current_time: 0, stream_format: stream_format}

@spec add_queue(state_t(), any(), non_neg_integer()) :: {:ok, state_t()} | {:error, String.t()}
def add_queue(state, id, offset \\ 0)
@spec add_queue(t(), any(), non_neg_integer()) :: t()
def add_queue(lq, id, offset \\ 0)

def add_queue(_state, _id, offset) when offset < 0,
do: {:error, "Offset has to be a `non_neg_integer`"}
def add_queue(lq, id, offset) when offset >= 0 do
if get_in(lq, [:queues, id]) != nil, do: raise("Queue with id: '#{id}' already exists.")

def add_queue(state, id, offset) do
if get_in(state, [:queues, id]) == nil do
queue = %Queue{offset: offset}
state = put_in(state, [:queues, id], queue)
{:ok, state}
else
{:error, "Queue with id: '#{id}' already exists."}
end
queue = %Queue{offset: offset}
put_in(lq, [:queues, id], queue)
end

@spec remove_queue(state_t(), any()) :: {:ok, state_t()}
def remove_queue(state, id) do
if get_in(state, [:queues, id]) != nil do
queue = state.queues[id]
@doc """
If the queue is empty, it will be removed right away.
Otherwise, it will be marked as `draining` and will be removed when it will get empty.
"""
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
@spec remove_queue(t(), any()) :: t()
def remove_queue(lq, id) do
if get_in(lq, [:queues, id]) != nil do
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
queue = lq.queues[id]

cond do
queue.finished? ->
{:error, "Queue with id: '#{id}' is already marked as finished"}
queue.draining? ->
raise "Queue with id: '#{id}' is already marked as draining"

queue.buffer_duration == 0 ->
{_queue, state} = pop_in(state, [:queues, id])
{:ok, state}
{_queue, lq} = pop_in(lq, [:queues, id])
lq

true ->
state = update_in(state, [:queues, id], &Map.put(&1, :finished?, true))
{:ok, state}
lq = update_in(lq, [:queues, id], &Map.put(&1, :draining?, true))
lq
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
end
else
{:error, "Queue with id: '#{id}' doesn't exists"}
raise "Queue with id: '#{id}' doesn't exists"
end
end

@spec add_buffer(state_t(), any(), Membrane.Buffer.t()) ::
{:ok, state_t()} | {:error, state_t()}
def add_buffer(state, id, buffer) do
if get_in(state, [:queues, id]) == nil,
do: {:error, "Queue with id: #{id} doesn't exist."},
else: do_add_buffer(state, id, buffer)
@spec all_queues_empty?(t()) :: boolean
def all_queues_empty?(%{queues: queues}),
do:
Enum.all?(queues, fn
{_key, %{buffer_duration: 0}} -> true
{_key, _queue} -> false
end)

@spec get_audio(t(), pos_integer()) :: {[{any(), binary()}], t()}
def get_audio(%{current_time: current_time} = lq, duration) do
{audios, new_lq} =
Enum.map_reduce(lq.queues, lq, fn {id, queue}, acc_lq ->
{audio, new_queue} = get_duration(lq, queue, duration)
new_lq = put_in(acc_lq, [:queues, id], new_queue)
{{id, audio}, new_lq}
end)

new_queues =
new_lq.queues
|> Enum.filter(fn
{_key, %{draining?: true, buffer_duration: 0}} -> false
_queue -> true
end)
|> Map.new()

{audios, %{new_lq | queues: new_queues, current_time: current_time + duration}}
end

defp do_add_buffer(
%{
stream_format: stream_format,
current_time: current_time,
queues: queues
} = state,
id,
%{pts: pts, payload: payload}
) do
queue = queues[id]
@doc """
When a buffer is too old it will be dropped
When part of a buffer is too old, the part of the buffer that is too old will be dropped and the rest will be added to the back of the queue.
When a buffer is "fresh", the whole buffer will be added.
In the case of a “whole” between the end of the queue and the beginning of the buffer, the difference will be filled with silence.

The state of the buffer, whether it's too old or not, is based on LiveQueue's `current_time`.
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
"""
@spec add_buffer(t(), any(), Membrane.Buffer.t()) :: t()
def add_buffer(
%{
stream_format: stream_format,
current_time: current_time,
queues: queues
} = lq,
id,
%{pts: pts, payload: payload}
) do
queue = Map.fetch!(queues, id)
pts = pts + queue.offset
payload_duration = RawAudio.bytes_to_time(byte_size(payload), stream_format)
end_pts = pts + payload_duration
queue_ts = current_time + queue.buffer_duration

case {pts > queue_ts, end_pts > queue_ts} do
{false, false} ->
{:ok, state}

{false, true} ->
drop_duration = queue_ts - pts
drop_bytes = RawAudio.time_to_bytes(drop_duration, stream_format)
<<_rest::binary-size(drop_bytes), to_add::binary>> = payload

to_add_duration = payload_duration - drop_duration

new_state =
update_in(state, [:queues, id], fn queue ->
new_lq =
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
update_in(lq, [:queues, id], fn queue ->
queue
|> Map.update!(:buffer, &(&1 <> to_add))
|> Map.update!(:buffer_duration, &(&1 + to_add_duration))
end)

{:ok, new_state}
new_lq

{true, true} ->
silence_duration = pts - queue_ts
silence = RawAudio.silence(stream_format, silence_duration)

new_state =
update_in(state, [:queues, id], fn queue ->
new_lq =
update_in(lq, [:queues, id], fn queue ->
queue
|> Map.update!(:buffer, &(&1 <> silence <> payload))
|> Map.update!(:buffer_duration, &(&1 + silence_duration + payload_duration))
end)

{:ok, new_state}
new_lq

_else ->
{:error, state}
lq
end
end

@spec get_audio(state_t(), pos_integer()) :: {[{any(), binary()}], state_t()}
def get_audio(%{current_time: current_time} = state, duration) do
{audios, new_state} =
Enum.map_reduce(state.queues, state, fn {id, queue}, acc_state ->
{audio, new_queue} = get_duration(state, queue, duration)
new_state = put_in(acc_state, [:queues, id], new_queue)
{{id, audio}, new_state}
end)

new_queues =
new_state.queues
|> Enum.filter(fn
{_key, %{finished?: true, buffer_duration: 0}} -> false
_queue -> true
end)
|> Map.new()

{audios, %{new_state | queues: new_queues, current_time: current_time + duration}}
end

defp get_duration(%{stream_format: stream_format}, queue, duration) do
if queue.buffer_duration < duration do
audio = queue.buffer <> RawAudio.silence(stream_format, duration - queue.buffer_duration)
Expand Down
Loading