Skip to content

Commit

Permalink
Merge pull request #285 from hms-dbmi-cellenics/seurat-pipeline
Browse files Browse the repository at this point in the history
Seurat pipeline
  • Loading branch information
alexvpickering authored Dec 5, 2022
2 parents 6cf44ac + e0e3341 commit 349be4f
Show file tree
Hide file tree
Showing 26 changed files with 936 additions and 83 deletions.
8 changes: 4 additions & 4 deletions local-runner/cf-local-container-launcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Resources:
docker_command = ' '.join([
"docker run --rm -t",
f"--name {event['name']}-{random_string(10)}",
f"--name {event['name']}-{random_string(10)}",
f"{'-d' if event['detached'] else ''}",
f"--env ACTIVITY_ARN={event.get('activityArn', '')}",
f"--env HOST_IP=__HOST_IP__",
Expand All @@ -47,7 +47,7 @@ Resources:
Timeout: 25
QualityControlActivity:
Type: AWS::StepFunctions::Activity
Properties:
Properties:
Name: biomage-qc-activity-development
RemovePreviousPipelineContainers:
Type: "AWS::Lambda::Function"
Expand All @@ -59,9 +59,9 @@ Resources:
ZipFile: |
import subprocess
# Removes all containers that already exist and contain either 'qc' or 'gem2s' in their name.
# Removes all containers that already exist and contain either 'qc' or 'gem2s' or 'seurat' in their name.
def handler(event, context):
proc = subprocess.run("docker kill $(docker ps -f name='qc|gem2s' --format '{{.Names}}') || true", shell=True, check=True)
proc = subprocess.run("docker kill $(docker ps -f name='qc|gem2s|seurat' --format '{{.Names}}') || true", shell=True, check=True)
return proc.returncode
Runtime: "python3.8"
Expand Down
2 changes: 1 addition & 1 deletion local-runner/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ AWS.config.update({
s3ForcePathStyle: true,
});

const isPipelineContainer = (name) => name.includes('qc') || name.includes('gem2s');
const isPipelineContainer = (name) => name.includes('qc') || name.includes('gem2s') || name.includes('seurat');

const setVarsInTemplate = (template) => {
const varNames = ['DEBUG_STEP', 'DEBUG_PATH', 'HOST_IP'];
Expand Down
2 changes: 1 addition & 1 deletion pipeline-runner/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ Config/testthat/edition: 3
Encoding: UTF-8
LazyData: true
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.0
RoxygenNote: 7.2.2
10 changes: 9 additions & 1 deletion pipeline-runner/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@

export(build_cc_gene_list)
export(create_seurat)
export(download_s3_files)
export(download_user_files)
export(filter_doublets)
export(filter_emptydrops)
export(filter_gene_umi_outlier)
export(filter_high_mito)
export(filter_low_cellsize)
export(filter_unnamed_features)
export(generate_default_values_cellSizeDistribution)
export(generate_default_values_classifier)
export(generate_first_step_ids)
export(getClusters)
export(get_feature_types)
export(get_gem2s_file)
export(ids_to_sym)
export(list_exclude_genes)
export(load_seurat)
export(load_user_files)
export(make_annot_with_ids)
export(merge_scdatas)
export(normalize_annotation_types)
export(prepare_experiment)
export(read_10x_annotations)
export(reconstruct_seurat)
export(remove_genes)
export(runClusters)
export(run_emptydrops)
export(score_doublets)
export(subset_ids)
export(subset_safe)
export(sym_to_ids)
import(data.table)
importFrom(magrittr,"%>%")
7 changes: 4 additions & 3 deletions pipeline-runner/R/gem2s-1-download_user_files.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ download_and_store <- function(bucket, key, file_path, s3) {
#'
#' @export
#'
get_gem2s_file <- function(input, originals_bucket, input_dir, s3) {
download_s3_files <- function(input, originals_bucket, input_dir, s3) {
project_id <- input$projectId
sample_ids <- input$sampleIds
sample_s3_paths <- input$sampleS3Paths
technology <- input$input$type

unlink(input_dir, recursive = TRUE)

for (sample_id in sample_ids) {

for (file_type in file_types_by_technology[[technology]]) {
s3_path <- sample_s3_paths[[sample_id]][[file_type]]

Expand All @@ -40,6 +40,7 @@ get_gem2s_file <- function(input, originals_bucket, input_dir, s3) {
}
}


#' Download user files from S3
#'
#' @param input The input object from the request
Expand All @@ -53,7 +54,7 @@ get_gem2s_file <- function(input, originals_bucket, input_dir, s3) {
download_user_files <- function(input, pipeline_config, prev_out = list(), input_dir= "/input") {
s3 <- paws::s3(config = pipeline_config$aws_config)

get_gem2s_file(input, pipeline_config$originals_bucket, input_dir, s3)
download_s3_files(input, pipeline_config$originals_bucket, input_dir, s3)

config <- list(
name = input$experimentName,
Expand Down
2 changes: 1 addition & 1 deletion pipeline-runner/R/gem2s-6-construct_qc_config.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# constructs default QC configuration for merged SeuratObject
construct_qc_config <- function(scdata, any_filtered) {
construct_qc_config <- function(scdata, any_filtered = FALSE) {
samples <- scdata$samples

# classifier
Expand Down
6 changes: 4 additions & 2 deletions pipeline-runner/R/handle_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ send_output_to_api <- function(pipeline_config, input, plot_data_keys, output) {
return(result$MessageId)
}

send_gem2s_update_to_api <- function(pipeline_config, experiment_id, task_name, data, input) {
send_pipeline_update_to_api <- function(pipeline_config, experiment_id, task_name, data, input, string_value) {
message("Sending to SNS topic ", pipeline_config$sns_topic)
sns <- paws::sns(config = pipeline_config$aws_config)
# TODO -REMOVE DUPLICATE AUTHJWT IN RESPONSE
Expand All @@ -158,7 +158,7 @@ send_gem2s_update_to_api <- function(pipeline_config, experiment_id, task_name,
MessageAttributes = list(
type = list(
DataType = "String",
StringValue = "GEM2SResponse",
StringValue = string_value,
BinaryValue = NULL
)
)
Expand All @@ -185,6 +185,8 @@ send_pipeline_fail_update <- function(pipeline_config, input, error_message) {
string_value <- "PipelineResponse"
} else if (process_name == "gem2s") {
string_value <- "GEM2SResponse"
} else if (process_name == "seurat") {
string_value <- "SeuratResponse"
} else {
message(paste("Invalid process_name given: ", process_name))
return()
Expand Down
11 changes: 10 additions & 1 deletion pipeline-runner/R/qc-6-integrate_scdata.R
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,16 @@ run_unisample <- function(scdata, config) {
}

add_dispersions <- function(scdata) {
vars <- Seurat::HVFInfo(object = scdata, assay = "RNA", selection.method = "vst")

vars <- tryCatch({
vars <- Seurat::HVFInfo(object = scdata, assay = 'RNA', selection.method = 'vst')
}, error = function(e) {
vars <- Seurat::HVFInfo(scdata, assay = 'SCT', selection.method = 'sct')
# rename for worker (in case sct)
colnames(vars) <- c('mean', 'variance', 'variance.standardized')
return(vars)
})

annotations <- scdata@misc[["gene_annotations"]]
vars$SYMBOL <- annotations$name[match(rownames(vars), annotations$input)]
vars$ENSEMBL <- rownames(vars)
Expand Down
11 changes: 7 additions & 4 deletions pipeline-runner/R/qc-7-embed_and_cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ format_cell_sets_object <-
type = "cellSets",
children = list()
)
for (i in sort(unique(cell_sets$cluster))) {
cells <- cell_sets[cell_sets$cluster == i, "cell_ids"]
for (cluster in sort(unique(cell_sets$cluster))) {
cells <- cell_sets[cell_sets$cluster == cluster, "cell_ids"]
is.num <- !is.na(as.numeric(cluster))
set_name <- ifelse(is.num, paste("Cluster", cluster), cluster)

new_set <- list(
key = paste0(clustering_method, "-", i),
name = paste0("Cluster ", i),
key = paste0(clustering_method, "-", cluster),
name = set_name,
rootNode = FALSE,
type = "cellSets",
color = color_pool[1],
Expand Down
179 changes: 179 additions & 0 deletions pipeline-runner/R/seurat-2-load_seurat.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#' Loads a Seurat object provided by the user
#'
#' This functions is a thin wrapper around `reconstruct_seurat`, which is used to
#' safely read, check, and reconstruct the Seurat object for downstream use.
#'
#'
#' @param input The input params received from the api.
#' @param pipeline_config List with S3 bucket name and SNS topics.
#' @param prev_out 'output' slot from call to \code{download_user_files}
#' @param input_dir A string, where the folder containing the downloaded
#' Seurat object is stored
#'
#' @export
load_seurat <- function(input, pipeline_config, prev_out, input_dir = "/input") {

config <- prev_out$config
dataset_dir <- config$samples[1]
dataset_fpath <- file.path(input_dir, dataset_dir, 'r.rds')

scdata <- reconstruct_seurat(dataset_fpath)

prev_out$scdata <- scdata

res <- list(
data = list(),
output = prev_out
)
return(res)
}

#' Reconstructs the Seurat object provided by the user
#'
#' The slots needed from the Seurat object are extracted and then their in-built types
#' are strictly checked before creating a new Seurat object. This reconstruction and type
#' checking prevents potentially malicious executable code (e.g. type `closure` or `environment`)
#' from being stored in the final Seurat object.
#'
#' @param dataset_fpath The path to the r.rds file containing the Seurat object.
#'
#' @return
#' @export
reconstruct_seurat <- function(dataset_fpath) {

# read it
tryCatch({
user_scdata <- readRDS(dataset_fpath)
stopifnot(methods::is(user_scdata, 'Seurat'))
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_RDS, call. = FALSE)
})

# get counts
tryCatch({
counts <- user_scdata[['RNA']]@counts
test_user_sparse_mat(counts)
rns <- row.names(counts)
check_type_is_safe(rns)
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_COUNTS, call. = FALSE)
})

gene_annotations <- data.frame(input = rns, name = rns, original_name = rns, row.names = rns)
user_scdata@misc$gene_annotations <- gene_annotations

# add dispersions
tryCatch({
user_scdata <- add_dispersions(user_scdata)
dispersions <- user_scdata@misc$gene_dispersion
test_user_df(dispersions)
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_HVFINFO, call. = FALSE)
})

# get meta data
tryCatch({
metadata <- user_scdata@meta.data
test_user_df(metadata)
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_METADATA, call. = FALSE)
})

# check for clusters
if (!'seurat_clusters' %in% colnames(metadata))
stop(errors$ERROR_SEURAT_CLUSTERS, call. = FALSE)


# reconstruct seurat object
scdata <- SeuratObject::CreateSeuratObject(
counts,
meta.data = metadata,
)

# add dispersions and gene annotations to new scdata
scdata@misc$gene_dispersion <- dispersions
scdata@misc$gene_annotations <- gene_annotations


# add default dimensionality reduction
tryCatch({
red_name <- SeuratObject::DefaultDimReduc(user_scdata)
check_type_is_safe(red_name)
embedding <- user_scdata@reductions[[red_name]]@cell.embeddings
test_user_df(embedding)
red <- SeuratObject::CreateDimReducObject(
embeddings = embedding,
assay = 'RNA'
)
scdata@reductions[[red_name]] <- red
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_REDUCTION, call. = FALSE)
})

# add pca dimensionality reduction
tryCatch({
pca <- user_scdata@reductions[['pca']]@cell.embeddings
test_user_df(pca)
red <- SeuratObject::CreateDimReducObject(
embeddings = pca,
assay = 'RNA'
)
scdata@reductions[['pca']] <- red
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_REDUCTION, call. = FALSE)
})

# add logcounts
tryCatch({
data <- user_scdata[['RNA']]@data
test_user_sparse_mat(data)
scdata[['RNA']]@data <- data
},
error = function(e) {
message(e$message)
stop(errors$ERROR_SEURAT_LOGCOUNTS, call. = FALSE)
})

return(scdata)
}


test_user_df <- function(df) {
for (col in colnames(df)) {
check_type_is_safe(df[, col])
}
}

test_user_sparse_mat <- function(counts) {
stopifnot(isS4(counts))

for (slot in methods::slotNames(counts)) {
check_type_is_safe(methods::slot(counts, slot))
}
}

check_type_is_safe <- function(x) {

# typeof determines the R internal type or storage mode of any object
# whereas methods::is can be tricked by setting class inappropriately (e.g. disguising a function as a numeric)
safe.types <- c('character', 'double', 'integer', 'logical', 'NULL')

# recurse into lists until reach node
if (typeof(x) == 'list') {
lapply(x, check_type_is_safe)
} else if (!typeof(x) %in% safe.types) {
stop('Unexpected data type in uploaded .rds file.')
}
}
Loading

0 comments on commit 349be4f

Please sign in to comment.