Skip to content

Commit

Permalink
Make sure all encoded data is flushed in pipe-based outputs.
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Aug 17, 2023
1 parent 2d169cd commit 66aa92c
Show file tree
Hide file tree
Showing 23 changed files with 410 additions and 385 deletions.
137 changes: 70 additions & 67 deletions src/core/outputs/pipe_output.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ class virtual base ~source ~name p =

val mutable encoder = None
val mutable current_metadata = None
method virtual start : unit
method virtual stop : unit

method virtual private encoder_factory
: string -> Frame.Metadata.Export.t -> Encoder.encoder

method start =
method create_encoder =
let enc = self#encoder_factory self#id in
let meta =
match current_metadata with
Expand All @@ -68,24 +70,28 @@ class virtual base ~source ~name p =

(* Make sure to call stop on the encoder to close any open
connection. *)
method stop =
method close_encoder =
match encoder with
| None -> ()
| None -> Strings.empty
| Some enc ->
(try ignore (enc.Encoder.stop ()) with _ -> ());
encoder <- None
let flushed = try enc.Encoder.stop () with _ -> Strings.empty in
encoder <- None;
flushed

method! reset = ()

method encode frame ofs len =
let enc = Option.get encoder in
enc.Encoder.encode frame ofs len
match encoder with
| None -> Strings.empty
| Some encoder -> encoder.Encoder.encode frame ofs len

method virtual write_pipe : string -> int -> int -> unit
method send b = Strings.iter self#write_pipe b

method insert_metadata m =
ignore (Option.map (fun e -> e.Encoder.insert_metadata m) encoder)
match encoder with
| None -> ()
| Some encoder -> encoder.Encoder.insert_metadata m
end

