Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

[RTC-116] Move to core 0.11/live dev #244

Merged
merged 29 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b67ebd5
update to core v0.11
mat-hek Dec 15, 2022
d39fa0b
make tests pass
mat-hek Dec 20, 2022
58d2837
make dialyzer pass
mat-hek Dec 22, 2022
f18f0a8
add dialyzer to CI
mat-hek Dec 22, 2022
4874019
fix deps
mat-hek Dec 22, 2022
69fde9c
temporairly remove HLS from guide
mat-hek Dec 23, 2022
47863b3
fix integration tests
mat-hek Dec 23, 2022
9bbd697
Merge branch 'master' into core-v0.11
mat-hek Jan 12, 2023
686e364
fix integration tests
mat-hek Jan 12, 2023
60aef08
fix leftovers
mat-hek Jan 12, 2023
a0e9de1
Upgrade HLS to core 0.11 (#223)
mickel8 Jan 21, 2023
b8c89b7
Merge remote-tracking branch 'origin/master' into core-v0.11
mickel8 Jan 21, 2023
8baa145
Format config files
mickel8 Jan 21, 2023
e8da32f
Fix dialyzer
mickel8 Jan 21, 2023
cb140b0
Uncomment HLS endpoint in custom_endpoints.md
mickel8 Jan 21, 2023
6fb0140
Update custom_endpoint guide
mickel8 Jan 21, 2023
cb037e6
Fix some cleanups
mickel8 Jan 23, 2023
7313733
Update deps
mickel8 Jan 23, 2023
27078e4
Merge remote-tracking branch 'origin/master' into core-v0.11
mickel8 Feb 7, 2023
74f188b
Fix tests
mickel8 Feb 7, 2023
60d1690
notify -> notify_parent
mickel8 Feb 7, 2023
d3d88da
Fix returned value on pipeline creation
bblaszkow06 Feb 1, 2023
e9c935d
Fix tests
Karolk99 Feb 14, 2023
68a0e8c
Fix integration tests
mickel8 Feb 16, 2023
3a1c3be
Merge remote-tracking branch 'origin/master' into core-v0.11
mickel8 Feb 16, 2023
9590e03
Fix removing endpoint
mickel8 Feb 16, 2023
d55ed83
Fix sending paddings
mickel8 Feb 16, 2023
0512d7c
Membrane Live dev (#188)
Karolk99 Feb 16, 2023
c7ed933
Merge remote-tracking branch 'origin/master' into core-v0.11
mickel8 Feb 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
- run: mix format --check-formatted
- run: mix compile --force --warnings-as-errors
- run: mix credo
- run: mix dialyzer
- run: mix docs && mix docs 2>&1 | (! grep -q "warning:")

integration_test:
Expand Down
3 changes: 3 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import Config

import_config "#{config_env()}.exs"
3 changes: 3 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import Config

config :membrane_telemetry_metrics, :enabled, true
1 change: 1 addition & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

94 changes: 44 additions & 50 deletions guides/custom_endpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ Pads should be in the following form
```elixir
def_input_pad :input,
demand_unit: :buffers,
caps: Membrane.RTP,
stream_format: Membrane.RTP,
availability: :on_request

def_output_pad :output,
demand_unit: :buffers,
caps: Membrane.RTP,
stream_format: Membrane.RTP,
availability: :on_request
```

Expand Down Expand Up @@ -73,17 +73,17 @@ defmodule RecordingEndpoint do

def_input_pad :input,
demand_unit: :buffers,
caps: :any,
accepted_format: _any,
availability: :on_request

@impl true
def handle_init(opts) do
def handle_init(_ctx, opts) do
state = %{
directory_path: opts.directory_path,
rtc_engine: opts.rtc_engine
output_dir_path: opts.output_dir_path,
rtc_engine: opts.rtc_engine,
}

{:ok, state}
{[], state}
end
end
```
Expand All @@ -103,46 +103,40 @@ used for serializing incoming Membrane stream

For each incoming track we will create separate serializer and sink.

## Subscribing for a track
## Subscribing to a track

Whenever some endpoint publishes its tracks, all other endpoints receive a message
in form of `{:new_tracks, tracks}` where `tracks` is a list of `t:Membrane.RTC.Engine.Track.t/0`.

To subscribe to any of the published tracks, an endpoint has to call `Membrane.RTC.Engine.subscribe/4`.

After subscribing for a track the endpoint will be notified about its readiness in
After subscribing to a track, the endpoint will be notified about its readiness in
`c:Membrane.Bin.handle_pad_added/3` callback. An example implementation of `handle_pad_added`
callback can look like this

```elixir
@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
children = %{
{:track_receiver, track_id} => %TrackReceiver{
structure = [
bin_input(pad)
|> child({:track_receiver, track_id}, %TrackReceiver{
track: Map.fetch!(state.tracks, track_id),
initial_target_variant: :high
},
{:serializer, track_id} => Membrane.Stream.Serializer,
{:sink, track_id} => %Membrane.File.Sink{
})
|> child({:serializer, track_id}, Membrane.Stream.Serializer)
|> child({:sink, track_id}, %Membrane.File.Sink{
location: Path.join(state.output_dir_path, track_id)
}
}

links = [
link_bin_input(pad)
|> to({:track_receiver, track_id})
|> to({:serializer, track_id})
|> to({:sink, track_id})
})
]

{{:ok, spec: %ParentSpec{children: children, links: links}}, state}
{[spec: structure], state}
end
```

We simply link our input pad on which we will receive media data to the track receiver,
serializer and then to the sink.

The endpoint will be also notified when some tracks it subscribed for are removed with
The endpoint will be also notified when some tracks it subscribed to are removed with
`{:removed_tracks, tracks}` message.

> #### Deeper dive into TrackReciver {: .tip}
Expand Down Expand Up @@ -201,70 +195,64 @@ defmodule RecordingEndpoint do

def_input_pad :input,
demand_unit: :buffers,
caps: :any,
accepted_format: _any,
availability: :on_request

@impl true
def handle_init(opts) do
def handle_init(_ctx, opts) do
state = %{
output_dir_path: opts.output_dir_path,
rtc_engine: opts.rtc_engine,
tracks: %{}
}

{:ok, state}
{[], state}
end

@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
children = %{
{:track_receiver, track_id} => %TrackReceiver{
structure = [
bin_input(pad)
|> child({:track_receiver, track_id}, %TrackReceiver{
track: Map.fetch!(state.tracks, track_id),
initial_target_variant: :high
},
{:serializer, track_id} => Membrane.Stream.Serializer,
{:sink, track_id} => %Membrane.File.Sink{
})
|> child({:serializer, track_id}, Membrane.Stream.Serializer)
|> child({:sink, track_id}, %Membrane.File.Sink{
location: Path.join(state.output_dir_path, track_id)
}
}

links = [
link_bin_input(pad)
|> to({:track_receiver, track_id})
|> to({:serializer, track_id})
|> to({:sink, track_id})
})
]

{{:ok, spec: %ParentSpec{children: children, links: links}}, state}
{[spec: structure], state}
end

@impl true
def handle_pad_removed(Pad.ref(:output, track_id), _ctx, state) do
def handle_pad_removed(Pad.ref(:input, track_id), _ctx, state) do
children = [
{:track_receiver, track_id},
{:serialized, track_id},
{:serializer, track_id},
{:sink, track_id}
]

{{:ok, remove_child: children}, state}
{[remove_child: children], state}
end

@impl true
def handle_other({:new_tracks, tracks}, ctx, state) do
def handle_parent_notification({:new_tracks, tracks}, ctx, state) do
{:endpoint, endpoint_id} = ctx.name

Enum.reduce_while(tracks, {:ok, state}, fn track, {:ok, state} ->
Enum.reduce_while(tracks, {[], state}, fn track, {[], state} ->
case Engine.subscribe(state.rtc_engine, endpoint_id, track.id) do
:ok ->
{:cont, {:ok, update_in(state, [:tracks], &Map.put(&1, track.id, track))}}
{:cont, {[], update_in(state, [:tracks], &Map.put(&1, track.id, track))}}

{:error, :invalid_track_id} ->
Membrane.Logger.warn("""
Couldn't subscribe to the track: #{inspect(track.id)}. No such track.
It had to be removed just after publishing it. Ignoring.
""")

{:cont, {:ok, state}}
{:cont, {[], state}}

{:error, reason} ->
raise "Couldn't subscribe to the track: #{inspect(track.id)}. Reason: #{inspect(reason)}"
Expand All @@ -273,8 +261,14 @@ defmodule RecordingEndpoint do
end

@impl true
def handle_other({:remove_tracks, []}, _ctx, state) do
{:ok, state}
def handle_parent_notification({:remove_tracks, _tracks}, _ctx, state) do
{[], state}
end

@impl true
def handle_parent_notification(_msg, _ctx, state) do
# ignore all other notifications
{[], state}
end
end
```
Loading