Skip to content

Commit

Permalink
feat(migrate): limit request concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
bjoerge committed Jan 30, 2024
1 parent 1822dc2 commit a9bfef9
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 47 deletions.
3 changes: 2 additions & 1 deletion packages/@sanity/migrate/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
"@sanity/types": "^3.23.4",
"@sanity/util": "3.26.1",
"arrify": "^2.0.1",
"fast-fifo": "^1.3.2"
"fast-fifo": "^1.3.2",
"p-map": "^7.0.1"
},
"devDependencies": {
"@types/arrify": "^2.0.1",
Expand Down
19 changes: 19 additions & 0 deletions packages/@sanity/migrate/src/destinations/commitMutations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import {pMapIterable} from 'p-map'
import {MultipleMutationResult} from '@sanity/client'
import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream'
import {parseJSON} from '../it-utils/json'
import {decodeText} from '../it-utils/decodeText'
import {concatStr} from '../it-utils/concatStr'
import {lastValueFrom} from '../it-utils/lastValueFrom'

export function commitMutations(
fetchOptions: AsyncIterableIterator<FetchOptions>,
options: {concurrency: number},
) {
return pMapIterable(
fetchOptions,
async (opts): Promise<MultipleMutationResult> =>
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))),
{concurrency: options.concurrency},
)
}
38 changes: 0 additions & 38 deletions packages/@sanity/migrate/src/destinations/toMutationEndpoint.ts

This file was deleted.

25 changes: 25 additions & 0 deletions packages/@sanity/migrate/src/it-utils/lastValueFrom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
interface Options<T> {
defaultValue?: T
}
export async function lastValueFrom<T>(
it: AsyncIterableIterator<T>,
options?: Options<T>,
): Promise<T> {
const defaultGiven = 'defaultValue' in (options ?? {})
let latestValue: T | undefined
let didYield = false

for await (const value of it) {
didYield = true
latestValue = value
}
if (!didYield) {
if (defaultGiven) {
return options!.defaultValue!
}
throw new Error(
'No value yielded from async iterable. If this iterable is empty, provide a default value.',
)
}
return latestValue!
}
1 change: 1 addition & 0 deletions packages/@sanity/migrate/src/runner/constants.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export const MUTATION_ENDPOINT_MAX_BODY_SIZE = 1024 * 256 // 256KB
export const DEFAULT_MUTATION_CONCURRENCY = 6
38 changes: 31 additions & 7 deletions packages/@sanity/migrate/src/runner/run.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
import {SanityDocument} from '@sanity/types'
import {MultipleMutationResult} from '@sanity/client'
import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client'
import {APIConfig, Migration} from '../types'
import {ndjson} from '../it-utils/ndjson'
import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint'
import {toMutationEndpoint} from '../destinations/toMutationEndpoint'
import {endpoints} from '../fetch-utils/endpoints'
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions'
import {commitMutations} from '../destinations/commitMutations'
import {collectMigrationMutations} from './collectMigrationMutations'
import {batchMutations} from './utils/batchMutations'
import {MUTATION_ENDPOINT_MAX_BODY_SIZE} from './constants'
import {DEFAULT_MUTATION_CONCURRENCY, MUTATION_ENDPOINT_MAX_BODY_SIZE} from './constants'
import {toSanityMutations} from './utils/toSanityMutations'

interface MigrationRunnerOptions {
api: APIConfig
concurrency?: number
}

export async function* toFetchOptionsIterable(
apiConfig: APIConfig,
mutations: AsyncIterableIterator<SanityMutation[]>,
) {
for await (const mut of mutations) {
yield toFetchOptions({
projectId: apiConfig.projectId,
apiVersion: apiConfig.apiVersion,
token: apiConfig.token,
apiHost: apiConfig.apiHost ?? 'api.sanity.io',
endpoint: endpoints.data.mutate(apiConfig.dataset, {returnIds: true}),
body: JSON.stringify({mutations: mut}),
})
}
}
export async function* run(config: MigrationRunnerOptions, migration: Migration) {
const mutations = collectMigrationMutations(
migration,
Expand All @@ -21,10 +39,16 @@ export async function* run(config: MigrationRunnerOptions, migration: Migration)
}),
)

for await (const result of toMutationEndpoint(
config.api,
batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE),
)) {
const concurrency = Math.min(
DEFAULT_MUTATION_CONCURRENCY,
config?.concurrency ?? DEFAULT_MUTATION_CONCURRENCY,
)

const batches = batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE)

const commits = commitMutations(toFetchOptionsIterable(config.api, batches), {concurrency})

for await (const result of commits) {
yield formatMutationResponse(result)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export async function* batchMutations(
mutations: AsyncIterableIterator<SanityMutation | SanityMutation[]>,
maxBatchSize: number,
options?: {preserveTransactions: boolean},
): AsyncIterableIterator<SanityMutation | SanityMutation[]> {
): AsyncIterableIterator<SanityMutation[]> {
let currentBatch: SanityMutation[] = []
let currentBatchSize = 0

Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12613,6 +12613,11 @@ p-map@^7.0.0:
resolved "https://registry.yarnpkg.com/p-map/-/p-map-7.0.0.tgz#757a189703986134d5d34ef7c16cf2f824d19ebe"
integrity sha512-EZl03dLKv3RypkrjlevZoNwQMSy4bAblWcR18zhonktnN4fUs3asFQKSe0awn982omGxamvbejqQKQYDJYHCEg==

p-map@^7.0.1:
version "7.0.1"
resolved "https://registry.yarnpkg.com/p-map/-/p-map-7.0.1.tgz#1faf994e597160f7851882926bfccabc1d226f80"
integrity sha512-2wnaR0XL/FDOj+TgpDuRb2KTjLnu3Fma6b1ZUwGY7LcqenMcvP/YFpjpbPKY6WVGsbuJZRuoUz8iPrt8ORnAFw==

p-pipe@^3.1.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/p-pipe/-/p-pipe-3.1.0.tgz#48b57c922aa2e1af6a6404cb7c6bf0eb9cc8e60e"
Expand Down

0 comments on commit a9bfef9

Please sign in to comment.