Skip to content

Commit

Permalink
Merge pull request #298 from hms-dbmi-cellenics/biomage-changes-2
Browse files Browse the repository at this point in the history
Biomage changes 2
  • Loading branch information
alexvpickering authored Feb 27, 2023
2 parents 00d6430 + 07d840e commit 6c1a2ca
Show file tree
Hide file tree
Showing 58 changed files with 295,537 additions and 308,213 deletions.
2 changes: 1 addition & 1 deletion pipeline-runner/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ Config/testthat/edition: 3
Encoding: UTF-8
LazyData: true
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.2
RoxygenNote: 7.2.3
18 changes: 9 additions & 9 deletions pipeline-runner/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Create builder step
# pull official base image and use it as builder
FROM rocker/r-ver:4.2.0 AS builder
FROM rocker/r-ver:4.2.2 AS builder
WORKDIR /src/pipeline-runner

# install required debian packages to install R packages
Expand All @@ -21,7 +21,7 @@ RUN echo ".libPaths(c('$RENV_LIB', .libPaths()))" >> $(R RHOME)/etc/Rprofile.sit

# install renv to install required R packages
RUN R -q -e "install.packages('remotes', repos = c(CRAN = 'https://cloud.r-project.org'))" && \
R -q -e "remotes::install_github('rstudio/renv@0.15.5')" && \
R -q -e "remotes::install_github('rstudio/renv@0.16.0')" && \
R -q -e "renv::init(bare = TRUE, settings = list(use.cache = FALSE))"

# an initial lockfile is used to avoid frequent re-installs
Expand All @@ -36,12 +36,6 @@ RUN R -q -e "renv::restore(lockfile='renv.lock.init', library = '$RENV_LIB')" &&

RUN R -q -e "renv::deactivate()"

