Skip to content

Commit

Permalink
feat(migration): improve progress as migration is running (#5550)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjoerge committed Jan 30, 2024
1 parent 2579159 commit 67aa5a9
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 38 deletions.
16 changes: 14 additions & 2 deletions dev/test-studio/migrations/rename-location-to-address/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import {defineMigration} from '@sanity/migrate'
import {at, patch, set} from '@sanity/migrate/mutations'
import {at, patch, set, unset} from '@sanity/migrate/mutations'

const addresses = [
{city: 'Oslo', country: 'Norway'},
{city: 'Stockholm', country: 'Sweden'},
{city: 'Copenhagen', country: 'Denmark'},
{city: 'Helsinki', country: 'Finland'},
{city: 'Reykjavik', country: 'Iceland'},
{city: 'Torshavn', country: 'Faroe Islands'},
]

export default defineMigration({
name: 'Rename Location to Address',
documentType: 'author',
migrate: {
document(doc) {
return patch(doc._id, [at('address', set('Somehwere'))])
return patch(doc._id, [
at('address', set(addresses[Math.floor(Math.random() * addresses.length)])),
at('location', unset()),
])
},
},
})
8 changes: 3 additions & 5 deletions packages/@sanity/migrate/src/destinations/commitMutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import {parseJSON} from '../it-utils/json'
import {decodeText} from '../it-utils/decodeText'
import {concatStr} from '../it-utils/concatStr'
import {lastValueFrom} from '../it-utils/lastValueFrom'
import {mapAsync} from '../it-utils/mapAsync'

export async function commitMutations(
fetchOptions: AsyncIterableIterator<FetchOptions>,
options: {concurrency: number},
) {
// todo: convert to top level import when we can
const {pMapIterable} = await import('p-map')

return pMapIterable(
return mapAsync(
fetchOptions,
async (opts): Promise<MultipleMutationResult> =>
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))),
{concurrency: options.concurrency},
options.concurrency,
)
}
12 changes: 12 additions & 0 deletions packages/@sanity/migrate/src/it-utils/mapAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export async function mapAsync<T, U>(
it: AsyncIterableIterator<T>,
project: (value: T) => Promise<U>,
concurrency: number,
): Promise<AsyncIterable<U>> {
// todo: convert to top level import when we can
const {pMapIterable} = await import('p-map')

return pMapIterable(it, (v) => project(v), {
concurrency: concurrency,
})
}
6 changes: 6 additions & 0 deletions packages/@sanity/migrate/src/it-utils/tap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export async function* tap<T>(it: AsyncIterableIterator<T>, interceptor: (value: T) => void) {
for await (const chunk of it) {
interceptor(chunk)
yield chunk
}
}
87 changes: 66 additions & 21 deletions packages/@sanity/migrate/src/runner/run.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
import {SanityDocument} from '@sanity/types'
import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client'
import {Mutation} from '@bjoerge/mutiny'
import arrify from 'arrify'
import {APIConfig, Migration} from '../types'
import {ndjson} from '../it-utils/ndjson'
import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint'
import {endpoints} from '../fetch-utils/endpoints'
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions'
import {commitMutations} from '../destinations/commitMutations'
import {collectMigrationMutations} from './collectMigrationMutations'
import {batchMutations} from './utils/batchMutations'
import {tap} from '../it-utils/tap'
import {mapAsync} from '../it-utils/mapAsync'
import {lastValueFrom} from '../it-utils/lastValueFrom'
import {decodeText, parseJSON} from '../it-utils'
import {concatStr} from '../it-utils/concatStr'
import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream'
import {toSanityMutations} from './utils/toSanityMutations'
import {
DEFAULT_MUTATION_CONCURRENCY,
MAX_MUTATION_CONCURRENCY,
MUTATION_ENDPOINT_MAX_BODY_SIZE,
} from './constants'
import {toSanityMutations} from './utils/toSanityMutations'
import {batchMutations} from './utils/batchMutations'
import {collectMigrationMutations} from './collectMigrationMutations'

type MigrationProgress = {
documents: number
mutations: number
pending: number
queuedBatches: number
currentMutations: Mutation[]
completedTransactions: MultipleMutationResult[]
done?: boolean
}
interface MigrationRunnerOptions {
api: APIConfig
concurrency?: number
onProgress?: (event: MigrationProgress) => void
}

