Skip to content

Commit

Permalink
Implement pre-forking on Unix
Browse files Browse the repository at this point in the history
  • Loading branch information
tmattio committed Dec 4, 2020
1 parent 6b61ddf commit ac43ca7
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 52 deletions.
9 changes: 0 additions & 9 deletions example/exit_hook/README.md

This file was deleted.

3 changes: 0 additions & 3 deletions example/exit_hook/dune

This file was deleted.

16 changes: 0 additions & 16 deletions example/exit_hook/main.ml

This file was deleted.

101 changes: 83 additions & 18 deletions opium/src/app.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let err_invalid_host host =
Lwt.fail_invalid_arg ("Could not get host info for `" ^ host ^ "`")
;;

let run_unix ?backlog ?middlewares ~host ~port handler =
let make_connection_handler ~host ~port ?middlewares handler =
let* host_entry =
Lwt.catch
(fun () -> Lwt_unix.gethostbyname host)
Expand All @@ -28,12 +28,50 @@ let run_unix ?backlog ?middlewares ~host ~port handler =
let app = Rock.App.create ?middlewares ~handler () in
Rock.Server_connection.run f app
in
Lwt.return (listen_address, connection_handler)
;;

let run_unix ?backlog ?middlewares ~host ~port handler =
let* listen_address, connection_handler =
make_connection_handler ?middlewares ~host ~port handler
in
Lwt_io.establish_server_with_client_socket ?backlog listen_address connection_handler
;;

let run_unix_multicore ?middlewares ~host ~port ~jobs handler =
let listen_address, connection_handler =
Lwt_main.run @@ make_connection_handler ?middlewares ~host ~port handler
in
let socket =
Lwt_unix.socket (Unix.domain_of_sockaddr listen_address) Unix.SOCK_STREAM 0
in
Lwt_unix.setsockopt socket Unix.SO_REUSEADDR true;
Lwt_main.run
(let+ () = Lwt_unix.bind socket listen_address in
Lwt_unix.listen socket (Lwt_unix.somaxconn () [@ocaml.warning "-3"]));
let rec accept_loop socket instance =
let* socket', sockaddr' = Lwt_unix.accept socket in
Lwt.async (fun () -> connection_handler sockaddr' socket');
accept_loop socket instance
in
for i = 1 to jobs do
flush_all ();
if Lwt_unix.fork () = 0
then (
Lwt.async (fun () -> accept_loop socket i);
let forever, _ = Lwt.wait () in
Lwt_main.run forever;
exit 0)
done;
while true do
Unix.pause ()
done
;;

type t =
{ host : string
; port : int
; jobs : int
; backlog : int option
; debug : bool
; verbose : bool
Expand All @@ -58,10 +96,23 @@ let default_not_found _ =
())
;;

let system_cores =
match Sys.unix with
| false ->
(* TODO: detect number of cores on Windows *)
1
| true ->
let ic = Unix.open_process_in "getconf _NPROCESSORS_ONLN" in
let cores = int_of_string (input_line ic) in
ignore (Unix.close_process_in ic);
cores
;;

let empty =
{ name = "Opium Default Name"
; host = "0.0.0.0"
; port = 3000
; jobs = system_cores
; backlog = None
; debug = false
; verbose = false
Expand Down Expand Up @@ -96,6 +147,7 @@ let to_handler app =
;;

let port port t = { t with port }
let jobs jobs t = { t with jobs }
let backlog backlog t = { t with backlog = Some backlog }
let host host t = { t with host }
let cmd_name name t = { t with name }
Expand Down Expand Up @@ -148,11 +200,28 @@ let start app =
if app.debug then Logs.set_level (Some Logs.Debug);
Logs.info (fun f ->
f
"Starting Opium on %s:%d%s..."
"Starting Opium on %s:%d with %d cores%s"
app.host
app.port
(if app.debug then " (debug)" else ""));
run_unix ?backlog:app.backlog ~middlewares ~host:app.host ~port:app.port app.not_found
app.jobs
(if app.debug then " (debug mode)" else ""));
match app.jobs with
| 1 ->
Lwt.async (fun () ->
let* _server =
run_unix
?backlog:app.backlog
~middlewares
~host:app.host
~port:app.port
app.not_found
in
Lwt.return_unit);
let forever, _ = Lwt.wait () in
Lwt_main.run forever
| i when i > 1 ->
run_unix_multicore ~middlewares ~host:app.host ~port:app.port ~jobs:i app.not_found
| _ -> failwith "The number of jobs must be superior or equal to 1"
;;

let hashtbl_add_multi tbl x y =
Expand Down Expand Up @@ -185,8 +254,8 @@ let print_middleware_f middlewares =
|> List.iter ~f:(Printf.printf "> %s \n")
;;

let cmd_run app port host print_routes print_middleware debug verbose _errors =
let app = { app with debug; verbose; host; port } in
let cmd_run app port jobs host print_routes print_middleware debug verbose _errors =
let app = { app with debug; verbose; host; port; jobs } in
if print_routes
then (
let routes = app.routes in
Expand Down Expand Up @@ -218,6 +287,11 @@ module Cmds = struct
Arg.(value & opt int default & info [ "p"; "port" ] ~doc)
;;

let jobs default =
let doc = "jobs" in
Arg.(value & opt int default & info [ "j"; "jobs" ] ~doc)
;;

let host default =
let doc = "host" in
Arg.(value & opt string default & info [ "h"; "host" ] ~doc)
Expand All @@ -244,6 +318,7 @@ module Cmds = struct
pure cmd_run
$ pure app
$ port app.port
$ jobs app.jobs
$ host app.host
$ routes
$ middleware
Expand All @@ -263,24 +338,14 @@ let run_command' app =
let open Cmdliner in
let cmd = Cmds.term app in
match Term.eval (cmd, Cmds.info app.name) with
| `Ok a ->
Lwt.async (fun () ->
let* _server = a in
Lwt.return_unit);
let forever, _ = Lwt.wait () in
`Ok forever
| `Ok a -> `Ok a
| `Error _ -> `Error
| _ -> `Not_running
;;

let run_command app =
match app |> run_command' with
| `Ok a ->
Lwt.async (fun () ->
let* _server = a in
Lwt.return_unit);
let forever, _ = Lwt.wait () in
Lwt_main.run forever
| `Ok a -> a
| `Error -> exit 1
| `Not_running -> exit 0
;;
8 changes: 2 additions & 6 deletions opium/src/app.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ val host : string -> builder
val backlog : int -> builder

val port : int -> builder
val jobs : int -> builder
val cmd_name : string -> builder

(** [not_found] accepts a regular Opium handler that will be used instead of the default
Expand Down Expand Up @@ -55,12 +56,7 @@ val action : Method.t -> route
val middleware : Rock.Middleware.t -> builder

(** Start an opium server. The thread returned can be cancelled to shutdown the server *)
val start : t -> Lwt_io.server Lwt.t
val start : t -> unit

(** Create a cmdliner command from an app and run lwt's event loop *)
val run_command : t -> unit

(* Run a cmdliner command from an app. Does not launch Lwt's event loop. `Error is
returned if the command line arguments are incorrect. `Not_running is returned if the
command was completed without the server being launched *)
val run_command' : t -> [> `Ok of unit Lwt.t | `Error | `Not_running ]

0 comments on commit ac43ca7

Please sign in to comment.