Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hanging tasks in a {targets} pipeline an SGE cluster #58

Closed
wlandau opened this issue May 13, 2023 · 18 comments
Closed

Hanging tasks in a {targets} pipeline an SGE cluster #58

wlandau opened this issue May 13, 2023 · 18 comments

Comments

@wlandau
Copy link

wlandau commented May 13, 2023

This issue presents the same as #53, but it emulates a real-life example on a machine with plenty of resources.

I am running crew and crew.cluster in a targets pipeline. I am using development versions of all the packages:

The pipeline has 2001 lightweight tasks. There are 20 workers which run on a powerful SGE cluster and each have 2GB memory at minimum. The login node where the dispatcher and client run has 263 GB total memory. In the _targets.R file, I launch all 20 workers up front, but they can still scale down and up because I set seconds_idle = 5 (and auto_scale = "demand" by default). I include the _targets.R which defines the pipeline below, and it is representative of the types of simulation studies my team and I do with targets. (https://books.ropensci.org/targets/walkthrough.html explains _targets.R file basics, and https://books.ropensci.org/targets/crew.html explains integration with crew.)

library(targets)
controller <- crew.cluster::crew_controller_sge(
  seconds_launch = 120,
  workers = 20L,
  sge_memory_gigabytes_required = 2L,
  seconds_idle = 5,
  seconds_exit = 5,
  script_lines = paste0("module load R/", getRversion())
)
controller$start()
controller$launch(n = 20L)
tar_option_set(controller = controller)
list(
  tar_target(x, seq_len(2000)),
  tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"), pattern = map(x))
)

When I run the pipeline with targets::tar_make(callr_function = NULL, terminate = FALSE), at first I see tasks resolve quickly at a rate of several per second, but then the pipeline hangs (roughly 25% to 30% of the time). targets continues its event loop, which I know because crew workers continue to relaunch after they idle out, but the tasks themselves do not complete. During the whole process, I see MEM% in htop stay at or below 0.1 for both the client R session and the dispatcher.

When I examine the controller in the R session that ran tar_make(), here is what I see. Multiple workers have tasks assigned but not complete in the hanging state. All the workers show as offline because I am printing this after they idled out, but at the time the hanging started, 2 of the workers were still running (I saw the jobs on the SGE cluster with qstat).

> controller <- tar_option_get("controller")
> controller$router$poll()
> tibble::tibble(controller$router$daemons) # 

# A tibble: 20 × 4
   online instance assigned complete
    <int>    <int>    <int>    <int>
 1      0        1        0        0
 2      0        1        0        0
 3      0        1        0        0
 4      0        1        0        0
 5      0        1        0        0
 6      0        1        0        0
 7      0        1        1        0
 8      0        1        0        0
 9      0        1        2        1
10      0        1        1        0
11      0        1        0        0
12      0        1        0        0
13      0        1        0        0
14      0        1        0        0
15      0        1        2        1
16      0        1        0        0
17      0        1        0        0
18      0        1        0        0
19      0        1        2        1
20      0        1        0        0

Many workers launched more than once because the idle timeout was aggressive and the tasks were quick. This in itself is not concerning, but it is part of the picture.

> controller$launcher$workers[, c("socket", "launches")]
# A tibble: 20 × 2
   socket                                                             launches
   <chr>                                                                 <int>
 1 ws://40.1.29.176:37387/6ba3c307a158262a0efe51a75674f6e91996d0bb           5
 2 ws://40.1.29.176:37387/2/5808b07413249336ab45df960bf15263e0347cb1         9
 3 ws://40.1.29.176:37387/3/cd16738d8dc9c4f89855626800d4afecdbeba851         9
 4 ws://40.1.29.176:37387/4/5f7c04c16f7be62ad3f47e8b2366dd2284f46bac         9
 5 ws://40.1.29.176:37387/5/ad8198fc0794d43faf0821ada2093b8d00c85588         9
 6 ws://40.1.29.176:37387/6/7f5f0522c0e753c00351a155b2c88d56fa7710bb         4
 7 ws://40.1.29.176:37387/7/10931a6c1db602731a2de242c4d96bb4a6c19ed8         3
 8 ws://40.1.29.176:37387/8/03870112255ec377a1ea052c63adb20e9b39b364         1
 9 ws://40.1.29.176:37387/9/58e2c309221f0b75455b04a764dae7f534e80f89         1
10 ws://40.1.29.176:37387/10/60c3c3272b102ac80ea6390c2e37fc65bbbe86ff        1
11 ws://40.1.29.176:37387/11/f42555c1c471dea1a884717ac35a9d9beea5ffe2        1
12 ws://40.1.29.176:37387/12/8d7d4f9460f70b9bcb6f6af8d6da1176aacd0f8b        1
13 ws://40.1.29.176:37387/13/340a2ab8590f67b1d5680c80ce0c348dbd2db42b        1
14 ws://40.1.29.176:37387/14/6fe3c24631a20689306cc2f0d1b00f9b264c7525        1
15 ws://40.1.29.176:37387/15/ffc82e8933eecf5e520e2d7c18df14a091c63743        1
16 ws://40.1.29.176:37387/16/e881afe42fe95bde1568cdd0434dae3fd4cb210b        1
17 ws://40.1.29.176:37387/17/2543943f5a4e835e0aa3813474f5283ce2ec3252        1
18 ws://40.1.29.176:37387/18/b6c9284a9d8019329e3a83452f34585452a2fff3        1
19 ws://40.1.29.176:37387/19/c9886a9c227344b9a3db4be3f94b3f6681a7f9b1        1
20 ws://40.1.29.176:37387/20/2dfc44716baf37cd11842b99914bfd2857d5da42        1

And most importantly, there are 5 tasks stuck in an unresolved state.

> length(controller$queue)
[1] 5
> for (i in seq_len(5)) print(controller$queue[[i]]$handle[[1]]$data)
'unresolved' logi NA
'unresolved' logi NA
'unresolved' logi NA
'unresolved' logi NA
'unresolved' logi NA

In my experiments with this and similar pipelines, it seems like I can set auto_scale = "none" and still sometimes produce the hanging (while some workers are still running and should be churning through tasks). But the hanging does seem to stop if seconds_idle is very high, so maybe it has something to do with auto-scaling down. crew assumes workers and tasks can launch independently and asynchronously relative to one another.

I am sorry to keep bothering you about hanging tasks in my pipelines, and I am sorry I can't seem to reproduce the problem with mirai alone. Hopefully this example provides more clues and more ways we can troubleshoot.

@wlandau
Copy link
Author

wlandau commented May 13, 2023

Later today, I plan to update mirai and nanonext and run this example again. Just realized an outdated mirai on my end might be the issue.

@shikokuchuo
Copy link
Owner

We definitely moved fast and broke things! I'm in the middle of trying to recover some things to a good state that may or may not be related to this. So you might not want to invest too much time yet if you find things still do not work.

@shikokuchuo
Copy link
Owner

Latest mirai build 9001 fixes some odd behaviour introduced just prior to mirai 0.8.4 release. I will need to observe one of our workflows over the next few days to confirm. Please use this rather than the CRAN build if you are testing.

It works with either the CRAN build of nanonext or a newer 9001 build. This actually rolls back 'libnng' to a slightly older version, but it might help your case as it sets a higher default expire thread count (used for timeouts). In my tests I haven't seen a difference, but I've made it available in case it helps in your situation.

@wlandau
Copy link
Author

wlandau commented May 14, 2023

Thanks for the updates. Unfortunately, this example still hangs with nanonext 0.8.3.9001 and mirai 0.8.7.9001. (In fact, the hanging seems easier to reproduce now.)

Another thing: when I set storage = "worker" in tar_target() so the workers save output to disk, I do not see output files for the tasks that hang, which means the task did not complete (may not have even started).

@wlandau
Copy link
Author

wlandau commented May 14, 2023

I managed to reproduce the hanging using crew.cluster running targets tasks. This one is simpler because it is an imperative script rather than a declarative Make-like pipeline.

controller <- crew.cluster::crew_controller_sge(
  seconds_launch = 120,
  workers = 20L,    
  sge_memory_gigabytes_required = 2L,
  seconds_idle = 5, 
  seconds_exit = 5, 
  script_lines = paste0("module load R/", getRversion())  
) 
controller$start() 
for (i in seq_len(6000)) { 
  print(i) 
  controller$push( 
    command = targets:::target_run(target, globalenv(), "_targets"),
    data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))), 
  )              
  tmp <- controller$pop()
} 
while (!controller$empty()) {  
  out <- controller$pop() 
  if (!is.null(out)) { 
    print(out$name) 
  } 
}
controller$terminate() 

