Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ag-solo kernel reentrancy and fake-chain non-determinism #710

Merged
merged 5 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -364,16 +364,26 @@ export default function buildKernel(kernelEndowments) {
}
}

let processQueueRunning;
async function processQueueMessage(message) {
kdebug(`processQ ${JSON.stringify(message)}`);
if (message.type === 'send') {
await deliverToTarget(message.target, message.msg);
} else if (message.type === 'notify') {
await processNotify(message);
} else {
throw Error(`unable to process message.type ${message.type}`);
if (processQueueRunning) {
console.log(`We're currently already running at`, processQueueRunning);
throw Error(`Kernel reentrancy is forbidden`);
}
try {
processQueueRunning = Error('here');
if (message.type === 'send') {
await deliverToTarget(message.target, message.msg);
} else if (message.type === 'notify') {
await processNotify(message);
} else {
throw Error(`unable to process message.type ${message.type}`);
}
commitCrank();
} finally {
processQueueRunning = undefined;
}
commitCrank();
}

function validateVatSetupFn(setup) {
Expand Down
17 changes: 10 additions & 7 deletions packages/SwingSet/src/kernel/vatManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,11 @@ export default function makeVatManager(
return process(
() => dispatch[dispatchOp](...dispatchArgs),
() => transcriptFinishDispatch(),
err => console.log(`doProcess: ${errmsg}:`, err),
err => {
if (errmsg !== null) {
console.log(`doProcess: ${errmsg}:`, err);
}
},
);
}

Expand Down Expand Up @@ -382,13 +386,12 @@ export default function makeVatManager(
throw replayAbandonShip;
}
playbackSyscalls = Array.from(t.syscalls);
// We really don't care about "failed replays" because they're just
// exceptions that have been raised in a normal event.
//
// If we really fail, replayAbandonShip is set.
// eslint-disable-next-line no-await-in-loop
await doProcess(
t.d,
`Replay failed: [${t.d[0]}, ${t.d[1]}, ${t.d[2]}, ${JSON.stringify(
t.d[3],
)}]`,
);
await doProcess(t.d, null);
}

if (replayAbandonShip) {
Expand Down
35 changes: 25 additions & 10 deletions packages/cosmic-swingset/lib/ag-solo/fake-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import path from 'path';
import fs from 'fs';
import stringify from '@agoric/swingset-vat/src/kernel/json-stable-stringify';
import { launch } from '../launch-chain';
import makeBlockManager from '../block-manager';

const PRETEND_BLOCK_DELAY = 5;

Expand Down Expand Up @@ -36,33 +37,47 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) {
const argv = [`--role=${role}`, bootAddress];
const stateDBdir = path.join(basedir, `fake-chain-${GCI}-state`);
const s = await launch(stateDBdir, mailboxStorage, vatsdir, argv);
const { deliverInbound, beginBlock, saveChainState, saveOutsideState } = s;

let pretendLast = Date.now();
let blockHeight = 0;
const blockManager = makeBlockManager(s);
const { savedHeight, savedActions } = s;

let blockHeight = savedHeight;
let blockTime =
savedActions.length > 0
? savedActions[0].blockTime
: Math.floor(Date.now() / 1000);
let intoChain = [];
let thisBlock = [];

async function simulateBlock() {
const actualStart = Date.now();
// Gather up the new messages into the latest block.
thisBlock.push(...intoChain);
intoChain = [];

try {
const commitStamp = pretendLast + PRETEND_BLOCK_DELAY * 1000;
const blockTime = Math.floor(commitStamp / 1000);
await beginBlock(blockHeight, blockTime);
blockTime += PRETEND_BLOCK_DELAY;
blockHeight += 1;

await blockManager({ type: 'BEGIN_BLOCK', blockHeight, blockTime });
for (let i = 0; i < thisBlock.length; i += 1) {
const [newMessages, acknum] = thisBlock[i];
await deliverInbound(bootAddress, newMessages, acknum);
await blockManager({
type: 'DELIVER_INBOUND',
peer: bootAddress,
messages: newMessages,
ack: acknum,
blockHeight,
blockTime,
});
}
await blockManager({ type: 'END_BLOCK', blockHeight, blockTime });

// Done processing, "commit the block".
saveChainState();
saveOutsideState();
await blockManager({ type: 'COMMIT_BLOCK', blockHeight, blockTime });
await writeMap(mailboxFile, mailboxStorage);
thisBlock = [];
pretendLast = commitStamp + Date.now() - actualStart;
blockTime = blockTime + Date.now() - actualStart;
blockHeight += 1;
} catch (e) {
console.log(`error fake processing`, e);
Expand Down
43 changes: 43 additions & 0 deletions packages/cosmic-swingset/lib/ag-solo/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import makePromise from '@agoric/make-promise';

// Return a function that can wrap an async or sync method, but
// ensures only one of them (in order) is running at a time.
export const makeWithQueue = () => {
const queue = [];

// Execute the thunk at the front of the queue.
const dequeue = () => {
if (!queue.length) {
return;
}
const [thunk, resolve, reject] = queue[0];
// Run the thunk in a new turn.
Promise.resolve()
.then(thunk)
// Resolve or reject our caller with the thunk's value.
.then(resolve, reject)
// Rerun dequeue() after settling.
.finally(() => {
queue.shift();
dequeue();
});
};

return function withQueue(inner) {
return function queueCall(...args) {
// Curry the arguments into the inner function, and
// resolve/reject with whatever the inner function does.
const thunk = _ => inner(...args);
const pr = makePromise();
queue.push([thunk, pr.res, pr.rej]);

if (queue.length === 1) {
// Start running immediately.
dequeue();
}

// Allow the caller to retrieve our thunk's results.
return pr.p;
};
};
};
94 changes: 54 additions & 40 deletions packages/cosmic-swingset/lib/ag-solo/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import fs from 'fs';
import path from 'path';
import temp from 'temp';
import { promisify } from 'util';
import readlines from 'n-readlines';
// import { createHash } from 'crypto';

// import connect from 'lotion-connect';
Expand All @@ -24,10 +23,10 @@ import {

import { deliver, addDeliveryTarget } from './outbound';
import { makeHTTPListener } from './web';
import { makeWithQueue } from './queue';

import { connectToChain } from './chain-cosmos-sdk';
import { connectToFakeChain } from './fake-chain';
import bundle from './bundle';

// import { makeChainFollower } from './follower';
// import { makeDeliverator } from './deliver-with-ag-cosmos-helper';
Expand Down Expand Up @@ -123,42 +122,61 @@ async function buildSwingset(
}
}

async function deliverInboundToMbx(sender, messages, ack) {
const withInputQueue = makeWithQueue();

// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const deliverInboundToMbx = withInputQueue(async (sender, messages, ack) => {
if (!(messages instanceof Array)) {
throw new Error(`inbound given non-Array: ${messages}`);
}
// console.log(`deliverInboundToMbx`, messages, ack);
if (mb.deliverInbound(sender, messages, ack, true)) {
await processKernel();
}
}
});

async function deliverInboundCommand(obj) {
// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const deliverInboundCommand = withInputQueue(async obj => {
// this promise could take an arbitrarily long time to resolve, so don't
// wait on it
const p = cm.inboundCommand(obj);
// TODO: synchronize this somehow, make sure it doesn't overlap with the
// processKernel() call in deliverInbound()

// Register a handler in this turn so that we don't get complaints about
// asynchronously-handled callbacks.
p.catch(_ => {});

// The turn passes...
await processKernel();
return p;
}

const intervalMillis = 1200;
// TODO(hibbert) protect against kernel turns that take too long
// drop calls to moveTimeForward if it's fallen behind, to make sure we don't
// have two copies of controller.run() executing at the same time.
function moveTimeForward() {
// Rethrow any inboundCommand rejection in the new turn so that our
// caller must handle it (or be an unhandledRejection).
return p.catch(e => {
michaelfig marked this conversation as resolved.
Show resolved Hide resolved
throw e;
});
});

let intervalMillis;

// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const moveTimeForward = withInputQueue(async () => {
const now = Math.floor(Date.now() / intervalMillis);
if (timer.poll(now)) {
const p = processKernel();
p.then(
_ => console.log(`timer-provoked kernel crank complete ${now}`),
err =>
console.log(`timer-provoked kernel crank failed at ${now}:`, err),
);
try {
if (timer.poll(now)) {
await processKernel();
console.log(`timer-provoked kernel crank complete ${now}`);
}
} catch (err) {
console.log(`timer-provoked kernel crank failed at ${now}:`, err);
} finally {
// We only rearm the timeout if moveTimeForward has completed, to
// make sure we don't have two copies of controller.run() executing
// at the same time.
setTimeout(moveTimeForward, intervalMillis);
}
}
setInterval(moveTimeForward, intervalMillis);
});

// now let the bootstrap functions run
await processKernel();
Expand All @@ -167,6 +185,10 @@ async function buildSwingset(
deliverInboundToMbx,
deliverInboundCommand,
deliverOutbound,
startTimer: interval => {
intervalMillis = interval;
setTimeout(moveTimeForward, intervalMillis);
},
};
}

Expand Down Expand Up @@ -199,7 +221,12 @@ export default async function start(basedir, withSES, argv) {
broadcast,
);

const { deliverInboundToMbx, deliverInboundCommand, deliverOutbound } = d;
const {
deliverInboundToMbx,
deliverInboundCommand,
deliverOutbound,
startTimer,
} = d;

await Promise.all(
connections.map(async c => {
Expand Down Expand Up @@ -252,23 +279,10 @@ export default async function start(basedir, withSES, argv) {
}),
);

// Start timer here!
startTimer(1200);

console.log(`swingset running`);
swingSetRunning = true;
deliverOutbound();

// Install the bundles as specified.
const initDir = path.join(basedir, 'init-bundles');
let list = [];
try {
list = await fs.promises.readdir(initDir);
} catch (e) {}
for (const initName of list.sort()) {
console.log('loading init bundle', initName);
const initFile = path.join(initDir, initName);
if (
await bundle(() => '.', ['--evaluate', '--once', '--input', initFile])
) {
return 0;
}
}
}