(** url output: discard encoded data, try to restart on encoding error (can be
Expand Down Expand Up @@ -147,54 +153,49 @@ class url_output p =
let self_sync = Lang.to_bool (List.assoc "self_sync" p) in
let name = "output.url" in
object (self)
inherit base p ~source ~name as super
inherit base p ~source ~name as base
method private encoder_factory = encoder_factory ~format format_val
val mutable restart_time = 0.

method! start =
method can_connect = restart_time <= Unix.gettimeofday ()

method on_error ~bt exn =
(try ignore self#close_encoder with _ -> ());
Utils.log_exception ~log:self#log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while connecting: %s" (Printexc.to_string exn));
on_error ~bt exn;
match restart_delay with
| None -> Printexc.raise_with_backtrace exn bt
| Some delay ->
restart_time <- Unix.gettimeofday () +. delay;
self#log#important "Will try again in %.02f seconds." delay

method connect =
match encoder with
| None when restart_time <= Unix.gettimeofday () -> (
| None when self#can_connect -> (
try
super#start;
self#create_encoder;
on_start ()
with exn -> (
with exn ->
let bt = Printexc.get_raw_backtrace () in
Utils.log_exception ~log:self#log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while connecting: %s"
(Printexc.to_string exn));
on_error ~bt exn;
match restart_delay with
| None -> Printexc.raise_with_backtrace exn bt
| Some delay ->
restart_time <- Unix.gettimeofday () +. delay;
self#log#important "Will try again in %.02f seconds." delay)
)
self#on_error ~bt exn)
| _ -> ()

method start = self#connect
method stop = ignore self#close_encoder

method! encode frame ofs len =
try
match encoder with
| None when restart_time <= Unix.gettimeofday () ->
super#start;
on_start ();
super#encode frame ofs len
| None when self#can_connect ->
self#connect;
base#encode frame ofs len
| None -> Strings.empty
| Some _ -> super#encode frame ofs len
with exn -> (
| Some _ -> base#encode frame ofs len
with exn ->
let bt = Printexc.get_raw_backtrace () in
Utils.log_exception ~log:self#log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while encoding data: %s"
(Printexc.to_string exn));
on_error ~bt exn;
match restart_delay with
| None -> Printexc.raise_with_backtrace exn bt
| Some delay ->
self#stop;
restart_time <- Unix.gettimeofday () +. delay;
self#log#important "Will try again in %.02f seconds." delay;
Strings.empty)
self#on_error ~bt exn;
Strings.empty

method write_pipe _ _ _ = ()
method! self_sync = (`Static, self_sync)
Expand Down Expand Up @@ -303,7 +304,7 @@ class virtual piped_output ~name p =
let on_reopen = List.assoc "on_reopen" p in
let on_reopen () = ignore (Lang.apply on_reopen []) in
object (self)
inherit base ~source ~name p as super
inherit base ~source ~name p as base
val mutable open_date = 0.
val need_reopen = Atomic.make false
method need_reopen = Atomic.set need_reopen true
Expand All @@ -324,25 +325,20 @@ class virtual piped_output ~name p =
method prepare_pipe =
self#open_pipe;
open_date <- Unix.gettimeofday ();
Atomic.set need_reopen false
Atomic.set need_reopen false;
self#create_encoder
method private close_encoder =
match encoder with
| None -> ()
| Some encoder ->
let flush = encoder.Encoder.stop () in
super#send flush
method! stop =
method cleanup_pipe =
if self#is_open then (
self#close_encoder;
self#close_pipe);
super#stop
base#send self#close_encoder;
self#close_pipe)
method start = self#prepare_pipe
method stop = self#cleanup_pipe
method reopen =
self#log#important "Re-opening output pipe.";
self#close_encoder;
self#close_pipe;
self#cleanup_pipe;
self#prepare_pipe;
on_reopen ()
Expand All @@ -358,13 +354,10 @@ class virtual piped_output ~name p =
(Printexc.to_string exn) reopen_delay
method! output =
try super#output
try base#output
with exn ->
let bt = Printexc.get_raw_backtrace () in
(try
self#close_encoder;
self#close_pipe
with _ -> ());
(try self#cleanup_pipe with _ -> ());
self#reopen_on_error ~bt exn
method! send b =
Expand All @@ -378,12 +371,12 @@ class virtual piped_output ~name p =
Atomic.set need_reopen true;
open_date <- Unix.gettimeofday ()
| _ -> ());
if self#is_open then super#send b
if self#is_open then base#send b
method! insert_metadata metadata =
if reopen_on_metadata (Frame.Metadata.Export.to_metadata metadata) then
self#reopen;
super#insert_metadata metadata
base#insert_metadata metadata
end
(** Out channel virtual class: takes care of current out channel and writing to
Expand Down Expand Up @@ -478,7 +471,8 @@ class virtual ['a] file_output_base p =
method close_chan fd =
try
self#close_out fd;
self#on_close (Option.get (Atomic.get current_filename));
(try self#on_close (Option.get (Atomic.get current_filename))
with _ -> ());
Atomic.set current_filename None
with Sys_error _ as exn ->
let bt = Printexc.get_raw_backtrace () in
Expand Down Expand Up @@ -507,9 +501,11 @@ class file_output ~format_val p =
class file_output_using_encoder ~format_val p =
let format = Lang.to_format format_val in
let append = Lang.to_bool (List.assoc "append" p) in
let on_close = List.assoc "on_close" p in
let on_close s = Lang.to_unit (Lang.apply on_close [("", Lang.string s)]) in
let p = ("append", Lang.bool true) :: List.remove_assoc "append" p in
object (self)
inherit piped_output ~name:"output.file" p
inherit piped_output ~name:"output.file" p as base
inherit [unit] chan_output p
inherit [unit] file_output_base p
Expand All @@ -521,10 +517,17 @@ class file_output_using_encoder ~format_val p =
method encoder_factory name meta =
(* Make sure the file is created with the right perms. *)
let filename, mode, perm = self#prepare_filename in
Atomic.set current_filename (Some filename);
self#open_out_gen mode perm filename;
let format = Encoder.with_file_output ~append format filename in
encoder_factory ~format format_val name meta
method! close_encoder =
let ret = base#close_encoder in
(try on_close (Option.get (Atomic.get current_filename)) with _ -> ());
Atomic.set current_filename None;
ret
method output_substring () _ _ _ = ()
method flush () = ()
method close_out () = ()
Expand Down
2 changes: 1 addition & 1 deletion tests/media/encoder.liq.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ s = once(@SOURCE@(duration=0.5))
#clock.assign_new(sync="none",[s])

output.file(
fallible=true,on_stop=test.pass,
fallible=true,on_close=fun (_) -> test.pass(),
@FORMAT@,file,s)
6 changes: 3 additions & 3 deletions tests/media/ffmpeg_add_text.liq
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ s.on_track(on_track)

#clock.assign_new(id='test_clock',sync='none',[s])

def on_done () =
def on_close(filename) =
if !started then
ojson = process.read("ffprobe -v quiet -print_format json -show_streams #{process.quote(out)}")
ojson = process.read("ffprobe -v quiet -print_format json -show_streams #{process.quote(filename)}")

output_format = json.parse(default=[("streams", [[("samplerate", "0")]])], ojson)

Expand Down Expand Up @@ -78,4 +78,4 @@ def on_done () =
end
end

output.file(fallible=true, on_stop=on_done, %ffmpeg(format="mkv",%audio(codec="aac"),%video(codec="libx264")), out, s)
output.file(fallible=true, on_close=on_close, %ffmpeg(format="mkv",%audio(codec="aac"),%video(codec="libx264")), out, s)
10 changes: 5 additions & 5 deletions tests/media/ffmpeg_audio_decoder.liq
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ s = sequence([s, s, once(s)])

#clock.assign_new(sync='none',[s])

def on_done () =
thread.run(delay=0.2, fun () -> begin
j = process.read("ffprobe -v quiet -print_format json -show_streams #{process.quote(out)}")
def on_close(filename) =
if not test.done() then
j = process.read("ffprobe -v quiet -print_format json -show_streams #{process.quote(filename)}")

let json.parse ( parsed: {
streams: [{
Expand All @@ -33,7 +33,7 @@ def on_done () =
else
test.fail()
end
end)
end
end

output.file(fallible=true, on_stop=on_done, %wav(mono), out, s)
output.file(fallible=true, on_close=on_close, %wav(mono), out, s)
54 changes: 28 additions & 26 deletions tests/media/ffmpeg_copy_and_encode_decoder.liq
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,34 @@ s = once(s)

#clock.assign_new(sync='none',[s])

def on_done () =
j = process.read("ffprobe -v quiet -print_format json -show_streams #{process.quote(out)}")

let json.parse ( parsed : {
streams: [{
channel_layout: string?,
sample_rate: string?,
sample_fmt: string?,
codec_name: string?,
pix_fmt: string?
}]
}) = j

video_stream = list.find((fun (stream) -> null.defined(stream.pix_fmt)), parsed.streams)
audio_stream = list.find((fun (stream) -> null.defined(stream.sample_rate)), parsed.streams)

if null.get(video_stream.codec_name) == "h264" and
null.get(video_stream.pix_fmt) == "yuv420p" and
null.get(audio_stream.channel_layout) == "stereo" and
null.get(audio_stream.codec_name) == "aac" and
null.get(audio_stream.sample_fmt) == "fltp" and
null.get(audio_stream.sample_rate) == "44100" then
test.pass()
else
test.fail()
def on_close (filename) =
if not test.done() then
j = process.read("ffprobe -v quiet -print_format json -show_streams #{process.quote(filename)}")

let json.parse ( parsed : {
streams: [{
channel_layout: string?,
sample_rate: string?,
sample_fmt: string?,
codec_name: string?,
pix_fmt: string?
}]
}) = j

video_stream = list.find((fun (stream) -> null.defined(stream.pix_fmt)), parsed.streams)
audio_stream = list.find((fun (stream) -> null.defined(stream.sample_rate)), parsed.streams)

if null.get(video_stream.codec_name) == "h264" and
null.get(video_stream.pix_fmt) == "yuv420p" and
null.get(audio_stream.channel_layout) == "stereo" and
null.get(audio_stream.codec_name) == "aac" and
null.get(audio_stream.sample_fmt) == "fltp" and
null.get(audio_stream.sample_rate) == "44100" then
test.pass()
else
test.fail()
end
end
end

output.file(fallible=true, on_stop=on_done, %ffmpeg(format="mkv",%audio(codec="aac"),%video.copy), out, s)
output.file(fallible=true, on_close=on_close, %ffmpeg(format="mkv",%audio(codec="aac"),%video.copy), out, s)
Loading

0 comments on commit 66aa92c

Please sign in to comment.