Skip to content

Commit

Permalink
Try downloading shards first (#22)
Browse files Browse the repository at this point in the history
* Try downloading shards first

* Refactor
  • Loading branch information
int128 authored Oct 9, 2024
1 parent cc66afc commit ff7280f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
32 changes: 20 additions & 12 deletions src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import * as path from 'path'
import { getOctokit } from './github'
import { downloadLastTestReports } from './artifact'
import { findTestCasesFromTestReportFiles, groupTestCasesByTestFile } from './junitxml'
import { generateShards, writeShardsWithLeaderElection } from './shard'
import { tryDownloadShardsIfAlreadyExists, generateShards, writeShardsWithLock } from './shard'
import { writeSummary } from './summary'

type Inputs = {
Expand All @@ -33,6 +33,13 @@ export const run = async (inputs: Inputs): Promise<Outputs> => {

const octokit = getOctokit(inputs.token)
const tempDirectory = await fs.mkdtemp(`${process.env.RUNNER_TEMP || os.tmpdir()}/parallel-test-action-`)
const shardsDirectory = path.join(tempDirectory, 'shards')

// Since multiple jobs run in parallel, another job may have already uploaded the shards.
if (await tryDownloadShardsIfAlreadyExists(shardsDirectory, inputs.shardsArtifactName)) {
await showListofShardFiles(shardsDirectory)
return { shardsDirectory }
}

const testReportDirectory = path.join(tempDirectory, 'test-reports')
const testReportSet = await downloadLastTestReports(octokit, {
Expand All @@ -51,23 +58,24 @@ export const run = async (inputs: Inputs): Promise<Outputs> => {
const shardSet = generateShards(workingTestFilenames, testFiles, inputs.shardCount)
core.info(`Generated ${shardSet.shards.length} shards`)

const shardsDirectory = path.join(tempDirectory, 'shards')
const leaderElection = await writeShardsWithLeaderElection(
shardSet.shards,
shardsDirectory,
inputs.shardsArtifactName,
)
if (leaderElection.leader) {
const shardsLock = await writeShardsWithLock(shardSet.shards, shardsDirectory, inputs.shardsArtifactName)
if (shardsLock.currentJobAcquiredLock) {
writeSummary(shardSet, testReportSet)
}

core.info(`Available ${leaderElection.shardFilenames.length} shard files:`)
for (const shardFilename of leaderElection.shardFilenames) {
core.info(`- ${shardFilename}`)
}
await showListofShardFiles(shardsDirectory)
return { shardsDirectory }
}

const showListofShardFiles = async (shardsDirectory: string) => {
const globber = await glob.create(path.join(shardsDirectory, '*'))
const files = await globber.glob()
core.info(`Available ${files.length} shard files:`)
for (const f of files) {
core.info(`- ${f}`)
}
}

const globRelative = async (pattern: string) => {
const globber = await glob.create(pattern)
const files = await globber.glob()
Expand Down
51 changes: 30 additions & 21 deletions src/shard.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as core from '@actions/core'
import * as fs from 'fs/promises'
import * as glob from '@actions/glob'
import * as path from 'path'
import { ArtifactNotFoundError, DefaultArtifactClient } from '@actions/artifact'

Expand Down Expand Up @@ -93,46 +92,56 @@ const averageOf = (a: number[]) => {

const sortByTime = <E extends { totalTime: number }>(shards: E[]) => shards.sort((a, b) => a.totalTime - b.totalTime)

type LeaderElection = {
shardFilenames: string[]
leader: boolean
export const tryDownloadShardsIfAlreadyExists = async (shardsDirectory: string, shardsArtifactName: string) => {
const artifactClient = new DefaultArtifactClient()
let existingArtifact
try {
existingArtifact = await artifactClient.getArtifact(shardsArtifactName)
} catch (e) {
if (e instanceof ArtifactNotFoundError) {
return false
}
throw e
}
core.info(`Another job has already uploaded the shards`)
await core.group(`Downloading the artifact: ${shardsArtifactName}`, () =>
artifactClient.downloadArtifact(existingArtifact.artifact.id, { path: shardsDirectory }),
)
return true
}

type Lock = {
currentJobAcquiredLock: boolean
}

export const writeShardsWithLeaderElection = async (
export const writeShardsWithLock = async (
shards: Shard[],
shardsDirectory: string,
shardsArtifactName: string,
): Promise<LeaderElection> => {
): Promise<Lock> => {
const artifactClient = new DefaultArtifactClient()

core.info(`Acquiring the leadership of shards`)
core.info(`Acquiring a lock of shards artifact`)
const shardFilenames = await writeShards(shards, shardsDirectory)
const uploadArtifactError = await core.group(`Uploading the artifact: ${shardsArtifactName}`, () =>
const conflictError = await core.group(`Uploading the artifact: ${shardsArtifactName}`, () =>
catchHttp409ConflictError(async () => {
await artifactClient.uploadArtifact(shardsArtifactName, shardFilenames, shardsDirectory)
}),
)
if (!uploadArtifactError) {
core.info(`This job becomes the leader`)
return {
shardFilenames,
leader: true,
}
if (!conflictError) {
core.info(`This job successfully uploaded the shards. Others will download the shards.`)
return { currentJobAcquiredLock: true }
}

core.info(`Another job has the leadership: ${uploadArtifactError}`)
core.info(`Finding the shards of the leader`)
core.info(`Another job already uploaded the shards: ${conflictError}`)
core.info(`This job downloads the existing shards`)
// For eventual consistency, GetArtifact may return ArtifactNotFoundError just after UploadArtifact.
const existingArtifact = await retryArtifactNotFoundError(() => artifactClient.getArtifact(shardsArtifactName))
await fs.rm(shardsDirectory, { recursive: true })
await core.group(`Downloading the artifact: ${shardsArtifactName}`, () =>
artifactClient.downloadArtifact(existingArtifact.artifact.id, { path: shardsDirectory }),
)
const shardGlobber = await glob.create(path.join(shardsDirectory, '*'))
return {
shardFilenames: await shardGlobber.glob(),
leader: false,
}
return { currentJobAcquiredLock: false }
}

const catchHttp409ConflictError = async (f: () => Promise<void>): Promise<undefined | Error> => {
Expand Down

0 comments on commit ff7280f

Please sign in to comment.