Skip to content

Commit

Permalink
Merge pull request #77 from biomage-org/2252-gem2s-subset-step
Browse files Browse the repository at this point in the history
[BIOMAGE-2252] - Add new subset pipeline to subset experiments
  • Loading branch information
gerbeldo authored Dec 26, 2022
2 parents 8551014 + f937381 commit c72c807
Show file tree
Hide file tree
Showing 36 changed files with 939 additions and 316 deletions.
12 changes: 12 additions & 0 deletions pipeline-runner/NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# Generated by roxygen2: do not edit by hand

export(add_metadata)
export(add_new_sample_ids)
export(add_subset_metadata)
export(build_cc_gene_list)
export(build_metadata_cellsets)
export(cbind_cellset_type)
export(compute_sample_edrops)
export(create_sample_id_map)
export(create_scdata)
export(create_seurat)
export(diet_scdata)
export(download_user_files)
export(embed_and_cluster)
export(filter_doublets)
Expand All @@ -25,12 +31,15 @@ export(integrate_from_sketch)
export(integrate_scdata)
export(learn_from_sketches)
export(list_exclude_genes)
export(load_cellsets)
export(load_parent_experiment_data)
export(load_user_files)
export(log_normalize)
export(make_annot_with_ids)
export(merge_scdata_list)
export(normalize_annotation_types)
export(normalize_data)
export(parse_cellsets)
export(prepare_experiment)
export(prepare_sct_integration)
export(read_10x_annotations)
Expand All @@ -40,9 +49,12 @@ export(runClusters)
export(run_emptydrops)
export(run_geosketch)
export(run_pca)
export(safe_cbind)
export(score_doublets)
export(subset_experiment)
export(subset_ids)
export(subset_safe)
export(subset_seurat)
export(sym_to_ids)
export(upload_to_aws)
import(data.table)
Expand Down
6 changes: 6 additions & 0 deletions pipeline-runner/R/gem2s-3-run_emptydrops.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ run_emptydrops <- function(input, pipeline_config, prev_out) {
}


