Skip to content

Commit

Permalink
fix(migrate): buffer exports through file
Browse files Browse the repository at this point in the history
  • Loading branch information
bjoerge committed Jan 30, 2024
1 parent ed7c904 commit dd47e2b
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 47 deletions.
8 changes: 6 additions & 2 deletions packages/@sanity/migrate/src/fetch-utils/fetchStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ export async function assert2xx(res: Response): Promise<void> {
}
}

export async function fetchAsyncIterator({url, init}: FetchOptions) {
export async function fetchStream({url, init}: FetchOptions) {
const response = await fetch(url, init)
await assert2xx(response)
if (response.body === null) throw new Error('No response received')
return streamAsyncIterator(response.body)
return response.body
}

export async function fetchAsyncIterator(options: FetchOptions) {
return streamAsyncIterator(await fetchStream(options))
}
18 changes: 12 additions & 6 deletions packages/@sanity/migrate/src/runner/dryRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,31 @@ import {SanityDocument} from '@sanity/types'
import {APIConfig, Migration} from '../types'
import {ndjson} from '../it-utils/ndjson'
import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint'
import {streamAsyncIterator} from '../utils/streamToAsyncIterator'
import {bufferThroughFile} from '../fs-webstream/bufferThroughFile'
import {collectMigrationMutations} from './collectMigrationMutations'
import {getBufferFilePath} from './utils/getBufferFile'

interface MigrationRunnerOptions {
api: APIConfig
}

export async function* dryRun(config: MigrationRunnerOptions, migration: Migration) {
const exportStream = bufferThroughFile(
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}),
getBufferFilePath(),
)

const mutations = collectMigrationMutations(
migration,
ndjson<SanityDocument>(
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}),
{
parse: safeJsonParser,
},
),
ndjson<SanityDocument>(streamAsyncIterator(exportStream), {
parse: safeJsonParser,
}),
)

for await (const mutation of mutations) {
if (!mutation) continue
yield CompactFormatter.format(Array.isArray(mutation) ? mutation : [mutation])
}
await exportStream.cancel()
}
16 changes: 12 additions & 4 deletions packages/@sanity/migrate/src/runner/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {lastValueFrom} from '../it-utils/lastValueFrom'
import {decodeText, parseJSON} from '../it-utils'
import {concatStr} from '../it-utils/concatStr'
import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream'
import {bufferThroughFile} from '../fs-webstream/bufferThroughFile'
import {streamAsyncIterator} from '../utils/streamToAsyncIterator'
import {toSanityMutations} from './utils/toSanityMutations'
import {
DEFAULT_MUTATION_CONCURRENCY,
Expand All @@ -21,6 +23,7 @@ import {
} from './constants'
import {batchMutations} from './utils/batchMutations'
import {collectMigrationMutations} from './collectMigrationMutations'
import {getBufferFilePath} from './utils/getBufferFile'

type MigrationProgress = {
documents: number
Expand Down Expand Up @@ -61,11 +64,14 @@ export async function run(config: MigrationRunnerOptions, migration: Migration)
completedTransactions: [],
currentMutations: [],
}

const exportStream = bufferThroughFile(
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}),
getBufferFilePath(),
)

const documents = tap(
ndjson<SanityDocument>(
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}),
{parse: safeJsonParser},
),
ndjson<SanityDocument>(streamAsyncIterator(exportStream), {parse: safeJsonParser}),
() => {
config.onProgress?.({...stats, documents: ++stats.documents})
},
Expand Down Expand Up @@ -109,6 +115,8 @@ export async function run(config: MigrationRunnerOptions, migration: Migration)
...stats,
})
}
// Cancel export/buffer stream, it's not needed anymore
await exportStream.cancel()
config.onProgress?.({
...stats,
done: true,
Expand Down
6 changes: 6 additions & 0 deletions packages/@sanity/migrate/src/runner/utils/getBufferFile.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {tmpdir} from 'node:os'
import path from 'node:path'

export function getBufferFilePath() {
return path.join(tmpdir(), `/export-buffer-${Date.now()}.tmp`)
}
8 changes: 3 additions & 5 deletions packages/@sanity/migrate/src/sources/fromExportEndpoint.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import {createSafeJsonParser} from '@sanity/util/createSafeJsonParser'
import {SanityDocument} from '@sanity/types'
import {fetchAsyncIterator} from '../fetch-utils/fetchStream'
import {fetchStream} from '../fetch-utils/fetchStream'
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions'
import {endpoints} from '../fetch-utils/endpoints'
import {ExportAPIConfig} from '../types'

export function fromExportEndpoint(
options: ExportAPIConfig,
): Promise<AsyncGenerator<Uint8Array, void, unknown>> {
return fetchAsyncIterator(
export function fromExportEndpoint(options: ExportAPIConfig) {
return fetchStream(
toFetchOptions({
projectId: options.projectId,
apiVersion: options.apiVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ import path from 'path'
import type {CliCommandContext, CliCommandDefinition} from '@sanity/cli'
import {register} from 'esbuild-register/dist/node'
import {
APIConfig,
collectMigrationMutations,
DEFAULT_MUTATION_CONCURRENCY,
dryRun,
fromExportArchive,
fromExportEndpoint,
safeJsonParser,
MAX_MUTATION_CONCURRENCY,
DEFAULT_MUTATION_CONCURRENCY,
Migration,
ndjson,
run,
Expand Down Expand Up @@ -227,30 +225,4 @@ async function runFromArchive(

output.print('Done!')
}

interface MigrationRunnerOptions {
api: APIConfig
}

async function* dryRun(
config: MigrationRunnerOptions,
migration: Migration,
{chalk}: CliCommandContext,
) {
const mutations = collectMigrationMutations(
migration,
ndjson<SanityDocument>(
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}),
{
parse: safeJsonParser,
},
),
)

for await (const mutation of mutations) {
if (!mutation) continue
yield format(chalk, Array.isArray(mutation) ? mutation : ([mutation] as any))
}
}

export default createMigrationCommand

0 comments on commit dd47e2b

Please sign in to comment.