From 65129c8c28659ed61e5928e53f949f853d5a0a24 Mon Sep 17 00:00:00 2001 From: Niek Palm Date: Tue, 30 Nov 2021 23:19:35 +0100 Subject: [PATCH] Add retry mechanisme for scaling errors Add retry mechanisme for scaling errors Add retry mechanisme for scaling errors Add retry mechanisme for scaling errors --- examples/default/main.tf | 18 ++++++++----- modules/runners/lambdas/runners/src/lambda.ts | 26 ++++++++++++------- .../runners/src/scale-runners/runners.ts | 21 ++++++++------- .../runners/src/scale-runners/scale-up.ts | 13 +++++----- modules/runners/scale-up.tf | 1 + 5 files changed, 49 insertions(+), 30 deletions(-) diff --git a/examples/default/main.tf b/examples/default/main.tf index c2362c9cee..a85552489c 100644 --- a/examples/default/main.tf +++ b/examples/default/main.tf @@ -30,11 +30,11 @@ module "runners" { webhook_secret = random_password.random.result } - webhook_lambda_zip = "lambdas-download/webhook.zip" - runner_binaries_syncer_lambda_zip = "lambdas-download/runner-binaries-syncer.zip" - runners_lambda_zip = "lambdas-download/runners.zip" - enable_organization_runners = false - runner_extra_labels = "default,example" + # webhook_lambda_zip = "lambdas-download/webhook.zip" + # runner_binaries_syncer_lambda_zip = "lambdas-download/runner-binaries-syncer.zip" + # runners_lambda_zip = "lambdas-download/runners.zip" + enable_organization_runners = true + runner_extra_labels = "default,example" # enable access to the runners via SSM enable_ssm_on_runners = true @@ -61,8 +61,14 @@ module "runners" { instance_types = ["m5.large", "c5.large"] # override delay of events in seconds - delay_webhook_event = 0 + delay_webhook_event = 10 + //job_queue_retention_in_seconds = 600 + //job_queue_retention_in_seconds = 60 + runners_maximum_count = 1 # override scaling down scale_down_schedule_expression = "cron(* * * * ? *)" + + enable_ephemeral_runners = true + disable_check_wokflow_job_labels = true } diff --git a/modules/runners/lambdas/runners/src/lambda.ts b/modules/runners/lambdas/runners/src/lambda.ts index a784c0d059..cd394f6ac2 100644 --- a/modules/runners/lambdas/runners/src/lambda.ts +++ b/modules/runners/lambdas/runners/src/lambda.ts @@ -2,21 +2,29 @@ import { scaleUp } from './scale-runners/scale-up'; import { scaleDown } from './scale-runners/scale-down'; import { SQSEvent, ScheduledEvent, Context, Callback } from 'aws-lambda'; import { logger } from './scale-runners/logger'; +import ScaleError from './scale-runners/ScaleError'; import 'source-map-support/register'; export async function scaleUpHandler(event: SQSEvent, context: Context, callback: Callback): Promise { logger.setSettings({ requestId: context.awsRequestId }); logger.debug(JSON.stringify(event)); - try { - for (const e of event.Records) { - await scaleUp(e.eventSource, JSON.parse(e.body)); - } - - callback(null); - } catch (e) { - logger.error(e); - callback('Failed handling SQS event'); + // TODO find the a more elegant way :( + if (event.Records.length != 1) { + logger.warn('Event ignored, only on record at the time can be handled, ensure the lambda batch size is set to 1.'); + return new Promise((resolve) => resolve()); } + + return new Promise((resolve, reject) => { + scaleUp(event.Records[0].eventSource, JSON.parse(event.Records[0].body)) + .then(() => resolve()) + .catch((e: Error) => { + if (e instanceof ScaleError) { + reject(e); + } else { + logger.warn('Ignoring error: ', e); + } + }); + }); } export async function scaleDownHandler(event: ScheduledEvent, context: Context, callback: Callback): Promise { diff --git a/modules/runners/lambdas/runners/src/scale-runners/runners.ts b/modules/runners/lambdas/runners/src/scale-runners/runners.ts index 9d152d9007..c1be0adb0d 100644 --- a/modules/runners/lambdas/runners/src/scale-runners/runners.ts +++ b/modules/runners/lambdas/runners/src/scale-runners/runners.ts @@ -85,16 +85,19 @@ export async function createRunner(runnerParameters: RunnerInputParameters, laun .runInstances(getInstanceParams(launchTemplateName, runnerParameters)) .promise(); logger.info('Created instance(s): ', runInstancesResponse.Instances?.map((i) => i.InstanceId).join(',')); + const ssm = new SSM(); - runInstancesResponse.Instances?.forEach(async (i: EC2.Instance) => { - await ssm - .putParameter({ - Name: runnerParameters.environment + '-' + (i.InstanceId as string), - Value: runnerParameters.runnerServiceConfig, - Type: 'SecureString', - }) - .promise(); - }); + if (runInstancesResponse.Instances != undefined) { + for (let i = 0; i < runInstancesResponse.Instances?.length; i++) { + await ssm + .putParameter({ + Name: runnerParameters.environment + '-' + (runInstancesResponse.Instances[i].InstanceId as string), + Value: runnerParameters.runnerServiceConfig, + Type: 'SecureString', + }) + .promise(); + } + } } function getInstanceParams( diff --git a/modules/runners/lambdas/runners/src/scale-runners/scale-up.ts b/modules/runners/lambdas/runners/src/scale-runners/scale-up.ts index 7a71a5d5f8..5eaf9f0de1 100644 --- a/modules/runners/lambdas/runners/src/scale-runners/scale-up.ts +++ b/modules/runners/lambdas/runners/src/scale-runners/scale-up.ts @@ -3,6 +3,7 @@ import { createOctoClient, createGithubAppAuth, createGithubInstallationAuth } f import yn from 'yn'; import { Octokit } from '@octokit/rest'; import { logger as rootLogger } from './logger'; +import ScaleError from './ScaleError'; const logger = rootLogger.getChildLogger(); @@ -57,9 +58,7 @@ export async function scaleUp(eventSource: string, payload: ActionRequestMessage const runnerType = enableOrgLevel ? 'Org' : 'Repo'; const runnerOwner = enableOrgLevel ? payload.repositoryOwner : `${payload.repositoryOwner}/${payload.repositoryName}`; - const isQueued = await getJobStatus(githubInstallationClient, payload); - // ephemeral runners should be created on every event, will only work with `workflow_job` events. - if (ephemeral || isQueued) { + if (ephemeral || (await getJobStatus(githubInstallationClient, payload))) { const currentRunners = await listEC2Runners({ environment, runnerType, @@ -67,7 +66,6 @@ export async function scaleUp(eventSource: string, payload: ActionRequestMessage }); logger.info(`${runnerType} ${runnerOwner} has ${currentRunners.length}/${maximumRunners} runners`); - // TODO: how to handle the event if the max is reached in case of ephemeral runners if (currentRunners.length < maximumRunners) { console.info(`Attempting to launch a new runner`); // create token @@ -94,7 +92,10 @@ export async function scaleUp(eventSource: string, payload: ActionRequestMessage runnerType, }); } else { - logger.info('No runner will be created, maximum number of runners reached.'); + logger.warn('No runner created: maximum number of runners reached.'); + if (ephemeral) { + throw new ScaleError('No runners create: maximum of runners reached.'); + } } } } @@ -139,6 +140,6 @@ export async function createRunnerLoop(runnerParameters: RunnerInputParameters): } } if (launched == false) { - throw Error('All launch templates failed'); + throw new ScaleError('All launch templates failed'); } } diff --git a/modules/runners/scale-up.tf b/modules/runners/scale-up.tf index 77057dd400..e217cf4b74 100644 --- a/modules/runners/scale-up.tf +++ b/modules/runners/scale-up.tf @@ -50,6 +50,7 @@ resource "aws_cloudwatch_log_group" "scale_up" { resource "aws_lambda_event_source_mapping" "scale_up" { event_source_arn = var.sqs_build_queue.arn function_name = aws_lambda_function.scale_up.arn + batch_size = 1 } resource "aws_lambda_permission" "scale_runners_lambda" {