Skip to content

Commit

Permalink
JOURNAL: Add support for summary(js, by = "future")
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Jan 30, 2024
1 parent 477828f commit e91afac
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 6 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: future
Version: 1.33.1-9005
Version: 1.33.1-9006
Title: Unified Parallel and Distributed Processing in R for Everyone
Imports:
digest,
Expand Down
98 changes: 93 additions & 5 deletions R/journal.R
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#' @noRd
journal <- function(x, ...) UseMethod("journal")


#' @export
journal.Future <- function(x, ...) {
data <- x$.journal
Expand Down Expand Up @@ -184,23 +185,35 @@ print.FutureJournal <- function(x, digits.secs = 3L, ...) {
NextMethod("print")
}


#' Summarize Timing and Memory Profiling
#'
#' _WARNING: This function is under development. It can change at any time.
#' For now, please, do not depend on this function in a published R package._
#'
#' @param by If `"evaluation"` (default), summarize across all evaluations.
#' Each evaluation is resolved by one future.
#' If `"process"`, summarize journal statistics per process,
#' which includes the main R session, and all workers (if any).
#'
#' @return
#' A data.frame of class `FutureJournalSummary`.
#'
#' @export
summary.FutureJournal <- function(object, by = c("future", "process"), ...) {
summary.FutureJournal <- function(object, by = c("evaluation", "process", "future"), ...) {
## To please 'R CMD check'
event <- future_uuid <- median <- parent <- category <- NULL

by <- match.arg(by)

if (by == "future") {
if (by == "evaluation") {
dt_top <- subset(object, is.na(parent))

uuids <- unique(dt_top$future_uuid)
nbr_of_futures <- length(uuids)

## Calculate 'stop' times
dt_top$stop <- dt_top$start + dt_top$duration

## -------------------------------------------------------
## 1. Calculate the total walltime
## -------------------------------------------------------
Expand Down Expand Up @@ -319,9 +332,84 @@ summary.FutureJournal <- function(object, by = c("future", "process"), ...) {

attr(stats, "nbr_of_futures") <- length(uuids)
class(stats) <- c("FutureJournalSummary", class(stats))




} else if (by == "process") {
stop(FutureError(sprintf("summary(..., by = \"process\") for %s is not implemented", class(object)[1])))
dt_top <- object

uuids <- unique(dt_top$session_uuid)

## Calculate 'stop' times
dt_top$stop <- dt_top$start + dt_top$duration
dt_top$at <- dt_top$start - min(dt_top$start, na.rm = TRUE)

## (a) Groups by sessions
groups <- by(dt_top, dt_top$session_uuid, FUN = identity)

## (b) Stats by group
stats <- lapply(groups, FUN = function(group) {
stats <- data.frame(
session_uuid = group$session_uuid[1],
start = min(group$start, na.rm = TRUE),
at = min(group$at, na.rm = TRUE),
duration = diff(range(group$at, group$at + group$duration, na.rm = TRUE)),
memory_rss = diff(range(c(group$memory_start_rss, group$memory_stop_rss), na.rm = TRUE)),
memory_vms = diff(range(c(group$memory_start_vms, group$memory_stop_vms), na.rm = TRUE)),
nbr_of_futures = length(unique(subset(group, event == "evaluate")$future_uuid))

)
stats
})

## (c) Combine
stats <- do.call(rbind, stats)

stats <- stats[order(stats$session_uuid != session_uuid(), stats$start), ]

rownames(stats) <- NULL

class(stats) <- c("FutureJournalSummary", class(stats))


} else if (by == "future") {
dt_top <- object

uuids <- unique(dt_top$future_uuid)

## Calculate 'stop' times
dt_top$stop <- dt_top$start + dt_top$duration
dt_top$at <- dt_top$start - min(dt_top$start, na.rm = TRUE)

## (a) Groups by futures
groups <- by(dt_top, dt_top$future_uuid, FUN = identity)

## (b) Stats by group
stats <- lapply(groups, FUN = function(group) {
stats <- data.frame(
future_uuid = group$future_uuid[1],
start = min(group$start, na.rm = TRUE),
at = min(group$at, na.rm = TRUE),
duration = diff(range(group$at, group$at + group$duration, na.rm = TRUE)),
memory_rss = diff(range(c(group$memory_start_rss, group$memory_stop_rss), na.rm = TRUE)),
memory_vms = diff(range(c(group$memory_start_vms, group$memory_stop_vms), na.rm = TRUE)),
nbr_of_sessions = length(unique(subset(group, event == "evaluate")$session_uuid))

)
stats
})

## (c) Combine
stats <- do.call(rbind, stats)

stats <- stats[order(stats$start), ]

rownames(stats) <- NULL

class(stats) <- c("FutureJournalSummary", class(stats))
}


stats
}
Expand Down
21 changes: 21 additions & 0 deletions man/summary.FutureJournal.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions tests/capture_journals.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ print(js)
stats <- summary(js)
print(stats)

stats <- summary(js, by = "future")
print(stats)

stats <- summary(js, by = "process")
print(stats)

stats <- summary(js, by = "future")
print(stats)

message("*** summary() of FutureJournal ... done")

#source("incl/end.R")

0 comments on commit e91afac

Please sign in to comment.