Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lwt.pick doesn't cancel #166

Closed
dsheets opened this issue Jun 30, 2015 · 5 comments
Closed

Lwt.pick doesn't cancel #166

dsheets opened this issue Jun 30, 2015 · 5 comments

Comments

@dsheets
Copy link
Contributor

dsheets commented Jun 30, 2015

open Lwt
open Lwt_unix

let sockaddr addr port =
  ADDR_INET (Unix.inet_addr_of_string addr, port)

let outfd addr port =
  let fd = socket PF_INET SOCK_DGRAM 17 in
  bind fd (sockaddr addr port);
  fd

let apnea () =
  print_endline "apnea";
  let t, w = Lwt.wait () in
  let ofd = outfd "0.0.0.0" 0 in  
  let () = async (fun () -> pick [
    (sleep 0.1 >|= fun () -> wakeup w "wakeup: timeout");
    (catch (fun () ->
       let buf = Lwt_bytes.create 64 in
       Lwt_bytes.recvfrom ofd buf 0 64 []
       >|= fun (len,_) -> wakeup w ("wakeup: active "^string_of_int len)
     ) (fun exn ->
      let exn_s = Printexc.to_string exn in
      Lwt_io.printl ("caught error: "^exn_s)
      >|= fun () -> wakeup w ("wakeup: error "^exn_s)
    ));
  ]) in
  Lwt.nchoose [t]
  >>= fun s -> Lwt_list.iter_s Lwt_io.printl s
  >>= fun () -> Lwt_unix.close ofd

let inception () =
  print_endline "inception";
  let t, w = Lwt.wait () in
  let ofd = outfd "0.0.0.0" 0 in  
  let receive = catch (fun () ->
    let buf = Lwt_bytes.create 64 in
    Lwt_bytes.recvfrom ofd buf 0 64 []
    >|= fun (len,_) -> wakeup w ("wakeup: active "^string_of_int len)
  ) (function
    | Canceled -> return ()
    | exn ->
      let exn_s = Printexc.to_string exn in
      Lwt_io.printl ("caught error: "^exn_s)
      >|= fun () -> wakeup w ("wakeup: error "^exn_s)
  ) in
  let () = async (fun () -> pick [
    (sleep 0.1
     >|= fun () -> wakeup w "wakeup: timeout");
    receive
  ]) in
  Lwt.nchoose [t]
  >>= fun s -> Lwt_list.iter_s Lwt_io.printl s
  >>= fun () -> (cancel receive; Lwt_unix.close ofd)

let () = match if Array.length Sys.argv > 1 then Sys.argv.(1) else "apnea" with
  | "cancel" -> Lwt_main.run (inception ())
  | _ -> Lwt_main.run (apnea ())

Build with : ocamlfind ocamlopt -package lwt.unix -linkpkg -g lwt_pick_bug.ml -o lwt_pick_bug

Run with: ./lwt_pick_bug and ./lwt_pick_bug cancel

I expected the threads not selected by pick to be canceled. I observed that they were not canceled. apnea is the reproduction case and inception is the fixed case. I'm not terribly happy with | Canceled -> return () but I can live with it if necessary. Is there a better way to timeout syscalls? Have a made some mistake in my application of Lwt calls? I found this pick behavior very surprising and contrary to the documentation.

@talex5
Copy link
Contributor

talex5 commented Jun 30, 2015

Note that the pick would have cancelled the other thread eventually, but the success case ran first, which closed the FD, which triggered the other thread to run first. Here's the first case with a sleep added before the close:

pick

The sleep finishing wakes a map, which notifies the choose thread first (which now sleeps for a bit) and then the pick, which cancels the other thread. Without the sleep, the choose thread runs first, generating the error before the cancel happens.

@talex5
Copy link
Contributor

talex5 commented Jun 30, 2015

Suggested solution: return a Timeout / Ok value from the pick threads instead of using a task and async.

@dsheets
Copy link
Contributor Author

dsheets commented Jun 30, 2015

The success case ran and then after exiting the pick the FD was closed. The test case comes from a larger program which launches multiple async threads and then calls nchoose on the waiters. I don't see how the (potentially infinitely blocking) recvfrom can be correctly terminated by simply returning Timeout/Ok -- something has to cancel it. The FD must be cleaned up eventually lest we get EINVAL in Lwt's select loop around 1020 iterations.

Given this kind of scenario, the current semantics of "maybe your other threads will eventually be canceled by pick" seems like a recipe for writing races. Perhaps I should put the FD cleanup code in a termination hook on the recvfrom thread? At the least, the exact semantics of pick need to be explained clearly in the documentation.

@talex5
Copy link
Contributor

talex5 commented Jun 30, 2015

Your success case didn't run after the pick. It depends only on the choose, which depends on t, which does not depend on the outcome of the pick. If you wait for the result of the pick, there should be no problem.

@dsheets
Copy link
Contributor Author

dsheets commented Jun 30, 2015

Ah, I understand what you mean now. wakeup schedules the waiter to awake which then races with pick's reaping of siblings inside the event loop.

If I simply factor wakeup w outside the pick, it behaves as expected.

@dsheets dsheets closed this as completed Jun 30, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants