Skip to content

Commit

Permalink
Merge pull request #370 from hms-dbmi-cellenics/add-sce-upload
Browse files Browse the repository at this point in the history
Add sce upload
  • Loading branch information
alexvpickering authored Sep 19, 2024
2 parents d6f3ec8 + 62a4082 commit 0ea1260
Show file tree
Hide file tree
Showing 17 changed files with 639 additions and 393 deletions.
4 changes: 2 additions & 2 deletions local-runner/cf-local-container-launcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ Resources:
ZipFile: |
import subprocess
# Removes all containers that already exist and contain either 'qc' or 'gem2s' or 'seurat' in their name.
# Removes all containers that already exist and contain either 'qc' or 'gem2s' or 'obj2s' in their name.
def handler(event, context):
proc = subprocess.run("docker kill $(docker ps -f name='qc|gem2s|seurat' --format '{{.Names}}') || true", shell=True, check=True)
proc = subprocess.run("docker kill $(docker ps -f name='qc|gem2s|obj2s' --format '{{.Names}}') || true", shell=True, check=True)
return proc.returncode
Runtime: "python3.8"
Expand Down
2 changes: 1 addition & 1 deletion local-runner/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ AWS.config.update({
s3ForcePathStyle: true,
});

const validPipelineTypes = ['qc', 'gem2s', 'subset', 'seurat', 'copy'];
const validPipelineTypes = ['qc', 'gem2s', 'subset', 'obj2s', 'copy'];
const isPipelineContainer = (name) => validPipelineTypes.some((keyword) => name.includes(keyword));

const setVarsInTemplate = (template) => {
Expand Down
3 changes: 2 additions & 1 deletion pipeline-runner/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ export(integrate_scdata)
export(learn_from_sketches)
export(list_exclude_genes)
export(load_cellsets)
export(load_obj2s_file)
export(load_parent_experiment_data)
export(load_seurat)
export(load_user_files)
export(make_annot_with_ids)
export(make_cl_metadata_cellclass)
Expand All @@ -58,6 +58,7 @@ export(merge_scdata_list)
export(normalize_annotation_types)
export(parse_cellsets)
export(prepare_experiment)
export(prepare_scdata_list_for_seurat_integration)
export(prepare_sct_integration)
export(read_10x_annotations)
export(read_10x_h5_file)
Expand Down
4 changes: 2 additions & 2 deletions pipeline-runner/R/handle_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ send_pipeline_fail_update <- function(pipeline_config, input, error_message) {
}


} else if (process_name %in% c("gem2s", "seurat", "subset", "copy")) {
string_value <- ifelse(process_name == "seurat", "SeuratResponse", "GEM2SResponse")
} else if (process_name %in% c("gem2s", "obj2s", "subset", "copy")) {
string_value <- ifelse(process_name == "obj2s", "OBJ2SResponse", "GEM2SResponse")

# TODO - REMOVE THE DUPLICATE EXPERIMENT ID FROM INPUT RESPONSE
response <- list(
Expand Down
10 changes: 5 additions & 5 deletions pipeline-runner/R/init-functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ call_subset <- function(task_name, input, pipeline_config) {
return(message_id)
}

call_seurat <- function(task_name, input, pipeline_config) {
call_obj2s <- function(task_name, input, pipeline_config) {

experiment_id <- input$experimentId

Expand All @@ -296,12 +296,12 @@ call_seurat <- function(task_name, input, pipeline_config) {
}

check_input(input)
tasks <- lapply(SEURAT_TASK_LIST, get)
tasks <- lapply(OBJ2S_TASK_LIST, get)

c(data, task_out) %<-% run_pipeline_step(prev_out, input, pipeline_config, tasks, task_name)
assign("prev_out", task_out, pos = ".GlobalEnv")

message_id <- send_pipeline_update_to_api(pipeline_config, experiment_id, task_name, data, input, 'SeuratResponse')
message_id <- send_pipeline_update_to_api(pipeline_config, experiment_id, task_name, data, input, 'OBJ2SResponse')
return(message_id)
}

Expand Down Expand Up @@ -376,7 +376,7 @@ call_qc <- function(task_name, input, pipeline_config) {
config$aws_config <- pipeline_config$aws_config
config$metadata_s3_path <- input$metadataS3Path
config$cl_metadata_bucket <- pipeline_config$cl_metadata_bucket

# For configure embedding
config$clustering_should_run <- input$clusteringShouldRun

Expand Down Expand Up @@ -532,7 +532,7 @@ start_heartbeat <- function(task_token, aws_config) {
handlers <- c(
qc = call_qc,
gem2s = call_gem2s,
seurat = call_seurat,
obj2s = call_obj2s,
subset = call_subset,
copy = call_copy
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#' Loads a Seurat object provided by the user
#' Loads a Seurat/SingleCellExperiment/AnnData object provided by the user
#'
#' This functions is a thin wrapper around `reconstruct_seurat`, which is used to
#' safely read, check, and reconstruct the Seurat object for downstream use.
#' This functions is a thin wrapper around `reconstruct_*` functions, which are used to
#' safely read, check, and reconstruct a Seurat object for downstream use.
#'
#'
#' @param input The input params received from the api.
Expand All @@ -11,13 +11,21 @@
#' Seurat object is stored
#'
#' @export
load_seurat <- function(input, pipeline_config, prev_out, input_dir = "/input") {
load_obj2s_file <- function(input, pipeline_config, prev_out, input_dir = "/input") {

config <- prev_out$config
dataset_dir <- config$samples[1]
obj2s_type <- config$input$type

dataset_fpath <- file.path(input_dir, dataset_dir, 'r.rds')

scdata <- reconstruct_seurat(dataset_fpath)
reconstruct_fun <- switch(
obj2s_type,
'seurat_object' = reconstruct_seurat,
'sce_object' = reconstruct_sce
)

scdata <- reconstruct_fun(dataset_fpath)

prev_out$scdata <- scdata

Expand All @@ -28,6 +36,127 @@ load_seurat <- function(input, pipeline_config, prev_out, input_dir = "/input")
return(res)
}

reconstruct_sce <- function(dataset_fpath) {
# read it
tryCatch({
user_scdata <- readRDS(dataset_fpath)
stopifnot(methods::is(user_scdata, 'SingleCellExperiment'))
},
error = function(e) {
message(e$message)
stop(errors$ERROR_OBJ2S_RDS, call. = FALSE)
})

# get counts
tryCatch({
counts <- SingleCellExperiment::counts(user_scdata)
test_user_sparse_mat(counts)
rns <- row.names(counts)
check_type_is_safe(rns)

},
error = function(e) {
message(e$message)
stop(errors$ERROR_OBJ2S_COUNTS, call. = FALSE)
})

# get meta data
tryCatch({
metadata <- user_scdata@colData
metadata <- as.data.frame(metadata)
test_user_df(metadata)
},
error = function(e) {
message(e$message)
stop(errors$ERROR_OBJ2S_METADATA, call. = FALSE)
})

# construct Seurat object from SingleCellExperiment items
scdata <- SeuratObject::CreateSeuratObject(
counts,
meta.data = metadata,
)

# add logcounts
tryCatch({

if ('logcounts' %in% SummarizedExperiment::assayNames(user_scdata)) {
logcounts <- SingleCellExperiment::logcounts(user_scdata)
test_user_sparse_mat(logcounts)
} else {
logcounts <- Seurat::NormalizeData(counts)
}

scdata[['RNA']]$data <- logcounts
},
error = function(e) {
message(e$message)
stop(errors$ERROR_OBJ2S_LOGCOUNTS, call. = FALSE)
})

scdata <- add_obj2s_dispersions(scdata)

# add gene annotations
gene_annotations <- data.frame(
input = rns,
name = rns,
original_name = rns,
row.names = rns
)
scdata@misc$gene_annotations <- gene_annotations

red_names <- tolower(SingleCellExperiment::reducedDimNames(user_scdata))

# add dimensionality reduction
tryCatch({
red_match <- grep("umap|tsne", red_names)

stopifnot(length(red_match) > 0)

# use last reduction as default (most recent call)
red.idx <- tail(red_match, 1)
red_name <- ifelse(grepl('umap', red_names[red.idx]), 'umap', 'tsne')

red <- SingleCellExperiment::reducedDims(user_scdata)[[red.idx]]
class(red) <- 'matrix'
test_user_df(red)

red <- Seurat::CreateDimReducObject(
embeddings = red,
assay = 'RNA',
key = paste0(red_name, '_'))

scdata@reductions[[red_name]] <- red
},
error = function(e) {
message(e$message)
stop(errors$ERROR_OBJ2S_REDUCTION, call. = FALSE)
})

# add pca dimensionality reduction (need for trajectory analysis)
tryCatch({
pca.idx <- which(red_names == 'pca')
stopifnot(length(pca.idx) > 0)

pca <- SingleCellExperiment::reducedDims(user_scdata)[[pca.idx]]
class(pca) <- 'matrix'
test_user_df(pca)

pca <- SeuratObject::CreateDimReducObject(
embeddings = pca,
assay = 'RNA',
key = 'pca_'
)
scdata@reductions[['pca']] <- red
},
error = function(e) {
message(e$message)
stop(errors$ERROR_OBJ2S_REDUCTION, call. = FALSE)
})

return(scdata)
}

#' Reconstructs the Seurat object provided by the user
#'
#' The slots needed from the Seurat object are extracted and then their in-built types
Expand All @@ -48,7 +177,7 @@ reconstruct_seurat <- function(dataset_fpath) {
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_RDS, call. = FALSE)
stop(errors$ERROR_OBJ2S_RDS, call. = FALSE)
})

# get counts
Expand All @@ -68,7 +197,7 @@ reconstruct_seurat <- function(dataset_fpath) {
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_COUNTS, call. = FALSE)
stop(errors$ERROR_OBJ2S_COUNTS, call. = FALSE)
})

# get meta data
Expand All @@ -79,7 +208,7 @@ reconstruct_seurat <- function(dataset_fpath) {
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_METADATA, call. = FALSE)
stop(errors$ERROR_OBJ2S_METADATA, call. = FALSE)
})

# reconstruct Seurat object
Expand Down Expand Up @@ -107,25 +236,12 @@ reconstruct_seurat <- function(dataset_fpath) {
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_LOGCOUNTS, call. = FALSE)
stop(errors$ERROR_OBJ2S_LOGCOUNTS, call. = FALSE)
})


