diff --git a/packages/SwingSet/src/devices/mailbox-src.js b/packages/SwingSet/src/devices/mailbox-src.js index 69c1a3421402..0cebf9760833 100644 --- a/packages/SwingSet/src/devices/mailbox-src.js +++ b/packages/SwingSet/src/devices/mailbox-src.js @@ -5,50 +5,27 @@ import { assert, details as X } from '@agoric/assert'; export function buildRootDeviceNode(tools) { const { SO, getDeviceState, setDeviceState, endowments } = tools; - const highestInboundDelivered = harden(new Map()); - const highestInboundAck = harden(new Map()); let deliverInboundMessages; let deliverInboundAck; - function inboundCallback(hPeer, hMessages, hAck) { - const peer = `${hPeer}`; + function inboundCallback(peer, messages, ack) { if (!deliverInboundMessages) { throw new Error( `mailbox.inboundCallback(${peer}) called before handler was registered`, ); } - const ack = Nat(hAck); - let didSomething = false; - - let latestMsg = 0; - if (highestInboundDelivered.has(peer)) { - latestMsg = highestInboundDelivered.get(peer); - } - const newMessages = []; - hMessages.forEach(m => { - const [hNum, hMsg] = m; - const num = Nat(hNum); - if (num > latestMsg) { - newMessages.push([num, `${hMsg}`]); - latestMsg = num; - highestInboundDelivered.set(peer, latestMsg); - } + assert.typeof(peer, 'string'); + messages.forEach(m => { + Nat(m[0]); + assert.typeof(m[1], 'string'); }); - if (newMessages.length) { - deliverInboundMessages(peer, harden(newMessages)); - didSomething = true; - } - let latestAck = 0; - if (highestInboundAck.has(peer)) { - latestAck = highestInboundAck.get(peer); - } - if (ack > latestAck) { - highestInboundAck.set(peer, ack); - deliverInboundAck(peer, ack); - didSomething = true; + Nat(ack); + if (messages.length) { + deliverInboundMessages(peer, harden(messages)); } - return didSomething; + deliverInboundAck(peer, ack); + return true; // always didSomething } endowments.registerInboundCallback(inboundCallback); diff --git a/packages/SwingSet/test/device-mailbox/test-device-mailbox.js b/packages/SwingSet/test/device-mailbox/test-device-mailbox.js index 2ffaad2544f7..4a4c516035c0 100644 --- a/packages/SwingSet/test/device-mailbox/test-device-mailbox.js +++ b/packages/SwingSet/test/device-mailbox/test-device-mailbox.js @@ -5,7 +5,6 @@ import { test } from '../../tools/prepare-test-env-ava.js'; import path from 'path'; import bundleSource from '@agoric/bundle-source'; import { provideHostStorage } from '../../src/hostStorage.js'; - import { initializeSwingset, makeSwingsetController, @@ -15,6 +14,7 @@ import { buildMailboxStateMap, buildMailbox, } from '../../src/devices/mailbox.js'; +import { capargs } from '../util.js'; test.before(async t => { const kernelBundles = await buildKernelBundles(); @@ -91,106 +91,140 @@ test('mailbox inbound', async t => { mailbox: { ...mb.endowments }, }; - let rc; - const hostStorage = provideHostStorage(); await initializeSwingset(config, ['mailbox2'], hostStorage, t.context.data); const c = await makeSwingsetController(hostStorage, deviceEndowments); await c.run(); - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - ], - 0, - ); - t.truthy(rc); + const m1 = [1, 'msg1']; + const m2 = [2, 'msg2']; + const m3 = [3, 'msg3']; + t.true(mb.deliverInbound('peer1', [m1, m2], 0)); await c.run(); - t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2']); - - // delivering the same messages should not trigger sends, but the ack is new - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - ], - 3, - ); - t.truthy(rc); + const expected = ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-0']; + t.deepEqual(c.dump().log, expected); + + // all messages/acks should be delivered, even duplicates + t.true(mb.deliverInbound('peer1', [m1, m2], 0)); await c.run(); - t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-3']); - - // no new messages/acks makes deliverInbound return 'false' - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - ], - 3, - ); - t.falsy(rc); + expected.push(...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-0']); + t.deepEqual(c.dump().log, expected); + + // new messages too + t.true(mb.deliverInbound('peer1', [m1, m2, m3], 0)); await c.run(); - t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-3']); - - // but new messages should be sent - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - [3, 'msg3'], - ], - 3, + expected.push( + ...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'm-3-msg3', 'da-peer1-0'], ); - t.truthy(rc); + t.deepEqual(c.dump().log, expected); + + // and new ack + t.true(mb.deliverInbound('peer1', [m1, m2, m3], 6)); await c.run(); - t.deepEqual(c.dump().log, [ - 'dm-peer1', - 'm-1-msg1', - 'm-2-msg2', - 'da-peer1-3', - 'dm-peer1', - 'm-3-msg3', - ]); - - // and a higher ack should be sent - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - [3, 'msg3'], - ], - 4, + expected.push( + ...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'm-3-msg3', 'da-peer1-6'], ); - t.truthy(rc); - await c.run(); - t.deepEqual(c.dump().log, [ - 'dm-peer1', - 'm-1-msg1', - 'm-2-msg2', - 'da-peer1-3', - 'dm-peer1', - 'm-3-msg3', - 'da-peer1-4', - ]); - - rc = mb.deliverInbound('peer2', [[4, 'msg4']], 5); - t.truthy(rc); + t.deepEqual(c.dump().log, expected); +}); + +async function initializeMailboxKernel(t) { + const s = buildMailboxStateMap(); + const mb = buildMailbox(s); + const config = { + bootstrap: 'bootstrap', + vats: { + bootstrap: { + bundle: t.context.data.bootstrap, + }, + }, + devices: { + mailbox: { + sourceSpec: require.resolve(mb.srcPath), + }, + }, + }; + const hostStorage = provideHostStorage(); + await initializeSwingset( + config, + ['mailbox-determinism'], + hostStorage, + t.context.data, + ); + return hostStorage; +} + +async function makeMailboxKernel(hostStorage) { + const s = buildMailboxStateMap(); + const mb = buildMailbox(s); + const deviceEndowments = { + mailbox: { ...mb.endowments }, + }; + const c = await makeSwingsetController(hostStorage, deviceEndowments); + c.pinVatRoot('bootstrap'); await c.run(); - t.deepEqual(c.dump().log, [ - 'dm-peer1', - 'm-1-msg1', - 'm-2-msg2', - 'da-peer1-3', - 'dm-peer1', - 'm-3-msg3', - 'da-peer1-4', - 'dm-peer2', - 'm-4-msg4', - 'da-peer2-5', - ]); + return [c, mb]; +} + +test('mailbox determinism', async t => { + // we run two kernels in parallel + const hostStorage1 = await initializeMailboxKernel(t); + const hostStorage2 = await initializeMailboxKernel(t); + const [c1a, mb1a] = await makeMailboxKernel(hostStorage1); + const [c2, mb2] = await makeMailboxKernel(hostStorage2); + + // they get the same inbound message + const msg1 = [[1, 'msg1']]; + t.true(mb1a.deliverInbound('peer1', msg1, 0)); + await c1a.run(); + t.deepEqual(c1a.dump().log, ['comms receive msg1']); + const kp1 = c1a.queueToVatRoot('bootstrap', 'getNumReceived', capargs([])); + await c1a.run(); + t.deepEqual(JSON.parse(c1a.kpResolution(kp1).body), 1); + + t.true(mb2.deliverInbound('peer1', msg1, 0)); + await c2.run(); + t.deepEqual(c2.dump().log, ['comms receive msg1']); + const kp2 = c2.queueToVatRoot('bootstrap', 'getNumReceived', capargs([])); + await c2.run(); + t.deepEqual(JSON.parse(c2.kpResolution(kp2).body), 1); + + // both should have the same number of cranks + t.is( + hostStorage1.kvStore.get('crankNumber'), + hostStorage2.kvStore.get('crankNumber'), + ); + + // then one is restarted, but the other keeps running + const [c1b, mb1b] = await makeMailboxKernel(hostStorage1); + + // Now we repeat delivery of that message to both. The mailbox should send + // it to vattp, even though it's duplicate, because the mailbox doesn't + // have durable state, and cannot correctly (deterministically) tell that + // it's a duplicate. + t.true(mb1b.deliverInbound('peer1', msg1, 0)); + await c1b.run(); + // the testlog is part of the ephemeral kernel state, so it will only have + // a record of messages in the second run, however the vat is replayed + // during the second-run startup, so we expect to see one copy of the + // original message, delivered during the second run + t.deepEqual(c1b.dump().log, ['comms receive msg1']); + // but vattp dedups, so only one message should be delivered to comms + const kp3 = c1b.queueToVatRoot('bootstrap', 'getNumReceived', capargs([])); + await c1b.run(); + t.deepEqual(JSON.parse(c1b.kpResolution(kp3).body), 1); + + t.true(mb2.deliverInbound('peer1', msg1, 0)); + await c2.run(); + // the second kernel still has that ephemeral testlog, however the vat is + // still running, so we only see the original message from the first run + t.deepEqual(c2.dump().log, ['comms receive msg1']); + const kp4 = c2.queueToVatRoot('bootstrap', 'getNumReceived', capargs([])); + await c2.run(); + t.deepEqual(JSON.parse(c2.kpResolution(kp4).body), 1); + + // Both should *still* have the same number of cranks. This is what bug + // #3471 exposed. + t.is( + hostStorage1.kvStore.get('crankNumber'), + hostStorage2.kvStore.get('crankNumber'), + ); });