Skip to content

Commit

Permalink
feat(migration): improve progress as migration is running
Browse files Browse the repository at this point in the history
  • Loading branch information
bjoerge committed Jan 23, 2024
1 parent b0db6d4 commit e459a28
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 37 deletions.
16 changes: 14 additions & 2 deletions dev/test-studio/migrations/rename-location-to-address/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import {defineMigration} from '@sanity/migrate'
import {at, patch, set} from '@sanity/migrate/mutations'
import {at, patch, set, unset} from '@sanity/migrate/mutations'

const addresses = [
{city: 'Oslo', country: 'Norway'},
{city: 'Stockholm', country: 'Sweden'},
{city: 'Copenhagen', country: 'Denmark'},
{city: 'Helsinki', country: 'Finland'},
{city: 'Reykjavik', country: 'Iceland'},
{city: 'Torshavn', country: 'Faroe Islands'},
]

export default defineMigration({
name: 'Rename Location to Address',
documentType: 'author',
migrate: {
document(doc) {
return patch(doc._id, [at('address', set('Somehwere'))])
return patch(doc._id, [
at('address', set(addresses[Math.floor(Math.random() * addresses.length)])),
at('location', unset()),
])
},
},
})
8 changes: 3 additions & 5 deletions packages/@sanity/migrate/src/destinations/commitMutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import {parseJSON} from '../it-utils/json'
import {decodeText} from '../it-utils/decodeText'
import {concatStr} from '../it-utils/concatStr'
import {lastValueFrom} from '../it-utils/lastValueFrom'
import {mapAsync} from '../it-utils/mapAsync'

export async function commitMutations(
fetchOptions: AsyncIterableIterator<FetchOptions>,
options: {concurrency: number},
) {
// todo: convert to top level import when we can
const {pMapIterable} = await import('p-map')

return pMapIterable(
return mapAsync(
fetchOptions,
async (opts): Promise<MultipleMutationResult> =>
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))),
{concurrency: options.concurrency},
options.concurrency,
)
}
12 changes: 12 additions & 0 deletions packages/@sanity/migrate/src/it-utils/mapAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export async function mapAsync<T, U>(
it: AsyncIterableIterator<T>,
project: (value: T) => Promise<U>,
concurrency: number,
): Promise<AsyncIterable<U>> {
// todo: convert to top level import when we can
const {pMapIterable} = await import('p-map')

return pMapIterable(it, (v) => project(v), {
concurrency: concurrency,
})
}
6 changes: 6 additions & 0 deletions packages/@sanity/migrate/src/it-utils/tap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export async function* tap<T>(it: AsyncIterableIterator<T>, interceptor: (value: T) => void) {
for await (const chunk of it) {
interceptor(chunk)
yield chunk
}
}
83 changes: 65 additions & 18 deletions packages/@sanity/migrate/src/runner/run.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
import {SanityDocument} from '@sanity/types'
import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client'
import {Mutation} from '@bjoerge/mutiny'
import arrify from 'arrify'
import {APIConfig, Migration} from '../types'
import {ndjson} from '../it-utils/ndjson'
import {fromExportEndpoint} from '../sources/fromExportEndpoint'
import {endpoints} from '../fetch-utils/endpoints'
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions'
import {commitMutations} from '../destinations/commitMutations'
import {collectMigrationMutations} from './collectMigrationMutations'
import {batchMutations} from './utils/batchMutations'
import {tap} from '../it-utils/tap'
import {mapAsync} from '../it-utils/mapAsync'
import {lastValueFrom} from '../it-utils/lastValueFrom'
import {decodeText, parseJSON} from '../it-utils'
import {concatStr} from '../it-utils/concatStr'
import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream'
import {toSanityMutations} from './utils/toSanityMutations'
import {
DEFAULT_MUTATION_CONCURRENCY,
MAX_MUTATION_CONCURRENCY,
MUTATION_ENDPOINT_MAX_BODY_SIZE,
} from './constants'
import {toSanityMutations} from './utils/toSanityMutations'
import {batchMutations} from './utils/batchMutations'
import {collectMigrationMutations} from './collectMigrationMutations'

type MigrationProgress = {
documents: number
mutations: number
pending: number
queuedBatches: number
currentMutations: Mutation[]
completedTransactions: MultipleMutationResult[]
done?: boolean
}
interface MigrationRunnerOptions {
api: APIConfig
concurrency?: number
onProgress?: (event: MigrationProgress) => void
}

