Skip to content

Commit

Permalink
feat(migrate): implement mutation batcher and use when submitting aga…
Browse files Browse the repository at this point in the history
…inst mutate endpoint
  • Loading branch information
bjoerge committed Jan 23, 2024
1 parent 707d6b7 commit 173baba
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 8 deletions.
1 change: 1 addition & 0 deletions packages/@sanity/migrate/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
"fast-fifo": "^1.3.2"
},
"devDependencies": {
"@types/arrify": "^2.0.1",
"rimraf": "^3.0.2"
},
"engines": {
Expand Down
11 changes: 4 additions & 7 deletions packages/@sanity/migrate/src/destinations/toMutationEndpoint.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {type Mutation, SanityEncoder} from '@bjoerge/mutiny'
import {MultipleMutationResult} from '@sanity/client'
import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client'
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions'
import {endpoints} from '../fetch-utils/endpoints'
import {fetchAsyncIterator} from '../fetch-utils/fetchStream'
Expand All @@ -17,18 +16,16 @@ interface APIOptions {

export async function* toMutationEndpoint(
options: APIOptions,
mutations: AsyncIterableIterator<Mutation[] | Mutation>,
mutations: AsyncIterableIterator<SanityMutation[] | SanityMutation>,
) {
for await (const mutation of mutations) {
for await (const mut of mutations) {
const fetchOptions = toFetchOptions({
projectId: options.projectId,
apiVersion: options.apiVersion,
token: options.token,
apiHost: options.apiHost ?? 'api.sanity.io',
endpoint: endpoints.data.mutate(options.dataset, {returnIds: true}),
body: JSON.stringify({
mutations: SanityEncoder.encode(Array.isArray(mutation) ? mutation : [mutation]),
}),
body: JSON.stringify({mutations: mut}),
})

for await (const result of parseJSON(
Expand Down
1 change: 1 addition & 0 deletions packages/@sanity/migrate/src/runner/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const MUTATION_ENDPOINT_MAX_BODY_SIZE = 1024 * 256 // 256KB
8 changes: 7 additions & 1 deletion packages/@sanity/migrate/src/runner/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import {ndjson} from '../it-utils/ndjson'
import {fromExportEndpoint} from '../sources/fromExportEndpoint'
import {toMutationEndpoint} from '../destinations/toMutationEndpoint'
import {collectMigrationMutations} from './collectMigrationMutations'
import {batchMutations} from './utils/batchMutations'
import {MUTATION_ENDPOINT_MAX_BODY_SIZE} from './constants'
import {toSanityMutations} from './utils/toSanityMutations'

interface MigrationRunnerOptions {
api: APIConfig
Expand All @@ -16,7 +19,10 @@ export async function* run(config: MigrationRunnerOptions, migration: Migration)
ndjson(await fromExportEndpoint(config.api)) as AsyncIterableIterator<SanityDocument>,
)

for await (const result of toMutationEndpoint(config.api, mutations)) {
for await (const result of toMutationEndpoint(
config.api,
batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE),
)) {
yield formatMutationResponse(result)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import {at, createIfNotExists, createOrReplace, patch, set} from '@bjoerge/mutiny'
import {Mutation} from '@sanity/client'
import {batchMutations} from '../batchMutations'

function byteLength(obj: unknown) {
return JSON.stringify(obj).length
}

describe('mutation payload batching', () => {
test('when everything fits into a single mutation request', async () => {
const first = {createIfNotExists: {_id: 'foo', _type: 'something', bar: 'baz'}}
const second = {patch: {id: 'foo', set: {bar: 'baz'}}}
const gen = async function* () {
yield first
yield second
}

const firstSize = JSON.stringify(first).length
const secondSize = JSON.stringify(second).length

const it = batchMutations(gen(), firstSize + secondSize)

expect(await it.next()).toEqual({value: [first, second], done: false})
expect(await it.next()).toEqual({value: undefined, done: true})
})

test('when max batch is not big enough to fit all values', async () => {
const first = {createIfNotExists: {_id: 'foo', _type: 'something', bar: 'baz'}}
const second = {patch: {id: 'foo', set: {bar: 'baz'}}}
const gen = async function* () {
yield first
yield second
}

const firstSize = JSON.stringify(first).length

const it = batchMutations(gen(), firstSize)

expect(await it.next()).toEqual({value: [first], done: false})
expect(await it.next()).toEqual({value: [second], done: false})
expect(await it.next()).toEqual({value: undefined, done: true})
})
test('when each mutation is smaller then max batch size', async () => {
const first = {createIfNotExists: {_id: 'foo', _type: 'something', bar: 'baz'}}
const second = {patch: {id: 'foo', set: {bar: 'baz'}}}
const gen = async function* () {
yield first
yield second
}

const it = batchMutations(gen(), 1)

expect(await it.next()).toEqual({value: [first], done: false})
expect(await it.next()).toEqual({value: [second], done: false})
expect(await it.next()).toEqual({value: undefined, done: true})
})

test('when some mutations can be chunked, others not', async () => {
const first = {createIfNotExists: {_id: 'foo', _type: 'something', bar: 'baz'}}
const second = {patch: {id: 'foo', set: {bar: 'baz'}}}

// Note: this is an array of mutations and should not be split up as it may be intentional to keep it in a transaction
// todo: is it ok to include other mutations in the same batch as a transaction?
const third = [
{
createOrReplace: {
_id: 'foo',
_type: 'something',
bar: 'baz',
baz: 'qux',
},
},
{patch: {id: 'foo', set: {bar: 'baz'}}},
]

const gen = async function* () {
yield first
yield second
yield third
}
const it = batchMutations(gen(), byteLength(first) + byteLength(second))

expect(await it.next()).toEqual({value: [first, second], done: false})
expect(await it.next()).toEqual({value: third, done: false})
})
})

//todo: verify if this is the default behavior we want
test('transactions are always in the same batch, but might include other mutations if they fit', async () => {
const first = {createIfNotExists: {_id: 'foo', _type: 'something', bar: 'baz'}}
const second = {patch: {id: 'foo', set: {bar: 'baz'}}}

// Note: this is an array of mutations and should not be split up as it may be intentional to keep it in a transaction
const transaction = [
{
createOrReplace: {
_id: 'foo',
_type: 'something',
bar: 'baz',
baz: 'qux',
},
},
{patch: {id: 'foo', set: {bar: 'baz'}}},
]

const fourth = {patch: {id: 'another', set: {this: 'that'}}}

const gen = async function* () {
yield first
yield second
yield transaction
yield fourth
}
const it = batchMutations(
gen(),
[first, second, transaction, fourth].reduce((l, m) => l + byteLength(m), 0),
)

expect(await it.next()).toEqual({value: [first, second, ...transaction, fourth], done: false})
expect(await it.next()).toEqual({value: undefined, done: true})
})

test('transactions are always batched as-is if preserveTransactions: true', async () => {
const first = {createIfNotExists: {_id: 'foo', _type: 'something', bar: 'baz'}}
const second = {patch: {id: 'foo', set: {bar: 'baz'}}}

// Note: this is an array of mutations and should not be split up as it may be intentional to keep it in a transaction
const transaction = [
{
createOrReplace: {
_id: 'foo',
_type: 'something',
bar: 'baz',
baz: 'qux',
},
},
{patch: {id: 'foo', set: {bar: 'baz'}}},
]

const fourth = {patch: {id: 'another', set: {this: 'that'}}}

const gen = async function* () {
yield first
yield second
yield transaction
yield fourth
}
const it = batchMutations(
gen(),
[first, second, transaction, fourth].reduce((l, m) => l + byteLength(m), 0),
{preserveTransactions: true},
)

expect(await it.next()).toEqual({value: [first, second], done: false})
expect(await it.next()).toEqual({value: transaction, done: false})
expect(await it.next()).toEqual({value: [fourth], done: false})
expect(await it.next()).toEqual({value: undefined, done: true})
})
55 changes: 55 additions & 0 deletions packages/@sanity/migrate/src/runner/utils/batchMutations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import {Mutation as SanityMutation} from '@sanity/client'
import arrify from 'arrify'

// We're working on "raw" mutations, e.g what will be put into the mutations array in the request body
const PADDING_SIZE = '{"mutations":[]}'.length

/**
*
* @param mutations - Async iterable of either single values or arrays of values
* @param maxBatchSize - Max batch size in bytes
* Todo: add support for transaction ids too
*/
export async function* batchMutations(
mutations: AsyncIterableIterator<SanityMutation | SanityMutation[]>,
maxBatchSize: number,
options?: {preserveTransactions: boolean},
): AsyncIterableIterator<SanityMutation | SanityMutation[]> {
let currentBatch: SanityMutation[] = []
let currentBatchSize = 0

for await (const mutation of mutations) {
if (options?.preserveTransactions && Array.isArray(mutation)) {
yield currentBatch
yield mutation
currentBatch = []
currentBatchSize = 0
continue
}

// the mutation itself may exceed the payload size, need to handle that
const mutationSize = JSON.stringify(mutation).length

if (mutationSize >= maxBatchSize + PADDING_SIZE) {
// the mutation size itself is bigger then max batch size, yield it as a single batch and hope for the best (the server has a bigger limit)
if (currentBatch.length) {
yield currentBatch
}
yield [...arrify(mutation)]
currentBatch = []
currentBatchSize = 0
continue
}
currentBatchSize += mutationSize
if (currentBatchSize >= maxBatchSize + PADDING_SIZE) {
yield currentBatch
currentBatch = []
currentBatchSize = 0
}
currentBatch.push(...arrify(mutation))
}

if (currentBatch.length > 0) {
yield currentBatch
}
}
14 changes: 14 additions & 0 deletions packages/@sanity/migrate/src/runner/utils/toSanityMutations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {SanityEncoder, Mutation} from '@bjoerge/mutiny'
import {Mutation as SanityMutation} from '@sanity/client'

export async function* toSanityMutations(
it: AsyncIterableIterator<Mutation | Mutation[]>,
): AsyncIterableIterator<SanityMutation | SanityMutation[]> {
for await (const mutation of it) {
if (Array.isArray(mutation)) {
yield SanityEncoder.encode(mutation)
continue
}
yield SanityEncoder.encode([mutation])[0]
}
}
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4076,6 +4076,13 @@
resolved "https://registry.yarnpkg.com/@types/aria-query/-/aria-query-5.0.1.tgz#3286741fb8f1e1580ac28784add4c7a1d49bdfbc"
integrity sha512-XTIieEY+gvJ39ChLcB4If5zHtPxt3Syj5rgZR+e1ctpmK8NjPf0zFqsz4JpLJT0xla9GFDKjy8Cpu331nrmE1Q==

"@types/arrify@^2.0.1":
version "2.0.1"
resolved "https://registry.yarnpkg.com/@types/arrify/-/arrify-2.0.1.tgz#d50abf903b1019b08c2ee21cf3ded40084201512"
integrity sha512-eL0bkcwbr+BXp/PPat6+z8C11Hf6+CcB8aE1lIk+Nwvj7uDA3NUmEUgfKLYqvvSuVmeldmaWvo6+s7q9tC9xUQ==
dependencies:
arrify "*"

"@types/babel__core@^7.1.14", "@types/babel__core@^7.20.4":
version "7.20.4"
resolved "https://registry.yarnpkg.com/@types/babel__core/-/babel__core-7.20.4.tgz#26a87347e6c6f753b3668398e34496d6d9ac6ac0"
Expand Down Expand Up @@ -5316,6 +5323,11 @@ arraybuffer.prototype.slice@^1.0.2:
is-array-buffer "^3.0.2"
is-shared-array-buffer "^1.0.2"

arrify@*:
version "3.0.0"
resolved "https://registry.yarnpkg.com/arrify/-/arrify-3.0.0.tgz#ccdefb8eaf2a1d2ab0da1ca2ce53118759fd46bc"
integrity sha512-tLkvA81vQG/XqE2mjDkGQHoOINtMHtysSnemrmoGe6PydDPMRbVugqyk4A6V/WDWEfm3l+0d8anA9r8cv/5Jaw==

arrify@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/arrify/-/arrify-1.0.1.tgz#898508da2226f380df904728456849c1501a4b0d"
Expand Down

0 comments on commit 173baba

Please sign in to comment.