@wlandau
Copy link
Author

wlandau commented May 14, 2023

This version hangs on my local Ubuntu machine which has 4 physical cores and 16 GB RAM:

controller <- crew::crew_controller_local(
  seconds_launch = 120,
  workers = 20L,
  seconds_idle = 5,
  seconds_exit = 5
)
controller$start()
for (i in seq_len(6000)) {
  print(i)
  controller$push(
    command = targets:::target_run(target, globalenv(), "_targets"),
    data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))), 
  )
  tmp <- controller$pop()
}
while (!controller$empty()) {
  out <- controller$pop()
  if (!is.null(out)) {
    print(out$name)
  }
}
controller$terminate()

@wlandau
Copy link
Author

wlandau commented May 14, 2023

Tasks are still hanging on GitHub Actions, though the number of failures does not seem to have significantly increased: https://github.com/ropensci/targets/actions/runs/4969941610/jobs/8893491899#step:9:209

@wlandau
Copy link
Author

wlandau commented May 14, 2023

Here is a lower-level script that does the same thing as #58 (comment). It emulates the auto-scaling and task management of crew, but without actually using crew. It unfortunately does not reproduce this elusive issue, but it does demonstrate how crew works behind the scenes, so hopefully it can be useful in future testing.

library(mirai)
library(nanonext)
library(purrr)
daemons(
  n = 20L,
  url = "ws://127.0.0.1:5000",
  dispatcher = TRUE,
  token = TRUE
)
urls <- environment(daemons)$..$default$urls
when_launched <- rep(-Inf, length(urls))
queue <- list()
tasks_to_submit <- seq_len(6000)
while(length(tasks_to_submit) > 0L || length(queue) > 0L) {
  if (length(tasks_to_submit) > 0L) {
    i <- tasks_to_submit[1L]
    print(paste("submitting task", i))
    queue[[length(queue) + 1L]] <- mirai(
      .expr = targets:::target_run(target, globalenv(), "_targets"),
      .args = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3")))
    )
    tasks_to_submit <- tasks_to_submit[-1L]
  }
  running <- map_lgl(queue, ~.unresolved(.x))
  walk(queue[!running], ~.x$data)
  queue <- queue[running]
  print(paste(format(Sys.time(), "%H:%M:%OS3"), "tasks remaining:", length(queue)))
  daemons <- daemons()$daemons
  now <- mclock()
  launching <- (now - when_launched) < 30000
  connected <- as.logical(daemons[, "online"] > 0L)
  discovered <- as.logical(daemons[, "instance"] > 0L)
  is_inactive <- (!connected) & (discovered | (!launching))
  which_relaunch <- head(which(is_inactive), n = length(queue))
  for (server_index in which_relaunch) {
    new_url <- saisei(i = server_index)
    if (!is.null(new_url)) {
      launch_server(url = new_url, maxtasks = 10, idletime = 1000)
      when_launched[server_index] <- now
    }
  }
}
daemons(0L)

