Skip to content

Commit

Permalink
Seekable source (#40)
Browse files Browse the repository at this point in the history
* Add SeekSourceEvent. Add handling of SeekSourceEvent in File.Source. Rename SeekEvent into SeekSinkEvent.
* Add NewSeekEvent - event send before the buffers from a new seek
  • Loading branch information
varsill authored Aug 1, 2023
1 parent 7319d70 commit 732915d
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 26 deletions.
6 changes: 3 additions & 3 deletions lib/membrane_file/common_file_behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Membrane.File.CommonFileBehaviour do
@moduledoc false

alias Membrane.Buffer
alias Membrane.File.SeekEvent
alias Membrane.File.SeekSinkEvent

@common_file_impl Application.compile_env(
:membrane_file_plugin,
Expand All @@ -23,9 +23,9 @@ defmodule Membrane.File.CommonFileBehaviour do
@callback write(File.io_device(), Buffer.t()) :: :ok | posix_error_t()
@callback write!(File.io_device(), Buffer.t()) :: :ok

@callback seek(File.io_device(), SeekEvent.position_t()) ::
@callback seek(File.io_device(), SeekSinkEvent.position_t()) ::
{:ok, integer()} | generic_error_t()
@callback seek!(File.io_device(), SeekEvent.position_t()) :: integer()
@callback seek!(File.io_device(), SeekSinkEvent.position_t()) :: integer()

@callback copy(File.io_device(), File.io_device()) ::
{:ok, non_neg_integer()} | generic_error_t()
Expand Down
12 changes: 12 additions & 0 deletions lib/membrane_file/new_seek_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Membrane.File.NewSeekEvent do
@moduledoc """
An event sent by the `Membrane.File.Source` with `seekable?: true` option,
right after receiving `Membrane.File.SeekSourceEvent`.
An element that steers the seekable file source with `Membrane.File.SeekSourceEvent`
can assume that all the buffers received after receiving that event are
the buffers ordered by that `Membrane.File.SeekSourceEvent`.
"""
@derive Membrane.EventProtocol

defstruct []
end
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.File.SeekEvent do
defmodule Membrane.File.SeekSinkEvent do
@moduledoc """
Event that triggers seeking or insertion to a file in `Membrane.File.Sink`.
Expand All @@ -15,6 +15,12 @@ defmodule Membrane.File.SeekEvent do
@derive Membrane.EventProtocol

@type offset_t :: integer()

@typedoc """
Specifies the position to which the seek is performed.
The meaning is the same as for the `Location` argument in https://www.erlang.org/doc/man/file.html#position-2.
"""
@type position_t :: offset_t() | {:bof | :cur | :eof, offset_t()} | :bof | :cur | :eof

@type t :: %__MODULE__{
Expand Down
31 changes: 31 additions & 0 deletions lib/membrane_file/seek_source_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Membrane.File.SeekSourceEvent do
@moduledoc """
Event that triggers seeking and reading in `Membrane.File.Source` working in
`seekable?: true` mode.
When `inspect(__MODULE__)` is received by the source, the source starts reading
data from the given position in file, specified by the `:start` field of the event's
struct. The source reads up to `size_to_read` bytes of the data from file (it can
read less if the file ends).
If the event is set with `last?: true`, once `size_to_read` bytes are read or the
file ends, the source will return `end_of_stream` action on the `:output` pad.
"""
@derive Membrane.EventProtocol

@type offset_t :: integer()

@typedoc """
Specifies the position to which the seek is performed.
The meaning is the same as for the `Location` argument in https://www.erlang.org/doc/man/file.html#position-2.
"""
@type position_t :: offset_t() | {:bof | :cur | :eof, offset_t()} | :bof | :cur | :eof

@type t :: %__MODULE__{
start: position_t(),
size_to_read: non_neg_integer() | :infinity,
last?: boolean()
}
@enforce_keys [:start, :size_to_read]
defstruct @enforce_keys ++ [last?: false]
end
8 changes: 4 additions & 4 deletions lib/membrane_file/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ defmodule Membrane.File.Sink do
@moduledoc """
Element that creates a file and stores incoming buffers there (in binary format).
When `Membrane.File.SeekEvent` is received, the element starts writing buffers starting
When `Membrane.File.SeekSinkEvent` is received, the element starts writing buffers starting
from `position`. By default, it overwrites previously stored bytes. You can set `insert?`
field of the event to `true` to start inserting new buffers without overwriting previous ones.
Please note, that inserting requires rewriting the file, what negatively impacts performance.
For more information refer to `Membrane.File.SeekEvent` moduledoc.
For more information refer to `Membrane.File.SeekSinkEvent` moduledoc.
"""
use Membrane.Sink

alias Membrane.File.SeekEvent
alias Membrane.File.SeekSinkEvent

@common_file Membrane.File.CommonFileBehaviour.get_impl()

Expand Down Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.File.Sink do
end

@impl true
def handle_event(:input, %SeekEvent{insert?: insert?, position: position}, _ctx, state) do
def handle_event(:input, %SeekSinkEvent{insert?: insert?, position: position}, _ctx, state) do
state =
if insert?,
do: split_file(state, position),
Expand Down
91 changes: 81 additions & 10 deletions lib/membrane_file/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ defmodule Membrane.File.Source do
@moduledoc """
Element that reads chunks of data from given file and sends them as buffers
through the output pad.
Can work in two modes, determined by the `seekable?` option.
"""
use Membrane.Source

alias Membrane.{Buffer, RemoteStream}
alias Membrane.File.NewSeekEvent
alias Membrane.File.SeekSourceEvent

@common_file Membrane.File.CommonFileBehaviour.get_impl()

Expand All @@ -17,17 +21,34 @@ defmodule Membrane.File.Source do
spec: pos_integer(),
default: 2048,
description: "Size of chunks being read"
],
seekable?: [
spec: boolean(),
default: false,
description: """
With `seekable?: false`, the source will start reading data from the file exactly the moment it starts
playing and will read it till the end of file, setting the `end_of_stream` action on the `:output` pad
when the reading is done.
With `seekable?: true`, the process of reading is driven by receiving `Membrane.File.SeekSourceEvent` events.
The source working in `seekable?: true` mode won't send any data before that event is received.
For more information about how to steer reading in `seekable?: true` mode, see: `Membrane.File.SeekSourceEvent`.
"""
]

def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}

@impl true
def handle_init(_ctx, %__MODULE__{location: location, chunk_size: size}) do
def handle_init(_ctx, %__MODULE__{location: location, chunk_size: size, seekable?: seekable?}) do
size_to_read = if seekable?, do: 0, else: :infinity

{[],
%{
location: Path.expand(location),
chunk_size: size,
fd: nil
fd: nil,
should_send_eos?: not seekable?,
size_to_read: size_to_read,
seekable?: seekable?
}}
end

Expand All @@ -43,6 +64,29 @@ defmodule Membrane.File.Source do
{[stream_format: {:output, %RemoteStream{type: :bytestream}}], state}
end

@impl true
def handle_event(
:output,
%SeekSourceEvent{start: seek_start, size_to_read: size_to_read, last?: last?},
_ctx,
%{seekable?: true} = state
) do
@common_file.seek!(state.fd, seek_start)

{[event: {:output, %NewSeekEvent{}}, redemand: :output],
%{state | should_send_eos?: last?, size_to_read: size_to_read}}
end

@impl true
def handle_event(
:output,
%SeekSourceEvent{},
_ctx,
%{seekable?: false}
) do
raise "Cannot handle `Membrane.File.SeekSourceEvent` in a `#{__MODULE__}` with `seekable?: false` option."
end

@impl true
def handle_demand(:output, _size, :buffers, _ctx, %{chunk_size: chunk_size} = state),
do: supply_demand(chunk_size, [redemand: :output], state)
Expand All @@ -57,19 +101,46 @@ defmodule Membrane.File.Source do
{[terminate: :normal], %{state | fd: nil}}
end

defp supply_demand(size, redemand, %{fd: fd} = state) do
actions =
case @common_file.binread!(fd, size) do
<<payload::binary>> when byte_size(payload) == size ->
[buffer: {:output, %Buffer{payload: payload}}] ++ redemand
defp supply_demand(demand_size, redemand, %{size_to_read: :infinity} = state) do
do_supply_demand(demand_size, redemand, state)
end

<<payload::binary>> when byte_size(payload) < size ->
[buffer: {:output, %Buffer{payload: payload}}, end_of_stream: :output]
defp supply_demand(_demand_size, _redemand, %{size_to_read: 0} = state) do
{[], state}
end

defp supply_demand(demand_size, redemand, %{size_to_read: size_to_read} = state) do
do_supply_demand(min(demand_size, size_to_read), redemand, state)
end

defp do_supply_demand(to_supply_size, redemand, state) do
{buffer_actions, supplied_size} =
case @common_file.binread!(state.fd, to_supply_size) do
<<payload::binary>> ->
{[buffer: {:output, %Buffer{payload: payload}}], byte_size(payload)}

:eof ->
[end_of_stream: :output]
{[], 0}
end

new_size_to_read =
if state.size_to_read == :infinity, do: :infinity, else: state.size_to_read - supplied_size

state = %{state | size_to_read: new_size_to_read}

actions =
buffer_actions ++
cond do
state.should_send_eos? and (state.size_to_read == 0 or supplied_size < to_supply_size) ->
[end_of_stream: :output]

to_supply_size == supplied_size ->
redemand

true ->
[]
end

{actions, state}
end
end
1 change: 1 addition & 0 deletions test/fixtures/input.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0123456789
98 changes: 98 additions & 0 deletions test/integration/source_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
defmodule Membrane.File.Integration.SourceTest do
use Membrane.File.IntegrationTestCaseTemplate

import Membrane.Testing.Assertions
import Membrane.ChildrenSpec

alias Membrane.Testing.Pipeline
alias Membrane.Testing.Sink
alias Membrane.File.Source
alias Membrane.File.SeekSourceEvent
alias Membrane.Buffer

defmodule Filter do
use Membrane.Filter

def_input_pad :input,
accepted_format: _,
mode: :pull,
demand_mode: :auto,
demand_unit: :bytes

def_output_pad :output, accepted_format: _, mode: :pull, demand_mode: :auto

@impl true
def handle_parent_notification(event, _context, state) do
{[event: {:input, event}], state}
end

@impl true
def handle_process(:input, buffer, _context, state) do
{[buffer: {:output, buffer}], state}
end
end

@input_text_file "test/fixtures/input.txt"

test "if seekable Source sents only the buffers requested" do
spec = [
child(:source, %Source{
location: @input_text_file,
chunk_size: 2,
seekable?: true
})
|> child(:filter, Filter)
|> child(:sink, Sink)
]

{:ok, _supervisor_pid, pipeline_pid} = Pipeline.start(structure: spec)
refute_sink_buffer(pipeline_pid, :sink, _)

Pipeline.execute_actions(pipeline_pid,
notify_child: {:filter, %SeekSourceEvent{start: 2, size_to_read: 5}}
)

assert_sink_buffer(pipeline_pid, :sink, %Buffer{payload: "23456"})

Pipeline.execute_actions(pipeline_pid,
notify_child: {:filter, %SeekSourceEvent{start: 0, size_to_read: 3}}
)

Pipeline.execute_actions(pipeline_pid,
notify_child: {:filter, %SeekSourceEvent{start: 7, size_to_read: 10}}
)

assert_sink_buffer(pipeline_pid, :sink, %Buffer{payload: "789"})

Pipeline.execute_actions(pipeline_pid,
notify_child: {:filter, %SeekSourceEvent{start: 7, size_to_read: 10, last?: true}}
)

assert_sink_buffer(pipeline_pid, :sink, %Buffer{payload: "789"})
assert_end_of_stream(pipeline_pid, :sink)
end

test "if seekable Source sents :end_of_stream for seek event with `last?: true` when all the bytes are supplied" do
Membrane.File.CommonMock.open!("test/fixtures/input.txt", [])

spec = [
child(:source, %Source{
location: @input_text_file,
chunk_size: 2,
seekable?: true
})
|> child(:filter, Filter)
|> child(:sink, Sink)
]

{:ok, _supervisor_pid, pipeline_pid} = Pipeline.start(structure: spec)
refute_sink_buffer(pipeline_pid, :sink, _)

Pipeline.execute_actions(pipeline_pid,
notify_child: {:filter, %SeekSourceEvent{start: 0, size_to_read: 5, last?: true}}
)

assert_sink_buffer(pipeline_pid, :sink, %Buffer{payload: "01234"})
assert_end_of_stream(pipeline_pid, :sink)
end
end
2 changes: 1 addition & 1 deletion test/membrane_file/sink_source_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Membrane.File.SinkSourceIntegrationTest do

actions = [
{:buffer, {:output, %Buffer{payload: second_part}}},
{:event, {:output, %MbrFile.SeekEvent{position: :bof, insert?: true}}},
{:event, {:output, %MbrFile.SeekSinkEvent{position: :bof, insert?: true}}},
{:buffer, {:output, %Buffer{payload: first_part}}},
{:end_of_stream, :output}
]
Expand Down
Loading

0 comments on commit 732915d

Please sign in to comment.