Skip to content

Commit

Permalink
Lwt_io.with_connection: safer implicit close
Browse files Browse the repository at this point in the history
  • Loading branch information
aantron committed Jun 25, 2016
1 parent c346d81 commit b1afe45
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 21 deletions.
8 changes: 7 additions & 1 deletion src/unix/lwt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1387,9 +1387,15 @@ let open_connection ?fd ?in_buffer ?out_buffer sockaddr =

let with_connection ?fd ?in_buffer ?out_buffer sockaddr f =
open_connection ?fd ?in_buffer ?out_buffer sockaddr >>= fun (ic, oc) ->

(* If the user already tried to close the socket and got an exception, we
don't want to raise that exception again during implicit close. *)
let close_if_not_closed channel =
if is_closed channel then Lwt.return_unit else close channel in

Lwt.finalize
(fun () -> f (ic, oc))
(fun () -> close ic <&> close oc)
(fun () -> close_if_not_closed ic <&> close_if_not_closed oc)

type server = {
shutdown : unit Lazy.t;
Expand Down
74 changes: 54 additions & 20 deletions tests/unix/test_lwt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ let with_async_exception_hook hook f =

let local = Unix.ADDR_INET (Unix.inet_addr_loopback, 4321)

(* Small delay to help ensure [close] system calls are issued on listening
(* Add small delay to help ensure [close] system calls are issued on listening
sockets as a result of calling [Lwt_io.shutdown_server], before proceeding.
In the future, it would be better if [Lwt_io.shutdown_server] produced a
thread that could be waited on. *)
let shutdown_pause = 0.05
let shutdown_server_and_wait server =
Lwt_io.shutdown_server server;
Lwt_unix.sleep 0.05

(* Helpers for [establish_server_safe] tests. *)
module Establish_server =
Expand Down Expand Up @@ -65,18 +67,15 @@ struct
in

client_finished >>= fun () ->
Lwt_io.shutdown_server server;
Lwt_unix.sleep shutdown_pause
shutdown_server_and_wait server

(* Dirty hack for forcing [Lwt_io.close] to fail, to test response to [close]
exceptions. Impolitely closes the last file descriptor allocated by the
system, without going through [Lwt_io].
exceptions. Impolitely closes the [n]th last file descriptor allocated by
the system, without going through [Lwt_io].
This assumes that the system allocates contiguously-increasing file
descriptors whenever possible. In the tests that use this function, the
last file descriptor allocated, at the time this is called, corresponds to
the channels passed to the handler of [establish_server_safe]. The next
file descriptor number is not occupied, and all lower numbers are occupied.
descriptors whenever possible, and can only be "correctly" done due to
exact control of file descriptors during testing.
The reasons for writing this are as follows:
- [EBADF] is the only error we can reliably produce on [close].
Expand All @@ -92,8 +91,8 @@ struct
This may not work on some systems, so the corresponding tests will have to
be disabled. *)
let close_last_fd () =
let guess_last_fd () =
let close_last_fd n =
let guess_fd () =
(* Using a pipe because it is easy and has no file system consequences. *)
let fd1, fd2 = Unix.pipe () in
Unix.close fd1;
Expand All @@ -111,11 +110,11 @@ struct

let fd1, fd2 = Pierce_abstraction.(pierce_fd fd1, pierce_fd fd2) in
let lowest = min fd1 fd2 in
let fd = lowest - 1 in
let fd = lowest - n in
Pierce_abstraction.hide_fd fd
in

Unix.close (guess_last_fd ())
Unix.close (guess_fd ())

(* Hacky is_closed functions that attempt to read from/write to the channels
to see if they are closed. *)
Expand Down Expand Up @@ -193,8 +192,7 @@ let suite = suite "lwt_io" [

with_connection local (fun _ -> Lwt.return_unit) >>= fun () ->
Lwt.wakeup client_finished ();
Lwt_io.shutdown_server server;
Lwt_unix.sleep shutdown_pause >>= fun () ->
shutdown_server_and_wait server >>= fun () ->
handler);

(* Counterpart to establish_server: shutdown test. Confirms that shutdown is
Expand All @@ -217,8 +215,7 @@ let suite = suite "lwt_io" [

>>= fun result ->

Lwt_io.shutdown_server server;
Lwt_unix.sleep shutdown_pause >|= fun () ->
shutdown_server_and_wait server >|= fun () ->
result);

test "establish_server_safe: implicit close"
Expand Down Expand Up @@ -325,7 +322,7 @@ let suite = suite "lwt_io" [
let run =
Establish_server.with_client
(fun (in_channel, out_channel) ->
close_last_fd ();
close_last_fd 1;
expecting_ebadf (fun () -> Lwt_io.close in_channel) >>= fun () ->
expecting_ebadf (fun () -> Lwt_io.close out_channel))
in
Expand Down Expand Up @@ -353,10 +350,47 @@ let suite = suite "lwt_io" [
let run () =
Establish_server.with_client
(fun (in_channel, out_channel) ->
close_last_fd ();
close_last_fd 1;
raise Exit)
in

with_async_exception_hook see_exception run >|= fun () ->
!exceptions_observed = 3 && !correct_exceptions);

(* Makes the socket fail with EBADF on close. Tries to close the socket
manually, and handles the exception. When with_connection tries to close
the socket again implicitly, that should not raise the exception again. *)
test "with_connection: no duplicate exceptions"
(fun () ->
let open Establish_server in

let exceptions_observed = ref 0 in
let expecting_ebadf f =
Lwt.catch f (function
| Unix.Unix_error (Unix.EBADF, _, _) ->
exceptions_observed := !exceptions_observed + 1;
Lwt.return_unit
| exn -> Lwt.fail exn)
in

let handler_started, notify_handler_started = Lwt.wait () in
let finish_server, resume_server = Lwt.wait () in
let server =
Lwt_io.establish_server_safe local
(fun _ ->
Lwt.wakeup notify_handler_started ();
finish_server)
in

expecting_ebadf (fun () ->
Lwt_io.with_connection local (fun (in_channel, out_channel) ->
handler_started >>= fun () ->
close_last_fd 2;
expecting_ebadf (fun () -> Lwt_io.close in_channel) >>= fun () ->
expecting_ebadf (fun () -> Lwt_io.close out_channel)))

>>= fun () ->
Lwt.wakeup resume_server ();
shutdown_server_and_wait server >|= fun () ->
!exceptions_observed = 2);
]

0 comments on commit b1afe45

Please sign in to comment.