Skip to content

Commit

Permalink
Merge pull request #346 from biomage-org/fix-norm-matrix-bug
Browse files Browse the repository at this point in the history
Fix normalized expression matrix for big datasets
  • Loading branch information
saracastel authored Oct 25, 2023
2 parents f07d5a1 + 962416d commit fa391bb
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 31 deletions.
3 changes: 2 additions & 1 deletion python/src/worker/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def REDIS_CLIENT(self):
# whereas in a container, it is mounted to `/data`. Either way, this ensures
# that the appropriate path is selected, as both are two directories up
LOCAL_DIR=os.path.join(os.pardir, os.pardir, "data"),
RDS_PATH = "/data/processed.rds"
RDS_PATH = "/data/processed.rds",
TMP_RESULTS_PATH_GZ = "/data/rResult.gz"
)

config.API_URL = (
Expand Down
12 changes: 9 additions & 3 deletions python/src/worker/response.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import gzip
import io
import ujson
from logging import info
import base64
import sys
import os

import aws_xray_sdk as xray
import boto3
Expand Down Expand Up @@ -157,11 +157,17 @@ def publish(self):

if not self.error and self.cacheable:
info("Uploading response to S3")
if self.result.data == config.RDS_PATH:
if self.result.data == config.RDS_PATH or self.result.data == config.TMP_RESULTS_PATH_GZ:
self._upload(self.result.data, "path")
else:
s3_data, socket_data = self._construct_data_for_upload()
self._upload(s3_data, "obj")

info("Sending socket.io message to clients subscribed to work response")
return self._send_notification(socket_data)
self._send_notification(socket_data)

# Remove the temporary file to transfer data between R and python
# to free up memory
if self.result.data == config.TMP_RESULTS_PATH_GZ:
info("Cleaning up temp files generated by work result")
os.remove(config.TMP_RESULTS_PATH_GZ)
7 changes: 5 additions & 2 deletions r/R/normalized_matrix.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@ GetNormalizedExpression <- function(req, data) {

matrix <- tibble::rownames_to_column(matrix, var = " ")

return(vroom::vroom_format(matrix, delim=","))
}
# fwrite compresses the file based on filename extension
data.table::fwrite(matrix, TMP_RESULTS_PATH_GZ, quote = F, row.names = T)

return(TMP_RESULTS_PATH_GZ)
}
Binary file modified r/R/sysdata.rda
Binary file not shown.
3 changes: 3 additions & 0 deletions r/data-raw/sysdata.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ IDS_IDS <- "ids_ids"

RDS_PATH <- "/data/processed.rds"

TMP_RESULTS_PATH_GZ <- "/data/rResult.gz"

usethis::use_data(error_codes,
ULTIMATE_SEED,
QUANTILE_THRESHOLD,
Expand All @@ -28,5 +30,6 @@ usethis::use_data(error_codes,
IDS_SYM,
IDS_IDS,
RDS_PATH,
TMP_RESULTS_PATH_GZ,
internal = TRUE,
overwrite = TRUE)
80 changes: 55 additions & 25 deletions r/tests/testthat/test-normalized_matrix.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,64 +11,94 @@ mock_req <- function(apply_subset = TRUE) {
body = list(
subsetBy = subset_by,
applySubset = apply_subset
)
)
)
)
}


stub_fwrite <- function(matrix, fpath, quote = FALSE, row.names = TRUE) {
# add '.' to path to point it to cwd instead of system root
fpath <- paste0(".", fpath)
if (!dir.exists(dirname(fpath))) {
dir.create(dirname(fpath), recursive = TRUE)
}
data.table::fwrite(matrix, fpath, quote = FALSE, row.names = TRUE)
return(fpath)
}

string_to_df <- function(text_df) {
df <- read.table(text = text_df, sep =",", header=TRUE)

# This part to rollback: rownames_to_column
rownames(df) <- df[,1]
df[,1] <- NULL

return(df)
stubbed_GetNormalizedExpression <- function(req, data) {
mockery::stub(
GetNormalizedExpression,
"data.table::fwrite",
stub_fwrite
)

res <- GetNormalizedExpression(req, data)
return(paste0(".", res))
}

test_that("GetNormalizedExpression generates the expected string format", {

test_that("GetNormalizedExpression saves the normalized matrix using the correct path", {
data <- mock_scdata()
req <- mock_req()

res <- GetNormalizedExpression(req, data)
res <- stubbed_GetNormalizedExpression(req, data)
withr::defer(unlink(dirname(res), recursive = TRUE))

expect_type(res, "character")
expect_equal(gsub("^\\.", "", res), TMP_RESULTS_PATH_GZ)
})

test_that("subsetting is applied and changes GetNormalizedExpression output", {

test_that("GetNormalizedExpression correctly subsets the data", {
data <- mock_scdata()
req <- mock_req()

res_filt <- GetNormalizedExpression(req, data)
original_ncol <- ncol(data)

req <- mock_req(apply_subset = FALSE)
res_unfilt <- GetNormalizedExpression(req, data)
res <- stubbed_GetNormalizedExpression(req, data)
withr::defer(unlink(dirname(res), recursive = TRUE))

matrix <- data.table::fread(res, sep=",", quote = F, header = TRUE)

expect_false(identical(res_unfilt, res_filt))
expect_equal(ncol(matrix) - 2, length(req$body$subsetBy)) # -1 because of the rownames and index columns
expect_false(ncol(data) == ncol(matrix))
})

test_that("GetNormalizedExpression correctly subsets the data", {
test_that("subsetting is applied and changes GetNormalizedExpression output", {
data <- mock_scdata()
req <- mock_req()

subset_ids <- req$body$subsetBy
res_filt <- stubbed_GetNormalizedExpression(req, data)
withr::defer(unlink(dirname(res_filt), recursive = TRUE))
matrix_filt <- data.table::fread(res_filt, sep=",", quote = F, header = TRUE)

res <- GetNormalizedExpression(req, data)

df <- string_to_df(res)
req <- mock_req(apply_subset = FALSE)
res_unfilt <- stubbed_GetNormalizedExpression(req, data)
withr::defer(unlink(dirname(res_unfilt), recursive = TRUE))
matrix_unfilt <- data.table::fread(res_unfilt, sep=",", quote = F, header = TRUE)

expect_false(ncol(data) == ncol(df))
expect_equal(ncol(df), length(subset_ids))
expect_false(identical(matrix_unfilt, matrix_filt))
})


test_that("GetNormalizedExpression doesn't subset the data when applySubset is FALSE", {
data <- mock_scdata()
req <- mock_req(apply_subset = FALSE)

res <- GetNormalizedExpression(req, data)
expect_message(
{
res <- stubbed_GetNormalizedExpression(req, data)
withr::defer(unlink(dirname(res), recursive = TRUE))
},
"No subsetting specified, sending the whole matrix"
)

df <- string_to_df(res)
matrix <-data.table::fread(res, sep=",", quote = F, header = TRUE)

expect_true(ncol(data) == ncol(df))
expect_true(ncol(data) == ncol(matrix) - 2)
})


Expand Down

0 comments on commit fa391bb

Please sign in to comment.