@wlandau
Copy link
Author

wlandau commented May 14, 2023

So unfortunately, it looks like this is another instance that I cannot isolate with just mirai. Very strange.

Given the above experiments, it would seem to be a problem with crew and not mirai. On the other hand, I could have missed something when I wrote the mirai-only test at #58 (comment). Also, crew is just calling daemons(), server(), mirai(), saisei(force = FALSE), and .unresolved(), functions that I would assume would be safe to call in any order and at any frequency within reason. So at this point I am not sure how else to troubleshoot from my side. If you think of a way I can be useful, please let me know.

If you have time next week and would be willing to try the crew-based test at #58 (comment) using development crew, I think it could shed some light. Sorry about the layer of crew on top. Hopefully instrumentation/logging in mirai will produce findings on your end.

@shikokuchuo
Copy link
Owner

I can confirm that the mirai-only reprex runs nicely (and is actually quite cool!).

I've found the bug in your crew-only (local) reprex:

for (i in seq_len(6000)) {
  print(i)
  controller$push(
    command = targets:::target_run(target, globalenv(), "_targets"),
    data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))), 
  )
  tmp <- controller$pop()
}

Just comment out the line: tmp <- controller$pop(). Then everything works.

Sorry perhaps I should know why that's needed but I have to confess I don't. Admittedly the reprex might now not be doing exactly what you wanted it to do with that change, but it possibly points to how you manage the event loop in crew. I'm happy to take a look at any areas you're finding especially problematic.

