Skip to content

Commit

Permalink
Handle crashed mirai workers
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Apr 12, 2024
1 parent 85240fe commit 0d58965
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: future.mirai
Version: 0.1.1-9015
Version: 0.1.1-9016
Depends:
future
Imports:
Expand Down
13 changes: 11 additions & 2 deletions R/MiraiFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ MiraiFuture <- function(expr = NULL,
daemons(n = 0L)
} else if (workers != nworkers) {
daemons(n = 0L) ## reset is required
daemons(n = workers, dispatcher = dispatcher)
daemons(n = workers, dispatcher = dispatcher, resilience = FALSE)
}
} else if (!is.null(workers)) {
stop("Argument 'workers' should be a numeric scalar or NULL: ", mode(workers))
Expand Down Expand Up @@ -167,6 +167,14 @@ result.MiraiFuture <- function(future, ...) {

mirai <- future[["mirai"]]
result <- call_mirai_(mirai)$data

if (inherits(result, "errorValue")) {
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("Failed to retrieve results from %s (%s). The mirai framework reports on error value %s", class(future)[1], label, result)
stop(FutureError(msg))
}

future[["result"]] <- result
future[["state"]] <- "finished"

Expand All @@ -182,7 +190,8 @@ mirai_daemons_nworkers <- function() {
if (is.data.frame(workers)) return(nrow(workers))

if (length(workers) != 1L) {
stop(FutureError(sprintf("Length of mirai::status()$daemons is not one: %d", length(workers))))
msg <- sprintf("Length of mirai::status()$daemons is not one: %d", length(workers))
stop(FutureError(msg))
}

if (workers == 0L) return(Inf)
Expand Down
5 changes: 4 additions & 1 deletion R/nbrOfWorkers.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ nbrOfWorkers.mirai <- function(evaluator) {
nbrOfFreeWorkers.mirai <- function(evaluator, background = FALSE, ...) {
res <- status()
workers <- res[["daemons"]]
if (!is.numeric(workers)) {
if (is.character(workers)) {
workers <- res[["connections"]]
stopifnot(is.numeric(workers))
} else if (!is.numeric(workers)) {
stop(FutureError(sprintf("Unknown type of mirai::daemons()$daemons: %s", typeof(workers))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ message("*** mirai_multisession() - terminating workers ...")
plan(mirai_multisession, workers = 2L)

all <- nbrOfWorkers()
message("Number of workers: ", all)
stopifnot(all == 2L)
free <- nbrOfFreeWorkers()
stopifnot(
nbrOfWorkers() == 2L,
nbrOfFreeWorkers() == 2L
)
message("Number of free workers: ", all)
stopifnot(free == 2L)


## Force R worker to quit
f <- future({ tools::pskill(pid = Sys.getpid()) })
Expand All @@ -18,8 +19,8 @@ print(res)
stopifnot(inherits(res, "FutureError"))

stopifnot(
nbrOfWorkers() == all,
nbrOfFreeWorkers() == free
nbrOfWorkers() == all - 1L,
nbrOfFreeWorkers() == free - 1L
)

message("*** mirai_multisession() - terminating workers ... DONE")
Expand Down

0 comments on commit 0d58965

Please sign in to comment.