From cb498269ec57147162eaa1fbc489d976d1c35d03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rge=20N=C3=A6ss?= Date: Wed, 24 Jan 2024 19:09:27 +0100 Subject: [PATCH] feat(migration): improve progress as migration is running (#5550) --- .../rename-location-to-address/index.ts | 16 +++- .../src/destinations/commitMutations.ts | 8 +- .../@sanity/migrate/src/it-utils/mapAsync.ts | 12 +++ packages/@sanity/migrate/src/it-utils/tap.ts | 6 ++ packages/@sanity/migrate/src/runner/run.ts | 87 ++++++++++++++----- .../commands/migration/runMigrationCommand.ts | 48 ++++++++-- .../{ => utils}/mutationFormatter.ts | 7 +- 7 files changed, 146 insertions(+), 38 deletions(-) create mode 100644 packages/@sanity/migrate/src/it-utils/mapAsync.ts create mode 100644 packages/@sanity/migrate/src/it-utils/tap.ts rename packages/sanity/src/_internal/cli/commands/migration/{ => utils}/mutationFormatter.ts (90%) diff --git a/dev/test-studio/migrations/rename-location-to-address/index.ts b/dev/test-studio/migrations/rename-location-to-address/index.ts index cea0b9637495..89270cda2c91 100644 --- a/dev/test-studio/migrations/rename-location-to-address/index.ts +++ b/dev/test-studio/migrations/rename-location-to-address/index.ts @@ -1,12 +1,24 @@ import {defineMigration} from '@sanity/migrate' -import {at, patch, set} from '@sanity/migrate/mutations' +import {at, patch, set, unset} from '@sanity/migrate/mutations' + +const addresses = [ + {city: 'Oslo', country: 'Norway'}, + {city: 'Stockholm', country: 'Sweden'}, + {city: 'Copenhagen', country: 'Denmark'}, + {city: 'Helsinki', country: 'Finland'}, + {city: 'Reykjavik', country: 'Iceland'}, + {city: 'Torshavn', country: 'Faroe Islands'}, +] export default defineMigration({ name: 'Rename Location to Address', documentType: 'author', migrate: { document(doc) { - return patch(doc._id, [at('address', set('Somehwere'))]) + return patch(doc._id, [ + at('address', set(addresses[Math.floor(Math.random() * addresses.length)])), + at('location', unset()), + ]) }, }, }) diff --git a/packages/@sanity/migrate/src/destinations/commitMutations.ts b/packages/@sanity/migrate/src/destinations/commitMutations.ts index 24f3966deccf..01e6c1fcb97f 100644 --- a/packages/@sanity/migrate/src/destinations/commitMutations.ts +++ b/packages/@sanity/migrate/src/destinations/commitMutations.ts @@ -4,18 +4,16 @@ import {parseJSON} from '../it-utils/json' import {decodeText} from '../it-utils/decodeText' import {concatStr} from '../it-utils/concatStr' import {lastValueFrom} from '../it-utils/lastValueFrom' +import {mapAsync} from '../it-utils/mapAsync' export async function commitMutations( fetchOptions: AsyncIterableIterator, options: {concurrency: number}, ) { - // todo: convert to top level import when we can - const {pMapIterable} = await import('p-map') - - return pMapIterable( + return mapAsync( fetchOptions, async (opts): Promise => lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))), - {concurrency: options.concurrency}, + options.concurrency, ) } diff --git a/packages/@sanity/migrate/src/it-utils/mapAsync.ts b/packages/@sanity/migrate/src/it-utils/mapAsync.ts new file mode 100644 index 000000000000..1cb1e3deef2f --- /dev/null +++ b/packages/@sanity/migrate/src/it-utils/mapAsync.ts @@ -0,0 +1,12 @@ +export async function mapAsync( + it: AsyncIterableIterator, + project: (value: T) => Promise, + concurrency: number, +): Promise> { + // todo: convert to top level import when we can + const {pMapIterable} = await import('p-map') + + return pMapIterable(it, (v) => project(v), { + concurrency: concurrency, + }) +} diff --git a/packages/@sanity/migrate/src/it-utils/tap.ts b/packages/@sanity/migrate/src/it-utils/tap.ts new file mode 100644 index 000000000000..5d682e307d4a --- /dev/null +++ b/packages/@sanity/migrate/src/it-utils/tap.ts @@ -0,0 +1,6 @@ +export async function* tap(it: AsyncIterableIterator, interceptor: (value: T) => void) { + for await (const chunk of it) { + interceptor(chunk) + yield chunk + } +} diff --git a/packages/@sanity/migrate/src/runner/run.ts b/packages/@sanity/migrate/src/runner/run.ts index 1ea743131b38..55623b7b79f7 100644 --- a/packages/@sanity/migrate/src/runner/run.ts +++ b/packages/@sanity/migrate/src/runner/run.ts @@ -1,23 +1,40 @@ import {SanityDocument} from '@sanity/types' import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client' +import {Mutation} from '@bjoerge/mutiny' +import arrify from 'arrify' import {APIConfig, Migration} from '../types' import {ndjson} from '../it-utils/ndjson' import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint' import {endpoints} from '../fetch-utils/endpoints' import {toFetchOptions} from '../fetch-utils/sanityRequestOptions' -import {commitMutations} from '../destinations/commitMutations' -import {collectMigrationMutations} from './collectMigrationMutations' -import {batchMutations} from './utils/batchMutations' +import {tap} from '../it-utils/tap' +import {mapAsync} from '../it-utils/mapAsync' +import {lastValueFrom} from '../it-utils/lastValueFrom' +import {decodeText, parseJSON} from '../it-utils' +import {concatStr} from '../it-utils/concatStr' +import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream' +import {toSanityMutations} from './utils/toSanityMutations' import { DEFAULT_MUTATION_CONCURRENCY, MAX_MUTATION_CONCURRENCY, MUTATION_ENDPOINT_MAX_BODY_SIZE, } from './constants' -import {toSanityMutations} from './utils/toSanityMutations' +import {batchMutations} from './utils/batchMutations' +import {collectMigrationMutations} from './collectMigrationMutations' +type MigrationProgress = { + documents: number + mutations: number + pending: number + queuedBatches: number + currentMutations: Mutation[] + completedTransactions: MultipleMutationResult[] + done?: boolean +} interface MigrationRunnerOptions { api: APIConfig concurrency?: number + onProgress?: (event: MigrationProgress) => void } export async function* toFetchOptionsIterable( @@ -35,34 +52,62 @@ export async function* toFetchOptionsIterable( }) } } -export async function* run(config: MigrationRunnerOptions, migration: Migration) { - const mutations = collectMigrationMutations( - migration, - ndjson(await fromExportEndpoint(config.api), { - parse: safeJsonParser, - }), +export async function run(config: MigrationRunnerOptions, migration: Migration) { + const stats: MigrationProgress = { + documents: 0, + mutations: 0, + pending: 0, + queuedBatches: 0, + completedTransactions: [], + currentMutations: [], + } + const documents = tap( + ndjson(await fromExportEndpoint(config.api), {parse: safeJsonParser}), + () => { + config.onProgress?.({...stats, documents: ++stats.documents}) + }, ) + const mutations = tap(collectMigrationMutations(migration, documents), (muts) => { + stats.currentMutations = arrify(muts) + config.onProgress?.({ + ...stats, + mutations: ++stats.mutations, + }) + }) + const concurrency = config?.concurrency ?? DEFAULT_MUTATION_CONCURRENCY if (concurrency > MAX_MUTATION_CONCURRENCY) { throw new Error(`Concurrency exceeds maximum allowed value (${MAX_MUTATION_CONCURRENCY})`) } - const batches = batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE) + const batches = tap( + batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), + () => { + config.onProgress?.({...stats, queuedBatches: ++stats.queuedBatches}) + }, + ) - const commits = await commitMutations(toFetchOptionsIterable(config.api, batches), {concurrency}) + const submit = async (opts: FetchOptions): Promise => + lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))) + const commits = await mapAsync( + toFetchOptionsIterable(config.api, batches), + (opts) => { + config.onProgress?.({...stats, pending: ++stats.pending}) + return submit(opts) + }, + concurrency, + ) for await (const result of commits) { - yield formatMutationResponse(result) + stats.completedTransactions.push(result) + config.onProgress?.({ + ...stats, + }) } -} - -function formatMutationResponse(mutationResponse: MultipleMutationResult) { - return `OK (transactionId = ${mutationResponse.transactionId}) -${mutationResponse.results - .map((result) => { - return ` - ${result.operation}: ${result.id}` + config.onProgress?.({ + ...stats, + done: true, }) - .join('\n')}` } diff --git a/packages/sanity/src/_internal/cli/commands/migration/runMigrationCommand.ts b/packages/sanity/src/_internal/cli/commands/migration/runMigrationCommand.ts index 7a8987187d7c..f63297026399 100644 --- a/packages/sanity/src/_internal/cli/commands/migration/runMigrationCommand.ts +++ b/packages/sanity/src/_internal/cli/commands/migration/runMigrationCommand.ts @@ -14,7 +14,8 @@ import { run, } from '@sanity/migrate' import {SanityDocument} from '@sanity/types' -import {format} from './mutationFormatter' +import {Mutation} from '@bjoerge/mutiny' +import {format, formatMutation} from './utils/mutationFormatter' const helpText = ` Options @@ -144,13 +145,46 @@ const createMigrationCommand: CliCommandDefinition = { apiVersion: 'v2024-01-09', } as const - const progress = dry + const spinner = output.spinner('Running migration…').start() + await (dry ? dryRun({api: apiConfig}, migration, context) - : run({api: apiConfig, concurrency}, migration) - - for await (const result of progress) { - output.print(result) - } + : run( + { + api: apiConfig, + concurrency, + onProgress(progress) { + if (progress.done) { + spinner.text = `Migration "${migrationName}" completed. + +Project id: ${chalk.bold(projectId)} +Dataset: ${chalk.bold(dataset)} + +${progress.documents} documents processed. +${progress.mutations} mutations generated. +${chalk.green(progress.completedTransactions.length)} transactions committed.` + spinner.stopAndPersist({symbol: chalk.green('✔')}) + return + } + + ;['', ...progress.currentMutations].forEach((mutation) => { + spinner.text = `Running migration "${migrationName}"… + +Project id: ${chalk.bold(projectId)} +Dataset: ${chalk.bold(dataset)} +Document type: ${chalk.bold(migration.documentType)} + +${progress.documents} documents processed… +${progress.mutations} mutations generated… +${chalk.blue(progress.pending)} requests pending… +${chalk.green(progress.completedTransactions.length)} transactions committed. + +${mutation && !progress.done ? `» ${chalk.grey(formatMutation(chalk, mutation as Mutation))}` : ''}` + }) + }, + }, + migration, + )) + spinner.stop() }, } diff --git a/packages/sanity/src/_internal/cli/commands/migration/mutationFormatter.ts b/packages/sanity/src/_internal/cli/commands/migration/utils/mutationFormatter.ts similarity index 90% rename from packages/sanity/src/_internal/cli/commands/migration/mutationFormatter.ts rename to packages/sanity/src/_internal/cli/commands/migration/utils/mutationFormatter.ts index 600c71fffc37..99808383febb 100644 --- a/packages/sanity/src/_internal/cli/commands/migration/mutationFormatter.ts +++ b/packages/sanity/src/_internal/cli/commands/migration/utils/mutationFormatter.ts @@ -4,19 +4,20 @@ import {Mutation, Index, KeyedPathElement, NodePatch} from '@bjoerge/mutiny' import {stringify} from '@bjoerge/mutiny/path' import {SanityDocument} from '@sanity/types' + import {Chalk} from 'chalk' export type ItemRef = string | number -export function format(chalk: Chalk, mutations: Mutation[]): string { - return mutations.flatMap((m) => encodeMutation(chalk, m)).join('\n') +export function format(chalk: Chalk, mutations: Mutation[]): string { + return mutations.flatMap((m) => formatMutation(chalk, m)).join('\n') } function encodeItemRef(ref: Index | KeyedPathElement): ItemRef { return typeof ref === 'number' ? ref : ref._key } -function encodeMutation(chalk: Chalk, mutation: Mutation): string { +export function formatMutation(chalk: Chalk, mutation: Mutation): string { if ( mutation.type === 'create' || mutation.type === 'createIfNotExists' ||