# add dispersions (need for gene list)
tryCatch({
dispersions <- Seurat::FindVariableFeatures(logcounts)
test_user_df(dispersions)
colnames(dispersions) <- gsub('^vst[.]', '', colnames(dispersions))

# keep columns that use: same as `HVFInfo(scdata)`
dispersions <- dispersions[, c('mean', 'variance', 'variance.standardized')]
dispersions$SYMBOL <- dispersions$ENSEMBL <- row.names(dispersions)
scdata@misc$gene_dispersion <- dispersions
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_HVFINFO, call. = FALSE)
})
scdata <- add_obj2s_dispersions(scdata)

# add gene annotations
gene_annotations <- data.frame(
Expand All @@ -138,8 +254,6 @@ reconstruct_seurat <- function(dataset_fpath) {


# add default dimensionality reduction
# TODO: consider storing resolution, clustering metric, and distance metric
# for trajectory analysis
tryCatch({
names(user_scdata@reductions) <- tolower(names(user_scdata@reductions))
red_name <- SeuratObject::DefaultDimReduc(user_scdata)
Expand Down Expand Up @@ -168,7 +282,7 @@ reconstruct_seurat <- function(dataset_fpath) {
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_REDUCTION, call. = FALSE)
stop(errors$ERROR_OBJ2S_REDUCTION, call. = FALSE)
})

# add pca dimensionality reduction (need for trajectory analysis)
Expand All @@ -183,12 +297,37 @@ reconstruct_seurat <- function(dataset_fpath) {
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_REDUCTION, call. = FALSE)
stop(errors$ERROR_OBJ2S_REDUCTION, call. = FALSE)
})

return(scdata)
}


add_obj2s_dispersions <- function(scdata) {

# add dispersions (need for gene list)
tryCatch({
logcounts <- scdata[['RNA']]$data

dispersions <- Seurat::FindVariableFeatures(logcounts)
test_user_df(dispersions)
colnames(dispersions) <- gsub('^vst[.]', '', colnames(dispersions))

# keep columns that use: same as `HVFInfo(scdata)`
dispersions <- dispersions[, c('mean', 'variance', 'variance.standardized')]
dispersions$SYMBOL <- dispersions$ENSEMBL <- row.names(dispersions)
scdata@misc$gene_dispersion <- dispersions
},
error = function(e) {
message(e$message)
stop(errors$ERROR_OBJ2S_HVFINFO, call. = FALSE)
})

return(scdata)
}


test_user_df <- function(df) {
for (col in colnames(df)) {
check_type_is_safe(df[, col])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
upload_seurat_to_aws <- function(input, pipeline_config, prev_out) {
upload_obj2s_to_aws <- function(input, pipeline_config, prev_out) {
message("Uploading to AWS ...")

experiment_id <- input$experimentId
Expand All @@ -18,7 +18,7 @@ upload_seurat_to_aws <- function(input, pipeline_config, prev_out) {
# detect cluster metadata
cluster_columns <- find_cluster_columns(scdata)
if (!length(cluster_columns))
stop(errors$ERROR_SEURAT_CLUSTERS, call. = FALSE)
stop(errors$ERROR_OBJ2S_CLUSTERS, call. = FALSE)

# add clusters
cluster_sets <- list()
Expand Down
Binary file modified pipeline-runner/R/sysdata.rda
Binary file not shown.
Loading

0 comments on commit 0ea1260

Please sign in to comment.