From a9bfef9f09cb8e123e9d7b6f0162122cc6283ebd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rge=20N=C3=A6ss?= Date: Mon, 22 Jan 2024 16:59:11 +0100 Subject: [PATCH] feat(migrate): limit request concurrency --- packages/@sanity/migrate/package.json | 3 +- .../src/destinations/commitMutations.ts | 19 ++++++++++ .../src/destinations/toMutationEndpoint.ts | 38 ------------------- .../migrate/src/it-utils/lastValueFrom.ts | 25 ++++++++++++ .../@sanity/migrate/src/runner/constants.ts | 1 + packages/@sanity/migrate/src/runner/run.ts | 38 +++++++++++++++---- .../src/runner/utils/batchMutations.ts | 2 +- yarn.lock | 5 +++ 8 files changed, 84 insertions(+), 47 deletions(-) create mode 100644 packages/@sanity/migrate/src/destinations/commitMutations.ts delete mode 100644 packages/@sanity/migrate/src/destinations/toMutationEndpoint.ts create mode 100644 packages/@sanity/migrate/src/it-utils/lastValueFrom.ts diff --git a/packages/@sanity/migrate/package.json b/packages/@sanity/migrate/package.json index 2564d0ac42f..f7ee823b4b2 100644 --- a/packages/@sanity/migrate/package.json +++ b/packages/@sanity/migrate/package.json @@ -79,7 +79,8 @@ "@sanity/types": "^3.23.4", "@sanity/util": "3.26.1", "arrify": "^2.0.1", - "fast-fifo": "^1.3.2" + "fast-fifo": "^1.3.2", + "p-map": "^7.0.1" }, "devDependencies": { "@types/arrify": "^2.0.1", diff --git a/packages/@sanity/migrate/src/destinations/commitMutations.ts b/packages/@sanity/migrate/src/destinations/commitMutations.ts new file mode 100644 index 00000000000..2b3ac95920a --- /dev/null +++ b/packages/@sanity/migrate/src/destinations/commitMutations.ts @@ -0,0 +1,19 @@ +import {pMapIterable} from 'p-map' +import {MultipleMutationResult} from '@sanity/client' +import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream' +import {parseJSON} from '../it-utils/json' +import {decodeText} from '../it-utils/decodeText' +import {concatStr} from '../it-utils/concatStr' +import {lastValueFrom} from '../it-utils/lastValueFrom' + +export function commitMutations( + fetchOptions: AsyncIterableIterator, + options: {concurrency: number}, +) { + return pMapIterable( + fetchOptions, + async (opts): Promise => + lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))), + {concurrency: options.concurrency}, + ) +} diff --git a/packages/@sanity/migrate/src/destinations/toMutationEndpoint.ts b/packages/@sanity/migrate/src/destinations/toMutationEndpoint.ts deleted file mode 100644 index 6c7b68ef63b..00000000000 --- a/packages/@sanity/migrate/src/destinations/toMutationEndpoint.ts +++ /dev/null @@ -1,38 +0,0 @@ -import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client' -import {toFetchOptions} from '../fetch-utils/sanityRequestOptions' -import {endpoints} from '../fetch-utils/endpoints' -import {fetchAsyncIterator} from '../fetch-utils/fetchStream' -import {parseJSON} from '../it-utils/json' -import {decodeText} from '../it-utils/decodeText' -import {concatStr} from '../it-utils/concatStr' - -interface APIOptions { - projectId: string - apiVersion: `vX` | `v${number}-${number}-${number}` - token: string - dataset: string - apiHost?: string -} - -export async function* toMutationEndpoint( - options: APIOptions, - mutations: AsyncIterableIterator, -) { - for await (const mut of mutations) { - const fetchOptions = toFetchOptions({ - projectId: options.projectId, - apiVersion: options.apiVersion, - token: options.token, - apiHost: options.apiHost ?? 'api.sanity.io', - endpoint: endpoints.data.mutate(options.dataset, {returnIds: true}), - body: JSON.stringify({mutations: mut}), - }) - - for await (const result of parseJSON( - concatStr(decodeText(await fetchAsyncIterator(fetchOptions))), - )) { - // todo: add return type - yield result as MultipleMutationResult - } - } -} diff --git a/packages/@sanity/migrate/src/it-utils/lastValueFrom.ts b/packages/@sanity/migrate/src/it-utils/lastValueFrom.ts new file mode 100644 index 00000000000..d341b2469d4 --- /dev/null +++ b/packages/@sanity/migrate/src/it-utils/lastValueFrom.ts @@ -0,0 +1,25 @@ +interface Options { + defaultValue?: T +} +export async function lastValueFrom( + it: AsyncIterableIterator, + options?: Options, +): Promise { + const defaultGiven = 'defaultValue' in (options ?? {}) + let latestValue: T | undefined + let didYield = false + + for await (const value of it) { + didYield = true + latestValue = value + } + if (!didYield) { + if (defaultGiven) { + return options!.defaultValue! + } + throw new Error( + 'No value yielded from async iterable. If this iterable is empty, provide a default value.', + ) + } + return latestValue! +} diff --git a/packages/@sanity/migrate/src/runner/constants.ts b/packages/@sanity/migrate/src/runner/constants.ts index 240c394f57f..22eaa57c134 100644 --- a/packages/@sanity/migrate/src/runner/constants.ts +++ b/packages/@sanity/migrate/src/runner/constants.ts @@ -1 +1,2 @@ export const MUTATION_ENDPOINT_MAX_BODY_SIZE = 1024 * 256 // 256KB +export const DEFAULT_MUTATION_CONCURRENCY = 6 diff --git a/packages/@sanity/migrate/src/runner/run.ts b/packages/@sanity/migrate/src/runner/run.ts index 7e0fe649655..bdf7e6cd841 100644 --- a/packages/@sanity/migrate/src/runner/run.ts +++ b/packages/@sanity/migrate/src/runner/run.ts @@ -1,18 +1,36 @@ import {SanityDocument} from '@sanity/types' -import {MultipleMutationResult} from '@sanity/client' +import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client' import {APIConfig, Migration} from '../types' import {ndjson} from '../it-utils/ndjson' import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint' -import {toMutationEndpoint} from '../destinations/toMutationEndpoint' +import {endpoints} from '../fetch-utils/endpoints' +import {toFetchOptions} from '../fetch-utils/sanityRequestOptions' +import {commitMutations} from '../destinations/commitMutations' import {collectMigrationMutations} from './collectMigrationMutations' import {batchMutations} from './utils/batchMutations' -import {MUTATION_ENDPOINT_MAX_BODY_SIZE} from './constants' +import {DEFAULT_MUTATION_CONCURRENCY, MUTATION_ENDPOINT_MAX_BODY_SIZE} from './constants' import {toSanityMutations} from './utils/toSanityMutations' interface MigrationRunnerOptions { api: APIConfig + concurrency?: number } +export async function* toFetchOptionsIterable( + apiConfig: APIConfig, + mutations: AsyncIterableIterator, +) { + for await (const mut of mutations) { + yield toFetchOptions({ + projectId: apiConfig.projectId, + apiVersion: apiConfig.apiVersion, + token: apiConfig.token, + apiHost: apiConfig.apiHost ?? 'api.sanity.io', + endpoint: endpoints.data.mutate(apiConfig.dataset, {returnIds: true}), + body: JSON.stringify({mutations: mut}), + }) + } +} export async function* run(config: MigrationRunnerOptions, migration: Migration) { const mutations = collectMigrationMutations( migration, @@ -21,10 +39,16 @@ export async function* run(config: MigrationRunnerOptions, migration: Migration) }), ) - for await (const result of toMutationEndpoint( - config.api, - batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), - )) { + const concurrency = Math.min( + DEFAULT_MUTATION_CONCURRENCY, + config?.concurrency ?? DEFAULT_MUTATION_CONCURRENCY, + ) + + const batches = batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE) + + const commits = commitMutations(toFetchOptionsIterable(config.api, batches), {concurrency}) + + for await (const result of commits) { yield formatMutationResponse(result) } } diff --git a/packages/@sanity/migrate/src/runner/utils/batchMutations.ts b/packages/@sanity/migrate/src/runner/utils/batchMutations.ts index 1cc89c408dc..fc5ab39988b 100644 --- a/packages/@sanity/migrate/src/runner/utils/batchMutations.ts +++ b/packages/@sanity/migrate/src/runner/utils/batchMutations.ts @@ -14,7 +14,7 @@ export async function* batchMutations( mutations: AsyncIterableIterator, maxBatchSize: number, options?: {preserveTransactions: boolean}, -): AsyncIterableIterator { +): AsyncIterableIterator { let currentBatch: SanityMutation[] = [] let currentBatchSize = 0 diff --git a/yarn.lock b/yarn.lock index 88099dcd81f..324aae1094f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -12613,6 +12613,11 @@ p-map@^7.0.0: resolved "https://registry.yarnpkg.com/p-map/-/p-map-7.0.0.tgz#757a189703986134d5d34ef7c16cf2f824d19ebe" integrity sha512-EZl03dLKv3RypkrjlevZoNwQMSy4bAblWcR18zhonktnN4fUs3asFQKSe0awn982omGxamvbejqQKQYDJYHCEg== +p-map@^7.0.1: + version "7.0.1" + resolved "https://registry.yarnpkg.com/p-map/-/p-map-7.0.1.tgz#1faf994e597160f7851882926bfccabc1d226f80" + integrity sha512-2wnaR0XL/FDOj+TgpDuRb2KTjLnu3Fma6b1ZUwGY7LcqenMcvP/YFpjpbPKY6WVGsbuJZRuoUz8iPrt8ORnAFw== + p-pipe@^3.1.0: version "3.1.0" resolved "https://registry.yarnpkg.com/p-pipe/-/p-pipe-3.1.0.tgz#48b57c922aa2e1af6a6404cb7c6bf0eb9cc8e60e"