Skip to content

Commit

Permalink
feat: implement swingStore data export/import in support of state sync
Browse files Browse the repository at this point in the history
Closes #6773
  • Loading branch information
FUDCo committed Mar 16, 2023
1 parent efcd1c3 commit 268e62f
Show file tree
Hide file tree
Showing 32 changed files with 2,228 additions and 463 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ if (!dirPath) {
if (!isSwingStore(dirPath)) {
throw Error(`${dirPath} does not appear to be a swingstore (no ./data.mdb)`);
}
const { kvStore, streamStore } = openSwingStore(dirPath).kernelStorage;
const { kvStore, transcriptStore } = openSwingStore(dirPath).kernelStorage;
function get(key) {
return kvStore.get(key);
}
Expand Down Expand Up @@ -98,7 +98,7 @@ if (!vatName) {
fs.writeSync(fd, JSON.stringify(first));
fs.writeSync(fd, '\n');

// The streamStore holds concatenated transcripts from all upgraded
// The transcriptStore holds concatenated transcripts from all upgraded
// versions. For each old version, it holds every delivery from
// `startVat` through `stopVat`. For the current version, it holds
// every delivery from `startVat` up through the last delivery
Expand All @@ -123,9 +123,8 @@ if (!vatName) {
console.log(`${transcriptLength} transcript entries`);

let deliveryNum = 0;
const transcriptStream = `transcript-${vatID}`;
const stream = streamStore.readStream(transcriptStream, startPos, endPos);
for (const entry of stream) {
const transcript = transcriptStore.readSpan(vatID, startPos, endPos);
for (const entry of transcript) {
// entry is JSON.stringify({ d, syscalls }), syscall is { d, response }
const t = { transcriptNum, ...JSON.parse(entry) };
// console.log(`t.${deliveryNum} : ${t}`);
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/misc-tools/replay-transcript.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async function replay(transcriptFile) {
return loadRaw(snapFile);
},
}
: makeSnapStore(process.cwd(), makeSnapStoreIO());
: makeSnapStore(process.cwd(), () => {}, makeSnapStoreIO());
const testLog = undefined;
const meterControl = makeDummyMeterControl();
const gcTools = harden({
Expand Down
29 changes: 5 additions & 24 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ const enableKernelGC = true;
* @typedef { import('../../types-external.js').KernelSlog } KernelSlog
* @typedef { import('../../types-external.js').ManagerType } ManagerType
* @typedef { import('../../types-external.js').SnapStore } SnapStore
* @typedef { import('../../types-external.js').StreamPosition } StreamPosition
* @typedef { import('../../types-external.js').StreamStore } StreamStore
* @typedef { import('../../types-external.js').TranscriptStore } TranscriptStore
* @typedef { import('../../types-external.js').VatKeeper } VatKeeper
* @typedef { import('../../types-external.js').VatManager } VatManager
*/
Expand Down Expand Up @@ -86,8 +85,6 @@ const enableKernelGC = true;
// $vatSlot is one of: o+$NN/o-$NN/p+$NN/p-$NN/d+$NN/d-$NN
// v$NN.c.$vatSlot = $kernelSlot = ko$NN/kp$NN/kd$NN
// v$NN.nextDeliveryNum = $NN
// v$NN.t.startPosition = $NN // inclusive
// v$NN.t.endPosition = $NN // exclusive
// v$NN.vs.$key = string
// v$NN.meter = m$NN // XXX does this exist?
// v$NN.reapInterval = $NN or 'never'
Expand Down Expand Up @@ -174,7 +171,7 @@ const FIRST_METER_ID = 1n;
* @param {KernelSlog|null} kernelSlog
*/
export default function makeKernelKeeper(kernelStorage, kernelSlog) {
const { kvStore, streamStore, snapStore } = kernelStorage;
const { kvStore, transcriptStore, snapStore } = kernelStorage;

insistStorageAPI(kvStore);

Expand Down Expand Up @@ -787,7 +784,7 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
const promisePrefix = `${vatID}.c.p`;
const kernelPromisesToReject = [];

vatKeeper.deleteSnapshots();
vatKeeper.deleteSnapshotsAndTranscript();

// Note: ASCII order is "+,-./", and we rely upon this to split the
// keyspace into the various o+NN/o-NN/etc spaces. If we were using a
Expand Down Expand Up @@ -1297,11 +1294,11 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
return found;
}
if (!kvStore.has(`${vatID}.o.nextID`)) {
initializeVatState(kvStore, streamStore, vatID);
initializeVatState(kvStore, transcriptStore, vatID);
}
const vk = makeVatKeeper(
kvStore,
streamStore,
transcriptStore,
kernelSlog,
vatID,
addKernelObject,
Expand All @@ -1328,20 +1325,6 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
}

/**
* Cease writing to the vat's transcript.
*
* @param {string} vatID
*/
function closeVatTranscript(vatID) {
insistVatID(vatID);
const transcriptStream = `transcript-${vatID}`;
streamStore.closeStream(transcriptStream);
}

/**
* NOTE: caller is responsible to closeVatTranscript()
* before evicting a VatKeeper.
*
* @param {string} vatID
*/
function evictVatKeeper(vatID) {
Expand Down Expand Up @@ -1448,7 +1431,6 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
if (vk) {
// TODO: find some way to expose the liveSlots internal tables, the
// kernel doesn't see them
closeVatTranscript(vatID);
const vatTable = {
vatID,
state: { transcript: Array.from(vk.getTranscript()) },
Expand Down Expand Up @@ -1615,7 +1597,6 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
provideVatKeeper,
vatIsAlive,
evictVatKeeper,
closeVatTranscript,
cleanupAfterTerminatedVat,
addDynamicVatID,
getDynamicVats,
Expand Down
64 changes: 22 additions & 42 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import { enumeratePrefixedKeys } from './storageHelper.js';
* @typedef { import('../../types-external.js').ManagerOptions } ManagerOptions
* @typedef { import('../../types-external.js').SnapStore } SnapStore
* @typedef { import('../../types-external.js').SourceOfBundle } SourceOfBundle
* @typedef { import('../../types-external.js').StreamPosition } StreamPosition
* @typedef { import('../../types-external.js').StreamStore } StreamStore
* @typedef { import('../../types-external.js').TranscriptStore } TranscriptStore
* @typedef { import('../../types-external.js').VatManager } VatManager
* @typedef { import('../../types-internal.js').RecordedVatOptions } RecordedVatOptions
* @typedef { import('../../types-external.js').TranscriptEntry } TranscriptEntry
Expand All @@ -36,25 +35,24 @@ const FIRST_DEVICE_ID = 70n;
* Establish a vat's state.
*
* @param {*} kvStore The key-value store in which the persistent state will be kept
* @param {*} streamStore Accompanying stream store
* @param {*} transcriptStore Accompanying transcript store
* @param {string} vatID The vat ID string of the vat in question
* TODO: consider making this part of makeVatKeeper
*/
export function initializeVatState(kvStore, streamStore, vatID) {
export function initializeVatState(kvStore, transcriptStore, vatID) {
kvStore.set(`${vatID}.o.nextID`, `${FIRST_OBJECT_ID}`);
kvStore.set(`${vatID}.p.nextID`, `${FIRST_PROMISE_ID}`);
kvStore.set(`${vatID}.d.nextID`, `${FIRST_DEVICE_ID}`);
kvStore.set(`${vatID}.nextDeliveryNum`, `0`);
kvStore.set(`${vatID}.incarnationNumber`, `1`);
kvStore.set(`${vatID}.t.startPosition`, `${streamStore.STREAM_START}`);
kvStore.set(`${vatID}.t.endPosition`, `${streamStore.STREAM_START}`);
transcriptStore.initTranscript(vatID);
}

/**
* Produce a vat keeper for a vat.
*
* @param {KVStore} kvStore The keyValue store in which the persistent state will be kept
* @param {StreamStore} streamStore Accompanying stream store, for the transcripts
* @param {TranscriptStore} transcriptStore Accompanying transcript store, for the transcripts
* @param {*} kernelSlog
* @param {string} vatID The vat ID string of the vat in question
* @param {*} addKernelObject Kernel function to add a new object to the kernel's
Expand All @@ -76,7 +74,7 @@ export function initializeVatState(kvStore, streamStore, vatID) {
*/
export function makeVatKeeper(
kvStore,
streamStore,
transcriptStore,
kernelSlog,
vatID,
addKernelObject,
Expand All @@ -94,7 +92,6 @@ export function makeVatKeeper(
snapStore = undefined,
) {
insistVatID(vatID);
const transcriptStream = `transcript-${vatID}`;

function getRequired(key) {
const value = kvStore.get(key);
Expand Down Expand Up @@ -475,20 +472,12 @@ export function makeVatKeeper(
/**
* Generator function to return the vat's transcript, one entry at a time.
*
* @param {StreamPosition} [startPos] Optional position to begin reading from
* @param {number} [startPos] Optional position to begin reading from
*
* @yields { TranscriptEntry } a stream of transcript entries
*/
function* getTranscript(startPos) {
if (startPos === undefined) {
startPos = Number(getRequired(`${vatID}.t.startPosition`));
}
const endPos = Number(getRequired(`${vatID}.t.endPosition`));
for (const entry of streamStore.readStream(
transcriptStream,
/** @type { StreamPosition } */ (startPos),
endPos,
)) {
for (const entry of transcriptStore.readSpan(vatID, startPos)) {
yield /** @type { TranscriptEntry } */ (JSON.parse(entry));
}
}
Expand All @@ -499,21 +488,13 @@ export function makeVatKeeper(
* @param {object} entry The transcript entry to append.
*/
function addToTranscript(entry) {
const oldPos = Number(getRequired(`${vatID}.t.endPosition`));
const newPos = streamStore.writeStreamItem(
transcriptStream,
JSON.stringify(entry),
oldPos,
);
kvStore.set(`${vatID}.t.endPosition`, `${newPos}`);
transcriptStore.addItem(vatID, JSON.stringify(entry));
}

/** @returns {StreamPosition} */
/** @returns {number} */
function getTranscriptEndPosition() {
const endPosition =
kvStore.get(`${vatID}.t.endPosition`) ||
assert.fail('missing endPosition');
return Number(endPosition);
const { endPos } = transcriptStore.getCurrentSpanBounds(vatID);
return endPos;
}

function getSnapshotInfo() {
Expand All @@ -540,6 +521,7 @@ export function makeVatKeeper(

const endPosition = getTranscriptEndPosition();
const info = await manager.makeSnapshot(endPosition, snapStore);
transcriptStore.rolloverSpan(vatID);
const {
hash,
uncompressedSize,
Expand All @@ -560,19 +542,18 @@ export function makeVatKeeper(
return true;
}

function deleteSnapshots() {
function deleteSnapshotsAndTranscript() {
if (snapStore) {
snapStore.deleteVatSnapshots(vatID);
}
transcriptStore.deleteVatTranscripts(vatID);
}

function removeSnapshotAndTranscript() {
function dropSnapshotAndResetTranscript() {
if (snapStore) {
snapStore.deleteVatSnapshots(vatID);
snapStore.stopUsingLastSnapshot(vatID);
}

const endPos = getRequired(`${vatID}.t.endPosition`);
kvStore.set(`${vatID}.t.startPosition`, endPos);
transcriptStore.rolloverSpan(vatID);
}

function vatStats() {
Expand All @@ -584,9 +565,8 @@ export function makeVatKeeper(
const objectCount = getCount(`${vatID}.o.nextID`, FIRST_OBJECT_ID);
const promiseCount = getCount(`${vatID}.p.nextID`, FIRST_PROMISE_ID);
const deviceCount = getCount(`${vatID}.d.nextID`, FIRST_DEVICE_ID);
const startCount = Number(getRequired(`${vatID}.t.startPosition`));
const endCount = Number(getRequired(`${vatID}.t.endPosition`));
const transcriptCount = endCount - startCount;
const { startPos, endPos } = transcriptStore.getCurrentSpanBounds(vatID);
const transcriptCount = endPos - startPos;

// TODO: Fix the downstream JSON.stringify to allow the counts to be BigInts
return harden({
Expand Down Expand Up @@ -645,8 +625,8 @@ export function makeVatKeeper(
vatStats,
dumpState,
saveSnapshot,
deleteSnapshots,
getSnapshotInfo,
removeSnapshotAndTranscript,
deleteSnapshotsAndTranscript,
dropSnapshotAndResetTranscript,
});
}
3 changes: 1 addition & 2 deletions packages/SwingSet/src/kernel/vat-warehouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ export function makeVatWarehouse(kernelKeeper, vatLoader, policyOptions) {
}
ephemeral.vats.delete(vatID);
xlate.delete(vatID);
kernelKeeper.closeVatTranscript(vatID);
kernelKeeper.evictVatKeeper(vatID);

// console.log('evict: shutting down', vatID);
Expand Down Expand Up @@ -383,7 +382,7 @@ export function makeVatWarehouse(kernelKeeper, vatLoader, policyOptions) {
async function resetWorker(vatID) {
await evict(vatID);
const vatKeeper = kernelKeeper.provideVatKeeper(vatID);
vatKeeper.removeSnapshotAndTranscript();
vatKeeper.dropSnapshotAndResetTranscript();
}

/**
Expand Down
3 changes: 1 addition & 2 deletions packages/SwingSet/src/types-ambient.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@
*/
/**
* @typedef { import('@agoric/swing-store').KVStore } KVStore
* @typedef { import('@agoric/swing-store').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store').StreamPosition } StreamPosition
* @typedef { import('@agoric/swing-store').TranscriptStore } TranscriptStore
* @typedef { import('@agoric/swing-store').SwingStore } SwingStore
* @typedef { import('@agoric/swing-store').SwingStoreKernelStorage } SwingStoreKernelStorage
* @typedef { import('@agoric/swing-store').SwingStoreHostStorage } SwingStoreHostStorage
Expand Down
5 changes: 2 additions & 3 deletions packages/SwingSet/src/types-external.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ export {};
* vatSyscallHandler: unknown) => Promise<VatManager>,
* } } VatManagerFactory
* @typedef { { deliver: (delivery: VatDeliveryObject) => Promise<VatDeliveryResult>,
* replayTranscript: (startPos: StreamPosition | undefined) => Promise<number?>,
* replayTranscript: (startPos: number | undefined) => Promise<number?>,
* makeSnapshot?: (endPos: number, ss: SnapStore) => Promise<SnapshotResult>,
* shutdown: () => Promise<void>,
* } } VatManager
Expand Down Expand Up @@ -273,8 +273,7 @@ export {};
* @typedef { import('@agoric/swing-store').KVStore } KVStore
* @typedef { import('@agoric/swing-store').SnapStore } SnapStore
* @typedef { import('@agoric/swing-store').SnapshotResult } SnapshotResult
* @typedef { import('@agoric/swing-store').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store').StreamPosition } StreamPosition
* @typedef { import('@agoric/swing-store').TranscriptStore } TranscriptStore
* @typedef { import('@agoric/swing-store').SwingStore } SwingStore
* @typedef { import('@agoric/swing-store').SwingStoreKernelStorage } SwingStoreKernelStorage
* @typedef { import('@agoric/swing-store').SwingStoreHostStorage } SwingStoreHostStorage
Expand Down
6 changes: 6 additions & 0 deletions packages/SwingSet/test/device-plugin/test-device.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ test.serial('plugin first time', async t => {
]);
});

// NOTE: the following test CANNOT be run standalone. It requires execution of
// the prior test to establish its necessary starting state. This is a bad
// practice and should be fixed. It's not bad enough to warrant fixing right
// now, but worth flagging with this comment as a help to anyone else who gets
// tripped up by it.

test.serial('plugin after restart', async t => {
const { bridge, cycle, dump, plugin, queueThunkForKernel } =
await setupVatController(t);
Expand Down
16 changes: 9 additions & 7 deletions packages/SwingSet/test/test-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,15 @@ function duplicateKeeper(serialize) {

test('kernelStorage param guards', async t => {
const { kvStore } = buildKeeperStorageInMemory();
const exp = { message: /true must be a string/ };
t.throws(() => kvStore.set('foo', true), exp);
t.throws(() => kvStore.set(true, 'foo'), exp);
t.throws(() => kvStore.has(true), exp);
t.throws(() => kvStore.getNextKey(true), exp);
t.throws(() => kvStore.get(true), exp);
t.throws(() => kvStore.delete(true), exp);
const expv = { message: /value must be a string/ };
const expk = { message: /key must be a string/ };
const exppk = { message: /previousKey must be a string/ };
t.throws(() => kvStore.set('foo', true), expv);
t.throws(() => kvStore.set(true, 'foo'), expk);
t.throws(() => kvStore.has(true), expk);
t.throws(() => kvStore.getNextKey(true), exppk);
t.throws(() => kvStore.get(true), expk);
t.throws(() => kvStore.delete(true), expk);
});

test('kernel state', async t => {
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/test/test-xsnap-metering.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function checkMetered(t, args, metered) {

async function doTest(t, metered) {
const db = sqlite3(':memory:');
const store = makeSnapStore(db, makeSnapStoreIO());
const store = makeSnapStore(db, () => {}, makeSnapStoreIO());

const { p: p1, startXSnap: start1 } = make(store);
const worker1 = await start1('vat', 'name', handleCommand, metered, false);
Expand Down
Loading

0 comments on commit 268e62f

Please sign in to comment.