Skip to content

Commit

Permalink
Allow insertion of arbitrary segment in output.hls
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Jun 24, 2023
1 parent f20d0a8 commit 5e03340
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 8 deletions.
142 changes: 134 additions & 8 deletions src/core/outputs/hls_output.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ let hls_proto frame_t =
([], Lang.bool_t),
"Replay ID3 data on each segment to make sure new listeners \
always start with fresh value. Enabled by default." );
("extra_tags", ([], Lang.list_t Lang.string_t), "Extra tags");
( "video_size",
([], Lang.product_t Lang.int_t Lang.int_t),
"Video size" );
Expand All @@ -80,6 +81,10 @@ let hls_proto frame_t =
Lang.string_t,
Some (Lang.string "stream.m3u8"),
Some "Playlist name (m3u8 extension is recommended)." );
( "extra_tags",
Lang.list_t Lang.string_t,
Some (Lang.list []),
Some "Extra tags to insert into the main playlist." );
( "prefix",
Lang.string_t,
Some (Lang.string ""),
Expand Down Expand Up @@ -153,13 +158,22 @@ type segment = {
discontinuous : bool;
current_discontinuity : int;
filename : string;
segment_extra_tags : string list;
mutable init_filename : string option;
mutable out_channel : out_channel option;
mutable len : int;
}

let json_of_segment
{ id; discontinuous; current_discontinuity; filename; init_filename; len } =
{
id;
discontinuous;
current_discontinuity;
filename;
init_filename;
segment_extra_tags;
len;
} =
`Assoc
[
("id", `Int id);
Expand All @@ -168,6 +182,7 @@ let json_of_segment
("filename", `String filename);
( "init_filename",
match init_filename with Some f -> `String f | None -> `Null );
("extra_tags", `Tuple (List.map (fun s -> `String s) segment_extra_tags));
("len", `Int len);
]

Expand All @@ -179,8 +194,14 @@ let segment_of_json = function
("current_discontinuity", `Int current_discontinuity);
("filename", `String filename);
("init_filename", init_filename);
("extra_tags", `Tuple segment_extra_tags);
("len", `Int len);
] ->
let segment_extra_tags =
List.map
(function `String t -> t | _ -> raise Invalid_state)
segment_extra_tags
in
let init_filename =
match init_filename with
| `String f -> Some f
Expand All @@ -194,6 +215,7 @@ let segment_of_json = function
filename;
init_filename;
len;
segment_extra_tags;
out_channel = None;
}
| _ -> raise Invalid_state
Expand Down Expand Up @@ -229,6 +251,8 @@ type stream = {
extname : string;
id3_enabled : bool;
replay_id3 : bool;
stream_extra_tags : string list;
mutable pending_extra_tags : string list Atomic.t;
mutable metadata : metadata;
mutable init_state : init_state;
mutable init_position : int;
Expand Down Expand Up @@ -424,6 +448,15 @@ class hls_output p =
| b -> b
| exception Not_found -> true
in
let stream_extra_tags =
match
List.map
(fun s -> String.trim (Lang.to_string s))
(Lang.to_list (List.assoc "extra_tags" stream_info))
with
| l -> l
| exception Not_found -> []
in
{
name;
format;
Expand All @@ -434,6 +467,8 @@ class hls_output p =
extname;
id3_enabled;
replay_id3;
stream_extra_tags;
pending_extra_tags = Atomic.make [];
metadata = `None;
init_state = `Todo;
init_position = 0;
Expand Down Expand Up @@ -461,6 +496,11 @@ class hls_output p =
let source = Lang.assoc "" 3 p in
let main_playlist_filename = Lang.to_string (List.assoc "playlist" p) in
let main_playlist_filename = directory ^^ main_playlist_filename in
let main_playlist_extra_tags =
List.map
(fun s -> String.trim (Lang.to_string s))
(Lang.to_list (List.assoc "extra_tags" p))
in
let segments_per_playlist = Lang.to_int (List.assoc "segments" p) in
let max_segments =
segments_per_playlist + Lang.to_int (List.assoc "segments_overhead" p)
Expand Down Expand Up @@ -557,13 +597,15 @@ class hls_output p =
current_discontinuity = s.discontinuity_count;
len = 0;
filename;
segment_extra_tags = Atomic.get s.pending_extra_tags;
init_filename =
(match s.init_state with `Has_init f -> Some f | _ -> None);
out_channel = Some out_channel;
}
in
s.current_segment <- Some segment;
s.position <- s.position + 1;
Atomic.set s.pending_extra_tags [];
if s.id3_enabled then (
let m =
match s.metadata with
Expand All @@ -581,6 +623,31 @@ class hls_output p =
(s.encoder.hls.insert_id3 ~frame_position ~sample_position m)));
if discontinuous then s.discontinuity_count <- s.discontinuity_count + 1

method insert_tag ~pos ~stream tag =
let tag = String.trim tag in
let streams =
match
( stream,
Option.join
(Option.map
(fun stream ->
List.find_opt (fun { name } -> name = stream) streams)
stream) )
with
| None, _ -> streams
| Some _, Some s -> [s]
| Some stream, None ->
Runtime_error.raise ~pos
~message:
(Printf.sprintf "Cannot find stream with name %s" stream)
"not_found"
in
List.iter
(fun s ->
Atomic.set s.pending_extra_tags
(tag :: Atomic.get s.pending_extra_tags))
streams

method private cleanup_streams =
List.iter
(fun (_, s) -> List.iter (fun s -> self#unlink s.filename) !s)
Expand Down Expand Up @@ -631,6 +698,11 @@ class hls_output p =
output_string oc
(Printf.sprintf "#EXT-X-DISCONTINUITY-SEQUENCE:%d\r\n"
discontinuity_sequence);
List.iter
(fun tag ->
output_string oc tag;
output_string oc "\r\n")
s.stream_extra_tags;
List.iteri
(fun pos segment ->
if segment.discontinuous then
Expand All @@ -648,6 +720,11 @@ class hls_output p =
output_string oc
(Printf.sprintf "#EXTINF:%.03f,\r\n"
(Frame.seconds_of_main segment.len));
List.iter
(fun tag ->
output_string oc tag;
output_string oc "\r\n")
segment.segment_extra_tags;
output_string oc
(Printf.sprintf "%s%s\r\n" prefix
(Filename.basename segment.filename)))
Expand All @@ -664,6 +741,11 @@ class hls_output p =
output_string oc "#EXTM3U\r\n";
output_string oc
(Printf.sprintf "#EXT-X-VERSION:%d\r\n" (Lazy.force x_version));
List.iter
(fun tag ->
output_string oc tag;
output_string oc "\r\n")
main_playlist_extra_tags;
List.iter
(fun s ->
let line =
Expand Down Expand Up @@ -728,11 +810,16 @@ class hls_output p =
let streams =
`Tuple
(List.map
(fun { name; position; discontinuity_count } ->
(fun { name; position; discontinuity_count; pending_extra_tags } ->
`Assoc
[
("name", `String name);
("position", `Int position);
( "pending_extra_tags",
`Tuple
(List.map
(fun s -> `String s)
(Atomic.get pending_extra_tags)) );
("discontinuity_count", `Int discontinuity_count);
])
streams)
Expand Down Expand Up @@ -762,9 +849,15 @@ class hls_output p =
[
("name", `String name);
("position", `Int position);
("pending_extra_tags", `Tuple pending_extra_tags);
("discontinuity_count", `Int discontinuity_count);
] ->
(name, position, discontinuity_count)
let pending_extra_tags =
List.map
(function `String s -> s | _ -> raise Invalid_state)
pending_extra_tags
in
(name, position, pending_extra_tags, discontinuity_count)
| _ -> raise Invalid_state)
(match saved_streams with `Tuple l -> l | _ -> raise Invalid_state)
in
Expand All @@ -780,8 +873,9 @@ class hls_output p =
| _ -> raise Invalid_state
in
List.iter2
(fun stream (name, pos, discontinuity_count) ->
(fun stream (name, pos, pending_extra_tags, discontinuity_count) ->
assert (name = stream.name);
Atomic.set stream.pending_extra_tags pending_extra_tags;
stream.discontinuity_count <- discontinuity_count;
stream.init_position <- pos;
stream.position <- pos + 1)
Expand Down Expand Up @@ -823,6 +917,7 @@ class hls_output p =
with Encoder.Not_enough_data -> (None, Strings.empty))
else if
(s.id3_enabled && pending_metadata s.metadata)
|| Atomic.get s.pending_extra_tags <> []
|| segment.len + len > segment_main_duration
then (
match Encoder.(s.encoder.hls.split_encode frame ofs len) with
Expand All @@ -841,8 +936,11 @@ class hls_output p =
(Option.map
(fun b ->
Strings.iter (output_substring (Option.get out_channel)) b;
self#close_segment s;
self#open_segment s)
self#mutexify
(fun () ->
self#close_segment s;
self#open_segment s)
())
flushed);
let { out_channel } = Option.get s.current_segment in
Strings.iter (output_substring (Option.get out_channel)) data
Expand All @@ -856,7 +954,35 @@ class hls_output p =
let _ =
let return_t = Lang.univ_t () in
Lang.add_operator ~base:Pipe_output.output_file "hls" (hls_proto return_t)
~return_t ~category:`Output ~meth:Output.meth
~return_t ~category:`Output
~meth:
([
( "insert_tag",
( [],
Lang.fun_t
[
(true, "stream", Lang.nullable_t Lang.string_t);
(false, "", Lang.string_t);
]
Lang.unit_t ),
"Insert an arbitrary tags into the media streams playlist. If \
`\"stream\"` is unset, tag is inserted in _all_ media stream \
playlists. Otherwise, it is inserted in the stream named with its \
value.",
fun s ->
Lang.val_fun
[("stream", "stream", Some Lang.null); ("", "", None)]
(fun p ->
let pos = Lang.pos p in
let stream =
Lang.to_valued_option Lang.to_string (List.assoc "stream" p)
in
let tag = Lang.to_string (List.assoc "" p) in
s#insert_tag ~pos ~stream tag;
Lang.unit) );
]
@ Start_stop.meth ())
~descr:
"Output the source stream to an HTTP live stream served from a local \
directory." (fun p -> (new hls_output p :> Output.output))
directory."
(fun p -> new hls_output p)
Loading

0 comments on commit 5e03340

Please sign in to comment.