export async function* toFetchOptionsIterable(
Expand All @@ -35,32 +52,62 @@ export async function* toFetchOptionsIterable(
})
}
}
export async function* run(config: MigrationRunnerOptions, migration: Migration) {
const mutations = collectMigrationMutations(
migration,
export async function run(config: MigrationRunnerOptions, migration: Migration) {
const stats: MigrationProgress = {
documents: 0,
mutations: 0,
pending: 0,
queuedBatches: 0,
completedTransactions: [],
currentMutations: [],
}
const documents = tap(
ndjson(await fromExportEndpoint(config.api)) as AsyncIterableIterator<SanityDocument>,
() => {
config.onProgress?.({...stats, documents: ++stats.documents})
},
)

const mutations = tap(collectMigrationMutations(migration, documents), (muts) => {
stats.currentMutations = arrify(muts)
config.onProgress?.({
...stats,
mutations: ++stats.mutations,
})
})

const concurrency = config?.concurrency ?? DEFAULT_MUTATION_CONCURRENCY

if (concurrency > MAX_MUTATION_CONCURRENCY) {
throw new Error(`Concurrency exceeds maximum allowed value (${MAX_MUTATION_CONCURRENCY})`)
}

const batches = batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE)
const batches = tap(
batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE),
() => {
config.onProgress?.({...stats, queuedBatches: ++stats.queuedBatches})
},
)

const commits = await commitMutations(toFetchOptionsIterable(config.api, batches), {concurrency})
const submit = async (opts: FetchOptions): Promise<MultipleMutationResult> =>
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts)))))

const commits = await mapAsync(
toFetchOptionsIterable(config.api, batches),
(opts) => {
config.onProgress?.({...stats, pending: ++stats.pending})
return submit(opts)
},
concurrency,
)
for await (const result of commits) {
yield formatMutationResponse(result)
stats.completedTransactions.push(result)
config.onProgress?.({
...stats,
})
}
}

function formatMutationResponse(mutationResponse: MultipleMutationResult) {
return `OK (transactionId = ${mutationResponse.transactionId})
${mutationResponse.results
.map((result) => {
return ` - ${result.operation}: ${result.id}`
config.onProgress?.({
...stats,
done: true,
})
.join('\n')}`
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
run,
} from '@sanity/migrate'
import {SanityDocument} from '@sanity/types'
import {format} from './mutationFormatter'
import {Mutation} from '@bjoerge/mutiny'
import {format, formatMutation} from './utils/mutationFormatter'

const helpText = `
Options
Expand Down Expand Up @@ -125,7 +126,6 @@ const createMigrationCommand: CliCommandDefinition<CreateFlags> = {
MAX_MUTATION_CONCURRENCY,
)
}
concurrency = MAX_MUTATION_CONCURRENCY
}

const {dataset, projectId, apiHost, token} = apiClient({
Expand All @@ -141,13 +141,46 @@ const createMigrationCommand: CliCommandDefinition<CreateFlags> = {
apiVersion: 'v2024-01-09',
} as const

const progress = dry
const spinner = output.spinner('Running migration…').start()
await (dry
? dryRun({api: apiConfig}, migration, context)
: run({api: apiConfig, concurrency}, migration)

for await (const result of progress) {
output.print(result)
}
: run(
{
api: apiConfig,
concurrency,
onProgress(progress) {
if (progress.done) {
spinner.text = `Migration "${migrationName}" completed.
Project id: ${chalk.bold(projectId)}
Dataset: ${chalk.bold(dataset)}
${progress.documents} documents processed.
${progress.mutations} mutations generated.
${chalk.green(progress.completedTransactions.length)} transactions committed.`
spinner.stopAndPersist({symbol: chalk.green('✔')})
return
}

;['', ...progress.currentMutations].forEach((mutation) => {
spinner.text = `Running migration "${migrationName}"…
Project id: ${chalk.bold(projectId)}
Dataset: ${chalk.bold(dataset)}
Document type: ${chalk.bold(migration.documentType)}
${progress.documents} documents processed…
${progress.mutations} mutations generated…
${chalk.blue(progress.pending)} requests pending…
${chalk.green(progress.completedTransactions.length)} transactions committed.
${mutation && !progress.done ? ${chalk.grey(formatMutation(chalk, mutation as Mutation))}` : ''}`
})
},
},
migration,
))
spinner.stop()
},
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
// An example of a compact formatter

import {Mutation, Index, KeyedPathElement, NodePatch, stringify} from '@bjoerge/mutiny'

Check failure on line 3 in packages/sanity/src/_internal/cli/commands/migration/utils/mutationFormatter.ts

View workflow job for this annotation

GitHub Actions / typeCheck

Module '"@bjoerge/mutiny"' has no exported member 'stringify'.

Check failure on line 3 in packages/sanity/src/_internal/cli/commands/migration/utils/mutationFormatter.ts

View workflow job for this annotation

GitHub Actions / typeCheck

Module '"@bjoerge/mutiny"' has no exported member 'stringify'.
import {SanityDocument} from '@sanity/types'
import {Chalk} from 'chalk'

export type ItemRef = string | number

export function format<Doc extends SanityDocument>(chalk: Chalk, mutations: Mutation[]): string {
return mutations.flatMap((m) => encodeMutation<Doc>(chalk, m)).join('\n')
export function format(chalk: Chalk, mutations: Mutation[]): string {
return mutations.flatMap((m) => formatMutation(chalk, m)).join('\n')
}

function encodeItemRef(ref: Index | KeyedPathElement): ItemRef {
return typeof ref === 'number' ? ref : ref._key
}

function encodeMutation<Doc extends SanityDocument>(chalk: Chalk, mutation: Mutation): string {
export function formatMutation(chalk: Chalk, mutation: Mutation): string {
if (
mutation.type === 'create' ||
mutation.type === 'createIfNotExists' ||
Expand Down

0 comments on commit e459a28

Please sign in to comment.