Skip to content

Commit

Permalink
create samples/files
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Pickering <[email protected]>
  • Loading branch information
alexvpickering committed Nov 6, 2024
1 parent 3cceb29 commit 2cb976c
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 38 deletions.
143 changes: 113 additions & 30 deletions pipeline-runner/R/handle_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ get_nnzero <- function (x) {
}

order_by_size <- function(scdata_list) {
return(scdata_list[order(sapply(scdata_list, get_nnzero))])
return(scdata_list[order(sapply(scdata_list, get_nnzero))])
}

load_source_scdata_list <- function (s3, pipeline_config, experiment_id) {
Expand Down Expand Up @@ -102,9 +102,9 @@ reload_data_from_s3 <- function(pipeline_config, experiment_id, task_name, tasks
load_cells_id_from_s3 <- function(pipeline_config, experiment_id, task_name, tasks, samples) {
s3 <- paws::s3(config = pipeline_config$aws_config)
object_list <- s3$list_objects(
Bucket = pipeline_config$cells_id_bucket,
Prefix = paste0(experiment_id, "/", task_name, "/")
)
Bucket = pipeline_config$cells_id_bucket,
Prefix = paste0(experiment_id, "/", task_name, "/")
)
message(pipeline_config$cells_id_bucket)
message(paste(experiment_id, "r.rds", sep = "/"))
cells_id <- list()
Expand Down Expand Up @@ -146,15 +146,15 @@ load_cells_id_from_s3 <- function(pipeline_config, experiment_id, task_name, tas

build_qc_response <- function(id, input, error, pipeline_config) {
msg <- list(
experimentId = input$experimentId,
taskName = input$taskName,
input = input,
response = list(
error = error
),
pipelineVersion = pipeline_version,
apiUrl = pipeline_config$api_url
)
experimentId = input$experimentId,
taskName = input$taskName,
input = input,
response = list(
error = error
),
pipelineVersion = pipeline_version,
apiUrl = pipeline_config$api_url
)

if (!is.null(id)) {
msg$output <- list(
Expand Down Expand Up @@ -375,7 +375,7 @@ rgb_img_to_ome_zarr <- function(img_arr, output_path, img_name, chunks = as.inte
end = 255
)

z_root <- zarr$open_group(output_path, mode="w", )
z_root <- zarr$open_group(output_path, mode="w")

ome_zarr$writer$write_image(
image=img_arr,
Expand Down Expand Up @@ -410,43 +410,126 @@ rgb_img_to_ome_zarr <- function(img_arr, output_path, img_name, chunks = as.inte
invisible()
}

upload_image_to_s3 <- function(pipeline_config, experiment_id, img_arr, img_name) {
upload_image_to_s3 <- function(pipeline_config, input, experiment_id, img_arr, img_name, img_id) {
# things for api requests
api_url <- pipeline_config$api_url
authJWT <- input$authJWT

# where to save zarr folder locally
zarr_name <- paste0(img_name, '.ome.zarr')
output_path <- file.path(tempdir(), zarr_name)

message("Saving image data to: ", output_path, '...')


# save as ome zarr folder
rgb_img_to_ome_zarr(img_arr, output_path, img_name)

# upload all files in zarr folder
zarr_key <- file.path(experiment_id, zarr_name)
zarr_files <- list.files(output_path, recursive = TRUE, include.dirs = FALSE)
# zip all files in zarr folder
zip_name <- paste0(zarr_name, '.zip')
zip_path <- file.path(tempdir(), zip_name)

workdir <- getwd()
setwd(output_path)
utils::zip(zip_path, files = '.', flags = '-r0')
setwd(workdir)

# upload ome.zarr.zip to s3
# use unique id for the file that is distinct from the sample id
sample_file_id <- ids::uuid()
message(
"Uploading image data to bucket: ", pipeline_config$spatial_image_bucket,
' at key: ', sample_file_id, '...')

put_object_in_s3(
pipeline_config,
pipeline_config$spatial_image_bucket,
object = zip_path,
key = sample_file_id
)

# create sql entry in sample_file, (also creates entry in sample_to_sample_file_map)
create_sample_file(
api_url,
experiment_id,
img_id,
'ome_zarr_zip',
file.size(zip_path),
sample_file_id, # gets used as s3_path by API
authJWT
)

message("Uploading image data to : ",
file.path(pipeline_config$spatial_image_bucket, zarr_key), '...')
invisible()
}

for (zarr_file in zarr_files) {
create_sample_file <- function(api_url, experiment_id, sample_id, file_type, file_size, sample_file_id, authJWT) {
url <- paste0(api_url, "/v2/experiments/", experiment_id, "/samples/", sample_id, '/sampleFiles/', file_type)

put_object_in_s3(
pipeline_config,
pipeline_config$spatial_image_bucket,
object = file.path(output_path, zarr_file),
key = file.path(zarr_key, zarr_file))
body <- list(
sampleFileId = sample_file_id,
size = file_size,
uploadStatus = 'uploaded'
)

response <- httr::POST(
url,
body = body,
encode = "json",
httr::add_headers("Content-Type" = "application/json",
"Authorization" = authJWT)
)

if (httr::status_code(response) >= 400) {
stop("API post to create sample file failed with status code: ", httr::status_code(response))
}
}

invisible()
create_sample <- function(api_url, experiment_id, sample_name, sample_technology, auth_JWT) {
url <- paste0(api_url, "/v2/experiments/", experiment_id, "/samples")

body <- list(list(
name = sample_name,
sampleTechnology = sample_technology,
options = c()
))

response <- httr::POST(
url,
body = body,
encode = "json",
httr::add_headers("Content-Type" = "application/json",
"Authorization" = auth_JWT)
)

if (httr::status_code(response) >= 400) {
stop("API post to create sample failed with status code: ", httr::status_code(response))
}
sample_id <- httr::content(response)[[1]]
return(sample_id)
}

upload_images_to_s3 <- function(pipeline_config, experiment_id, scdata) {

convert_camel_to_snake <- function(camel_string) {
# Use gsub to find uppercase letters and replace them with an underscore followed by the lowercase version
snake_string <- gsub("([a-z0-9])([A-Z])", "\\1_\\2", camel_string)

# Convert the entire string to lowercase
snake_string <- tolower(snake_string)

return(snake_string)
}

upload_images_to_s3 <- function(pipeline_config, input, experiment_id, scdata) {

# sample name to id map
sample_ids <- input$sampleIds
names(sample_ids) <- input$sampleNames

img_names <- Seurat::Images(scdata)

for (img_name in img_names) {
img_id <- sample_ids[img_name]
img_arr <- scdata@images[[img_name]]@image
upload_image_to_s3(pipeline_config, experiment_id, img_arr, img_name)
upload_image_to_s3(pipeline_config, input, experiment_id, img_arr, img_name, img_id)
}
}

Expand Down
7 changes: 4 additions & 3 deletions pipeline-runner/R/obj2s-2-load_obj2s_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ reconstruct_seurat_spatial <- function(dataset_fpath) {
meta.data = metadata,
)

# add image annotation
# add image annotation as samples column
image_names <- Seurat::Images(user_scdata)
scdata$image <- NA
scdata$samples <- NA
for (image_name in image_names) {
image_cells <- Seurat:::CellsByImage(user_scdata, image_name, unlist = TRUE)
scdata@meta.data[image_cells, 'image'] <- image_name
scdata@meta.data[image_cells, 'samples'] <- image_name
}

# use library size factors for logcounts
Expand Down Expand Up @@ -189,6 +189,7 @@ reconstruct_seurat_spatial <- function(dataset_fpath) {

# TODO: ensure class of image can be handled
# stopifnot(class(image) %in% c('VisiumV2', 'VisiumV1'))

image <- user_scdata@images[[image_name]]

check_type_is_safe(image)
Expand Down
27 changes: 22 additions & 5 deletions pipeline-runner/R/obj2s-3-upload_obj2s_to_aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ upload_obj2s_to_aws <- function(input, pipeline_config, prev_out) {

scdata <- format_obj2s(scdata, experiment_id)

# add entries to sample table and get returned ids
input <- add_samples_to_input(scdata, pipeline_config, experiment_id, input)

# change sample ids/names so that get sample cell sets
input <- add_samples_to_input(scdata, input)
input <- add_metadata_to_input(scdata, input)
scdata <- change_sample_names_to_ids(scdata, input)
cell_sets <- get_cell_sets(scdata, input)
Expand Down Expand Up @@ -55,7 +57,7 @@ upload_obj2s_to_aws <- function(input, pipeline_config, prev_out) {
message('Count matrix uploaded to ', pipeline_config$processed_bucket, ' with key ',object_key)

# images for spatial to s3
upload_images_to_s3(pipeline_config, experiment_id, scdata)
upload_images_to_s3(pipeline_config, input, experiment_id, scdata)

experiment_data <- list(
apiVersion = "2.0.0-data-ingest-seurat-rds-automated",
Expand Down Expand Up @@ -178,10 +180,25 @@ test_groups_equal <- function(vals1, vals2) {
}


add_samples_to_input <- function(scdata, input) {
add_samples_to_input <- function(scdata, pipeline_config, experiment_id, input) {
samples <- unique(scdata$samples)
sample_ids <- c()

for (sample_name in samples) {
sample_id <- create_sample(
pipeline_config$api_url,
experiment_id,
sample_name,
'obj2s_sample',
input$authJWT
)

sample_ids <- c(sample_ids, sample_id)
}


input$sampleNames <- samples
input$sampleIds <- ids::uuid(n = length(samples))
input$sampleIds <- sample_ids
return(input)
}

Expand Down Expand Up @@ -276,7 +293,7 @@ format_obj2s <- function(scdata, experiment_id) {

# use 'samples' or 'sample' if present, otherwise assume one sample
add_samples_col <- function(scdata) {
samples_cols <- c('samples', 'sample', 'image')
samples_cols <- c('samples', 'sample')
in.meta <- samples_cols %in% colnames(scdata@meta.data)

if (!any(in.meta)) {
Expand Down

0 comments on commit 2cb976c

Please sign in to comment.