Just taking a step back, the fact that you can produce a working example with just mirai should hopefully mean that the underlying infrastructure is sound. Logically, if that is all crew does then it would be working fine.

@wlandau
Copy link
Author

wlandau commented May 14, 2023

Just comment out the line: tmp <- controller$pop(). Then everything works.

targets needs the tmp <- pop() so it can collect results and save the metadata of those results even as new tasks are submitted. In a targets pipeline, saving the metadata of upstream tasks allows the downstream tasks that depend on them to begin.

When I commented out the tmp <- controller$pop() line and tried it on my Ubuntu machine, I got a new error at the out <- controller$pop() line calling daemons():

[1] "ef17d2102a8a294ffe29a8a51897473afa797bca"
[1] "e16a511d26e752fc231c2df898aaee851d5f26f5"
[1] "25512ee0c823d08274706027fca7edd281006366"
[1] "00ae568493e8011b3d0fc6948e94eef67d4445e9"
[1] "3643adae7ee677d71240fdff852fd427d4c79fd8"
[1] "3f4693ac1efd7c0eac9d85a45676943cea29462c"
[1] "607166bfb706d47ca4376a0c614553ff2e551786"
...

Error in `attributes<-`(res, list(dim = c(length(envir[["urls"]]), 4L),  : 
  dims [product 80] do not match the length of object [65]
In addition: Warning messages:
1: In is.character(r) && nzchar(r) :
  'length(x) = 52 > 1' in coercion to 'logical(1)'
2: In envir[["urls"]][i] <- r :
  number of items to replace is not a multiple of replacement length
3: In self$workers$socket[index] <- socket :
  number of items to replace is not a multiple of replacement length
4: In recv(sock, mode = mode, block = timeout) :
  received data could not be converted to integer
> traceback()
10: query_status(envir)
9: mirai::daemons(.compute = self$name)
8: self$router$poll()
7: self$scale()
6: if_any(scale, self$scale(), self$collect())
5: controller$pop() at test2.R#17
4: eval(ei, envir)
3: eval(ei, envir)
2: withVisible(eval(ei, envir))
1: source("test2.R")

@wlandau
Copy link
Author

wlandau commented May 14, 2023

It's so strange: this is a script that more accurately emulates what crew does, and yet it succeeds:

library(mirai)
library(nanonext)
library(purrr)
daemons(
  n = 20L,
  url = "ws://127.0.0.1:5000",
  dispatcher = TRUE,
  token = TRUE
)
urls <- environment(daemons)$..$default$urls
when_launched <- rep(-Inf, length(urls))
queue <- list()
results <- list()
tasks_to_submit <- seq_len(6000)
while(length(tasks_to_submit) > 0L || length(queue) > 0L) {
  # Submit a task
  if (length(tasks_to_submit) > 0L) {
    i <- tasks_to_submit[1L]
    print(paste("submitting task", i))
    target <- targets::tar_target_raw(
      name = "y",
      command = parse(
        text = sprintf('tibble::tibble(x = %s, a = 1, b = 2, c = "3")', i)
      )[[1]]
    )
    queue[[length(queue) + 1L]] <- mirai(
      .expr = targets:::target_run(target, globalenv(), "_targets"),
      .args = list(target = target)
    )
    tasks_to_submit <- tasks_to_submit[-1L]
  }
  
  # Collect the results of resolved tasks
  # and clear those tasks out of the queue
  # so demand for server can be assessed properly.
  done <- rep(FALSE, length(queue))
  for (index in seq_along(done)) {
    task <- queue[[index]]
    if (!.unresolved(task)) {
      results[[length(results) + 1L]] <- task
      done[index] <- TRUE
    }
  }
  queue[done] <- NULL

  # Scale up servers to meet demand from tasks.
  daemons <- daemons()$daemons
  now <- mclock()
  launching <- (now - when_launched) < 30000
  connected <- as.logical(daemons[, "online"] > 0L)
  discovered <- as.logical(daemons[, "instance"] > 0L)
  is_inactive <- (!connected) & (discovered | (!launching))
  which_relaunch <- head(which(is_inactive), n = length(queue))
  for (server_index in which_relaunch) {
    new_url <- saisei(i = server_index)
    if (!is.null(new_url)) {
      launch_server(url = new_url, idletime = 1000)
      when_launched[server_index] <- now
    }
  }
  
  # Deal with the result of a task.
  if (length(results) > 0L) {
    print(paste("result:", results[[1]]$data$value$object$x))
    results[[1]] <- NULL
  }
}
daemons(0L)

@wlandau
Copy link
Author

wlandau commented May 14, 2023

So I think it would be more productive at this point if I focus on the crew-only example.

Another difference: in the crew example, workers tend to scale down as I would expect (noticed from watching htop). In the mirai-only example, the workers don't scale down automatically. So I wonder if I need to take a closer look at the arguments I pass to server().

I might also try a launcher plugin that just calls mirai::launch_server() instead of a processx call. Won't have a terminate_worker() method, but it shouldn't need one here.

@shikokuchuo
Copy link
Owner

shikokuchuo commented May 14, 2023 via email

@shikokuchuo
Copy link
Owner

When I commented out the tmp <- controller$pop() line and tried it on my Ubuntu machine, I got a new error at the out <- controller$pop() line calling daemons():

[1] "ef17d2102a8a294ffe29a8a51897473afa797bca"
[1] "e16a511d26e752fc231c2df898aaee851d5f26f5"
[1] "25512ee0c823d08274706027fca7edd281006366"
[1] "00ae568493e8011b3d0fc6948e94eef67d4445e9"
[1] "3643adae7ee677d71240fdff852fd427d4c79fd8"
[1] "3f4693ac1efd7c0eac9d85a45676943cea29462c"
[1] "607166bfb706d47ca4376a0c614553ff2e551786"
...

Error in `attributes<-`(res, list(dim = c(length(envir[["urls"]]), 4L),  : 
  dims [product 80] do not match the length of object [65]
In addition: Warning messages:
1: In is.character(r) && nzchar(r) :
  'length(x) = 52 > 1' in coercion to 'logical(1)'
2: In envir[["urls"]][i] <- r :
  number of items to replace is not a multiple of replacement length
3: In self$workers$socket[index] <- socket :
  number of items to replace is not a multiple of replacement length
4: In recv(sock, mode = mode, block = timeout) :
  received data could not be converted to integer
> traceback()
10: query_status(envir)
9: mirai::daemons(.compute = self$name)
8: self$router$poll()
7: self$scale()
6: if_any(scale, self$scale(), self$collect())
5: controller$pop() at test2.R#17
4: eval(ei, envir)
3: eval(ei, envir)
2: withVisible(eval(ei, envir))
1: source("test2.R")

Re. my above comment, I just tested on the latest crew version 2a1c6ad and still works for me.

The error message seems to imply that the dispatcher is running an older version of mirai (i.e. it is possibly sending back a serialised r object), while the client process is expecting a single character vector, hence the 'length(x) = 52 > 1' in coercion to 'logical(1)' error. This is very strange - not only can I not reproduce, I can't really see how it is possible!

But just to show that it does indeed work, this is the log from an example run:

> controller$log
# A tibble: 20 × 5
   popped_tasks popped_seconds popped_errors popped_warnings controller                              
          <int>          <dbl>         <int>           <int> <chr>                                   
 1         4018         5.47               0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
 2          735         1.11               0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
 3           84         0.166              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
 4           92         0.155              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
 5           55         0.0970             0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
 6           47         0.101              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
 7           69         0.107              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
 8           81         0.130              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
 9           56         0.109              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
10           44         0.0900             0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
11           99         0.156              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
12           62         0.119              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
13           86         0.146              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
14           69         0.117              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
15           85         0.125              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
16           58         0.0980             0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
17           67         0.107              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
18           61         0.0950             0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
19           58         0.0890             0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e
20           74         0.113              0               0 0685becc44f8e004a6b958916deb9eafe8d14b1e

@shikokuchuo
Copy link
Owner

shikokuchuo commented May 14, 2023

Another difference: in the crew example, workers tend to scale down as I would expect (noticed from watching htop). In the mirai-only example, the workers don't scale down automatically. So I wonder if I need to take a closer look at the arguments I pass to server().

I tried your new example and again it works. When I look at completed tasks, the daemons()$daemons record is showing around 5,100 for the instances that last until the end, so there is still some limited scaling down happening.

@wlandau
Copy link
Author

wlandau commented May 14, 2023

Well, thanks for trying it out.

The most recent evidence does point to crew, so I will open new issues there. It's really baffling.

I create an even more crew-like test below which runs fine.

library(mirai)
library(nanonext)
library(purrr)

new_task <- function(i) {
  print(paste("submitting task", i))
  command <- quote(quote(targets:::target_run(target, globalenv(), "_targets")))
  .expr <- rlang::call2(
    .fn = quote(crew::crew_eval),
    command = command,
    data = quote(data),
    globals = quote(globals),
    seed = quote(seed),
    packages = quote(packages),
    library = quote(library)
  )
  target <- targets::tar_target_raw(
    name = paste0("task", i),
    command = quote(tibble::tibble(x = 1, a = 1, b = 2, c = "3"))
  )
  .args <- list(
    data = list(target = target),
    globals = list(),
    seed = 123L,
    packages = character(0L),
    library = NULL
  )
  handle <- mirai(
    .expr = .expr,
    .args = .args,
    .timeout = NULL
  )
  task <- list(
    name = crew:::crew_random_name(),
    command = paste(deparse1(command), collapse = " "),
    handle = list(handle)
  )
}

collect_results <- function(controller) {
  done <- rep(FALSE, length(controller$queue))
  for (index in seq_along(done)) {
    task <- controller$queue[[index]]
    if (!.unresolved(task$handle[[1L]])) {
      controller$results[[length(controller$results) + 1L]] <- task
      done[index] <- TRUE
    }
  }
  controller$queue[done] <- NULL
}

scale_servers <- function(controller) {
  daemons <- daemons()$daemons
  now <- mclock()
  launching <- (now - controller$when_launched) < 30000
  connected <- as.logical(daemons[, "online"] > 0L)
  discovered <- as.logical(daemons[, "instance"] > 0L)
  is_inactive <- (!connected) & (discovered | (!launching))
  which_relaunch <- head(which(is_inactive), n = length(controller$queue))
  for (server_index in which_relaunch) {
    new_url <- saisei(i = server_index)
    if (!is.null(new_url)) {
      run_server(url = new_url)
      controller$when_launched[server_index] <- now
    }
  }
}

run_server <- function(url) {
 # launch_server(url = url, idletime = 1000)
  bin <- ifelse(tolower(Sys.info()[["sysname"]]) == "windows", "R.exe", "R")
  path <- file.path(R.home("bin"), bin)
  call <- sprintf(
    "crew::crew_worker(settings = list(url = \"%s\", maxtasks = Inf, idletime = 1000, walltime = Inf, timerstart = 0L, exitlinger = 1000, cleanup = 1L, asyncdial = FALSE), launcher = \"b\", worker = \"c\", instance = \"d\")",
    url
  )
  processx::process$new(
    command = path,
    args = c("-e", call),
    cleanup = FALSE
  )
}

push_task <- function(i, controller) {
  controller$queue[[length(controller$queue) + 1L]] <- new_task(tasks_submitted)
}

pop_task <- function(controller) {
  print(paste("collected task:", controller$results[[1L]]$handle[[1L]]$data$result[[1L]]$settings$name))
  controller$results <- controller$results[-1L]
}

daemons(
  n = 20L,
  url = "ws://127.0.0.1:0",
  dispatcher = TRUE,
  token = TRUE
)
controller <- new.env(parent = emptyenv())
controller$queue <- list()
controller$results <- list()
controller$when_launched <- rep(-Inf, length(environment(daemons)$..$default$urls))
tasks_submitted <- 0L
n_tasks <- 6000

while(tasks_submitted < n_tasks || length(controller$queue) > 0L || length(controller$results) > 0L) {
  # Submit a task
  if (tasks_submitted < n_tasks) {
    tasks_submitted <- tasks_submitted + 1L
    push_task(tasks_submitted, controller)
    collect_results(controller)
    scale_servers(controller)
  }

  # Collect the result of a task.
  collect_results(controller)
  scale_servers(controller)
  if (length(controller$results) > 0L) {
    pop_task(controller)
  }
}

daemons(0L)

@wlandau
Copy link
Author

wlandau commented May 14, 2023

Just submitted wlandau/crew#75. Any suggestions on how I might troubleshoot this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants