Skip to content

Commit

Permalink
feat(State Channels): Ping every 10 seconds to persist connection (#324)
Browse files Browse the repository at this point in the history
* Improve channel rpc usage

* Fix lint error

* Remove unreachable code

* Make sure that sign function is correctly called

* Improve error handling for update method

* Add missing channel tx serializations

* Add channel close solo and settle tx serialization

* Add channel slash tx serialization

* Add proof of inclusion tx serialization

* Add basic merkle patricia tree implementation

* Add merkle patricia tree serialization and verify function

* fix(schema.js): Fix linter error

* Improve channel tests and error handling (#276)

* Make sure that sign function is correctly called

* Improve error handling for update method

* Improve state channel params handling. Fixes #299 (#300)

* Compiler improvements (#303)

* refactor(Chain and Contract): Fix Chain.getAccount. Omprove Compiler

Add ability to get account/balance on specific block hash/height. Add test. Add changeCompilerUrl to
Compiler stamp

#302

* fix(Crypto): Fix name hash function arguments parsing

* refactor(Compiler): Remove async for changeCompilerUrl function

* Channel contracts (#279)

* Add support for contracts in state channels

* Remove console.log

* Remove console.log

* Improve channel rpc usage (#275)

* Improve channel rpc usage

* Fix lint error

* Remove unreachable code

* Improve channel tests and error handling (#276)

* Make sure that sign function is correctly called

* Improve error handling for update method

* Improve state channel params handling. Fixes #299 (#300)

* Fix channel tests

* feat(State Channels): Ping every 10 seconds to persist connection

Connection can be manually closed with new "disconnect()" method

* Register pong timeout after ping has been sent

* Add contract call tx serialization

* Add channel tx serialization

* Add missing tree tx serializations

* Add channel snapshot solo tx serialization

* feat(State Channels): Ping every 10 seconds to persist connection

Connection can be manually closed with new "disconnect()" method

* Register pong timeout after ping has been sent

* Handle unexpected messages

* Fix lint error
  • Loading branch information
mpowaga authored and nduchak committed Apr 24, 2019
1 parent 2ddeea8 commit 6d0e156
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 17 deletions.
31 changes: 29 additions & 2 deletions es/channel/handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,21 @@ import {
changeState,
send,
emit,
channelId
channelId,
disconnect
} from './internal'
import { unpackTx } from '../tx/builder'

function handleUnexpectedMessage (channel, message, state) {
if (state.reject) {
state.reject(Object.assign(
Error(`Unexpected message received:\n\n${JSON.stringify(message)}`),
{ wsMessage: message }
))
}
return { handler: channelOpen }
}

export function awaitingConnection (channel, message, state) {
if (message.method === 'channels.info') {
if (['channel_accept', 'funding_created'].includes(message.params.data.event)) {
Expand Down Expand Up @@ -159,6 +170,7 @@ export async function awaitingOffChainTx (channel, message, state) {
}
return { handler: channelOpen }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingOffChainUpdate (channel, message, state) {
Expand All @@ -175,6 +187,7 @@ export function awaitingOffChainUpdate (channel, message, state) {
state.reject(new Error(message.error.message))
return { handler: channelOpen }
}
return handleUnexpectedMessage(channel, message, state)
}

export async function awaitingTxSignRequest (channel, message, state) {
Expand All @@ -198,6 +211,7 @@ export async function awaitingTxSignRequest (channel, message, state) {
})
return { handler: awaitingUpdateConflict }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingUpdateConflict (channel, message, state) {
Expand All @@ -207,6 +221,7 @@ export function awaitingUpdateConflict (channel, message, state) {
if (message.method === 'channels.conflict') {
return { handler: channelOpen }
}
return handleUnexpectedMessage(channel, message, state)
}

export async function awaitingShutdownTx (channel, message, state) {
Expand All @@ -215,24 +230,28 @@ export async function awaitingShutdownTx (channel, message, state) {
send(channel, { jsonrpc: '2.0', method: 'channels.shutdown_sign', params: { tx: signedTx } })
return { handler: awaitingShutdownOnChainTx, state }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingShutdownOnChainTx (channel, message, state) {
if (message.method === 'channels.on_chain_tx') {
state.resolveShutdownPromise(message.params.data.tx)
state.resolve(message.params.data.tx)
return { handler: channelClosed }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingLeave (channel, message, state) {
if (message.method === 'channels.leave') {
state.resolve({ channelId: message.params.channel_id, signedTx: message.params.data.state })
disconnect(channel)
return { handler: channelClosed }
}
if (message.method === 'channels.error') {
state.reject(new Error(message.data.message))
return { handler: channelOpen }
}
return handleUnexpectedMessage(channel, message, state)
}

export async function awaitingWithdrawTx (channel, message, state) {
Expand All @@ -241,6 +260,7 @@ export async function awaitingWithdrawTx (channel, message, state) {
send(channel, { jsonrpc: '2.0', method: 'channels.withdraw_tx', params: { tx: signedTx } })
return { handler: awaitingWithdrawCompletion, state }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingWithdrawCompletion (channel, message, state) {
Expand Down Expand Up @@ -271,6 +291,7 @@ export function awaitingWithdrawCompletion (channel, message, state) {
state.resolve({ accepted: false })
return { handler: channelOpen }
}
return handleUnexpectedMessage(channel, message, state)
}

export async function awaitingDepositTx (channel, message, state) {
Expand All @@ -279,6 +300,7 @@ export async function awaitingDepositTx (channel, message, state) {
send(channel, { jsonrpc: '2.0', method: 'channels.deposit_tx', params: { tx: signedTx } })
return { handler: awaitingDepositCompletion, state }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingDepositCompletion (channel, message, state) {
Expand Down Expand Up @@ -309,6 +331,7 @@ export function awaitingDepositCompletion (channel, message, state) {
state.resolve({ accepted: false })
return { handler: channelOpen }
}
return handleUnexpectedMessage(channel, message, state)
}

export async function awaitingNewContractTx (channel, message, state) {
Expand All @@ -317,6 +340,7 @@ export async function awaitingNewContractTx (channel, message, state) {
send(channel, { jsonrpc: '2.0', method: 'channels.update', params: { tx: signedTx } })
return { handler: awaitingNewContractCompletion, state }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingNewContractCompletion (channel, message, state) {
Expand All @@ -339,6 +363,7 @@ export function awaitingNewContractCompletion (channel, message, state) {
state.resolve({ accepted: false })
return { handler: channelOpen }
}
return handleUnexpectedMessage(channel, message, state)
}

export async function awaitingCallContractUpdateTx (channel, message, state) {
Expand All @@ -347,6 +372,7 @@ export async function awaitingCallContractUpdateTx (channel, message, state) {
send(channel, { jsonrpc: '2.0', method: 'channels.update', params: { tx: signedTx } })
return { handler: awaitingCallContractCompletion, state }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingCallContractCompletion (channel, message, state) {
Expand All @@ -359,6 +385,7 @@ export function awaitingCallContractCompletion (channel, message, state) {
state.resolve({ accepted: false })
return { handler: channelOpen }
}
return handleUnexpectedMessage(channel, message, state)
}

export function awaitingCallsPruned (channels, message, state) {
Expand Down
35 changes: 24 additions & 11 deletions es/channel/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import {
enqueueAction,
send,
channelId,
call
call,
disconnect as channelDisconnect
} from './internal'
import * as R from 'ramda'

Expand Down Expand Up @@ -62,6 +63,13 @@ function on (event, callback) {
eventEmitters.get(this).on(event, callback)
}

/**
* Close the connection
*/
function disconnect () {
return channelDisconnect(this)
}

/**
* Get current status
*
Expand Down Expand Up @@ -205,15 +213,15 @@ async function balances (accounts) {
* })
*/
function leave () {
return new Promise((resolve) => {
return new Promise((resolve, reject) => {
enqueueAction(
this,
(channel, state) => state.handler === handlers.channelOpen,
(channel, state) => {
send(channel, { jsonrpc: '2.0', method: 'channels.leave', params: {} })
return {
handler: handlers.awaitingLeave,
state: { resolve }
state: { resolve, reject }
}
})
})
Expand All @@ -232,7 +240,7 @@ function leave () {
* ).then(tx => console.log('on_chain_tx', tx))
*/
function shutdown (sign) {
return new Promise((resolve) => {
return new Promise((resolve, reject) => {
enqueueAction(
this,
(channel, state) => state.handler === handlers.channelOpen,
Expand All @@ -242,7 +250,8 @@ function shutdown (sign) {
handler: handlers.awaitingShutdownTx,
state: {
sign,
resolveShutdownPromise: resolve
resolve,
reject
}
}
}
Expand Down Expand Up @@ -297,7 +306,7 @@ function shutdown (sign) {
* })
*/
function withdraw (amount, sign, { onOnChainTx, onOwnWithdrawLocked, onWithdrawLocked } = {}) {
return new Promise((resolve) => {
return new Promise((resolve, reject) => {
enqueueAction(
this,
(channel, state) => state.handler === handlers.channelOpen,
Expand All @@ -308,6 +317,7 @@ function withdraw (amount, sign, { onOnChainTx, onOwnWithdrawLocked, onWithdrawL
state: {
sign,
resolve,
reject,
onOnChainTx,
onOwnWithdrawLocked,
onWithdrawLocked
Expand Down Expand Up @@ -366,7 +376,7 @@ function withdraw (amount, sign, { onOnChainTx, onOwnWithdrawLocked, onWithdrawL
* })
*/
function deposit (amount, sign, { onOnChainTx, onOwnDepositLocked, onDepositLocked } = {}) {
return new Promise((resolve) => {
return new Promise((resolve, reject) => {
enqueueAction(
this,
(channel, state) => state.handler === handlers.channelOpen,
Expand All @@ -377,6 +387,7 @@ function deposit (amount, sign, { onOnChainTx, onOwnDepositLocked, onDepositLock
state: {
sign,
resolve,
reject,
onOnChainTx,
onOwnDepositLocked,
onDepositLocked
Expand Down Expand Up @@ -420,7 +431,7 @@ function deposit (amount, sign, { onOnChainTx, onOwnDepositLocked, onDepositLock
* })
*/
function createContract ({ code, callData, deposit, vmVersion, abiVersion }, sign) {
return new Promise((resolve) => {
return new Promise((resolve, reject) => {
enqueueAction(
this,
(channel, state) => state.handler === handlers.channelOpen,
Expand All @@ -440,7 +451,8 @@ function createContract ({ code, callData, deposit, vmVersion, abiVersion }, sig
handler: handlers.awaitingNewContractTx,
state: {
sign,
resolve
resolve,
reject
}
}
}
Expand Down Expand Up @@ -485,7 +497,7 @@ function createContract ({ code, callData, deposit, vmVersion, abiVersion }, sig
* })
*/
function callContract ({ amount, callData, contract, abiVersion }, sign) {
return new Promise((resolve) => {
return new Promise((resolve, reject) => {
enqueueAction(
this,
(channel, state) => state.handler === handlers.channelOpen,
Expand All @@ -502,7 +514,7 @@ function callContract ({ amount, callData, contract, abiVersion }, sign) {
})
return {
handler: handlers.awaitingCallContractUpdateTx,
state: { resolve, sign }
state: { resolve, reject, sign }
}
}
)
Expand Down Expand Up @@ -717,6 +729,7 @@ const Channel = AsyncInit.compose({
callContractStatic,
getContractCall,
getContractState,
disconnect,
cleanContractCalls
}
})
Expand Down
37 changes: 34 additions & 3 deletions es/channel/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import * as R from 'ramda'
import { pascalToSnake } from '../utils/string'
import { awaitingConnection } from './handlers'

const PING_TIMEOUT_MS = 10000
const PONG_TIMEOUT_MS = 5000

const options = new WeakMap()
const status = new WeakMap()
const state = new WeakMap()
Expand All @@ -34,6 +37,8 @@ const actionQueueLocked = new WeakMap()
const sequence = new WeakMap()
const channelId = new WeakMap()
const rpcCallbacks = new WeakMap()
const pingTimeoutId = new WeakMap()
const pongTimeoutId = new WeakMap()

function channelURL (url, params, endpoint = 'channel') {
const paramString = R.join('&', R.values(R.mapObjIndexed((value, key) =>
Expand Down Expand Up @@ -154,6 +159,24 @@ function call (channel, method, params) {
})
}

function ping (channel) {
const ws = websockets.get(channel)
if (ws.readyState === ws.OPEN) {
ws._connection.ping()
clearTimeout(pongTimeoutId.get(channel))
pongTimeoutId.set(channel, setTimeout(() => ws._connection.drop(), PONG_TIMEOUT_MS))
}
}

function disconnect (channel) {
const ws = websockets.get(channel)
if (ws.readyState === ws.OPEN) {
ws._connection.close()
clearTimeout(pongTimeoutId.get(channel))
clearTimeout(pingTimeoutId.get(channel))
}
}

function WebSocket (url, callbacks) {
function fireOnce (target, key, always) {
target[key] = (...args) => {
Expand Down Expand Up @@ -185,11 +208,18 @@ async function initialize (channel, channelOptions) {
eventEmitters.set(channel, new EventEmitter())
sequence.set(channel, 0)
rpcCallbacks.set(channel, new Map())
websockets.set(channel, await WebSocket(wsUrl, {
const ws = await WebSocket(wsUrl, {
onopen: () => changeStatus(channel, 'connected'),
onclose: () => changeStatus(channel, 'disconnected'),
onmessage: ({ data }) => onMessage(channel, data)
}))
})
ws._connection.on('pong', () => {
clearTimeout(pongTimeoutId.get(channel))
clearTimeout(pingTimeoutId.get(channel))
pingTimeoutId.set(channel, setTimeout(() => ping(channel), PING_TIMEOUT_MS))
})
pingTimeoutId.set(channel, setTimeout(() => ping(channel), PING_TIMEOUT_MS))
websockets.set(channel, ws)
}

export {
Expand All @@ -204,5 +234,6 @@ export {
send,
enqueueAction,
channelId,
call
call,
disconnect
}
8 changes: 7 additions & 1 deletion test/integration/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ describe('Channel', function () {
await initiator.spend('6000000000000000', await responder.address())
})

after(() => {
initiatorCh.disconnect()
responderCh.disconnect()
})

beforeEach(() => {
responderShouldRejectUpdate = false
})
Expand Down Expand Up @@ -403,7 +408,8 @@ describe('Channel', function () {
await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)])
sinon.assert.notCalled(initiatorSign)
sinon.assert.notCalled(responderSign)
await initiatorCh.leave()
initiatorCh.disconnect()
responderCh.disconnect()
})

it('can solo close a channel', async () => {
Expand Down

0 comments on commit 6d0e156

Please sign in to comment.