Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Use progressr with future framework for progress updates #703

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ LazyData: true
Imports:
future,
future.apply,
pbapply,
progressr,
irlba,
NMF (>= 0.23.0),
ggalluvial,
Expand Down
51 changes: 28 additions & 23 deletions R/analysis.R
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,9 @@ netAnalysis_contribution <- function(object, signaling, signaling.name = NULL, s
#' @param net compute the centrality measures on a specific signaling network given by a 2 or 3 dimemsional array net
#' @param net.name a character vector giving the name of signaling networks
#' @param thresh threshold of the p-value for determining significant interaction
#' @importFrom future nbrOfWorkers
#' @importFrom methods slot
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#'
#' @return
#' @export
Expand All @@ -211,17 +210,16 @@ netAnalysis_computeCentrality <- function(object = NULL, slot.name = "netP", net
}
if (length(dim(net)) == 3) {
nrun <- dim(net)[3]
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = pbapply::pbsapply,
no = future.apply::future_sapply
)
centr.all = my.sapply(
p <- progressr::progressor(nrun)
centr.all <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
Sys.sleep(1/nrun)
p(sprintf("%g of %g", x, nrun)) # Use with_progress() to see progress bar in client-side
net0 <- net[ , , x]
return(computeCentralityLocal(net0))
computeCentralityLocal(net0)
},
future.seed = TRUE,
simplify = FALSE
)
} else {
Expand Down Expand Up @@ -705,9 +703,9 @@ netEmbedding <- function(object, slot.name = "netP", type = c("functional","stru
#' @param nCores number of workers when doing parallel
#' @param k.eigen the number of eigenvalues used when doing spectral clustering
#' @importFrom methods slot
#' @importFrom future nbrOfWorkers plan
#' @importFrom future plan
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#' @return
#' @export
#'
Expand All @@ -733,28 +731,35 @@ netClustering <- function(object, slot.name = "netP", type = c("functional","str
} else {
N <- nrow(data.use)
kRange <- seq(2,min(N-1, 10),by = 1)
kN <- length(kRange)
nCores <- as.integer(nCores)
if (do.parallel) {
future::plan("multiprocess", workers = nCores)
if (.Platform$OS.type == "windows") {
future::plan("multisession", workers = nCores)
} else {
future::plan("multicore", workers = nCores)
}
options(future.globals.maxSize = 1000 * 1024^2)
} else {
future::plan("sequential")
}
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = pbapply::pbsapply,
no = future.apply::future_sapply
)
results = my.sapply(
X = 1:length(kRange),
message(sprintf("future plan is '%s'", as.character(attr(future::plan(), "call"))[2]))
p <- progressr::progressor(kN)
results <- future.apply::future_sapply(
X = 1:kN,
FUN = function(x) {
idents <- kmeans(data.use,kRange[x],nstart=10)$cluster
Sys.sleep(1/kN)
p(sprintf("%g of %g", x, kN)) # Use with_progress() to see progress bar in client-side
idents <- kmeans(data.use, kRange[x], nstart = 10)$cluster
clusIndex <- idents
#adjMat0 <- as.numeric(outer(clusIndex, clusIndex, FUN = "==")) - outer(1:N, 1:N, "==")
adjMat0 <- Matrix::Matrix(as.numeric(outer(clusIndex, clusIndex, FUN = "==")), nrow = N, ncol = N)
return(list(adjMat = adjMat0, ncluster = length(unique(idents))))
list(adjMat = adjMat0, ncluster = length(unique(idents)))
},
future.seed = TRUE,
simplify = FALSE
)
adjMat <- lapply(results, "[[", 1)
CM <- Reduce('+', adjMat)/length(kRange)
CM <- Reduce('+', adjMat)/kN
res <- computeEigengap(as.matrix(CM))
numCluster <- res$upper_bound
clusters = kmeans(data.use,numCluster,nstart=10)$cluster
Expand Down
99 changes: 42 additions & 57 deletions R/modeling.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
#' @param n parameter in Hill function
#'
#'
#' @importFrom future nbrOfWorkers
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#' @importFrom stats aggregate
#' @importFrom Matrix crossprod
#' @importFrom utils txtProgressBar setTxtProgressBar
Expand Down Expand Up @@ -69,11 +68,6 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres
}
complex_input <- object@DB$complex
cofactor_input <- object@DB$cofactor
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = sapply,
no = future.apply::future_sapply
)

ptm = Sys.time()

Expand Down Expand Up @@ -154,14 +148,17 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres

set.seed(seed.use)
permutation <- replicate(nboot, sample.int(nC, size = nC))
data.use.avg.boot <- my.sapply(
p <- progressr::progressor(nboot)
data.use.avg.boot <- future.apply::future_sapply(
X = 1:nboot,
FUN = function(nE) {
p()
groupboot <- group[permutation[, nE]]
data.use.avgB <- aggregate(t(data.use), list(groupboot), FUN = FunMean)
data.use.avgB <- t(data.use.avgB[,-1])
return(data.use.avgB)
data.use.avgB
},
future.seed = TRUE,
simplify = FALSE
)
pb <- txtProgressBar(min = 0, max = nLR, style = 3, file = stderr())
Expand Down Expand Up @@ -203,10 +200,11 @@ computeCommunProb <- function(object, type = c("triMean", "truncatedMean","thres

Pnull <- as.vector(Pnull)

#Pboot <- foreach(nE = 1:nboot) %dopar% {
Pboot <- sapply(
p <- progressr::progressor(nboot)
Pboot <- future.apply::future_sapply(
X = 1:nboot,
FUN = function(nE) {
p()
data.use.avgB <- data.use.avg.boot[[nE]]
dataLavgB <- computeExpr_LR(geneL[i], data.use.avgB, complex_input)
dataRavgB <- computeExpr_LR(geneR[i], data.use.avgB, complex_input)
Expand Down Expand Up @@ -458,23 +456,20 @@ computeAveExpr <- function(object, features = NULL, group.by = NULL, type = c("t
#' @param complex the names of complex
#' @return
#' @importFrom dplyr select starts_with
#' @importFrom future nbrOfWorkers
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#' @export
computeExpr_complex <- function(complex_input, data.use, complex) {
Rsubunits <- complex_input[complex,] %>% dplyr::select(starts_with("subunit"))
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = sapply,
no = future.apply::future_sapply
)
data.complex = my.sapply(
X = 1:nrow(Rsubunits),
nrun <- nrow(Rsubunits)
p <- progressr::progressor(nrun)
data.complex <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
p()
RsubunitsV <- unlist(Rsubunits[x,], use.names = F)
RsubunitsV <- RsubunitsV[RsubunitsV != ""]
return(geometricMean(data.use[RsubunitsV,]))
geometricMean(data.use[RsubunitsV,])
}
)
data.complex <- t(data.complex)
Expand All @@ -489,20 +484,17 @@ computeExpr_complex <- function(complex_input, data.use, complex) {
# @param FunMean the function for computing mean expression per group
# @return
# @importFrom dplyr select starts_with
# @importFrom future nbrOfWorkers
# @importFrom future.apply future_sapply
# @importFrom pbapply pbsapply
# #' @export
# @importFrom progressr progressor
# @export
.computeExprGroup_complex <- function(complex_input, data.use, complex, group, FunMean) {
Rsubunits <- complex_input[complex,] %>% dplyr::select(starts_with("subunit"))
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = pbapply::pbsapply,
no = future.apply::future_sapply
)
data.complex = my.sapply(
X = 1:nrow(Rsubunits),
nrun <- nrow(Rsubunits)
p <- progressr::progressor(nrun)
data.complex <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
p()
RsubunitsV <- unlist(Rsubunits[x,], use.names = F)
RsubunitsV <- RsubunitsV[RsubunitsV != ""]
RsubunitsV <- intersect(RsubunitsV, rownames(data.use))
Expand All @@ -515,7 +507,7 @@ computeExpr_complex <- function(complex_input, data.use, complex) {
} else {
data.avg = matrix(0, nrow = 1, ncol = length(unique(group)))
}
return(geometricMean(data.avg))
geometricMean(data.avg)
}
)
data.complex <- t(data.complex)
Expand Down Expand Up @@ -554,9 +546,8 @@ computeExpr_LR <- function(geneLR, data.use, complex_input){
#' @param pairLRsig a data frame giving ligand-receptor interactions
#' @param type when type == "A", computing expression of co-activation receptor; when type == "I", computing expression of co-inhibition receptor.
#' @return
#' @importFrom future nbrOfWorkers
#' @importFrom future.apply future_sapply
#' @importFrom pbapply pbsapply
#' @importFrom progressr progressor
#' @export
computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c("A", "I")) {
type <- match.arg(type)
Expand All @@ -567,25 +558,23 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c
}
index.coreceptor <- which(!is.na(coreceptor.all) & coreceptor.all != "")
if (length(index.coreceptor) > 0) {
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = sapply,
no = future.apply::future_sapply
)
coreceptor <- coreceptor.all[index.coreceptor]
coreceptor.ind <- cofactor_input[coreceptor, grepl("cofactor" , colnames(cofactor_input) )]
data.coreceptor.ind = my.sapply(
X = 1:nrow(coreceptor.ind),
nrun <- nrow(coreceptor.ind)
p <- progressr::progressor(nrun)
data.coreceptor.ind <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
p()
coreceptor.indV <- unlist(coreceptor.ind[x,], use.names = F)
coreceptor.indV <- coreceptor.indV[coreceptor.indV != ""]
coreceptor.indV <- intersect(coreceptor.indV, rownames(data.use))
if (length(coreceptor.indV) == 1) {
return(1 + data.use[coreceptor.indV, ])
1 + data.use[coreceptor.indV, ]
} else if (length(coreceptor.indV) > 1) {
return(apply(1 + data.use[coreceptor.indV, ], 2, prod))
apply(1 + data.use[coreceptor.indV, ], 2, prod)
} else {
return(matrix(1, nrow = 1, ncol = ncol(data.use)))
matrix(1, nrow = 1, ncol = ncol(data.use))
}
}
)
Expand All @@ -607,9 +596,8 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c
# @param group a factor defining the cell groups
# @param FunMean the function for computing mean expression per group
# @return
# @importFrom future nbrOfWorkers
# @importFrom future.apply future_sapply
# @importFrom pbapply pbsapply
#' @importFrom progressr progressor
# #' @export
.computeExprGroup_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c("A", "I"), group, FunMean) {
type <- match.arg(type)
Expand All @@ -620,30 +608,27 @@ computeExpr_coreceptor <- function(cofactor_input, data.use, pairLRsig, type = c
}
index.coreceptor <- which(!is.na(coreceptor.all) & coreceptor.all != "")
if (length(index.coreceptor) > 0) {
my.sapply <- ifelse(
test = future::nbrOfWorkers() == 1,
yes = pbapply::pbsapply,
no = future.apply::future_sapply
)
coreceptor <- coreceptor.all[index.coreceptor]
coreceptor.ind <- cofactor_input[coreceptor, grepl("cofactor" , colnames(cofactor_input) )]
data.coreceptor.ind = my.sapply(
X = 1:nrow(coreceptor.ind),
nrun <- nrow(coreceptor.ind)
p <- progressr::progressor(nrun)
data.coreceptor.ind <- future.apply::future_sapply(
X = 1:nrun,
FUN = function(x) {
p()
coreceptor.indV <- unlist(coreceptor.ind[x,], use.names = F)
coreceptor.indV <- coreceptor.indV[coreceptor.indV != ""]
coreceptor.indV <- intersect(coreceptor.indV, rownames(data.use))
if (length(coreceptor.indV) > 1) {
data.avg <- aggregate(t(data.use[coreceptor.indV,]), list(group), FUN = FunMean)
data.avg <- t(data.avg[,-1])
return(apply(1 + data.avg, 2, prod))
# return(1 + apply(data.avg, 2, mean))
apply(1 + data.avg, 2, prod)
} else if (length(coreceptor.indV) == 1) {
data.avg <- aggregate(matrix(data.use[coreceptor.indV,], ncol = 1), list(group), FUN = FunMean)
data.avg <- t(data.avg[,-1])
return(1 + data.avg)
1 + data.avg
} else {
return(matrix(1, nrow = 1, ncol = length(unique(group))))
matrix(1, nrow = 1, ncol = length(unique(group)))
}
}
)
Expand Down
Loading