#' Calculate empty drops scores for sample
#'
#' @param sample_counts dgCMatrix with counts for one sample.
#'
#' @return data.frame with edrops scores
#' @export
#'
compute_sample_edrops <- function(sample_counts) {
# check if filtered
num_empty_drops <- sum(Matrix::colSums(sample_counts) < gem2s$max.empty.counts)
Expand Down
7 changes: 3 additions & 4 deletions pipeline-runner/R/gem2s-6-prepare_experiment.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#' Prepare experiment for upload to AWS
#'
#' 1) Merges the samples for the current experiment
#' 2) Adds metadata: cellsId, color_pool, and gene annotation
#' 3) Preparing QC configuration
#' 1) Adds metadata: cellsId, color_pool, and gene annotation
#' 2) Prepares QC configuration
#'
#' @inheritParams download_user_files
#' @param prev_out 'output' slot from call to \code{create_seurat}
Expand All @@ -15,7 +14,7 @@
prepare_experiment <- function(input, pipeline_config, prev_out) {
message("Preparing experiment ...")

check_names <- c("config", "counts_list", "annot", "doublet_scores", "scdata_list", "disable_qc_filters")
check_names <- c("config", "edrops", "annot", "scdata_list", "disable_qc_filters")
check_prev_out(prev_out, check_names)

scdata_list <- prev_out$scdata_list
Expand Down
7 changes: 6 additions & 1 deletion pipeline-runner/R/gem2s-7-upload_to_aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#'
upload_to_aws <- function(input, pipeline_config, prev_out) {
message("Uploading to AWS ...")
check_names <- c("config", "counts_list", "annot", "doublet_scores", "scdata_list", "qc_config", "disable_qc_filters")
check_names <- c("config", "scdata_list", "qc_config", "disable_qc_filters")
check_prev_out(prev_out, check_names)

experiment_id <- input$experimentId
Expand All @@ -22,6 +22,11 @@ upload_to_aws <- function(input, pipeline_config, prev_out) {
qc_config <- prev_out$qc_config
disable_qc_filters <- prev_out$disable_qc_filters

if("sample_id_map" %in% names(prev_out)) {
input$sampleIds <- names(scdata_list)
input$sampleNames <- names(scdata_list)
}

message("Constructing cell sets ...")
cell_sets <- get_cell_sets(scdata_list, input)

Expand Down
106 changes: 101 additions & 5 deletions pipeline-runner/R/handle_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ upload_cells_id <- function(pipeline_config, object_key, cells_id) {
return(object_key)
}

reload_scdata_from_s3 <- function (s3, pipeline_config, experiment_id) {
load_processed_scdata <- function (s3, pipeline_config, experiment_id) {
bucket <- pipeline_config$processed_bucket
message("Loading processed scdata")
message(bucket)
message(paste(experiment_id, "r.rds", sep = "/"))

Expand All @@ -47,10 +48,10 @@ get_nnzero <- function (x) {
}

order_by_size <- function(scdata_list) {
return(scdata_list <- scdata_list[ order( sapply(scdata_list, get_nnzero)) ])
return(scdata_list[order(sapply(scdata_list, get_nnzero))])
}

reload_scdata_list_from_s3 <- function (s3, pipeline_config, experiment_id) {
load_source_scdata_list <- function (s3, pipeline_config, experiment_id) {
bucket <- pipeline_config$source_bucket
objects <- s3$list_objects(
Bucket = bucket,
Expand Down Expand Up @@ -86,13 +87,15 @@ reload_data_from_s3 <- function(pipeline_config, experiment_id, task_name, tasks
integration_index <- match("dataIntegration", task_names)
s3 <- paws::s3(config = pipeline_config$aws_config)

# TODO: remove if block
# this never runs, because embed and cluster runs in the worker if modified.
# If the task is after data integration, we need to get scdata from processed_matrix
if (match(task_name, task_names) > integration_index) {
return(reload_scdata_from_s3(s3, pipeline_config, experiment_id))
return(load_processed_scdata(s3, pipeline_config, experiment_id))
}

# Otherwise, return scdata_list
return(reload_scdata_list_from_s3(s3, pipeline_config, experiment_id))
return(load_source_scdata_list(s3, pipeline_config, experiment_id))

}

Expand Down Expand Up @@ -406,3 +409,96 @@ upload_multipart_parts <- function(s3, bucket, object, key, upload_id) {

return(parts)
}


#' Load cellsets object from s3
#'
#' @param s3 paws::s3 object
#' @param pipeline_config list
#' @param experiment_id character
#'
#' @return cellsets list
#' @export
#'
load_cellsets <- function(s3, pipeline_config, experiment_id) {
message("loading cellsets file")

bucket <- pipeline_config$cell_sets_bucket

c(body, ...rest) %<-% s3$get_object(
Bucket = bucket,
Key = experiment_id
)

obj <- jsonlite::fromJSON(rawConnection(body), flatten = T)
return(obj)

}


#' Bind columns not creating rows if there's an empty data.table
#'
#' `cbind` on `data.table` adds a row if binding an empty data.table to a non-empty
#' one. We do not want that behavior when parsing cellsets, because it implies
#' the "creation" of a cell that does not exists (i.e. when binding scratchpad
#' cellsets slots of an experiment without custom cellsets)
#'
#' @param dt data.table
#' @param ... columns to add
#'
#' @return data.table with new columns
#' @export
#'
safe_cbind <- function(dt, ...) {
if (nrow(dt) > 0) {
dt <- cbind(dt, ...)
}
return(dt)
}


#' add cellset type column to cellsets data.table
#'
#' helper to correctly name the cellset type column. some cellsets already
#' contain a "type" slot, which complicates matters, so we chose `cellset_type`,
#'
#' @param dt data.table
#' @param col string of corresponding cellset type
#'
#' @return data.table with cellset_type column
#' @export
#'
cbind_cellset_type <- function(dt, col) {
dt <- safe_cbind(dt, cellset_type = col)
}


#' Parse cellsets object to data.table
#'
#' Gets the cellsets list and converts it to a tidy data.table
#'
#' @param cellsets list
#'
#' @return data.table of cellset keys, names and corresponding cell_ids
#' @export
#'
parse_cellsets <- function(cellsets) {

dt_list <- cellsets$cellSets$children

lapply(dt_list, data.table::setDT)
dt_list <- purrr::map2(dt_list, cellsets$cellSets$key, cbind_cellset_type)

# fill columns in case there are empty cellset classes
dt <- data.table::rbindlist(dt_list, fill = TRUE)

# change cellset type to more generic names
dt[cellset_type %in% c("louvain", "leiden"), cellset_type := "cluster"]
dt[!cellset_type %in% c("cluster", "scratchpad", "sample"), cellset_type := "metadata"]

# unnest, and change column name
dt <- dt[, setNames(.(unlist(cellIds)), "cell_id"), by = .(key, name, cellset_type)]
data.table::setnames(dt, "cellset_type", "type")
return(dt)
}

57 changes: 46 additions & 11 deletions pipeline-runner/R/init-functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ load_config <- function(development_aws_server) {
}

# batch does not have access to the internal EKS cluster api URL, use the public one
if(running_in_batch == "true" && domain_name != "") {
if (running_in_batch == "true" && domain_name != "") {
config$api_url <- config$public_api_url
}

Expand Down Expand Up @@ -187,12 +187,12 @@ run_qc_step <- function(scdata, config, tasks, task_name, cells_id, sample_id, d
}


#' Run GEM2S step
#' Run pipeline step
#'
#' Calls the corresponding task_name GEM2S step function.
#' Calls the corresponding `task_name` pipeline step function.
#'
#' The input list only contains experiment level parameters, such as project ID,
#' and sample names. It's only used for downloading user files.
#' and sample names and it's only used for downloading user files.
#'
#' @param task_name character
#' @param input list
Expand All @@ -204,8 +204,7 @@ run_qc_step <- function(scdata, config, tasks, task_name, cells_id, sample_id, d
#'
#' @return list of task results
#'
run_gem2s_step <- function(prev_out, input, pipeline_config, tasks, task_name) {

run_pipeline_step <- function(prev_out, input, pipeline_config, tasks, task_name) {
if (!task_name %in% names(tasks)) {
stop("Invalid task name given: ", task_name)
}
Expand Down Expand Up @@ -243,7 +242,40 @@ call_gem2s <- function(task_name, input, pipeline_config) {
check_input(input)
tasks <- lapply(GEM2S_TASK_LIST, get)

c(data, task_out) %<-% run_gem2s_step(prev_out, input, pipeline_config, tasks, task_name)
c(data, task_out) %<-% run_pipeline_step(prev_out, input, pipeline_config, tasks, task_name)
assign("prev_out", task_out, pos = ".GlobalEnv")

message_id <- send_gem2s_update_to_api(pipeline_config, experiment_id, task_name, data, input)

return(message_id)
}


#' Call subset seurat
#'
#' Runs step `task_name` of the subset seurat pipeline, sends output message to the API
#'
#' @param task_name character name of the step
#' @param input list containing
#' - parentExperimentId
#' - childExperimentId
#' - sample IDs, and names
#' @param pipeline_config list as defined by load_config
#'
#' @return character message id
#'
call_subset <- function(task_name, input, pipeline_config) {
experiment_id <- input$experimentId

if (!exists("prev_out")) {
remove_cell_ids(pipeline_config, experiment_id)
assign("prev_out", NULL, pos = ".GlobalEnv")
}

check_input(input)
tasks <- lapply(SUBSET_SEURAT_TASK_LIST, get)

c(data, task_out) %<-% run_pipeline_step(prev_out, input, pipeline_config, tasks, task_name)
assign("prev_out", task_out, pos = ".GlobalEnv")

message_id <- send_gem2s_update_to_api(pipeline_config, experiment_id, task_name, data, input)
Expand All @@ -261,7 +293,7 @@ call_gem2s <- function(task_name, input, pipeline_config) {
#' @param input list containing:
#' - step parameters for all samples
#' - current sample UUID
#' - uploadCountMatrix (wether or not to upload matrix after step)
#' - uploadCountMatrix (whether or not to upload matrix after step)
#' @param pipeline_config list as defined by load_config
#'
#' @return character message id
Expand Down Expand Up @@ -403,6 +435,7 @@ pipeline_heartbeat <- function(task_token, aws_config) {
}
}


#' Start heartbeat as a background process
#'
#' messages inside the background process will ONLY be printed into
Expand All @@ -415,14 +448,14 @@ pipeline_heartbeat <- function(task_token, aws_config) {
start_heartbeat <- function(task_token, aws_config) {
message("Starting heartbeat")

heartbeat_proc <- callr::r_bg(
heartbeat_proc <- callr::r_bg(
func = pipeline_heartbeat, args = list(
task_token, aws_config
),
stdout = "/tmp/out",
stderr = "/tmp/err"
)
return(heartbeat_proc)
return(heartbeat_proc)
}


Expand All @@ -441,7 +474,6 @@ wrapper <- function(input, pipeline_config) {
message("\n------\nStarting task: ", task_name, "\n")
message("Input:")
str(input)
message("")

# common to gem2s and data processing
server <- input$server
Expand All @@ -453,13 +485,16 @@ wrapper <- function(input, pipeline_config) {
message_id <- call_qc(task_name, input, pipeline_config)
} else if (process_name == "gem2s") {
message_id <- call_gem2s(task_name, input, pipeline_config)
} else if (process_name == "subset") {
message_id <- call_subset(task_name, input, pipeline_config)
} else {
stop("Process name not recognized.")
}

return(message_id)
}


#' Pipeline error handler
#'
#' Pretty prints errors, sends roport to the API, and uploads debug output to
Expand Down
5 changes: 4 additions & 1 deletion pipeline-runner/R/qc-1-filter_emptydrops.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# STEP 1. Classifier filter

#' @description Filters seurat object based on mitochondrialContent
#' Filter empty droplets
#'
#' filters seurat objects based on edrops scores.
#'
#' @param config list containing the following information
#' - enable: true/false. Refering to apply or not the filter.
#' - auto: true/false. 'True' indicates that the filter setting need to be changed depending on some sensible value (it requires
Expand Down
Loading

0 comments on commit c72c807

Please sign in to comment.