export async function* toFetchOptionsIterable(
Expand All @@ -35,34 +52,62 @@ export async function* toFetchOptionsIterable(
})
}
}
export async function* run(config: MigrationRunnerOptions, migration: Migration) {
const mutations = collectMigrationMutations(
migration,
ndjson<SanityDocument>(await fromExportEndpoint(config.api), {
parse: safeJsonParser,
}),
export async function run(config: MigrationRunnerOptions, migration: Migration) {
const stats: MigrationProgress = {
documents: 0,
mutations: 0,
pending: 0,
queuedBatches: 0,
completedTransactions: [],
currentMutations: [],
}
const documents = tap(
ndjson<SanityDocument>(await fromExportEndpoint(config.api), {parse: safeJsonParser}),
() => {
config.onProgress?.({...stats, documents: ++stats.documents})
},
)

const mutations = tap(collectMigrationMutations(migration, documents), (muts) => {
stats.currentMutations = arrify(muts)
config.onProgress?.({
...stats,
mutations: ++stats.mutations,
})
})

const concurrency = config?.concurrency ?? DEFAULT_MUTATION_CONCURRENCY

if (concurrency > MAX_MUTATION_CONCURRENCY) {
throw new Error(`Concurrency exceeds maximum allowed value (${MAX_MUTATION_CONCURRENCY})`)
}

const batches = batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE)
const batches = tap(
batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE),
() => {
config.onProgress?.({...stats, queuedBatches: ++stats.queuedBatches})
},
)

const commits = await commitMutations(toFetchOptionsIterable(config.api, batches), {concurrency})
const submit = async (opts: FetchOptions): Promise<MultipleMutationResult> =>
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts)))))

const commits = await mapAsync(
toFetchOptionsIterable(config.api, batches),
(opts) => {
config.onProgress?.({...stats, pending: ++stats.pending})
return submit(opts)
},
concurrency,
)
for await (const result of commits) {
yield formatMutationResponse(result)
stats.completedTransactions.push(result)
config.onProgress?.({
...stats,
})
}
}

function formatMutationResponse(mutationResponse: MultipleMutationResult) {
return `OK (transactionId = ${mutationResponse.transactionId})
${mutationResponse.results
.map((result) => {
return ` - ${result.operation}: ${result.id}`
config.onProgress?.({
...stats,
done: true,
})
.join('\n')}`
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import {
run,
} from '@sanity/migrate'
import {SanityDocument} from '@sanity/types'
import {format} from './mutationFormatter'
import {Mutation} from '@bjoerge/mutiny'
import {format, formatMutation} from './utils/mutationFormatter'

const helpText = `
Options
Expand Down Expand Up @@ -144,13 +145,46 @@ const createMigrationCommand: CliCommandDefinition<CreateFlags> = {
apiVersion: 'v2024-01-09',
} as const

const progress = dry
const spinner = output.spinner('Running migration…').start()
await (dry
? dryRun({api: apiConfig}, migration, context)
: run({api: apiConfig, concurrency}, migration)

for await (const result of progress) {
output.print(result)
}
: run(
{
api: apiConfig,
concurrency,
onProgress(progress) {
if (progress.done) {
spinner.text = `Migration "${migrationName}" completed.
Project id: ${chalk.bold(projectId)}
Dataset: ${chalk.bold(dataset)}
${progress.documents} documents processed.
${progress.mutations} mutations generated.
${chalk.green(progress.completedTransactions.length)} transactions committed.`
spinner.stopAndPersist({symbol: chalk.green('✔')})
return
}

;['', ...progress.currentMutations].forEach((mutation) => {
spinner.text = `Running migration "${migrationName}"…
Project id: ${chalk.bold(projectId)}
Dataset: ${chalk.bold(dataset)}
Document type: ${chalk.bold(migration.documentType)}
${progress.documents} documents processed…
${progress.mutations} mutations generated…
${chalk.blue(progress.pending)} requests pending…
${chalk.green(progress.completedTransactions.length)} transactions committed.
${mutation && !progress.done ? ${chalk.grey(formatMutation(chalk, mutation as Mutation))}` : ''}`
})
},
},
migration,
))
spinner.stop()
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import {Mutation, Index, KeyedPathElement, NodePatch} from '@bjoerge/mutiny'
import {stringify} from '@bjoerge/mutiny/path'

import {SanityDocument} from '@sanity/types'

import {Chalk} from 'chalk'

export type ItemRef = string | number

export function format<Doc extends SanityDocument>(chalk: Chalk, mutations: Mutation[]): string {
return mutations.flatMap((m) => encodeMutation<Doc>(chalk, m)).join('\n')
export function format(chalk: Chalk, mutations: Mutation[]): string {
return mutations.flatMap((m) => formatMutation(chalk, m)).join('\n')
}

function encodeItemRef(ref: Index | KeyedPathElement): ItemRef {
return typeof ref === 'number' ? ref : ref._key
}

function encodeMutation<Doc extends SanityDocument>(chalk: Chalk, mutation: Mutation): string {
export function formatMutation(chalk: Chalk, mutation: Mutation): string {
if (
mutation.type === 'create' ||
mutation.type === 'createIfNotExists' ||
Expand Down

0 comments on commit 67aa5a9

Please sign in to comment.