Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BIOMAGE-1864] added heartbeat to tasks #224

Merged
merged 10 commits into from
Apr 21, 2022
4 changes: 2 additions & 2 deletions pipeline-runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ FROM builder AS prod
# see at https://github.com/paws-r/paws/blob/main/docs/credentials.md for how PAWS
# handles this
#
RUN wget -O jq https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64 && chmod +x ./jq && cp jq /usr/bin
RUN pip install awscli
RUN wget -O jq https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64 && chmod +x ./jq && cp jq /usr/bin && \
pip install awscli
COPY aws_config /root/.aws/config

# start app
Expand Down
68 changes: 63 additions & 5 deletions pipeline-runner/init.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ library(zeallot)
library(tryCatchLog)
library(futile.logger)
library(magrittr)
library(callr)

# time stamp used for directory to store log/dump files in event of error
debug_timestamp <- format(Sys.time(), format = "%Y-%m-%d_at_%H-%M-%OS3")
Expand Down Expand Up @@ -282,6 +283,45 @@ call_data_processing <- function(task_name, input, pipeline_config) {
return(message_id)
}

#
# start_heartbeat(task_token, aws_config)
# IN task_token, aws_config
#
# Sends a heartbeat to the state machine every 'wait_time' seconds
# Once the task is completed the heartbeat will fail accordingly with a
# task timeout and exit the loop and a new heartbeat will be set up by next task.
Copy link
Contributor

@aerlaut aerlaut Apr 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task timeout is 10800 seconds (3 hours). If the task succeeds, does this mean that this bg process will run for 3 hours before failing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no:

  1. as soon as the task is completed the heartbeat fails and the loop stops
  2. when a pipeline finishes the pod and everything running inside is just killed

# This method is invoked with `callr::r_bg` which creates a new process which does not inherit
# the current workspace or memory, only the provided parameters; that's why we need to
# reimport tryCatchLog & initialize states again.
#
start_heartbeat <- function(task_token, aws_config) {
library(tryCatchLog)
message("Starting hearbeat")
states <- paws::sfn(config=aws_config)

keep_running <- TRUE
# amount of time to wait between heartbeats
wait_time <- 30
i <- 0
while (keep_running) {
tryCatchLog({
states$send_task_heartbeat(
taskToken = task_token
)
message("Heartbeat sent: ", i)
},
error = function(e) {
message("Send task heartbeat failed: ", e$message)
message("Stopping heartbeat after ", i+1)
keep_running <- FALSE
})
i <- i + 1
# sleep until next hearbeat
Sys.sleep(wait_time)

}
}

#
# Wrapper(input_json)
# IN input_json: json input from message. Input should have:
Expand All @@ -296,6 +336,7 @@ wrapper <- function(input) {
str(input)
message("")


# common to gem2s and data processing
server <- input$server
input <- input[names(input) != "server"]
Expand Down Expand Up @@ -331,12 +372,12 @@ init <- function() {
message("Waiting for tasks")

repeat {
c(taskToken, input_json) %<-% states$get_activity_task(
c(task_token, input_json) %<-% states$get_activity_task(
activityArn = pipeline_config$activity_arn,
workerName = pipeline_config$pod_name
)

if(is.null(taskToken) || !length(taskToken) || taskToken == "") {
if(is.null(task_token) || !length(task_token) || task_token == "") {
message('No input received during last poll, shutting down...')
quit('no')
}
Expand All @@ -349,12 +390,26 @@ init <- function() {
dump_folder <- file.path(DEBUG_PATH, debug_prefix)
flog.appender(appender.tee(file.path(dump_folder, "logs.txt")))

# start heartbeat as a different process in the background
message("Starting heartbeat")
# message inside r_bg will ONLY be printed into /tmp/[out|err]
# to see them
# 1. log into the R container
# 2. cat /tmp/out o tail -f /tmp/out
kafkasl marked this conversation as resolved.
Show resolved Hide resolved
heartbeat_proc <- r_bg(func=start_heartbeat, args=list(
task_token, pipeline_config$aws_config),
stdout = "/tmp/out",
stderr = "/tmp/err"
)

tryCatchLog({


wrapper(input)

message('Send task success\n------\n')
states$send_task_success(
taskToken = taskToken,
taskToken = task_token,
output = "{}"
)
},
Expand All @@ -369,13 +424,13 @@ init <- function() {

message(error_txt)
states$send_task_failure(
taskToken = taskToken,
taskToken = task_token,
error = "We had an issue while processing your data.",
cause = error_txt
)

send_pipeline_fail_update(pipeline_config, input, error_txt)
message("Sent task failure to state machine task: ", taskToken)
message("Sent task failure to state machine task: ", task_token)

if (pipeline_config$cluster_env != 'development') {
upload_debug_folder_to_s3(debug_prefix, pipeline_config)
Expand All @@ -385,6 +440,9 @@ init <- function() {
},
write.error.dump.file = TRUE,
write.error.dump.folder = dump_folder)

# kill heartbeat process
heartbeat_proc$kill()
}
}

Expand Down