Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(migration): improve progress as migration is running #5550

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions dev/test-studio/migrations/rename-location-to-address/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import {defineMigration} from '@sanity/migrate'
import {at, patch, set} from '@sanity/migrate/mutations'
import {at, patch, set, unset} from '@sanity/migrate/mutations'

const addresses = [
{city: 'Oslo', country: 'Norway'},
{city: 'Stockholm', country: 'Sweden'},
{city: 'Copenhagen', country: 'Denmark'},
{city: 'Helsinki', country: 'Finland'},
{city: 'Reykjavik', country: 'Iceland'},
{city: 'Torshavn', country: 'Faroe Islands'},
]

export default defineMigration({
name: 'Rename Location to Address',
documentType: 'author',
migrate: {
document(doc) {
return patch(doc._id, [at('address', set('Somehwere'))])
return patch(doc._id, [
at('address', set(addresses[Math.floor(Math.random() * addresses.length)])),
at('location', unset()),
])
},
},
})
8 changes: 3 additions & 5 deletions packages/@sanity/migrate/src/destinations/commitMutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import {parseJSON} from '../it-utils/json'
import {decodeText} from '../it-utils/decodeText'
import {concatStr} from '../it-utils/concatStr'
import {lastValueFrom} from '../it-utils/lastValueFrom'
import {mapAsync} from '../it-utils/mapAsync'

export async function commitMutations(
fetchOptions: AsyncIterableIterator<FetchOptions>,
options: {concurrency: number},
) {
// todo: convert to top level import when we can
const {pMapIterable} = await import('p-map')

return pMapIterable(
return mapAsync(
fetchOptions,
async (opts): Promise<MultipleMutationResult> =>
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))),
{concurrency: options.concurrency},
options.concurrency,
)
}
12 changes: 12 additions & 0 deletions packages/@sanity/migrate/src/it-utils/mapAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export async function mapAsync<T, U>(
it: AsyncIterableIterator<T>,
project: (value: T) => Promise<U>,
concurrency: number,
): Promise<AsyncIterable<U>> {
// todo: convert to top level import when we can
const {pMapIterable} = await import('p-map')

return pMapIterable(it, (v) => project(v), {
concurrency: concurrency,
})
}
6 changes: 6 additions & 0 deletions packages/@sanity/migrate/src/it-utils/tap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export async function* tap<T>(it: AsyncIterableIterator<T>, interceptor: (value: T) => void) {
for await (const chunk of it) {
interceptor(chunk)
yield chunk
}
}
87 changes: 66 additions & 21 deletions packages/@sanity/migrate/src/runner/run.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
import {SanityDocument} from '@sanity/types'
import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client'
import {Mutation} from '@bjoerge/mutiny'
import arrify from 'arrify'
import {APIConfig, Migration} from '../types'
import {ndjson} from '../it-utils/ndjson'
import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint'
import {endpoints} from '../fetch-utils/endpoints'
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions'
import {commitMutations} from '../destinations/commitMutations'
import {collectMigrationMutations} from './collectMigrationMutations'
import {batchMutations} from './utils/batchMutations'
import {tap} from '../it-utils/tap'
import {mapAsync} from '../it-utils/mapAsync'
import {lastValueFrom} from '../it-utils/lastValueFrom'
import {decodeText, parseJSON} from '../it-utils'
import {concatStr} from '../it-utils/concatStr'
import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream'
import {toSanityMutations} from './utils/toSanityMutations'
import {
DEFAULT_MUTATION_CONCURRENCY,
MAX_MUTATION_CONCURRENCY,
MUTATION_ENDPOINT_MAX_BODY_SIZE,
} from './constants'
import {toSanityMutations} from './utils/toSanityMutations'
import {batchMutations} from './utils/batchMutations'
import {collectMigrationMutations} from './collectMigrationMutations'

type MigrationProgress = {
documents: number
mutations: number
pending: number
queuedBatches: number
currentMutations: Mutation[]
completedTransactions: MultipleMutationResult[]
done?: boolean
}
interface MigrationRunnerOptions {
api: APIConfig
concurrency?: number
onProgress?: (event: MigrationProgress) => void
}

