From ff7280f23cba0eb8f510ea4a41c41212a8ad2d98 Mon Sep 17 00:00:00 2001 From: Hidetake Iwata Date: Wed, 9 Oct 2024 11:10:15 +0900 Subject: [PATCH] Try downloading shards first (#22) * Try downloading shards first * Refactor --- src/run.ts | 32 ++++++++++++++++++++------------ src/shard.ts | 51 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/src/run.ts b/src/run.ts index 2617a21..5419ffe 100644 --- a/src/run.ts +++ b/src/run.ts @@ -6,7 +6,7 @@ import * as path from 'path' import { getOctokit } from './github' import { downloadLastTestReports } from './artifact' import { findTestCasesFromTestReportFiles, groupTestCasesByTestFile } from './junitxml' -import { generateShards, writeShardsWithLeaderElection } from './shard' +import { tryDownloadShardsIfAlreadyExists, generateShards, writeShardsWithLock } from './shard' import { writeSummary } from './summary' type Inputs = { @@ -33,6 +33,13 @@ export const run = async (inputs: Inputs): Promise => { const octokit = getOctokit(inputs.token) const tempDirectory = await fs.mkdtemp(`${process.env.RUNNER_TEMP || os.tmpdir()}/parallel-test-action-`) + const shardsDirectory = path.join(tempDirectory, 'shards') + + // Since multiple jobs run in parallel, another job may have already uploaded the shards. + if (await tryDownloadShardsIfAlreadyExists(shardsDirectory, inputs.shardsArtifactName)) { + await showListofShardFiles(shardsDirectory) + return { shardsDirectory } + } const testReportDirectory = path.join(tempDirectory, 'test-reports') const testReportSet = await downloadLastTestReports(octokit, { @@ -51,23 +58,24 @@ export const run = async (inputs: Inputs): Promise => { const shardSet = generateShards(workingTestFilenames, testFiles, inputs.shardCount) core.info(`Generated ${shardSet.shards.length} shards`) - const shardsDirectory = path.join(tempDirectory, 'shards') - const leaderElection = await writeShardsWithLeaderElection( - shardSet.shards, - shardsDirectory, - inputs.shardsArtifactName, - ) - if (leaderElection.leader) { + const shardsLock = await writeShardsWithLock(shardSet.shards, shardsDirectory, inputs.shardsArtifactName) + if (shardsLock.currentJobAcquiredLock) { writeSummary(shardSet, testReportSet) } - core.info(`Available ${leaderElection.shardFilenames.length} shard files:`) - for (const shardFilename of leaderElection.shardFilenames) { - core.info(`- ${shardFilename}`) - } + await showListofShardFiles(shardsDirectory) return { shardsDirectory } } +const showListofShardFiles = async (shardsDirectory: string) => { + const globber = await glob.create(path.join(shardsDirectory, '*')) + const files = await globber.glob() + core.info(`Available ${files.length} shard files:`) + for (const f of files) { + core.info(`- ${f}`) + } +} + const globRelative = async (pattern: string) => { const globber = await glob.create(pattern) const files = await globber.glob() diff --git a/src/shard.ts b/src/shard.ts index 9536354..1082bdd 100644 --- a/src/shard.ts +++ b/src/shard.ts @@ -1,6 +1,5 @@ import * as core from '@actions/core' import * as fs from 'fs/promises' -import * as glob from '@actions/glob' import * as path from 'path' import { ArtifactNotFoundError, DefaultArtifactClient } from '@actions/artifact' @@ -93,46 +92,56 @@ const averageOf = (a: number[]) => { const sortByTime = (shards: E[]) => shards.sort((a, b) => a.totalTime - b.totalTime) -type LeaderElection = { - shardFilenames: string[] - leader: boolean +export const tryDownloadShardsIfAlreadyExists = async (shardsDirectory: string, shardsArtifactName: string) => { + const artifactClient = new DefaultArtifactClient() + let existingArtifact + try { + existingArtifact = await artifactClient.getArtifact(shardsArtifactName) + } catch (e) { + if (e instanceof ArtifactNotFoundError) { + return false + } + throw e + } + core.info(`Another job has already uploaded the shards`) + await core.group(`Downloading the artifact: ${shardsArtifactName}`, () => + artifactClient.downloadArtifact(existingArtifact.artifact.id, { path: shardsDirectory }), + ) + return true +} + +type Lock = { + currentJobAcquiredLock: boolean } -export const writeShardsWithLeaderElection = async ( +export const writeShardsWithLock = async ( shards: Shard[], shardsDirectory: string, shardsArtifactName: string, -): Promise => { +): Promise => { const artifactClient = new DefaultArtifactClient() - core.info(`Acquiring the leadership of shards`) + core.info(`Acquiring a lock of shards artifact`) const shardFilenames = await writeShards(shards, shardsDirectory) - const uploadArtifactError = await core.group(`Uploading the artifact: ${shardsArtifactName}`, () => + const conflictError = await core.group(`Uploading the artifact: ${shardsArtifactName}`, () => catchHttp409ConflictError(async () => { await artifactClient.uploadArtifact(shardsArtifactName, shardFilenames, shardsDirectory) }), ) - if (!uploadArtifactError) { - core.info(`This job becomes the leader`) - return { - shardFilenames, - leader: true, - } + if (!conflictError) { + core.info(`This job successfully uploaded the shards. Others will download the shards.`) + return { currentJobAcquiredLock: true } } - core.info(`Another job has the leadership: ${uploadArtifactError}`) - core.info(`Finding the shards of the leader`) + core.info(`Another job already uploaded the shards: ${conflictError}`) + core.info(`This job downloads the existing shards`) // For eventual consistency, GetArtifact may return ArtifactNotFoundError just after UploadArtifact. const existingArtifact = await retryArtifactNotFoundError(() => artifactClient.getArtifact(shardsArtifactName)) await fs.rm(shardsDirectory, { recursive: true }) await core.group(`Downloading the artifact: ${shardsArtifactName}`, () => artifactClient.downloadArtifact(existingArtifact.artifact.id, { path: shardsDirectory }), ) - const shardGlobber = await glob.create(path.join(shardsDirectory, '*')) - return { - shardFilenames: await shardGlobber.glob(), - leader: false, - } + return { currentJobAcquiredLock: false } } const catchHttp409ConflictError = async (f: () => Promise): Promise => {