Skip to content

Commit

Permalink
Export stream as method, attach tag insertion callback to streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Jun 25, 2023
1 parent 5e03340 commit 751e1d5
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 60 deletions.
110 changes: 63 additions & 47 deletions src/core/outputs/hls_output.ml
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ class hls_output p =
val mutable segments = List.map (fun { name } -> (name, ref [])) streams

val mutable streams = streams
method streams = streams
val mutable current_position = (0, 0)
val mutable state : hls_state = `Idle

Expand Down Expand Up @@ -623,31 +624,6 @@ class hls_output p =
(s.encoder.hls.insert_id3 ~frame_position ~sample_position m)));
if discontinuous then s.discontinuity_count <- s.discontinuity_count + 1

method insert_tag ~pos ~stream tag =
let tag = String.trim tag in
let streams =
match
( stream,
Option.join
(Option.map
(fun stream ->
List.find_opt (fun { name } -> name = stream) streams)
stream) )
with
| None, _ -> streams
| Some _, Some s -> [s]
| Some stream, None ->
Runtime_error.raise ~pos
~message:
(Printf.sprintf "Cannot find stream with name %s" stream)
"not_found"
in
List.iter
(fun s ->
Atomic.set s.pending_extra_tags
(tag :: Atomic.get s.pending_extra_tags))
streams

method private cleanup_streams =
List.iter
(fun (_, s) -> List.iter (fun s -> self#unlink s.filename) !s)
Expand Down Expand Up @@ -951,35 +927,75 @@ class hls_output p =
List.iter (fun s -> s.metadata <- `Todo m) streams
end

let stream_t kind =
Lang.record_t
[
("name", Lang.string_t);
("encoder", Lang.format_t kind);
( "video_size",
Lang.nullable_t
(Lang.record_t [("width", Lang.int_t); ("height", Lang.int_t)]) );
("bandwidth", Lang.int_t);
("codecs", Lang.string_t);
("extname", Lang.string_t);
("id3_enabled", Lang.bool_t);
("replay_id3", Lang.bool_t);
("extra_tags", Lang.list_t Lang.string_t);
("discontinuity_count", Lang.int_t);
("insert_tag", Lang.fun_t [(false, "", Lang.string_t)] Lang.unit_t);
]

let value_of_stream
{
name;
format;
video_size;
bandwidth;
codecs;
extname;
id3_enabled;
replay_id3;
stream_extra_tags;
discontinuity_count;
pending_extra_tags;
} =
Lang.record
[
("name", Lang.string name);
("encoder", Lang_encoder.L.format format);
( "video_size",
match Lazy.force video_size with
| None -> Lang.null
| Some (w, h) ->
Lang.record [("width", Lang.int w); ("height", Lang.int h)] );
("bandwidth", Lang.int (Lazy.force bandwidth));
("codecs", Lang.string (Lazy.force codecs));
("extname", Lang.string extname);
("id3_enabled", Lang.bool id3_enabled);
("replay_id3", Lang.bool replay_id3);
("extra_tags", Lang.list (List.map Lang.string stream_extra_tags));
("discontinuity_count", Lang.int discontinuity_count);
( "insert_tag",
Lang.val_fun
[("", "", None)]
(fun p ->
let tag = String.trim (Lang.to_string (List.assoc "" p)) in
Atomic.set pending_extra_tags (tag :: Atomic.get pending_extra_tags);
Lang.unit) );
]

let _ =
let return_t = Lang.univ_t () in
Lang.add_operator ~base:Pipe_output.output_file "hls" (hls_proto return_t)
~return_t ~category:`Output
~meth:
([
( "insert_tag",
( [],
Lang.fun_t
[
(true, "stream", Lang.nullable_t Lang.string_t);
(false, "", Lang.string_t);
]
Lang.unit_t ),
"Insert an arbitrary tags into the media streams playlist. If \
`\"stream\"` is unset, tag is inserted in _all_ media stream \
playlists. Otherwise, it is inserted in the stream named with its \
value.",
( "streams",
([], Lang.fun_t [] (Lang.list_t (stream_t return_t))),
"Output streams",
fun s ->
Lang.val_fun
[("stream", "stream", Some Lang.null); ("", "", None)]
(fun p ->
let pos = Lang.pos p in
let stream =
Lang.to_valued_option Lang.to_string (List.assoc "stream" p)
in
let tag = Lang.to_string (List.assoc "" p) in
s#insert_tag ~pos ~stream tag;
Lang.unit) );
Lang.val_fun [] (fun _ ->
Lang.list (List.map value_of_stream s#streams)) );
]
@ Start_stop.meth ())
~descr:
Expand Down
15 changes: 2 additions & 13 deletions tests/streams/hls_custom_tags.liq
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ on_shutdown({file.rmdir(tmp_dir)})
main_tags = ref(false)
aac_flags = ref(false)
mp4_flags = ref(false)
global_insert_flags = ref(0)
aac_insert_flags = ref(false)

def on_file_change(~state, p) =
Expand All @@ -29,10 +28,6 @@ def on_file_change(~state, p) =
if string.contains(substring="X-CUSTOM-AAC-INSERT", contents) then
aac_insert_flags := true
end

if string.contains(substring="X-CUSTOM-GLOBAL-INSERT", contents) then
global_insert_flags := global_insert_flags() + 1
end
end

if fname == "mp4.m3u8" then
Expand All @@ -41,16 +36,11 @@ def on_file_change(~state, p) =
if string.contains(substring="X-CUSTOM", contents) then
mp4_flags := true
end

if string.contains(substring="X-CUSTOM-GLOBAL-INSERT", contents) then
global_insert_flags := global_insert_flags() + 1
end
end
end

if main_tags() and aac_flags() and
mp4_flags() and global_insert_flags() == 2 and
aac_insert_flags() then
mp4_flags() and aac_insert_flags() then
test.pass()
end
end
Expand All @@ -67,8 +57,7 @@ o = output.file.hls(
s
)

thread.run(delay=2., {o.insert_tag(stream="aac", "X-CUSTOM-AAC-INSERT")})
thread.run(delay=3., {o.insert_tag("X-CUSTOM-GLOBAL-INSERT")})
thread.run(delay=2., {list.hd(o.streams()).insert_tag("X-CUSTOM-AAC-INSERT")})


clock.assign_new(sync="none",[s])

0 comments on commit 751e1d5

Please sign in to comment.