Skip to content

Commit

Permalink
Handle gem2s response refactoring (#68)
Browse files Browse the repository at this point in the history
* Add sns updates when tasks finish

* Handle gem2s response refactoring

* Minor renaming

* Fix

* Fix

Co-authored-by: Martin Fosco <[email protected]>
  • Loading branch information
cosa65 and Martin Fosco authored May 27, 2021
1 parent 7e93f2d commit 117d0b4
Show file tree
Hide file tree
Showing 9 changed files with 9 additions and 19 deletions.
2 changes: 0 additions & 2 deletions pipeline-runner/src/data-ingest/0-download_gem2s.r
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,4 @@ task <- function(input, pipeline_config) {
message(exportJSON)
write(exportJSON, "/input/meta.json")
message('Written config json')

send_update_to_api(pipeline_config, experiment_id = input$experimentId, status_msg = "OK")
}
2 changes: 0 additions & 2 deletions pipeline-runner/src/data-ingest/1_Preproc.r
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,4 @@ task <- function(input, pipeline_config) {

message("Step 1 completed.")
print(list.files(paste("/output",sep = "/"),all.files=TRUE,full.names=TRUE,recursive=TRUE))

send_update_to_api(pipeline_config, experiment_id = input$experimentId, status_msg = "OK")
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,5 @@ task <- function(input,pipeline_config){

message("Step 2-1 completed.")
print(list.files(paste("/output",sep = "/"),all.files=TRUE,full.names=TRUE,recursive=TRUE))

send_update_to_api(pipeline_config, experiment_id = input$experimentId, status_msg = "OK")
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,5 @@ task <- function(input,pipeline_config){

message("Step 2-2 completed.")
print(list.files(paste("/output",sep = "/"),all.files=TRUE,full.names=TRUE,recursive=TRUE))

send_update_to_api(pipeline_config, experiment_id = input$experimentId, status_msg = "OK")
}

2 changes: 0 additions & 2 deletions pipeline-runner/src/data-ingest/3_Seurat.r
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,5 @@ task <- function(input,pipeline_config){

message("Step 3 completed.")
print(list.files(paste("/output",sep = "/"),all.files=TRUE,full.names=TRUE,recursive=TRUE))

send_update_to_api(pipeline_config, experiment_id = input$experimentId, status_msg = "OK")
}

2 changes: 0 additions & 2 deletions pipeline-runner/src/data-ingest/4_Prepare_experiment.r
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,6 @@ task <- function(input,pipeline_config){

message("Step 4 completed.")
print(list.files(paste("/output",sep = "/"),all.files=TRUE,full.names=TRUE,recursive=TRUE))

send_update_to_api(pipeline_config, experiment_id = input$experimentId, status_msg = "OK")
}


6 changes: 4 additions & 2 deletions pipeline-runner/src/data-ingest/5_Upload-to-aws.r
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,15 @@ task <- function(input, pipeline_config) {
send_dynamodb_item_to_api(pipeline_config,
experiment_id = experiment_id,
table = pipeline_config$experiments_table,
item = experiment_data)
item = experiment_data,
task_name = "uploadToAWS")

# samples data to dynamodb
send_dynamodb_item_to_api(pipeline_config,
experiment_id = experiment_id,
table = pipeline_config$samples_table,
item = samples_data)
item = samples_data,
task_name = "uploadToAWS")

if (cluster_env == "production")
print(sprintf("https://scp.biomage.net/experiments/%s/data-exploration", experiment_id))
Expand Down
8 changes: 4 additions & 4 deletions pipeline-runner/src/handle_data.r
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ upload_matrix_to_s3 <- function(pipeline_config, experiment_id, data) {
return(object_key)
}

send_update_to_api <- function(pipeline_config, experiment_id, status_msg) {
send_gem2s_update_to_api <- function(pipeline_config, experiment_id, task_name) {
message("Sending to SNS topic ", pipeline_config$sns_topic)
sns <- paws::sns(config=pipeline_config$aws_config)

msg <- list(
status = status_msg,
taskName = task_name,
experimentId = experiment_id
)

Expand All @@ -139,12 +139,12 @@ send_update_to_api <- function(pipeline_config, experiment_id, status_msg) {
return(result$MessageId)
}

send_dynamodb_item_to_api <- function(pipeline_config, experiment_id, table, item) {
send_dynamodb_item_to_api <- function(pipeline_config, experiment_id, table, item, task_name) {
message("Sending to SNS topic ", pipeline_config$sns_topic)
sns <- paws::sns(config=pipeline_config$aws_config)

msg <- list(
status = "OK",
taskName = task_name,
experimentId = experiment_id,
item = item,
table = table
Expand Down
2 changes: 1 addition & 1 deletion pipeline-runner/src/init.r
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ require("paws")
require("zeallot")
require("ids")


source("handle_data.r")
source("utils.r")

Expand Down Expand Up @@ -135,6 +134,7 @@ run_gem2s_step <- function(task_name, input, pipeline_config){
)
print("Starting task")
task(input, pipeline_config)
send_gem2s_update_to_api(pipeline_config, experiment_id = input$experimentId, task_name = task_name)
}

#
Expand Down

0 comments on commit 117d0b4

Please sign in to comment.