diff --git a/packages/vats/src/ibc.js b/packages/vats/src/ibc.js index 11c3d2a8cd2..ac06a25d67f 100644 --- a/packages/vats/src/ibc.js +++ b/packages/vats/src/ibc.js @@ -1,7 +1,6 @@ // @ts-check import { - rethrowUnlessMissing, dataToBase64, base64ToBytes, } from '@agoric/swingset-vat/src/vats/network/index.js'; @@ -15,8 +14,6 @@ import '@agoric/swingset-vat/src/vats/network/types.js'; import { makeWithQueue } from './queue.js'; -const DEFAULT_PACKET_TIMEOUT = 1000; - // CAVEAT: IBC acks cannot be empty, as the Cosmos IAVL tree cannot represent // empty acknowledgements as distinct from unacknowledged packets. const DEFAULT_ACKNOWLEDGEMENT = '\x00'; @@ -74,46 +71,45 @@ export function makeIBCProtocolHandler(E, rawCallIBCDevice) { * @property {Counterparty} counterparty * @property {string} version * - * @typedef {PromiseRecord<[Endpoint, ConnectionHandler]>} OnConnectP - * @typedef {ConnectingInfo & { onConnectP: OnConnectP }} Outbound + * @typedef {PromiseRecord} OnConnectP + * @typedef {Omit & { + * localAddr: Endpoint, onConnectP: OnConnectP + * counterparty: { port_id: string }, + * }} Outbound */ /** - * @type {Store>} + * @type {LegacyMap>} */ - const srcPortToOutbounds = makeStore('SRC-PORT'); + // Legacy because it holds a mutable Javascript Array + const srcPortToOutbounds = makeLegacyMap('SRC-PORT'); /** * @type {Store} */ const channelKeyToInfo = makeStore('CHANNEL:PORT'); - /** - * @type {Store} - */ - const channelKeyToOnConnectP = makeStore('CHANNEL:PORT'); - /** * @type {Store>} */ const channelKeyToAttemptP = makeStore('CHANNEL:PORT'); /** - * @type {Store>>} + * @type {LegacyMap>>} */ - const channelKeyToSeqAck = makeStore('CHANNEL:PORT'); + // Legacy because it holds a LegacyMap + const channelKeyToSeqAck = makeLegacyMap('CHANNEL:PORT'); /** * Send a packet out via the IBC device. * * @param {IBCPacket} packet - * @param {Store>} seqToAck + * @param {LegacyMap>} seqToAck */ async function ibcSendPacket(packet, seqToAck) { // Make a kernel call to do the send. const fullPacket = await callIBCDevice('sendPacket', { packet, - relativeTimeout: DEFAULT_PACKET_TIMEOUT, }); // Extract the actual sequence number from the return. @@ -145,7 +141,8 @@ export function makeIBCProtocolHandler(E, rawCallIBCDevice) { order, ) { const channelKey = `${channelID}:${portID}`; - const seqToAck = makeStore('SEQUENCE'); + // Legacy because it holds a PromiseRecord + const seqToAck = makeLegacyMap('SEQUENCE'); channelKeyToSeqAck.init(channelKey, seqToAck); /** @@ -190,7 +187,7 @@ export function makeIBCProtocolHandler(E, rawCallIBCDevice) { source_port: portID, source_channel: channelID, }; - await callIBCDevice('channelCloseInit', { packet }); + await callIBCDevice('startChannelCloseInit', { packet }); const rejectReason = Error('Connection closed'); for (const ackDeferred of seqToAck.values()) { ackDeferred.reject(rejectReason); @@ -227,47 +224,37 @@ export function makeIBCProtocolHandler(E, rawCallIBCDevice) { * @property {PromiseRecord} deferredHandler */ - /** - * @type {Store} - */ - const portToCircuits = makeStore('Port'); - /** * @type {LegacyMap>>} */ // Legacy because it holds a raw JavaScript Set const portToPendingConns = makeLegacyMap('Port'); - /** @type {Store} */ - const remoteAddrToLocalSuffix = makeStore('endpoint'); - /** * @type {ProtocolHandler} */ - const protocol = Far('ProtocolHandler', { + const protocol = Far('IBCProtocolHandler', { async onCreate(impl, _protocolHandler) { console.debug('IBC onCreate'); protocolImpl = impl; }, + async onInstantiate() { + // The IBC channel is not known until after handshake. + return ''; + }, async generatePortID(_localAddr, _protocolHandler) { lastPortID += 1; return `port-${lastPortID}`; }, async onBind(port, localAddr, _protocolHandler) { const portID = localAddrToPortID(localAddr); - portToCircuits.init(port, []); portToPendingConns.init(port, new Set()); const packet = { source_port: portID, }; return callIBCDevice('bindPort', { packet }); }, - async onInstantiate(_port, _localAddr, remoteAddr, _protocolHandler) { - // we can take advantage of the fact that remoteAddrs are unique (they - // have their own channelID). - return remoteAddrToLocalSuffix.get(remoteAddr); - }, - async onConnect(port, localAddr, remoteAddr, chandler, _protocolHandler) { + async onConnect(port, localAddr, remoteAddr, _chandler, _protocolHandler) { console.debug('IBC onConnect', localAddr, remoteAddr); const portID = localAddrToPortID(localAddr); const pendingConns = portToPendingConns.get(port); @@ -302,9 +289,10 @@ export function makeIBCProtocolHandler(E, rawCallIBCDevice) { const onConnectP = makePromiseKit(); pendingConns.add(onConnectP); + /** @type {Outbound[]} */ let outbounds; if (srcPortToOutbounds.has(portID)) { - outbounds = srcPortToOutbounds.get(portID); + outbounds = [...srcPortToOutbounds.get(portID)]; } else { outbounds = []; srcPortToOutbounds.init(portID, outbounds); @@ -316,9 +304,10 @@ export function makeIBCProtocolHandler(E, rawCallIBCDevice) { order, version, onConnectP, + localAddr, }); - // Get any passive relayers to flow. + // Initialise the channel, which automatic relayers should pick up. const packet = { source_port: portID, destination_port: rPortID, @@ -331,38 +320,6 @@ export function makeIBCProtocolHandler(E, rawCallIBCDevice) { version, }); - if (!chandler) { - // Nothing to explain to the user, so return now. - return onConnectP.promise; - } - - // We explain to the user how to configure a naive relayer. - const q = JSON.stringify; - E( - /** @type {ConnectionHandler&{infoMessage?: (...args: any[]) => void}} */ - (chandler), - ) - .infoMessage( - `\ -# Set up the relayer for this path: -ag-nchainz start-relayer <<'EOF' -{ - "src": { - "connection-id": ${q(hops[0])}, - "port-id": ${q(portID)}, - "order": ${q(order)}, - "version": ${q(version)} - }, - "dst": { - "port-id": ${q(rPortID)}, - "order": ${q(order)} - } -} -EOF -# then your connection will try to proceed. -`, - ) - .catch(rethrowUnlessMissing); return onConnectP.promise; }, async onListen(_port, localAddr, _listenHandler) { @@ -375,7 +332,6 @@ EOF console.debug('IBC onRevoke', localAddr); const pendingConns = portToPendingConns.get(port); portToPendingConns.delete(port); - portToCircuits.delete(port); const revoked = Error(`Port ${localAddr} revoked`); for (const onConnectP of pendingConns.values()) { onConnectP.reject(revoked); @@ -388,50 +344,6 @@ EOF async fromBridge(srcID, obj) { console.info('IBC fromBridge', srcID, obj); switch (obj.event) { - case 'channelOpenInit': { - // This event is sent by a naive relayer that wants to initiate - // a connection. It is only honoured if we already have autonomously - // attempted a connection. - const { - portID, - counterparty: { port_id: rPortID }, - connectionHops: rHops, - } = obj; - - const outbounds = srcPortToOutbounds.has(portID) - ? srcPortToOutbounds.get(portID) - : []; - const oidx = outbounds.findIndex( - ({ counterparty: { port_id: iPortID }, connectionHops: iHops }) => { - if (iPortID !== rPortID) { - return false; - } - for (let i = 0; i < rHops.length; i += 1) { - if (iHops[i] !== rHops[i]) { - return false; - } - } - return true; - }, - ); - - assert(oidx >= 0, X`${portID}: did not expect channelOpenInit`); - - // Continue the handshake by extracting the outbound information. - const { onConnectP, ...chanInfo } = outbounds[oidx]; - outbounds.splice(oidx, 1); - if (outbounds.length === 0) { - srcPortToOutbounds.delete(portID); - } - - // We have more specific information for the outbound connection. - const channelKey = `${obj.channelID}:${portID}`; - channelKeyToInfo.init(channelKey, { ...chanInfo, ...obj }); - channelKeyToOnConnectP.init(channelKey, onConnectP); - break; - } - - case 'attemptChannelOpenTry': case 'channelOpenTry': { // They're (more or less politely) asking if we are listening, so make an attempt. const { @@ -444,33 +356,22 @@ EOF counterpartyVersion: rVersion, } = obj; - const channelKey = `${channelID}:${portID}`; - if (channelKeyToAttemptP.has(channelKey)) { - // We have a pending attempt, so continue the handshake. - break; - } - - const versionSuffix = version ? `/${version}` : ''; - const localAddr = `/ibc-port/${portID}/${order.toLowerCase()}${versionSuffix}`; + const localAddr = `/ibc-port/${portID}/${order.toLowerCase()}/${version}`; const ibcHops = hops.map(hop => `/ibc-hop/${hop}`).join('/'); const remoteAddr = `${ibcHops}/ibc-port/${rPortID}/${order.toLowerCase()}/${rVersion}/ibc-channel/${rChannelID}`; // See if we allow an inbound attempt for this address pair (without // rejecting). - remoteAddrToLocalSuffix.init(remoteAddr, `ibc-channel/${channelID}`); const attemptP = E(protocolImpl).inbound(localAddr, remoteAddr); // Tell what version string we negotiated. - const attemptedLocal = await E(attemptP) - .getLocalAddress() - .finally(() => { - // No matter what, delete the temporary mapping. - remoteAddrToLocalSuffix.delete(remoteAddr); - }); + const attemptedLocal = await E(attemptP).getLocalAddress(); const match = attemptedLocal.match( // Match: ... /ORDER/VERSION ... new RegExp('^(/[^/]+/[^/]+)*/(ordered|unordered)/([^/]+)(/|$)'), ); + + const channelKey = `${channelID}:${portID}`; if (!match) { throw Error( `${channelKey}: cannot determine version from attempted local address ${attemptedLocal}`, @@ -482,23 +383,7 @@ EOF channelKeyToInfo.init(channelKey, obj); try { - if (obj.type === 'attemptChannelOpenTry') { - // We can try to open with the version we wanted. - const packet = { - source_channel: channelID, - source_port: portID, - destination_channel: rChannelID, - destination_port: rPortID, - }; - - await callIBCDevice('continueChannelOpenTry', { - packet, - order, - hops, - version: negotiatedVersion, - counterpartyVersion: rVersion, - }); - } else if (negotiatedVersion !== version) { + if (negotiatedVersion !== version) { // Too late; the relayer gave us a version we didn't like. throw Error( `${channelKey}: negotiated version was ${negotiatedVersion}; rejecting ${version}`, @@ -521,30 +406,51 @@ EOF channelID, counterparty: { port_id: rPortID, channel_id: rChannelID }, counterpartyVersion: rVersion, + connectionHops: rHops, } = obj; - const channelKey = `${channelID}:${portID}`; - assert( - channelKeyToOnConnectP.has(channelKey), - X`${channelKey}: did not expect channelOpenAck`, + const outbounds = srcPortToOutbounds.has(portID) + ? srcPortToOutbounds.get(portID) + : []; + const oidx = outbounds.findIndex( + ({ counterparty: { port_id: iPortID }, connectionHops: iHops }) => { + if (iPortID !== rPortID) { + return false; + } + if (iHops.length !== rHops.length) { + return false; + } + for (let i = 0; i < iHops.length; i += 1) { + if (iHops[i] !== rHops[i]) { + return false; + } + } + return true; + }, ); - const onConnectP = channelKeyToOnConnectP.get(channelKey); - channelKeyToOnConnectP.delete(channelKey); - const { order, connectionHops: rHops } = - channelKeyToInfo.get(channelKey); - channelKeyToInfo.delete(channelKey); + assert(oidx >= 0, X`${portID}: did not expect channelOpenAck`); + const { onConnectP, localAddr, ...chanInfo } = outbounds[oidx]; + outbounds.splice(oidx, 1); + if (outbounds.length === 0) { + srcPortToOutbounds.delete(portID); + } // Finish the outbound connection. const ibcHops = rHops.map(hop => `/ibc-hop/${hop}`).join('/'); - const remoteAddr = `${ibcHops}/ibc-port/${rPortID}/${order.toLowerCase()}/${rVersion}`; + const remoteAddress = `${ibcHops}/ibc-port/${rPortID}/${chanInfo.order.toLowerCase()}/${rVersion}/ibc-channel/${rChannelID}`; + const localAddress = `${localAddr}/ibc-channel/${channelID}`; const rchandler = makeIBCConnectionHandler( channelID, portID, rChannelID, rPortID, - order, + chanInfo.order, ); - onConnectP.resolve([remoteAddr, rchandler]); + onConnectP.resolve({ + localAddress, + remoteAddress, + handler: rchandler, + }); break; } @@ -573,7 +479,11 @@ EOF rPortID, order, ); - E(attemptP).accept(rchandler); + const localAddr = await E(attemptP).getLocalAddress(); + E(attemptP).accept({ + localAddress: `${localAddr}/ibc-channel/${channelID}`, + handler: rchandler, + }); break; } diff --git a/packages/vats/test/test-network.js b/packages/vats/test/test-network.js index 124e0c8b525..1735e6cca36 100644 --- a/packages/vats/test/test-network.js +++ b/packages/vats/test/test-network.js @@ -3,20 +3,26 @@ import { test } from '@agoric/swingset-vat/tools/prepare-test-env-ava.js'; import { E, Far } from '@endo/far'; +import { makeSubscriptionKit } from '@agoric/notifier'; import { buildRootObject as ibcBuildRootObject } from '../src/vat-ibc.js'; import { buildRootObject as networkBuildRootObject } from '../src/vat-network.js'; test('network - ibc', async t => { - t.plan(2); - const networkVat = E(networkBuildRootObject)(); const ibcVat = E(ibcBuildRootObject)(); + const { subscription, publication } = makeSubscriptionKit(); + + const events = subscription[Symbol.asyncIterator](); const callbacks = Far('ibcCallbacks', { downcall: (method, params) => { - t.is(method, 'bindPort'); - t.deepEqual(params, { packet: { source_port: 'port-1' } }); + publication.updateState([method, params]); + if (method === 'sendPacket') { + const { packet } = params; + return { ...packet, sequence: '39' }; + } + return undefined; }, }); @@ -28,5 +34,183 @@ test('network - ibc', async t => { // Actually test the ibc port binding. // TODO: Do more tests on the returned Port object. - await E(networkVat).bind('/ibc-port/'); + const p = E(networkVat).bind('/ibc-port/'); + await p; + const ev1 = await events.next(); + t.assert(!ev1.done); + t.deepEqual(ev1.value, ['bindPort', { packet: { source_port: 'port-1' } }]); + + const testEcho = async () => { + await E(p).addListener( + Far('ibcListener', { + async onAccept(_port, _localAddr, _remoteAddr, _listenHandler) { + /** @type {ConnectionHandler} */ + const handler = Far('plusOne', { + async onReceive(_c, packetBytes) { + return `${packetBytes}1`; + }, + async onOpen(_c, localAddr, remoteAddr, _connectionHandler) { + publication.updateState([ + 'plusOne-open', + { localAddr, remoteAddr }, + ]); + }, + }); + return handler; + }, + async onListen(port, _listenHandler) { + console.debug(`listening on echo port: ${port}`); + }, + }), + ); + + const c = E(p).connect('/ibc-port/port-1/unordered/foo'); + + const ack = await E(c).send('hello198'); + t.is(ack, 'hello1981', 'expected echo'); + await c; + + await E(c).close(); + }; + + await testEcho(); + + const testIBCOutbound = async () => { + const c = E(p).connect( + '/ibc-hop/connection-11/ibc-port/port-98/unordered/bar', + ); + + const evopen = await events.next(); + t.assert(!evopen.done); + t.deepEqual(evopen.value, [ + 'plusOne-open', + { + localAddr: '/ibc-port/port-1/unordered/foo', + remoteAddr: '/ibc-port/port-1', + }, + ]); + + const ev2 = await events.next(); + t.assert(!ev2.done); + t.deepEqual(ev2.value, [ + 'startChannelOpenInit', + { + packet: { source_port: 'port-1', destination_port: 'port-98' }, + order: 'UNORDERED', + hops: ['connection-11'], + version: 'bar', + }, + ]); + + await E(ibcHandler).fromBridge('dontcare', { + event: 'channelOpenAck', + portID: 'port-1', + channelID: 'channel-1', + counterparty: { port_id: 'port-98', channel_id: 'channel-22' }, + counterpartyVersion: 'bar', + connectionHops: ['connection-11'], + }); + + await c; + const ack = E(c).send('some-transfer-message'); + + const ev3 = await events.next(); + t.assert(!ev3.done); + t.deepEqual(ev3.value, [ + 'sendPacket', + { + packet: { + data: 'c29tZS10cmFuc2Zlci1tZXNzYWdl', + destination_channel: 'channel-22', + destination_port: 'port-98', + source_channel: 'channel-1', + source_port: 'port-1', + }, + }, + ]); + + await E(ibcHandler).fromBridge('stilldontcare', { + event: 'acknowledgementPacket', + packet: { + data: 'c29tZS10cmFuc2Zlci1tZXNzYWdl', + destination_channel: 'channel-22', + destination_port: 'port-98', + source_channel: 'channel-1', + source_port: 'port-1', + sequence: '39', + }, + acknowledgement: 'YS10cmFuc2Zlci1yZXBseQ==', + }); + + t.is(await ack, 'a-transfer-reply'); + + await E(c).close(); + }; + + await testIBCOutbound(); + + const testIBCInbound = async () => { + await E(ibcHandler).fromBridge('reallydontcare', { + event: 'channelOpenTry', + channelID: 'channel-2', + portID: 'port-1', + counterparty: { port_id: 'port-99', channel_id: 'channel-23' }, + connectionHops: ['connection-12'], + order: 'ORDERED', + version: 'bazi', + counterpartyVersion: 'bazo', + }); + + await E(ibcHandler).fromBridge('stillreallydontcare', { + event: 'channelOpenConfirm', + portID: 'port-1', + channelID: 'channel-2', + }); + + const evopen = await events.next(); + t.assert(!evopen.done); + t.deepEqual(evopen.value, [ + 'plusOne-open', + { + localAddr: '/ibc-port/port-1/ordered/bazi/ibc-channel/channel-2', + remoteAddr: + '/ibc-hop/connection-12/ibc-port/port-99/ordered/bazo/ibc-channel/channel-23', + }, + ]); + + await E(ibcHandler).fromBridge('notevenyet', { + event: 'receivePacket', + packet: { + data: 'aW5ib3VuZC1tc2c=', + destination_port: 'port-1', + destination_channel: 'channel-2', + source_channel: 'channel-23', + source_port: 'port-99', + }, + }); + + const ev4 = await events.next(); + t.assert(!ev4.done); + t.deepEqual(ev4.value, [ + 'receiveExecuted', + { + ack: 'aW5ib3VuZC1tc2cx', + packet: { + data: 'aW5ib3VuZC1tc2c=', + destination_channel: 'channel-2', + destination_port: 'port-1', + source_channel: 'channel-23', + source_port: 'port-99', + }, + }, + ]); + }; + + await testIBCInbound(); + + // Verify that we consumed all the published events. + publication.finish([]); + const evend = await events.next(); + t.assert(evend.done); + t.deepEqual(evend.value, []); });