Skip to content

Commit

Permalink
Prevent multiple call to filename function in output.file (#2922)
Browse files Browse the repository at this point in the history
* Use `Atomic` for values potentially acessed concurrently.
* Make filename computation idempotent
* Prevent filename function from being called to set the source's name

Fix: #2842
  • Loading branch information
toots authored Feb 24, 2023
1 parent 6557ff5 commit caef89d
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 31 deletions.
71 changes: 40 additions & 31 deletions src/core/outputs/pipe_output.ml
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,18 @@ let pipe_proto frame_t arg_doc =
("", Lang.source_t frame_t, None, None);
]

class virtual piped_output p =
class virtual piped_output ~name p =
let reload_predicate = List.assoc "reopen_when" p in
let reload_delay = Lang.to_float (List.assoc "reopen_delay" p) in
let reload_on_error = Lang.to_bool (List.assoc "reopen_on_error" p) in
let reload_on_metadata = Lang.to_bool (List.assoc "reopen_on_metadata" p) in
let name = Lang.to_string_getter (Lang.assoc "" 2 p) in
let name = name () in
let source = Lang.assoc "" 3 p in
object (self)
inherit base ~source ~name p as super
method reopen_cmd = self#reopen
val mutable open_date = 0.
val mutable need_reset = false
val mutable reopening = false
val need_reset = Atomic.make false
val reopening = Atomic.make false
method virtual open_pipe : unit
method virtual close_pipe : unit
method virtual is_open : bool
Expand Down Expand Up @@ -280,30 +278,30 @@ class virtual piped_output p =
self#log#important "Re-opening output pipe.";
(* #stop can trigger #send, the [reopening] flag avoids loops *)
reopening <- true;
Atomic.set reopening true;
self#stop;
self#start;
reopening <- false;
need_reset <- false)
Atomic.set reopening false;
Atomic.set need_reset false)
()
method! send b =
if not self#is_open then self#prepare_pipe;
(try super#send b
with e when reload_on_error ->
self#log#important "Reopening on error: %s." (Printexc.to_string e);
need_reset <- true);
if not reopening then
Atomic.set need_reset true);
if not (Atomic.get reopening) then
if
need_reset
Atomic.get need_reset
|| Unix.gettimeofday () > reload_delay +. open_date
&& Lang.to_bool (Lang.apply reload_predicate [])
then self#reopen
method! insert_metadata m =
if reload_on_metadata then (
current_metadata <- Some m;
need_reset <- true)
Atomic.set need_reset true)
else super#insert_metadata m
end
Expand Down Expand Up @@ -354,7 +352,7 @@ class virtual ['a] file_output_base p =
let dir_perm = Lang.to_int (List.assoc "dir_perm" p) in
let append = Lang.to_bool (List.assoc "append" p) in
object (self)
val mutable current_filename = None
val current_filename = Atomic.make None
method virtual interpolate : ?subst:(string -> string) -> string -> string
method private filename =
Expand All @@ -366,17 +364,27 @@ class virtual ['a] file_output_base p =
method virtual open_out_gen : open_flag list -> int -> string -> 'a
method open_chan =
method private prepare_filename =
let mode =
Open_wronly :: Open_creat
:: (if append then [Open_append] else [Open_trunc])
in
let filename = self#filename in
match Atomic.get current_filename with
| Some filename -> (filename, mode, perm)
| None -> (
let filename = self#filename in
try
Utils.mkdir ~perm:dir_perm (Filename.dirname filename);
Atomic.set current_filename (Some filename);
(filename, mode, perm)
with Sys_error _ as exn ->
let bt = Printexc.get_raw_backtrace () in
Lang.raise_as_runtime ~bt ~kind:"system" exn)
method open_chan =
try
Utils.mkdir ~perm:dir_perm (Filename.dirname filename);
let fd = self#open_out_gen mode perm filename in
current_filename <- Some filename;
fd
let filename, mode, perm = self#prepare_filename in
self#open_out_gen mode perm filename
with Sys_error _ as exn ->
let bt = Printexc.get_raw_backtrace () in
Lang.raise_as_runtime ~bt ~kind:"system" exn
Expand All @@ -386,8 +394,8 @@ class virtual ['a] file_output_base p =
method close_chan fd =
try
self#close_out fd;
self#on_close (Option.get current_filename);
current_filename <- None
self#on_close (Option.get (Atomic.get current_filename));
Atomic.set current_filename None
with Sys_error _ as exn ->
let bt = Printexc.get_raw_backtrace () in
Lang.raise_as_runtime ~bt ~kind:"system" exn
Expand All @@ -397,7 +405,7 @@ class virtual ['a] file_output_base p =
class file_output ~format_val p =
object
inherit piped_output p
inherit piped_output ~name:"output.file" p
inherit [out_channel] chan_output p
inherit [out_channel] file_output_base p
method encoder_factory = encoder_factory format_val
Expand All @@ -417,21 +425,22 @@ class file_output_using_encoder ~format_val p =
let append = Lang.to_bool (List.assoc "append" p) in
let p = ("append", Lang.bool true) :: List.remove_assoc "append" p in
object (self)
inherit piped_output p
inherit piped_output ~name:"output.file" p
inherit [unit] chan_output p
inherit [unit] file_output_base p
method open_out_gen mode perm filename =
let fd = open_out_gen mode perm filename in
close_out fd;
()
method encoder_factory name meta =
(* Make sure the file is created with the right perms. *)
ignore self#open_chan;
let format = Encoder.with_file_output ~append format self#filename in
let filename, mode, perm = self#prepare_filename in
self#open_out_gen mode perm filename;
let format = Encoder.with_file_output ~append format filename in
encoder_factory ~format format_val name meta
method open_out_gen mode perms filename =
let fd = open_out_gen mode perms filename in
close_out fd;
()
method output_substring () _ _ _ = ()
method flush () = ()
method close_out () = ()
Expand Down Expand Up @@ -489,7 +498,7 @@ class external_output p =
let process = Lang.to_string_getter (Lang.assoc "" 2 p) in
let self_sync = Lang.to_bool (List.assoc "self_sync" p) in
object (self)
inherit piped_output p
inherit piped_output ~name:"output.external" p
inherit [out_channel] chan_output p
method encoder_factory = encoder_factory format_val
method! self_sync = (`Static, self_sync)
Expand Down
27 changes: 27 additions & 0 deletions tests/regression/GH2842.liq
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
log.level := 5

def f() =
was_called = ref(false)
filename = file.temp("bla", "blo")
on_shutdown({file.remove(filename)})

def filename()
if was_called() then
test.fail()
end
was_called := true
"bla"
end

output.file(
fallible=true,
on_stop=test.pass,
%ffmpeg(format="mp3",
%audio(codec="libmp3lame", samplerate=48000, b="320k")
),
filename,
once(sine(duration=1.,480.))
)
end

test.check(f)
13 changes: 13 additions & 0 deletions tests/regression/dune.inc
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,19 @@
(:run_test ../run_test.exe))
(action (run %{run_test} playlist-id.liq liquidsoap %{test_liq} playlist-id.liq)))

(rule
(alias citest)
(package liquidsoap)
(deps
GH2842.liq
../media/all_media_files
../../src/bin/liquidsoap.exe
(source_tree ../../src/libs)
(:stdlib ../../src/libs/stdlib.liq)
(:test_liq ../test.liq)
(:run_test ../run_test.exe))
(action (run %{run_test} GH2842.liq liquidsoap %{test_liq} GH2842.liq)))

(rule
(alias citest)
(package liquidsoap)
Expand Down

0 comments on commit caef89d

Please sign in to comment.