diff --git a/.github/workflows/test-deploy.yml b/.github/workflows/test-deploy.yml index 97e68680d..bbe2817f1 100644 --- a/.github/workflows/test-deploy.yml +++ b/.github/workflows/test-deploy.yml @@ -1,7 +1,7 @@ name: Test & Deploy on: schedule: - - cron: 0 1 * * 6 # Sat 01:00 + - cron: 0 1 * * 6 # Sat 01:00 release: types: [published] pull_request_target: diff --git a/README.md b/README.md index 88b452917..21dae0277 100644 --- a/README.md +++ b/README.md @@ -475,54 +475,55 @@ For example, `docker://dvcorg/cml:0-dvc2-base1-gpu`, or The `cml-runner` function accepts the following arguments: ``` -Usage: cml-runner.js +Usage: cml-runner Options: - --version Show version number [boolean] - --labels One or more user-defined labels for this runner - (delimited with commas) [default: "cml"] - --idle-timeout Time in seconds for the runner to be waiting for - jobs before shutting down. Setting it to 0 - disables automatic shutdown [default: 300] - --name Name displayed in the repository once registered - [default: "cml-4wdd123kha"] - --single Exit after running a single job + --version Show version number [boolean] + --labels One or more user-defined labels for this runner + (delimited with commas) [default: "cml"] + --idle-timeout Time in seconds for the runner to be waiting for + jobs before shutting down. Setting it to 0 + disables automatic shutdown [default: 300] + --name Name displayed in the repository once registered + cml-{ID} + --no-retry Do not restart workflow terminated due to instance + disposal or GitHub Actions timeout [boolean] [default: false] - --reuse Don't launch a new runner if an existing one has - the same name or overlapping labels + --single Exit after running a single job [boolean] [default: false] - --driver Platform where the repository is hosted. If not - specified, it will be inferred from the - environment [choices: "github", "gitlab"] - --repo Repository to be used for registering the runner. - If not specified, it will be inferred from the - environment - --token Personal access token to register a self-hosted - runner on the repository. If not specified, it - will be inferred from the environment - --cloud Cloud to deploy the runner - [choices: "aws", "azure"] - --cloud-region Region where the instance is deployed. Choices: - [us-east, us-west, eu-west, eu-north]. Also - accepts native cloud regions [default: "us-west"] - --cloud-type Instance type. Choices: [m, l, xl]. Also supports - native types like i.e. t2.micro - --cloud-gpu GPU type. [choices: "nogpu", "k80", "tesla"] - --cloud-hdd-size HDD size in GB. - --cloud-ssh-private Custom private RSA SSH key. If not provided an - automatically generated throwaway key will be - used [default: ""] - --cloud-ssh-private-visible Show the private SSH key in the output with the - rest of the instance properties (not recommended) - [boolean] - --cloud-spot Request a spot instance [boolean] - --cloud-spot-price Maximum spot instance bidding price in USD. - Defaults to the current spot bidding price - [default: "-1"] - --cloud-startup-script Run the provided Base64-encoded Linux shell - script during the instance initialization + --reuse Don't launch a new runner if an existing one has + the same name or overlapping labels + [boolean] [default: false] + --driver Platform where the repository is hosted. If not + specified, it will be inferred from the + environment [choices: "github", "gitlab"] + --repo Repository to be used for registering the runner. + If not specified, it will be inferred from the + environment + --token Personal access token to register a self-hosted + runner on the repository. If not specified, it + will be inferred from the environment + --cloud Cloud to deploy the runner + [choices: "aws", "azure", "kubernetes"] + --cloud-region Region where the instance is deployed. Choices: + [us-east, us-west, eu-west, eu-north]. Also + accepts native cloud regions [default: "us-west"] + --cloud-type Instance type. Choices: [m, l, xl]. Also supports + native types like i.e. t2.micro + --cloud-gpu GPU type. + [choices: "nogpu", "k80", "v100", "tesla"] + --cloud-hdd-size HDD size in GB + --cloud-ssh-private Custom private RSA SSH key. If not provided an + automatically generated throwaway key will be used [default: ""] - -h Show help [boolean] + --cloud-spot Request a spot instance [boolean] + --cloud-spot-price Maximum spot instance bidding price in USD. + Defaults to the current spot bidding price + [default: "-1"] + --cloud-startup-script Run the provided Base64-encoded Linux shell script + during the instance initialization [default: ""] + --cloud-aws-security-group Specifies the security group in AWS [default: ""] + -h Show help [boolean] ``` #### Environment Variables diff --git a/bin/cml-runner.js b/bin/cml-runner.js index cefc2e2eb..2978605c6 100755 --- a/bin/cml-runner.js +++ b/bin/cml-runner.js @@ -17,97 +17,97 @@ const { RUNNER_PATH = `${WORKDIR_BASE}/${NAME}`, RUNNER_IDLE_TIMEOUT = 5 * 60, - RUNNER_DESTROY_DELAY = 30, + RUNNER_DESTROY_DELAY = 20, RUNNER_LABELS = 'cml', RUNNER_NAME = NAME, RUNNER_SINGLE = false, RUNNER_REUSE = false, + RUNNER_NO_RETRY = false, RUNNER_DRIVER, RUNNER_REPO, REPO_TOKEN } = process.env; let cml; -let RUNNER_LAUNCHED = false; +let RUNNER; let RUNNER_TIMEOUT_TIMER = 0; let RUNNER_SHUTTING_DOWN = false; -const RUNNER_JOBS_RUNNING = []; +let RUNNER_JOBS_RUNNING = []; +const GH_5_MIN_TIMEOUT = (72 * 60 - 5) * 60 * 1000; const shutdown = async (opts) => { if (RUNNER_SHUTTING_DOWN) return; - RUNNER_SHUTTING_DOWN = true; - let { error, cloud } = opts; - const { name, tfResource, workdir = '' } = opts; + const { error, cloud } = opts; + const { name, workdir = '', tfResource, noRetry } = opts; const tfPath = workdir; - console.log( - JSON.stringify({ level: error ? 'error' : 'info', status: 'terminated' }) - ); - if (error) console.error(error); - const unregisterRunner = async () => { + if (!RUNNER) return; + try { - console.log('Unregistering runner...'); + console.log(`Unregistering runner ${name}...`); + RUNNER && RUNNER.kill('SIGINT'); await cml.unregisterRunner({ name }); console.log('\tSuccess'); } catch (err) { - console.error('\tFailed'); - error = err; + console.error(`\tFailed: ${err.message}`); } }; - const shutdownDockerMachine = async () => { - console.log('docker-machine destroy...'); - console.log( - 'Docker machine is deprecated and this will be removed!! Check how to deploy using our tf provider.' - ); + const retryWorkflows = async () => { try { - await exec(`echo y | docker-machine rm ${DOCKER_MACHINE}`); + if (!noRetry && RUNNER_JOBS_RUNNING.length) { + await Promise.all( + RUNNER_JOBS_RUNNING.map( + async (job) => await cml.pipelineRestart({ jobId: job.id }) + ) + ); + } } catch (err) { - console.error(`\tFailed shutting down docker machine: ${err.message}`); - error = err; + console.error(err); } }; - const shutdownTf = async () => { - const { tfResource } = opts; - - if (!tfResource) { - console.log(`\tNo TF resource found`); - return; - } + const destroyDockerMachine = async () => { + if (!DOCKER_MACHINE) return; + console.log('docker-machine destroy...'); + console.log( + 'Docker machine is deprecated and will be removed!! Check how to deploy using our tf provider.' + ); try { - await tf.destroy({ dir: tfPath }); + await exec(`echo y | docker-machine rm ${DOCKER_MACHINE}`); } catch (err) { - console.error(`\tFailed Terraform destroy: ${err.message}`); - error = err; + console.error(`\tFailed shutting down docker machine: ${err.message}`); } }; const destroyTerraform = async () => { + if (!tfResource) return; + try { console.log(await tf.destroy({ dir: tfPath })); } catch (err) { console.error(`\tFailed destroying terraform: ${err.message}`); - error = err; } }; + console.log( + JSON.stringify({ level: error ? 'error' : 'info', status: 'terminated' }) + ); + if (error) console.error(error); + await sleep(RUNNER_DESTROY_DELAY); + if (cloud) { await destroyTerraform(); } else { - if (RUNNER_LAUNCHED) await unregisterRunner(); + await unregisterRunner(); + await retryWorkflows(); - console.log( - `\tDestroy scheduled: ${RUNNER_DESTROY_DELAY} seconds remaining.` - ); - await sleep(RUNNER_DESTROY_DELAY); - - if (DOCKER_MACHINE) await shutdownDockerMachine(); - if (tfResource) await shutdownTf(); + await destroyDockerMachine(); + await destroyTerraform(); } process.exit(error ? 1 : 0); @@ -214,7 +214,7 @@ const runCloud = async (opts) => { const runLocal = async (opts) => { console.log(`Launching ${cml.driver} runner`); - const { workdir, name, labels, single, idleTimeout } = opts; + const { workdir, name, labels, single, idleTimeout, noRetry } = opts; const proc = await cml.startRunner({ workdir, @@ -224,17 +224,30 @@ const runLocal = async (opts) => { idleTimeout }); - const dataHandler = (data) => { - const log = cml.parseRunnerLog({ data }); + const dataHandler = async (data) => { + const log = await cml.parseRunnerLog({ data }); log && console.log(JSON.stringify(log)); if (log && log.status === 'job_started') { - RUNNER_JOBS_RUNNING.push(1); + RUNNER_JOBS_RUNNING.push({ id: log.job, date: log.date }); RUNNER_TIMEOUT_TIMER = 0; } else if (log && log.status === 'job_ended') { - RUNNER_JOBS_RUNNING.pop(); + const { job } = log; + + if (!RUNNER_SHUTTING_DOWN) { + const jobs = job + ? [job] + : (await cml.pipelineJobs({ jobs: RUNNER_JOBS_RUNNING })) + .filter((job) => job.status === 'completed') + .map((job) => job.id); + + RUNNER_JOBS_RUNNING = RUNNER_JOBS_RUNNING.filter( + (job) => !jobs.includes(job.id) + ); + } } }; + proc.stderr.on('data', dataHandler); proc.stdout.on('data', dataHandler); proc.on('uncaughtException', () => shutdown(opts)); @@ -252,7 +265,19 @@ const runLocal = async (opts) => { }, 1000); } - RUNNER_LAUNCHED = true; + if (!noRetry && cml.driver === 'github') { + const watcher = setInterval(() => { + RUNNER_JOBS_RUNNING.forEach((job) => { + if ( + new Date().getTime() - new Date(job.date).getTime() > + GH_5_MIN_TIMEOUT + ) + shutdown(opts) && clearInterval(watcher); + }); + }, 60 * 1000); + } + + RUNNER = proc; }; const run = async (opts) => { @@ -337,9 +362,15 @@ const opts = yargs 'idle-timeout', 'Time in seconds for the runner to be waiting for jobs before shutting down. Setting it to 0 disables automatic shutdown' ) - .default('name', RUNNER_NAME) - .describe('name', 'Name displayed in the repository once registered') - + .default('name') + .describe('name', 'Name displayed in the repository once registered cml-{ID}') + .coerce('name', (val) => val || RUNNER_NAME) + .boolean('no-retry') + .default('no-retry', RUNNER_NO_RETRY) + .describe( + 'no-retry', + 'Do not restart workflow terminated due to instance disposal or GitHub Actions timeout' + ) .boolean('single') .default('single', RUNNER_SINGLE) .describe('single', 'Exit after running a single job') diff --git a/src/cml.js b/src/cml.js index a1186e4d1..b3ba264d0 100644 --- a/src/cml.js +++ b/src/cml.js @@ -147,59 +147,57 @@ class CML { return await getDriver(this).runnerToken(); } - parseRunnerLog(opts = {}) { + async parseRunnerLog(opts = {}) { let { data } = opts; if (!data) return; + const date = new Date(); + try { data = data.toString('utf8'); let log = { level: 'info', - time: new Date().toISOString(), + date: date.toISOString(), repo: this.repo }; if (this.driver === GITHUB) { if (data.includes('Running job')) { - log.job = ''; + const { id } = await getDriver(this).job({ time: date.getTime() }); + log.job = id; log.status = 'job_started'; - return log; } else if ( data.includes('Job') && data.includes('completed with result') ) { log.job = ''; log.status = 'job_ended'; - log.success = data.endsWith('Succeeded'); + log.success = data.includes('Succeeded'); log.level = log.success ? 'info' : 'error'; - return log; } else if (data.includes('Listening for Jobs')) { log.status = 'ready'; - return log; } + return log; } if (this.driver === GITLAB) { const { msg, job } = JSON.parse(data); + log = { ...log, job }; if (msg.endsWith('received')) { - log = { ...log, job }; log.status = 'job_started'; - return log; } else if ( msg.startsWith('Job failed') || msg.startsWith('Job succeeded') ) { - log = { ...log, job }; log.status = 'job_ended'; log.success = !msg.startsWith('Job failed'); log.level = log.success ? 'info' : 'error'; - return log; } else if (msg.includes('Starting runner for')) { log.status = 'ready'; - return log; } + return log; } } catch (err) { console.log(`Failed parsing log: ${err.message}`); @@ -337,6 +335,14 @@ Automated commits for ${this.repo}/commit/${sha} created by CML. return renderPr(url); } + async pipelineRestart(opts) { + return await getDriver(this).pipelineRestart(opts); + } + + async pipelineJobs(opts) { + return await getDriver(this).pipelineJobs(opts); + } + logError(e) { console.error(e.message); } diff --git a/src/drivers/bitbucket_cloud.js b/src/drivers/bitbucket_cloud.js index c614f1269..cbb8b139d 100644 --- a/src/drivers/bitbucket_cloud.js +++ b/src/drivers/bitbucket_cloud.js @@ -168,6 +168,10 @@ class BitbucketCloud { }); } + async pipelineRestart(opts = {}) { + throw new Error('BitBucket Cloud does not support workflowRestart!'); + } + async request(opts = {}) { const { token, api } = this; const { url, endpoint, method = 'GET', body } = opts; diff --git a/src/drivers/github.js b/src/drivers/github.js index c7aae4d7f..b14f4e90a 100644 --- a/src/drivers/github.js +++ b/src/drivers/github.js @@ -192,7 +192,7 @@ class Github { await fs.unlink(runnerCfg); } catch (e) { const arch = process.platform === 'darwin' ? 'osx-x64' : 'linux-x64'; - const ver = '2.274.2'; + const ver = '2.278.0'; const destination = resolve(workdir, 'actions-runner.tar.gz'); const url = `https://github.com/actions/runner/releases/download/v${ver}/actions-runner-${arch}-${ver}.tar.gz`; await download({ url, path: destination }); @@ -289,6 +289,112 @@ class Github { }); } + async pipelineJobs(opts = {}) { + const { jobs: runnerJobs } = opts; + const { owner, repo } = ownerRepo({ uri: this.repo }); + const { actions } = octokit(this.token, this.repo); + + const jobs = await Promise.all( + runnerJobs.map(async (job) => { + const { data } = await actions.getJobForWorkflowRun({ + owner, + repo, + job_id: job.id + }); + + return data; + }) + ); + + return jobs.map((job) => { + const { id, started_at: date, run_id: runId, status } = job; + return { id, date, runId, status }; + }); + } + + async job(opts = {}) { + const { time, status = 'queued' } = opts; + const { owner, repo } = ownerRepo({ uri: this.repo }); + const { actions } = octokit(this.token, this.repo); + + const { + data: { workflow_runs: workflowRuns } + } = await actions.listWorkflowRunsForRepo({ + owner, + repo, + status + }); + + let runJobs = await Promise.all( + workflowRuns.map(async (run) => { + const { + data: { jobs } + } = await actions.listJobsForWorkflowRun({ + owner, + repo, + run_id: run.id, + status + }); + + return jobs; + }) + ); + + runJobs = [].concat.apply([], runJobs).map((job) => { + const { id, started_at: date, run_id: runId } = job; + return { id, date, runId }; + }); + + const job = runJobs.reduce((prev, curr) => { + const diffTime = (job) => Math.abs(new Date(job.date).getTime() - time); + return diffTime(curr) < diffTime(prev) ? curr : prev; + }); + + return job; + } + + async pipelineRestart(opts = {}) { + const { jobId } = opts; + const { owner, repo } = ownerRepo({ uri: this.repo }); + const { actions } = octokit(this.token, this.repo); + + const { + data: { run_id: runId } + } = await actions.getJobForWorkflowRun({ + owner, + repo, + job_id: jobId + }); + + try { + await actions.cancelWorkflowRun({ + owner, + repo, + run_id: runId + }); + } catch (err) { + // HANDLES: Cannot cancel a workflow run that is completed. + } + + const { + data: { status } + } = await actions.getWorkflowRun({ + owner, + repo, + run_id: runId + }); + + if (status !== 'queued') { + try { + await actions.reRunWorkflow({ + owner, + repo, + run_id: runId + }); + } catch (err) {} + } + } + get sha() { if (GITHUB_EVENT_NAME === 'pull_request') return github.context.payload.pull_request.head.sha; diff --git a/src/drivers/gitlab.js b/src/drivers/gitlab.js index fd0f514e2..7e641addd 100644 --- a/src/drivers/gitlab.js +++ b/src/drivers/gitlab.js @@ -225,6 +225,30 @@ class Gitlab { }); } + async pipelineRestart(opts = {}) { + const projectPath = await this.projectPath(); + const { jobId } = opts; + + const { + pipeline: { id } + } = await this.request({ + endpoint: `/projects/${projectPath}/jobs/${jobId}` + }); + + let status; + while (!status || status === 'running') + ({ status } = await this.request({ + endpoint: `/projects/${projectPath}/pipelines/${id}/cancel`, + method: 'POST' + })); + + while (status !== 'running') + ({ status } = await this.request({ + endpoint: `/projects/${projectPath}/pipelines/${id}/retry`, + method: 'POST' + })); + } + async request(opts = {}) { const { token } = this; const { endpoint, method = 'GET', body, raw } = opts;