Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

Revert "[RTC-116] Update to core v0.11 (#211)" #243

Merged
merged 1 commit into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ jobs:
- run: mix format --check-formatted
- run: mix compile --force --warnings-as-errors
- run: mix credo
- run: mix dialyzer
- run: mix docs && mix docs 2>&1 | (! grep -q "warning:")

integration_test:
Expand Down
3 changes: 0 additions & 3 deletions config/config.exs

This file was deleted.

3 changes: 0 additions & 3 deletions config/dev.exs

This file was deleted.

1 change: 0 additions & 1 deletion config/prod.exs

This file was deleted.

1 change: 0 additions & 1 deletion config/test.exs

This file was deleted.

94 changes: 50 additions & 44 deletions guides/custom_endpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ Pads should be in the following form
```elixir
def_input_pad :input,
demand_unit: :buffers,
stream_format: Membrane.RTP,
caps: Membrane.RTP,
availability: :on_request

def_output_pad :output,
demand_unit: :buffers,
stream_format: Membrane.RTP,
caps: Membrane.RTP,
availability: :on_request
```

Expand Down Expand Up @@ -73,17 +73,17 @@ defmodule RecordingEndpoint do

def_input_pad :input,
demand_unit: :buffers,
accepted_format: _any,
caps: :any,
availability: :on_request

@impl true
def handle_init(_ctx, opts) do
def handle_init(opts) do
state = %{
output_dir_path: opts.output_dir_path,
rtc_engine: opts.rtc_engine,
directory_path: opts.directory_path,
rtc_engine: opts.rtc_engine
}

{[], state}
{:ok, state}
end
end
```
Expand All @@ -103,40 +103,46 @@ used for serializing incoming Membrane stream

For each incoming track we will create separate serializer and sink.

## Subscribing to a track
## Subscribing for a track

Whenever some endpoint publishes its tracks, all other endpoints receive a message
in form of `{:new_tracks, tracks}` where `tracks` is a list of `t:Membrane.RTC.Engine.Track.t/0`.

To subscribe to any of the published tracks, an endpoint has to call `Membrane.RTC.Engine.subscribe/4`.

After subscribing to a track, the endpoint will be notified about its readiness in
After subscribing for a track the endpoint will be notified about its readiness in
`c:Membrane.Bin.handle_pad_added/3` callback. An example implementation of `handle_pad_added`
callback can look like this

```elixir
@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
structure = [
bin_input(pad)
|> child({:track_receiver, track_id}, %TrackReceiver{
children = %{
{:track_receiver, track_id} => %TrackReceiver{
track: Map.fetch!(state.tracks, track_id),
initial_target_variant: :high
})
|> child({:serializer, track_id}, Membrane.Stream.Serializer)
|> child({:sink, track_id}, %Membrane.File.Sink{
},
{:serializer, track_id} => Membrane.Stream.Serializer,
{:sink, track_id} => %Membrane.File.Sink{
location: Path.join(state.output_dir_path, track_id)
})
}
}

links = [
link_bin_input(pad)
|> to({:track_receiver, track_id})
|> to({:serializer, track_id})
|> to({:sink, track_id})
]

{[spec: structure], state}
{{:ok, spec: %ParentSpec{children: children, links: links}}, state}
end
```

We simply link our input pad on which we will receive media data to the track receiver,
serializer and then to the sink.

The endpoint will be also notified when some tracks it subscribed to are removed with
The endpoint will be also notified when some tracks it subscribed for are removed with
`{:removed_tracks, tracks}` message.

> #### Deeper dive into TrackReciver {: .tip}
Expand Down Expand Up @@ -195,64 +201,70 @@ defmodule RecordingEndpoint do

def_input_pad :input,
demand_unit: :buffers,
accepted_format: _any,
caps: :any,
availability: :on_request

@impl true
def handle_init(_ctx, opts) do
def handle_init(opts) do
state = %{
output_dir_path: opts.output_dir_path,
rtc_engine: opts.rtc_engine,
tracks: %{}
}

{[], state}
{:ok, state}
end

@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
structure = [
bin_input(pad)
|> child({:track_receiver, track_id}, %TrackReceiver{
children = %{
{:track_receiver, track_id} => %TrackReceiver{
track: Map.fetch!(state.tracks, track_id),
initial_target_variant: :high
})
|> child({:serializer, track_id}, Membrane.Stream.Serializer)
|> child({:sink, track_id}, %Membrane.File.Sink{
},
{:serializer, track_id} => Membrane.Stream.Serializer,
{:sink, track_id} => %Membrane.File.Sink{
location: Path.join(state.output_dir_path, track_id)
})
}
}

links = [
link_bin_input(pad)
|> to({:track_receiver, track_id})
|> to({:serializer, track_id})
|> to({:sink, track_id})
]

{[spec: structure], state}
{{:ok, spec: %ParentSpec{children: children, links: links}}, state}
end

@impl true
def handle_pad_removed(Pad.ref(:input, track_id), _ctx, state) do
def handle_pad_removed(Pad.ref(:output, track_id), _ctx, state) do
children = [
{:track_receiver, track_id},
{:serializer, track_id},
{:serialized, track_id},
{:sink, track_id}
]

{[remove_child: children], state}
{{:ok, remove_child: children}, state}
end

@impl true
def handle_parent_notification({:new_tracks, tracks}, ctx, state) do
def handle_other({:new_tracks, tracks}, ctx, state) do
{:endpoint, endpoint_id} = ctx.name

Enum.reduce_while(tracks, {[], state}, fn track, {[], state} ->
Enum.reduce_while(tracks, {:ok, state}, fn track, {:ok, state} ->
case Engine.subscribe(state.rtc_engine, endpoint_id, track.id) do
:ok ->
{:cont, {[], update_in(state, [:tracks], &Map.put(&1, track.id, track))}}
{:cont, {:ok, update_in(state, [:tracks], &Map.put(&1, track.id, track))}}

{:error, :invalid_track_id} ->
Membrane.Logger.warn("""
Couldn't subscribe to the track: #{inspect(track.id)}. No such track.
It had to be removed just after publishing it. Ignoring.
""")

{:cont, {[], state}}
{:cont, {:ok, state}}

{:error, reason} ->
raise "Couldn't subscribe to the track: #{inspect(track.id)}. Reason: #{inspect(reason)}"
Expand All @@ -261,14 +273,8 @@ defmodule RecordingEndpoint do
end

@impl true
def handle_parent_notification({:remove_tracks, _tracks}, _ctx, state) do
{[], state}
end

@impl true
def handle_parent_notification(_msg, _ctx, state) do
# ignore all other notifications
{[], state}
def handle_other({:remove_tracks, []}, _ctx, state) do
{:ok, state}
end
end
```
Loading