export async function* toFetchOptionsIterable(
Expand All @@ -35,34 +52,62 @@ export async function* toFetchOptionsIterable(
})
}
}
export async function* run(config: MigrationRunnerOptions, migration: Migration) {
const mutations = collectMigrationMutations(
migration,
ndjson<SanityDocument>(await fromExportEndpoint(config.api), {
parse: safeJsonParser,
}),
export async function run(config: MigrationRunnerOptions, migration: Migration) {
const stats: MigrationProgress = {
documents: 0,
mutations: 0,
pending: 0,
queuedBatches: 0,
completedTransactions: [],
currentMutations: [],
}
const documents = tap(
ndjson<SanityDocument>(await fromExportEndpoint(config.api), {parse: safeJsonParser}),
() => {
config.onProgress?.({...stats, documents: ++stats.documents})
},
)

const mutations = tap(collectMigrationMutations(migration, documents), (muts) => {
stats.currentMutations = arrify(muts)
config.onProgress?.({
...stats,
mutations: ++stats.mutations,
})
})

const concurrency = config?.concurrency ?? DEFAULT_MUTATION_CONCURRENCY

if (concurrency > MAX_MUTATION_CONCURRENCY) {
throw new Error(`Concurrency exceeds maximum allowed value (${MAX_MUTATION_CONCURRENCY})`)
}

const batches = batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE)
const batches = tap(
batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE),
() => {
config.onProgress?.({...stats, queuedBatches: ++stats.queuedBatches})
},
)

const commits = await commitMutations(toFetchOptionsIterable(config.api, batches), {concurrency})
const submit = async (opts: FetchOptions): Promise<MultipleMutationResult> =>
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts)))))

const commits = await mapAsync(
toFetchOptionsIterable(config.api, batches),
(opts) => {
config.onProgress?.({...stats, pending: ++stats.pending})
return submit(opts)
},
concurrency,
)
for await (const result of commits) {
yield formatMutationResponse(result)
stats.completedTransactions.push(result)
config.onProgress?.({
...stats,
})
}
}

function formatMutationResponse(mutationResponse: MultipleMutationResult) {
return `OK (transactionId = ${mutationResponse.transactionId})
${mutationResponse.results
.map((result) => {
return ` - ${result.operation}: ${result.id}`
config.onProgress?.({
...stats,
done: true,
})
.join('\n')}`
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import {
run,
} from '@sanity/migrate'
import {SanityDocument} from '@sanity/types'
import {format} from './mutationFormatter'
import {Mutation} from '@bjoerge/mutiny'
import {format, formatMutation} from './utils/mutationFormatter'

const helpText = `
Options
Expand Down Expand Up @@ -144,13 +145,46 @@ const createMigrationCommand: CliCommandDefinition<CreateFlags> = {
apiVersion: 'v2024-01-09',
} as const

const progress = dry
const spinner = output.spinner('Running migration…').start()
await (dry
? dryRun({api: apiConfig}, migration, context)
: run({api: apiConfig, concurrency}, migration)

for await (const result of progress) {
output.print(result)
}
: run(
{
api: apiConfig,
concurrency,
onProgress(progress) {
if (progress.done) {
spinner.text = `Migration "${migrationName}" completed.

Project id: ${chalk.bold(projectId)}
Dataset: ${chalk.bold(dataset)}

${progress.documents} documents processed.
${progress.mutations} mutations generated.
${chalk.green(progress.completedTransactions.length)} transactions committed.`
spinner.stopAndPersist({symbol: chalk.green('✔')})
return
}

;['', ...progress.currentMutations].forEach((mutation) => {
spinner.text = `Running migration "${migrationName}"…

Project id: ${chalk.bold(projectId)}
Dataset: ${chalk.bold(dataset)}
Document type: ${chalk.bold(migration.documentType)}

${progress.documents} documents processed…
${progress.mutations} mutations generated…
${chalk.blue(progress.pending)} requests pending…
${chalk.green(progress.completedTransactions.length)} transactions committed.

${mutation && !progress.done ? `» ${chalk.grey(formatMutation(chalk, mutation as Mutation))}` : ''}`
})
},
},
migration,
))
spinner.stop()
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import {Mutation, Index, KeyedPathElement, NodePatch} from '@bjoerge/mutiny'
import {stringify} from '@bjoerge/mutiny/path'

import {SanityDocument} from '@sanity/types'

import {Chalk} from 'chalk'

export type ItemRef = string | number

export function format<Doc extends SanityDocument>(chalk: Chalk, mutations: Mutation[]): string {
return mutations.flatMap((m) => encodeMutation<Doc>(chalk, m)).join('\n')
export function format(chalk: Chalk, mutations: Mutation[]): string {
return mutations.flatMap((m) => formatMutation(chalk, m)).join('\n')
}

function encodeItemRef(ref: Index | KeyedPathElement): ItemRef {
return typeof ref === 'number' ? ref : ref._key
}

function encodeMutation<Doc extends SanityDocument>(chalk: Chalk, mutation: Mutation): string {
export function formatMutation(chalk: Chalk, mutation: Mutation): string {
if (
mutation.type === 'create' ||
mutation.type === 'createIfNotExists' ||
Expand Down
Loading