Skip to content

Commit

Permalink
use new Monitor class
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 29, 2024
1 parent 145d7ac commit 7ba6471
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 30 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.3.1.9004
Version: 1.3.1.9005
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand Down Expand Up @@ -33,7 +33,7 @@ Encoding: UTF-8
Depends:
R (>= 3.6)
Imports:
nanonext (>= 1.3.2.9006)
nanonext (>= 1.3.2.9007)
Enhances:
parallel,
promises
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ importFrom(nanonext,is_error_value)
importFrom(nanonext,listen)
importFrom(nanonext,lock)
importFrom(nanonext,mclock)
importFrom(nanonext,monitor)
importFrom(nanonext,msleep)
importFrom(nanonext,nng_error)
importFrom(nanonext,opt)
importFrom(nanonext,parse_url)
importFrom(nanonext,pipe_notify)
importFrom(nanonext,random)
importFrom(nanonext,read_monitor)
importFrom(nanonext,reap)
importFrom(nanonext,recv)
importFrom(nanonext,recv_aio)
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# mirai 1.3.1.9004 (development)
# mirai 1.3.1.9005 (development)

#### New Features

* Introduces `daemons(dispatcher = "next")`, a newer and more efficient dispatcher. Supports mirai cancellation.

#### Updates

* Requires `nanonext` >= [1.3.2.9006].
* Requires `nanonext` >= [1.3.2.9007].

# mirai 1.3.1

Expand Down
2 changes: 0 additions & 2 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ daemon2 <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
}
snapshot()

send(sock, 0L, mode = 2L, block = TRUE)
aio <- recv_aio(sock, mode = 1L, cv = cv)

repeat {
aio2 <- recv_aio(sock, mode = 1L, cv = cv)
wait(cv) || break
Expand Down
41 changes: 24 additions & 17 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,14 @@ dispatcher2 <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL

auto <- is.null(url)
if (auto) url <- local_url()
ncv <- cv()
psock <- socket(protocol = "poly")
on.exit(reap(psock), add = TRUE, after = TRUE)
pipe_notify(psock, cv = ncv, cv2 = cv, add = TRUE, remove = TRUE)
m <- monitor(psock, cv)
listen(psock, url = url, tls = tls, error = TRUE)

msgid <- 100L
inq <- outq <- list()

if (auto) {

envir <- new.env(hash = FALSE)
Expand All @@ -297,6 +299,12 @@ dispatcher2 <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL
for (i in seq_len(n))
until(cv, .limit_long) || stop(._[["sync_daemons"]])

changes <- read_monitor(m)
for (item in changes) {
outq[[as.character(item)]] <- if (item > 0) list(pipe = item, msgid = 0L, ctx = NULL)
print(item)
}

} else {

url <- check_url(psock)
Expand All @@ -316,8 +324,6 @@ dispatcher2 <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL
cmessage <- recv_aio(sockc, mode = 5L, cv = cv)
}

msgid <- 100L
inq <- outq <- list()
ctx <- .context(sock)
req <- recv_aio(ctx, mode = 8L, cv = cv)
res <- recv_aio(psock, mode = 8L, cv = cv)
Expand All @@ -327,18 +333,23 @@ dispatcher2 <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL

wait(cv) || break

if (cv_value(ncv)) {
wait(ncv)
# add connection / disconnection logic
changes <- read_monitor(m)
if (length(changes)) {
for (item in changes) {
outq[[as.character(item)]] <- if (item > 0) list(pipe = item, msgid = 0L, ctx = NULL)
print(item)
}
next
}

ctrchannel && !.unresolved(cmessage) && {
msgid <- collect_aio(cmessage)
if (msgid) {
found <- FALSE
for (item in outq)
if (msgid == item[["msgid"]]) {
call_aio(send_aio(psock, .miraiInterrupt, mode = 1L, pipe = item[["pipe"]]))
for (i in seq_along(outq))
if (outq[[i]][["msgid"]] == msgid) {
call_aio(send_aio(psock, .miraiInterrupt, mode = 1L, pipe = outq[[i]][["pipe"]]))
outq[[i]][["msgid"]] <- 0L
found <- TRUE
break
}
Expand All @@ -365,14 +376,10 @@ dispatcher2 <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL

} else if (!.unresolved(res)) {
value <- collect_aio(res)
id <- .subset2(res, "aio")
id <- as.character(.subset2(res, "aio"))
res <- recv_aio(psock, mode = 8L, cv = cv)
if (value[1L] == 0L) {
outq[[as.character(id)]] <- list(pipe = id, msgid = 0L, ctx = NULL)
} else {
send(outq[[as.character(id)]][["ctx"]], value, mode = 2L, block = TRUE)
outq[[as.character(id)]][["msgid"]] <- 0L
}
send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE)
outq[[id]][["msgid"]] <- 0L
}

if (length(inq))
Expand Down
8 changes: 4 additions & 4 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
#'
#' @importFrom nanonext .advance call_aio call_aio_ collect_aio collect_aio_
#' .context cv cv_value dial .dispatcher .interrupt is_error_value .keep
#' listen lock .mark mclock msleep nng_error .online opt opt<- parse_url
#' pipe_notify random reap recv recv_aio request send send_aio serial_config
#' socket stat stop_aio tls_config unresolved .unresolved until wait
#' write_cert
#' listen lock .mark mclock monitor msleep nng_error .online opt opt<-
#' parse_url pipe_notify random read_monitor reap recv recv_aio request send
#' send_aio serial_config socket stat stop_aio tls_config unresolved
#' .unresolved until wait write_cert
#'
"_PACKAGE"

Expand Down
3 changes: 1 addition & 2 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,8 @@ collect_mirai <- collect_aio
stop_mirai <- function(x) {
.compute <- attr(x, "profile")
envir <- if (!is.null(.compute)) ..[[.compute]]
res <- length(envir[["msgid"]]) && query_dispatcher(envir[["sockc"]], command = attr(x, "msgid"), mode = 6L)
stop_aio(x)
invisible(res)
invisible(length(envir[["msgid"]]) && query_dispatcher(envir[["sockc"]], command = attr(x, "msgid"), mode = 6L))
}

#' Query if a mirai is Unresolved
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ connection && Sys.getenv("NOT_CRAN") == "true" && {
test_true(stop_mirai(m2))
test_equal(m2$data, 20L)
test_true(stop_mirai(m1))
test_class("miraiError", mirai(res)[])
test_class("miraiInterrupt", mirai(res)[])
test_equal(status()$connections, 1L)
test_equal(length(nextget("urls")), 1L)
test_class("miraiLaunchCmd", launch_remote(1))
Expand Down

0 comments on commit 7ba6471

Please sign in to comment.