Skip to content

Commit

Permalink
perf(swing-store): Improve the efficiency of writing snapshots
Browse files Browse the repository at this point in the history
* Use TMPDIR for temporary files.
* Parallelize reading raw snapshot data for hash computation and compression.

Fixes #6225
  • Loading branch information
gibson042 authored and mhofman committed Sep 16, 2022
1 parent 6ad3a6e commit 236d3bd
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 79 deletions.
4 changes: 3 additions & 1 deletion packages/SwingSet/misc-tools/replay-transcript.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { createHash } from 'crypto';
import { pipeline } from 'stream';
import { performance } from 'perf_hooks';
// eslint-disable-next-line import/no-extraneous-dependencies
import { tmpName } from 'tmp';
import { file as tmpFile, tmpName } from 'tmp';
import bundleSource from '@endo/bundle-source';
import { makeMeasureSeconds } from '@agoric/internal';
import { makeSnapStore } from '@agoric/swing-store';
Expand Down Expand Up @@ -63,11 +63,13 @@ function makeSnapStoreIO() {
return {
createReadStream: fs.createReadStream,
createWriteStream: fs.createWriteStream,
fsync: fs.fsync,
measureSeconds: makeMeasureSeconds(performance.now),
open: fs.promises.open,
rename: fs.promises.rename,
resolve: path.resolve,
stat: fs.promises.stat,
tmpFile,
tmpName,
unlink: fs.promises.unlink,
};
Expand Down
2 changes: 0 additions & 2 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,6 @@ export function makeVatKeeper(
newFile,
rawByteCount,
rawSaveSeconds,
hashSeconds,
compressedByteCount,
compressSeconds,
} = info;
Expand All @@ -617,7 +616,6 @@ export function makeVatKeeper(
newFile,
rawByteCount,
rawSaveSeconds,
hashSeconds,
compressedByteCount,
compressSeconds,
endPosition,
Expand Down
3 changes: 0 additions & 3 deletions packages/SwingSet/test/snapshots/test-xsnap-store.js.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ Generated by [AVA](https://avajs.dev).
{
compressSeconds: 0,
hash: '8a0e3873976c50462d1b1dac59c912152b0e5cad5eeb9deca0ca64a087b4a873',
hashSeconds: 0,
newFile: true,
rawByteCount: 167887,
rawSaveSeconds: 0,
Expand All @@ -22,7 +21,6 @@ Generated by [AVA](https://avajs.dev).
{
compressSeconds: 0,
hash: 'd66baf70b553d5c31f3396130fbf91b573fb35e4d0d2b4fe7e9e32f86bee08df',
hashSeconds: 0,
newFile: true,
rawByteCount: 782991,
rawSaveSeconds: 0,
Expand All @@ -33,7 +31,6 @@ Generated by [AVA](https://avajs.dev).
{
compressSeconds: 0,
hash: '00d679bb0d85b74351286c37cc5602d900b0797efda6632c9989174aaee47779',
hashSeconds: 0,
newFile: true,
rawByteCount: 784895,
rawSaveSeconds: 0,
Expand Down
187 changes: 116 additions & 71 deletions packages/swing-store/src/snapStore.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// @ts-check
import { createHash } from 'crypto';
import { pipeline } from 'stream';
import { finished as finishedCallback, pipeline } from 'stream';
import { promisify } from 'util';
import { createGzip, createGunzip } from 'zlib';
import { assert, details as d } from '@agoric/assert';
import { aggregateTryFinally, PromiseAllOrErrors } from '@agoric/internal';

/**
* @typedef {object} SnapshotInfo
Expand All @@ -12,7 +13,6 @@ import { assert, details as d } from '@agoric/assert';
* @property {boolean} newFile true if the compressed snapshot is new, false otherwise
* @property {number} rawByteCount size of (uncompressed) snapshot
* @property {number} rawSaveSeconds time to save (uncompressed) snapshot
* @property {number} hashSeconds time to calculate snapshot hash
* @property {number} compressedByteCount size of (compressed) snapshot
* @property {number} compressSeconds time to compress and save snapshot (0 if the file is not new)
*/
Expand All @@ -28,6 +28,7 @@ import { assert, details as d } from '@agoric/assert';
*/

const pipe = promisify(pipeline);
const finished = promisify(finishedCallback);

const { freeze } = Object;

Expand Down Expand Up @@ -76,11 +77,13 @@ export const fsStreamReady = stream =>
* @param {{
* createReadStream: typeof import('fs').createReadStream,
* createWriteStream: typeof import('fs').createWriteStream,
* fsync: typeof import('fs').fsync,
* measureSeconds: ReturnType<typeof import('@agoric/internal').makeMeasureSeconds>,
* open: typeof import('fs').promises.open,
* rename: typeof import('fs').promises.rename,
* resolve: typeof import('path').resolve,
* stat: typeof import('fs').promises.stat,
* tmpFile: typeof import('tmp').file,
* tmpName: typeof import('tmp').tmpName,
* unlink: typeof import('fs').promises.unlink,
* }} io
Expand All @@ -94,10 +97,12 @@ export function makeSnapStore(
createReadStream,
createWriteStream,
measureSeconds,
fsync,
open,
rename,
resolve: pathResolve,
stat,
tmpFile,
tmpName,
unlink,
},
Expand All @@ -106,6 +111,9 @@ export function makeSnapStore(
/** @type {(opts: unknown) => Promise<string>} */
const ptmpName = promisify(tmpName);

/** @type {(fd: number) => Promise<void>} */
const pfsync = promisify(fsync);

/**
* Returns the result of calling a function with the name
* of a temp file that exists only for the duration of
Expand Down Expand Up @@ -134,35 +142,6 @@ export function makeSnapStore(
return result;
}

/**
* Creates a file atomically by moving in place a temp file
* populated by a callback.
*
* @param {string} baseName relative-to-root name of file to be written
* @param { (name: string) => Promise<void> } writeContents
* @returns {Promise<void>}
*/
async function atomicWriteInRoot(baseName, writeContents) {
// Atomicity requires remaining on the same filesystem,
// so we perform all operations in the root directory.
assert(!baseName.includes('/'));
const tmpFilePath = await ptmpName({
tmpdir: root,
template: 'atomic-XXXXXX',
});
try {
await writeContents(tmpFilePath);
const target = resolve(root, baseName);
await rename(tmpFilePath, target);
} finally {
try {
await unlink(tmpFilePath);
} catch (ignore) {
// ignore
}
}
}

/**
* Populates destPath by streaming the contents of srcPath through a transform.
*
Expand Down Expand Up @@ -208,6 +187,22 @@ export function makeSnapStore(
return hash.digest('hex');
}

// Create a file for the compressed snapshot in-place
// to be atomically renamed once we know its name
// from the hash of raw snapshot contents.
// TODO: hoist.
const ptmpFile = (options = {}) => {
return new Promise((resolve, reject) => {
tmpFile(options, (err, path, fd, cleanupCallback) => {
if (err) {
reject(err);
} else {
resolve({ path, fd, cleanup: cleanupCallback });
}
});
});
};

/** @param {string} hash */
function baseNameFromHash(hash) {
assert.typeof(hash, 'string');
Expand All @@ -227,56 +222,106 @@ export function makeSnapStore(
* Creates a new gzipped snapshot file in the `root` directory
* and reports information about the process,
* including file size and timing metrics.
* Note that timing metrics exclude file open.
*
* @param {(filePath: string) => Promise<void>} saveRaw
* @returns {Promise<SnapshotInfo>}
*/
async function save(saveRaw) {
return withTempName(async tmpSnapPath => {
const { duration: rawSaveSeconds } = await measureSeconds(() =>
saveRaw(tmpSnapPath),
);
const { size: rawByteCount } = await stat(tmpSnapPath);
const { result: hash, duration: hashSeconds } = await measureSeconds(() =>
fileHash(tmpSnapPath),
);
toDelete.delete(hash);
const baseName = baseNameFromHash(hash);
const filePath = pathResolve(root, baseName);
const infoBase = {
filePath,
hash,
rawByteCount,
rawSaveSeconds,
hashSeconds,
};
const fileStat = await stat(filePath).catch(err => {
if (err.code !== 'ENOENT') {
throw err;
const cleanup = [];
return aggregateTryFinally(
async () => {
const tmpSnapPath = await ptmpName({ template: 'save-raw-XXXXXX.xss' });
cleanup.push(() => unlink(tmpSnapPath));
const { duration: rawSaveSeconds } = await measureSeconds(async () =>
saveRaw(tmpSnapPath),
);
const { size: rawByteCount } = await stat(tmpSnapPath);

// Perform operations that read snapshot data in parallel.
// We still serialize the stat and opening of tmpSnapPath
// and creation of tmpGzPath for readability, but we could
// parallelize those as well if the cost is significant.
const snapReader = createReadStream(tmpSnapPath);
cleanup.push(() => {
snapReader.destroy();
});

const {
path: tmpGzPath,
fd: tmpGzFd,
cleanup: tmpGzCleanup,
} = await ptmpFile({ tmpdir: root });
const gzWriter = createWriteStream(noPath, {
fd: tmpGzFd,
autoClose: false,
});
cleanup.push(tmpGzCleanup);
cleanup.push(() => gzWriter.close());

await Promise.all([
fsStreamReady(snapReader),
fsStreamReady(tmpGzCleanup),
]);

const hashStream = createHash('sha256');
const gzip = createGzip();

const { result: hash, duration: compressSeconds } =
await measureSeconds(async () => {
snapReader.pipe(hashStream);
snapReader.pipe(gzip).pipe(gzWriter);

await Promise.all([finished(snapReader), finished(gzWriter)]);

const h = hashStream.digest('hex');
await pfsync(tmpGzFd);

const tmpGzClose = cleanup.pop();
tmpGzClose();

return h;
});

toDelete.delete(hash);
const baseName = baseNameFromHash(hash);
const filePath = pathResolve(root, baseName);
const infoBase = {
filePath,
hash,
rawByteCount,
rawSaveSeconds,
compressSeconds,
};
const fileStat = await stat(filePath).catch(err => {
if (err.code !== 'ENOENT') {
throw err;
}
});
if (fileStat) {
const { size: compressedByteCount } = fileStat;
return freeze({
...infoBase,
newFile: false,
compressedByteCount,
});
}
});
if (fileStat) {
const { size: compressedByteCount } = fileStat;

// Atomically rename.
await rename(tmpGzPath, filePath);
const { size: compressedByteCount } = await stat(filePath);
return freeze({
...infoBase,
newFile: false,
compressSeconds: 0,
newFile: true,
compressedByteCount,
});
}
const { duration: compressSeconds } = await measureSeconds(() =>
atomicWriteInRoot(baseName, tmpGzPath =>
filter(tmpSnapPath, createGzip(), tmpGzPath, { flush: true }),
),
);
const { size: compressedByteCount } = await stat(filePath);
return freeze({
...infoBase,
newFile: true,
compressSeconds,
compressedByteCount,
});
}, 'save-raw');
},
async () => {
await PromiseAllOrErrors(
cleanup.reverse().map(fn => Promise.resolve().then(() => fn())),
);
},
);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/swing-store/src/swingStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import fs from 'fs';
import path from 'path';
import { performance } from 'perf_hooks';
import { tmpName } from 'tmp';
import { file as tmpFile, tmpName } from 'tmp';

import { open as lmdbOpen, ABORT as lmdbAbort } from 'lmdb';
import sqlite3 from 'better-sqlite3';
Expand All @@ -22,11 +22,13 @@ export function makeSnapStoreIO() {
return {
createReadStream: fs.createReadStream,
createWriteStream: fs.createWriteStream,
fsync: fs.fsync,
measureSeconds: makeMeasureSeconds(performance.now),
open: fs.promises.open,
rename: fs.promises.rename,
resolve: path.resolve,
stat: fs.promises.stat,
tmpFile,
tmpName,
unlink: fs.promises.unlink,
};
Expand Down
3 changes: 2 additions & 1 deletion packages/swing-store/test/test-snapstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ test('build temp file; compress to cache file', async t => {
await fs.promises.mkdir(pool.name, { recursive: true });
const store = makeSnapStore(pool.name, {
...tmp,
tmpFile: tmp.file,
...path,
...fs,
...fs.promises,
Expand All @@ -38,7 +39,6 @@ test('build temp file; compress to cache file', async t => {
filePath: path.resolve(pool.name, `${expectedHash}.gz`),
rawByteCount: 3,
rawSaveSeconds: 0,
hashSeconds: 0,
compressSeconds: 0,
});
t.is(await store.has(hash), true);
Expand All @@ -62,6 +62,7 @@ test('snapStore prepare / commit delete is robust', async t => {

const io = {
...tmp,
tmpFile: tmp.file,
...path,
...fs,
...fs.promises,
Expand Down

0 comments on commit 236d3bd

Please sign in to comment.