# use renv::snapshot() while R dependency updates are quick to build
COPY ./renv.lock .
RUN R -q -e "renv::restore(lockfile='renv.lock', library = '$RENV_LIB', clean = TRUE)" && \
R -q -e 'root <- renv::paths$root(); unlink(root, recursive = TRUE)' && \
strip --strip-debug $RENV_LIB/*/libs/*.so

# install miniconda and geosketch
# clean conda
ENV RETICULATE_MINICONDA_PATH=/src/r-miniconda
Expand All @@ -50,6 +44,12 @@ RUN R -q -e "reticulate::install_miniconda()" && \
CONDA_PATH=$(R -q -s -e "cat(reticulate::conda_binary())") && \
$CONDA_PATH clean --force-pkgs-dirs -y

# use renv::snapshot() while R dependency updates are quick to build
COPY ./renv.lock .
RUN R -q -e "renv::restore(lockfile='renv.lock', library = '$RENV_LIB', clean = TRUE)" && \
R -q -e 'root <- renv::paths$root(); unlink(root, recursive = TRUE)' && \
strip --strip-debug $RENV_LIB/*/libs/*.so

# determine system run-time deps
COPY setup/get_sysdeps_run.R .
RUN Rscript get_sysdeps_run.R
Expand All @@ -65,7 +65,7 @@ RUN Rscript check_package_licenses.R
# ---------------------------------------------------
# COMMON MINIMAL BUILD
# ---------------------------------------------------
FROM rocker/r-ver:4.2.0 AS common
FROM rocker/r-ver:4.2.2 AS common
WORKDIR /src/pipeline-runner
ENV RETICULATE_MINICONDA_PATH=/src/r-miniconda

Expand Down
9 changes: 6 additions & 3 deletions pipeline-runner/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export(diet_scdata)
export(download_s3_files)
export(download_user_files)
export(embed_and_cluster)
export(ensure_is_list_in_json)
export(extract_subset_user_metadata)
export(filter_doublets)
export(filter_emptydrops)
Expand All @@ -33,8 +34,8 @@ export(get_cell_sets)
export(get_feature_types)
export(get_subset_cell_sets)
export(ids_to_sym)
export(integrate_from_sketch)
export(integrate_scdata)
export(integrate_using_geosketch)
export(learn_from_sketches)
export(list_exclude_genes)
export(load_cellsets)
Expand All @@ -45,7 +46,6 @@ export(log_normalize)
export(make_annot_with_ids)
export(merge_scdata_list)
export(normalize_annotation_types)
export(normalize_data)
export(parse_cellsets)
export(prepare_experiment)
export(prepare_sct_integration)
Expand All @@ -57,14 +57,17 @@ export(remove_used_colors)
export(runClusters)
export(run_emptydrops)
export(run_geosketch)
export(run_pca)
export(run_seuratv4)
export(safe_cbind)
export(score_doublets)
export(seuratv4_find_and_integrate_anchors)
export(subset_experiment)
export(subset_ids)
export(subset_safe)
export(subset_seurat)
export(sym_to_ids)
export(temp_integrate_scdata)
export(upload_to_aws)
import(data.table)
importFrom(magrittr,"%>%")
importFrom(uuid,UUIDgenerate)
3 changes: 2 additions & 1 deletion pipeline-runner/R/gem2s-6-construct_qc_config.R
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ get_sample_mitochondrial_config <- function(scdata_list.sample, config) {

# threshold for doublet score is the max score given to a singlet (above score => doublets)
get_dblscore_config <- function(scdata_list, config) {
probabilityThreshold <- max(scdata_list$doublet_scores[scdata_list$doublet_class == "singlet"], na.rm = TRUE)
probabilityThreshold <- generate_default_values_doubletScores(scdata_list)

config$filterSettings$probabilityThreshold <- probabilityThreshold

return(config)
Expand Down
4 changes: 2 additions & 2 deletions pipeline-runner/R/gem2s-7-upload_to_aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ build_sample_cellsets <- function(input, scdata, color_pool) {
key = sample_id,
name = sample_name,
color = color_pool[i],
cellIds = unname(cell_ids)
cellIds = ensure_is_list_in_json(unname(cell_ids))
)
}

Expand Down Expand Up @@ -394,7 +394,7 @@ build_scratchpad_cellsets <- function(color_pool, subset_cellsets) {
key = scratchpad_ids[i],
name = scratchpad_names[i],
color = color_pool[i],
cellIds = subset_cellsets[key == scratchpad_ids[i], cell_id]
cellIds = ensure_is_list_in_json(subset_cellsets[key == scratchpad_ids[i], cell_id])
)
}

Expand Down
6 changes: 3 additions & 3 deletions pipeline-runner/R/handle_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ send_output_to_api <- function(pipeline_config, input, plot_data_keys, output) {
error = FALSE
),
pipelineVersion = pipeline_version,
apiUrl = pipeline_config$public_api_url
apiUrl = pipeline_config$api_url
)

message("Publishing the message")
Expand Down Expand Up @@ -216,7 +216,7 @@ send_pipeline_update_to_api <- function(pipeline_config, experiment_id, task_nam
jobId = list(job_id),
authJWT = list(input$auth_JWT),
input = list(input),
apiUrl = pipeline_config$public_api_url
apiUrl = pipeline_config$api_url
)

result <- sns$publish(
Expand Down Expand Up @@ -245,7 +245,7 @@ send_pipeline_fail_update <- function(pipeline_config, input, error_message) {
error_msg$taskName <- input$taskName
error_msg$response$error <- process_name
error_msg$input <- input
error_msg$apiUrl <- pipeline_config$public_api_url
error_msg$apiUrl <- pipeline_config$api_url
sns <- paws::sns(config = pipeline_config$aws_config)

string_value <- ""
Expand Down
38 changes: 18 additions & 20 deletions pipeline-runner/R/init-functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ build_activity_arn <- function(aws_region, aws_account_id, activity_id) {
#'
#' @return list with config parameters
load_config <- function(development_aws_server) {

# running in linux needs the IP of the host to work. If it is set as an
# environment variable (by makefile) honor it instead of the provided
# parameter
overriden_server <- Sys.getenv("HOST_IP", "")
if (overriden_server != "") {
development_aws_server <- overriden_server
}

label_path <- "/etc/podinfo/labels"
aws_account_id <- Sys.getenv("AWS_ACCOUNT_ID", unset = "242905224710")
aws_region <- Sys.getenv("AWS_DEFAULT_REGION", unset = "eu-west-1")
Expand Down Expand Up @@ -77,7 +86,7 @@ load_config <- function(development_aws_server) {
aws_config = list(region = aws_region),
pod_name = Sys.getenv("K8S_POD_NAME", "local"),
activity_arn = activity_arn,
api_url = paste0("http://api-", sandbox, ".api-", sandbox, ".svc.cluster.local:3000"),
api_url = sprintf("http://%s:3000", development_aws_server),
api_version = "v2",
debug_config = list(
step = Sys.getenv("DEBUG_STEP", ""),
Expand All @@ -87,27 +96,13 @@ load_config <- function(development_aws_server) {


if (config$cluster_env == "staging") {
config$public_api_url <- paste0("https://api-", sandbox, ".", domain_name)
config$api_url <- paste0("https://api-", sandbox, ".", domain_name)
}
if (config$cluster_env == "production") {
config$public_api_url <- paste0("https://api.", domain_name)
}

# batch does not have access to the internal EKS cluster api URL, use the public one
if (running_in_batch == "true" && domain_name != "") {
config$api_url <- config$public_api_url
}

# running in linux needs the IP of the host to work. If it is set as an
# environment variable (by makefile) honor it instead of the provided
# parameter
overriden_server <- Sys.getenv("HOST_IP", "")
if (overriden_server != "") {
development_aws_server <- overriden_server
config$api_url <- paste0("https://api.", domain_name)
}

if (config$cluster_env == "development") {
config$api_url <- sprintf("http://%s:3000", development_aws_server)
# DOCKER_GATEWAY_HOST
config$aws_config[["endpoint"]] <- sprintf(
"http://%s:4566",
Expand Down Expand Up @@ -278,7 +273,7 @@ call_subset <- function(task_name, input, pipeline_config) {
c(data, task_out) %<-% run_pipeline_step(prev_out, input, pipeline_config, tasks, task_name)
assign("prev_out", task_out, pos = ".GlobalEnv")

message_id <- send_gem2s_update_to_api(pipeline_config, experiment_id, task_name, data, input)
message_id <- send_pipeline_update_to_api(pipeline_config, experiment_id, task_name, data, input, 'GEM2SResponse')

return(message_id)
}
Expand Down Expand Up @@ -353,8 +348,11 @@ call_qc <- function(task_name, input, pipeline_config) {
samples <- names(scdata)
assign("cells_id",
load_cells_id_from_s3(pipeline_config, experiment_id, task_name, tasks, samples),
pos = ".GlobalEnv"
)
pos = ".GlobalEnv")

# won't be cells_id in S3 for uploaded Seurat object that is being subsetted
if (!length(cells_id)) cells_id <- generate_first_step_ids(scdata)

} else {
stop("Invalid task name given: ", task_name)
}
Expand Down
8 changes: 5 additions & 3 deletions pipeline-runner/R/qc-5-filter_doublets.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#' to call generate_default_values_doubletScores)
#' - filterSettings: slot with thresholds
#' - probabilityThreshold: Float. cut-off for the maximun probability scores that could have a cell. The doubletScores scores have been computed
#' through "scrublet" [1]
#' through scDblFinder
#' - binStep: Float. Bin size for the histogram
#' @export
#' @return a list with the filtered seurat object by doublet score, the config and the plot values
Expand Down Expand Up @@ -83,10 +83,12 @@ filter_doublets <- function(scdata_list, config, sample_id, cells_id, task_name
return(result)
}

# default doublet score based of scDblFinder classification
generate_default_values_doubletScores <- function(scdata) {
# default doublet score based of scDblFinder classification
is_singlet <- scdata$doublet_class == "singlet"
threshold <- max(scdata$doublet_scores[is_singlet], na.rm = TRUE)

# if no singlets set threshold to 0, preventing some experiments to fail
threshold <- max(scdata$doublet_scores[is_singlet], 0.0, na.rm = TRUE)

return(threshold)
}
Expand Down
115 changes: 115 additions & 0 deletions pipeline-runner/R/qc-6-integrate_scdata-fastmnn.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
run_fastmnn <- function(scdata_list, config, cells_id) {
settings <- config$dataIntegration$methodSettings[["fastmnn"]]
nfeatures <- settings$numGenes
normalization <- settings$normalisation
if (grepl("lognorm", normalization, ignore.case = TRUE)) {
normalization <- "LogNormalize"
}

# calculate as many PCs for the PCA as possible, ideally 50, unless few cells
npcs_for_pca <- min(vapply(scdata_list, ncol, integer(1)) - 1, 50)
npcs <- config$dimensionalityReduction$numPCs

# use the min of what the user wants and what can be calculated
if (!is.null(npcs)) {
npcs <- min(config$dimensionalityReduction$numPCs, npcs_for_pca)
}

use_geosketch <- "downsampling" %in% names(config) && config$downsampling$method == "geosketch"

scdata <- prepare_scdata_for_fastmnn(scdata_list, config, cells_id)

# we need RNA assay to compute the integrated matrix
Seurat::DefaultAssay(scdata) <- "RNA"

scdata <- scdata |>
Seurat::NormalizeData(normalization.method = normalization, verbose = FALSE) |>
Seurat::FindVariableFeatures(nfeatures = nfeatures, verbose = FALSE) |>
Seurat::ScaleData(verbose = FALSE) |>
Seurat::RunPCA(npcs = npcs_for_pca, verbose = FALSE)

# estimate number of PCs to be used downstream, for integration and clustering
if (is.null(npcs)) {
scdata@misc[["active.reduction"]] <- "pca"
npcs <- get_npcs(scdata)
message("Estimated number of PCs: ", npcs)
}

scdata <- add_dispersions(scdata, normalization)
misc <- scdata@misc
# remove scale.data slot to avoid errors
# https://github.com/satijalab/seurat-wrappers/issues/126
scdata@assays$RNA@scale.data <- as.matrix(0)

if (use_geosketch) {
scdata <-
RunGeosketchFastMNN(
scdata,
split.by = "samples",
features = nfeatures,
dims = npcs,
config = config
)
misc[["geosketch"]] <- TRUE
} else {
scdata_split <- Seurat::SplitObject(scdata, split.by = "samples")
scdata <-
SeuratWrappers::RunFastMNN(
scdata_split,
features = nfeatures,
d = npcs,
get.variance = TRUE
)
}

scdata@misc <- misc
scdata@misc[["numPCs"]] <- npcs
scdata@misc[["active.reduction"]] <- "mnn"

return(scdata)
}


prepare_scdata_for_fastmnn <- function(scdata_list, config, cells_id) {
# pre-process
scdata_list <- order_by_size(scdata_list)
scdata <- create_scdata(scdata_list, cells_id)

exclude_groups <- config$dimensionalityReduction$excludeGeneCategories
# remove genes groups if required
if (length(exclude_groups) > 0) {
scdata <- remove_genes(scdata, exclude_groups)
}

return(scdata)
}


RunGeosketchFastMNN <- function(scdata, split.by, features, dims, config) {
set.seed(RANDOM_SEED)
perc_num_cells <- config$downsampling$methodSettings$geosketch$percentageToKeep
geosketch_list <- run_geosketch(
scdata,
dims = dims,
perc_num_cells = perc_num_cells
)

scdata_split <- Seurat::SplitObject(geosketch_list$sketch, split.by = split.by)
scdata_sketch_integrated <-
SeuratWrappers::RunFastMNN(
scdata_split,
features = nfeatures,
d = dims,
get.variance = TRUE
)

scdata <- learn_from_sketches(
geosketch_list$scdata,
geosketch_list$sketch,
scdata_sketch_integrated,
dims = dims
)

return(scdata)

}
Loading

0 comments on commit 6c1a2ca

Please sign in to comment.