From 751e1d535cc2fea91a66d9883862569f1d12e583 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Sat, 24 Jun 2023 18:50:31 -0500 Subject: [PATCH] Export stream as method, attach tag insertion callback to streams. --- src/core/outputs/hls_output.ml | 110 +++++++++++++++++------------- tests/streams/hls_custom_tags.liq | 15 +--- 2 files changed, 65 insertions(+), 60 deletions(-) diff --git a/src/core/outputs/hls_output.ml b/src/core/outputs/hls_output.ml index ddf6d7c851..f37d5f1c1b 100644 --- a/src/core/outputs/hls_output.ml +++ b/src/core/outputs/hls_output.ml @@ -515,6 +515,7 @@ class hls_output p = val mutable segments = List.map (fun { name } -> (name, ref [])) streams val mutable streams = streams + method streams = streams val mutable current_position = (0, 0) val mutable state : hls_state = `Idle @@ -623,31 +624,6 @@ class hls_output p = (s.encoder.hls.insert_id3 ~frame_position ~sample_position m))); if discontinuous then s.discontinuity_count <- s.discontinuity_count + 1 - method insert_tag ~pos ~stream tag = - let tag = String.trim tag in - let streams = - match - ( stream, - Option.join - (Option.map - (fun stream -> - List.find_opt (fun { name } -> name = stream) streams) - stream) ) - with - | None, _ -> streams - | Some _, Some s -> [s] - | Some stream, None -> - Runtime_error.raise ~pos - ~message: - (Printf.sprintf "Cannot find stream with name %s" stream) - "not_found" - in - List.iter - (fun s -> - Atomic.set s.pending_extra_tags - (tag :: Atomic.get s.pending_extra_tags)) - streams - method private cleanup_streams = List.iter (fun (_, s) -> List.iter (fun s -> self#unlink s.filename) !s) @@ -951,35 +927,75 @@ class hls_output p = List.iter (fun s -> s.metadata <- `Todo m) streams end +let stream_t kind = + Lang.record_t + [ + ("name", Lang.string_t); + ("encoder", Lang.format_t kind); + ( "video_size", + Lang.nullable_t + (Lang.record_t [("width", Lang.int_t); ("height", Lang.int_t)]) ); + ("bandwidth", Lang.int_t); + ("codecs", Lang.string_t); + ("extname", Lang.string_t); + ("id3_enabled", Lang.bool_t); + ("replay_id3", Lang.bool_t); + ("extra_tags", Lang.list_t Lang.string_t); + ("discontinuity_count", Lang.int_t); + ("insert_tag", Lang.fun_t [(false, "", Lang.string_t)] Lang.unit_t); + ] + +let value_of_stream + { + name; + format; + video_size; + bandwidth; + codecs; + extname; + id3_enabled; + replay_id3; + stream_extra_tags; + discontinuity_count; + pending_extra_tags; + } = + Lang.record + [ + ("name", Lang.string name); + ("encoder", Lang_encoder.L.format format); + ( "video_size", + match Lazy.force video_size with + | None -> Lang.null + | Some (w, h) -> + Lang.record [("width", Lang.int w); ("height", Lang.int h)] ); + ("bandwidth", Lang.int (Lazy.force bandwidth)); + ("codecs", Lang.string (Lazy.force codecs)); + ("extname", Lang.string extname); + ("id3_enabled", Lang.bool id3_enabled); + ("replay_id3", Lang.bool replay_id3); + ("extra_tags", Lang.list (List.map Lang.string stream_extra_tags)); + ("discontinuity_count", Lang.int discontinuity_count); + ( "insert_tag", + Lang.val_fun + [("", "", None)] + (fun p -> + let tag = String.trim (Lang.to_string (List.assoc "" p)) in + Atomic.set pending_extra_tags (tag :: Atomic.get pending_extra_tags); + Lang.unit) ); + ] + let _ = let return_t = Lang.univ_t () in Lang.add_operator ~base:Pipe_output.output_file "hls" (hls_proto return_t) ~return_t ~category:`Output ~meth: ([ - ( "insert_tag", - ( [], - Lang.fun_t - [ - (true, "stream", Lang.nullable_t Lang.string_t); - (false, "", Lang.string_t); - ] - Lang.unit_t ), - "Insert an arbitrary tags into the media streams playlist. If \ - `\"stream\"` is unset, tag is inserted in _all_ media stream \ - playlists. Otherwise, it is inserted in the stream named with its \ - value.", + ( "streams", + ([], Lang.fun_t [] (Lang.list_t (stream_t return_t))), + "Output streams", fun s -> - Lang.val_fun - [("stream", "stream", Some Lang.null); ("", "", None)] - (fun p -> - let pos = Lang.pos p in - let stream = - Lang.to_valued_option Lang.to_string (List.assoc "stream" p) - in - let tag = Lang.to_string (List.assoc "" p) in - s#insert_tag ~pos ~stream tag; - Lang.unit) ); + Lang.val_fun [] (fun _ -> + Lang.list (List.map value_of_stream s#streams)) ); ] @ Start_stop.meth ()) ~descr: diff --git a/tests/streams/hls_custom_tags.liq b/tests/streams/hls_custom_tags.liq index 11bd29d905..0742ccfb23 100644 --- a/tests/streams/hls_custom_tags.liq +++ b/tests/streams/hls_custom_tags.liq @@ -8,7 +8,6 @@ on_shutdown({file.rmdir(tmp_dir)}) main_tags = ref(false) aac_flags = ref(false) mp4_flags = ref(false) -global_insert_flags = ref(0) aac_insert_flags = ref(false) def on_file_change(~state, p) = @@ -29,10 +28,6 @@ def on_file_change(~state, p) = if string.contains(substring="X-CUSTOM-AAC-INSERT", contents) then aac_insert_flags := true end - - if string.contains(substring="X-CUSTOM-GLOBAL-INSERT", contents) then - global_insert_flags := global_insert_flags() + 1 - end end if fname == "mp4.m3u8" then @@ -41,16 +36,11 @@ def on_file_change(~state, p) = if string.contains(substring="X-CUSTOM", contents) then mp4_flags := true end - - if string.contains(substring="X-CUSTOM-GLOBAL-INSERT", contents) then - global_insert_flags := global_insert_flags() + 1 - end end end if main_tags() and aac_flags() and - mp4_flags() and global_insert_flags() == 2 and - aac_insert_flags() then + mp4_flags() and aac_insert_flags() then test.pass() end end @@ -67,8 +57,7 @@ o = output.file.hls( s ) -thread.run(delay=2., {o.insert_tag(stream="aac", "X-CUSTOM-AAC-INSERT")}) -thread.run(delay=3., {o.insert_tag("X-CUSTOM-GLOBAL-INSERT")}) +thread.run(delay=2., {list.hd(o.streams()).insert_tag("X-CUSTOM-AAC-INSERT")}) clock.assign_new(sync="none",[s])