Skip to content

Commit

Permalink
@syn-ui/tests: + waitfor_filtered_signals_change
Browse files Browse the repository at this point in the history
  • Loading branch information
btakita committed May 1, 2021
1 parent 3916710 commit add6188
Showing 1 changed file with 98 additions and 99 deletions.
197 changes: 98 additions & 99 deletions tests/unit-test/syn.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import path from 'path'
import { Config, InstallAgentsHapps } from '@holochain/tryorama'
import { delay } from '@holochain/tryorama/lib/util'
import { _neq } from '@ctx-core/function'
import { noop } from '@ctx-core/function'
import { assign } from '@ctx-core/object'
import { I } from '@ctx-core/combinators'
import { derived$, subscribe_wait_timeout, writable$ } from '@ctx-core/store'
import { bufferToBase64, EntryHash } from '@syn-ui/utils'
import { Readable$, subscribe_wait_timeout, writable$ } from '@ctx-core/store'
import { bufferToBase64, console_b, EntryHash } from '@syn-ui/utils'
import { content_b, apply_deltas_b, session_info_b, join_session, leave_session } from '@syn-ui/model'
import {
Commit, Content, Delta, my_tag_b, rpc_commit_b, rpc_get_content_b, rpc_get_folks_b, rpc_get_session_b,
rpc_get_sessions_b, rpc_hash_content_b, rpc_send_change_b, rpc_send_change_request_b, rpc_send_folk_lore_b,
rpc_send_heartbeat_b, rpc_send_sync_request_b, rpc_send_sync_response_b, Signal, StateForSync
} from '@syn-ui/zome-client'
import { AgentPubKey } from '@holochain/conductor-api'

const config = Config.gen()

Expand Down Expand Up @@ -58,6 +58,11 @@ module.exports = (orchestrator)=>{
const bob_port:number = parseInt(bob_player._conductor.appClient.client.socket.url.split(':')[2])

const me_ctx = {}, alice_ctx = {}, bob_ctx = {}
// To enable logs, remove log: noop
const console_overrides = { log: noop }
assign(console_b(me_ctx), console_overrides)
assign(console_b(alice_ctx), console_overrides)
assign(console_b(bob_ctx), console_overrides)
await join_session({ app_port: me_port, app_id: me_happ.hAppId, ctx: me_ctx })
await join_session({ app_port: alice_port, app_id: alice_happ.hAppId, ctx: alice_ctx })
await join_session({ app_port: bob_port, app_id: bob_happ.hAppId, ctx: bob_ctx })
Expand Down Expand Up @@ -99,10 +104,8 @@ module.exports = (orchestrator)=>{

// set signal handlers so we can confirm they get sent and received appropriately
let me_signals = writable$<Signal[]>([])
const me_signals_length = derived$(me_signals, $me_signals=>$me_signals?.length)
let $me_signals_length = me_signals_length.$
me_player.setSignalHandler((signal)=>{
console.log('Received Signal for me:', signal)
console_b(me_ctx).log('Received Signal for me:', signal)
me_signals.update($me_signals=>{
$me_signals.push(signal.data.payload)
return $me_signals
Expand All @@ -111,10 +114,8 @@ module.exports = (orchestrator)=>{

// alice signal handler
const alice_signals = writable$<Signal[]>([])
const alice_signals_length = derived$(alice_signals, $alice_signals=>$alice_signals?.length)
let $alice_signals_length = alice_signals_length.$
alice_player.setSignalHandler((signal)=>{
console.log('Received Signal for alice:', signal)
console_b(alice_ctx).log('Received Signal for alice:', signal)
alice_signals.update($alice_signals=>{
$alice_signals.push(signal.data.payload)
return $alice_signals
Expand All @@ -123,10 +124,8 @@ module.exports = (orchestrator)=>{

// bob signal handler
const bob_signals = writable$<Signal[]>([])
const bob_signals_length = derived$(bob_signals, $bob_signals=>$bob_signals?.length)
let $bob_signals_length = bob_signals_length.$
bob_player.setSignalHandler((signal)=>{
console.log('Received Signal for bob:', signal)
console_b(bob_ctx).log('Received Signal for bob:', signal)
bob_signals.update($bob_signals=>{
$bob_signals.push(signal.data.payload)
return $bob_signals
Expand Down Expand Up @@ -195,6 +194,9 @@ module.exports = (orchestrator)=>{
// check that deltas and snapshot content returned add up to the current real content
await delay(500) // make time for integrating new data
const received_deltas:Delta[] = (jsonDeltas ? alice_session_info.$!.deltas.map(d=>JSON.parse(d)) : alice_session_info.$!.deltas) as Delta[]
console.debug('debug|1', {
received_deltas
})
await apply_deltas_b(alice_ctx)(received_deltas)
t.deepEqual(
me_content.$,
Expand All @@ -220,17 +222,18 @@ module.exports = (orchestrator)=>{
commit_content_hash: new_content_hash_2,
deltas: pending_deltas,
}
$alice_signals_length = alice_signals_length.$
await rpc_send_sync_response_b(me_ctx)({
participant: alice_pubkey,
state,
})
await subscribe_wait_timeout(alice_signals_length, _neq($alice_signals_length), 10_000)
$alice_signals_length = alice_signals_length.$
let [alice_SyncResp_stack] = await waitfor_filtered_signals_change(async ()=>
rpc_send_sync_response_b(me_ctx)({
participant: alice_pubkey,
state,
}),
[alice_signals],
$alice_signals=>filter_signal_name($alice_signals, 'SyncResp')
)

// Alice should have received uncommitted deltas
t.equal(alice_signals.$[$alice_signals_length - 1].signal_name, 'SyncResp')
let receivedState = alice_signals.$[$alice_signals_length - 1].signal_payload
t.equal(alice_SyncResp_stack[0].signal_name, 'SyncResp')
let receivedState = alice_SyncResp_stack[0].signal_payload
t.deepEqual(receivedState, { ...state, deltas: pending_deltas.map(d=>JSON.stringify(d)) }) // deltas, commit, and snapshot match

// bob joins session
Expand All @@ -244,92 +247,73 @@ module.exports = (orchestrator)=>{
const alice_delta:Delta = { type: 'Title', value: 'Alice in Wonderland' }
let delta = jsonDeltas ? JSON.stringify(alice_delta) : alice_delta

let me_ChangeReq_signals_length = filter_signal_name(me_signals.$, 'ChangeReq').length
$me_signals_length = me_signals_length.$
await rpc_send_change_request_b(alice_ctx)({
scribe: alice_session_info.$!.scribe,
index: 1,
deltas: [alice_delta]
})
await subscribe_wait_timeout(
me_signals,
const [me_ChangeReq_stack] = await waitfor_filtered_signals_change(async ()=>
rpc_send_change_request_b(alice_ctx)({
scribe: alice_session_info.$!.scribe,
index: 1,
deltas: [alice_delta]
}),
[me_signals],
$me_signals=>
filter_signal_name($me_signals, 'ChangeReq').length > me_ChangeReq_signals_length,
10_000)
const sig = filter_signal_name(me_signals.$, 'ChangeReq').reverse()[0]
t.deepEqual(sig.signal_name, 'ChangeReq')
const [sig_index, sig_delta] = sig.signal_payload
filter_signal_name($me_signals, 'ChangeReq')
)
t.deepEqual(me_ChangeReq_stack[0].signal_name, 'ChangeReq')
const [sig_index, sig_delta] = me_ChangeReq_stack[0].signal_payload
t.equal(sig_index, 1)
const receiveDelta = jsonDeltas ? JSON.parse(sig_delta) : sig_delta
t.deepEqual(receiveDelta, alice_delta) // delta_matches

let my_deltas:Delta[] = [{ type: 'Add', value: [0, 'Whoops!\n'] }, { type: 'Title', value: 'Alice in Wonderland' }]
deltas = jsonDeltas ? my_deltas.map(d=>JSON.stringify(d)) : deltas
let alice_Change_signals_length = filter_signal_name(alice_signals.$, 'Change').length
let bob_Change_signals_length = filter_signal_name(bob_signals.$, 'Change').length
// I send a change, and alice and bob should receive it.
await rpc_send_change_b(me_ctx)({
participants: [alice_pubkey, bob_pubkey],
index: 2,
deltas: my_deltas,
})
await subscribe_wait_timeout(
alice_signals,
$alice_signals=>filter_signal_name($alice_signals, 'Change').length > alice_Change_signals_length,
10_000)
await subscribe_wait_timeout(
bob_signals,
$bob_signals=>filter_signal_name($bob_signals, 'Change').length > bob_Change_signals_length,
10_000)
let a_sig = filter_signal_name(alice_signals.$, 'Change').reverse()[0]
let b_sig = filter_signal_name(bob_signals.$, 'Change').reverse()[0]
const [alice_Change_stack, bob_Change_stack] = await waitfor_filtered_signals_change(async ()=>
rpc_send_change_b(me_ctx)({
participants: [alice_pubkey, bob_pubkey],
index: 2,
deltas: my_deltas,
}),
[alice_signals, bob_signals],
$signals=>filter_signal_name($signals, 'Change')
)
let a_sig = alice_Change_stack[0]
let b_sig = bob_Change_stack[0]
t.equal(a_sig.signal_name, 'Change')
t.equal(b_sig.signal_name, 'Change')
t.deepEqual(a_sig.signal_payload, [2, deltas]) // delta_matches
t.deepEqual(b_sig.signal_payload, [2, deltas]) // delta_matches

let me_Hearbeat_signal_length = filter_signal_name(me_signals.$, 'Heartbeat').length
await rpc_send_heartbeat_b(alice_ctx)({
scribe: me_pubkey,
data: 'Hello'
})
await subscribe_wait_timeout(
me_signals,
$me_signals=>filter_signal_name($me_signals, 'Heartbeat').length > me_Hearbeat_signal_length,
10_000
const [me_Heartbeat] = await waitfor_filtered_signals_change(async ()=>
rpc_send_heartbeat_b(alice_ctx)({
scribe: me_pubkey,
data: 'Hello'
}),
[me_signals],
$signals=>filter_signal_name($signals, 'Heartbeat')
)
let me_sig = filter_signal_name(me_signals.$, 'Heartbeat').reverse()[0]
let me_sig = me_Heartbeat[0]
t.equal(me_sig.signal_name, 'Heartbeat')
t.deepEqual(me_sig.signal_payload[1], 'Hello')
t.deepEqual(me_sig.signal_payload[0], alice_pubkey)

let alice_FolkLore_length = filter_signal_name(alice_signals.$, 'FolkLore').length
let bob_FolkLore_length = filter_signal_name(bob_signals.$, 'FolkLore').length
await rpc_send_folk_lore_b(me_ctx)({
participants: [alice_pubkey, bob_pubkey],
data: {
participants: {
[alice_pubkey]: {
pubKey: alice_pubkey
},
[bob_pubkey]: {
pubKey: bob_pubkey
},
}
}
})
await subscribe_wait_timeout(
alice_signals,
$signals=>filter_signal_name($signals, 'FolkLore').length > alice_FolkLore_length,
10_000
)
await subscribe_wait_timeout(
bob_signals,
$signals=>filter_signal_name($signals, 'FolkLore').length > bob_FolkLore_length,
10_000
const [alice_FolkLore, bob_FolkLore] = await waitfor_filtered_signals_change(async ()=>
rpc_send_folk_lore_b(me_ctx)({
participants: [alice_pubkey, bob_pubkey],
data: {
participants: {
[alice_pubkey]: {
pubKey: alice_pubkey
},
[bob_pubkey]: {
pubKey: bob_pubkey
},
}
}
}),
[alice_signals, bob_signals],
$signals=>filter_signal_name($signals, 'FolkLore')
)
a_sig = filter_signal_name(alice_signals.$, 'FolkLore').reverse()[0]
b_sig = filter_signal_name(bob_signals.$, 'FolkLore').reverse()[0]
a_sig = alice_FolkLore[0]
b_sig = bob_FolkLore[0]
t.equal(a_sig.signal_name, 'FolkLore')
t.equal(b_sig.signal_name, 'FolkLore')
t.deepEqual(a_sig.signal_payload, JSON.stringify({
Expand All @@ -353,29 +337,44 @@ module.exports = (orchestrator)=>{
}
}))

let me_SyncReq_length = filter_signal_name(me_signals.$, 'SyncReq').length
// alice asks for a sync request
await rpc_send_sync_request_b(alice_ctx)(me_pubkey)
await subscribe_wait_timeout(
me_signals,
$signals=>filter_signal_name($signals, 'SyncReq').length > me_SyncReq_length,
10_000
const [me_SyncReq] = await waitfor_filtered_signals_change(async()=>
rpc_send_sync_request_b(alice_ctx)(me_pubkey),
[me_signals],
$signals=>filter_signal_name($signals, 'SyncReq')
)
me_sig = filter_signal_name(me_signals.$, 'SyncReq').reverse()[0]
me_sig = me_SyncReq[0]
t.equal(me_sig.signal_name, 'SyncReq')

// confirm that all agents got added to the folks anchor
// TODO figure out why init doesn't happen immediately.
let folks = await rpc_get_folks_b(me_ctx)()
t.equal(folks.length, 3)
/**/
} finally {
await leave_session({ ctx: me_ctx })
await leave_session({ ctx: alice_ctx })
await leave_session({ ctx: bob_ctx })
}
/**/
})
}
function filter_signal_name(signals:Signal[], signal_name:string) {
return signals.filter(s=>s.signal_name === signal_name)
function filter_signal_name($signals:Signal[], signal_name:string) {
return $signals.filter(s=>s.signal_name === signal_name)
}
async function waitfor_filtered_signals_change(
fn:()=>Promise<void>,
signals_a1:Readable$<Signal[]>[],
_filtered_signals:($signals:Signal[])=>Signal[],
timeout = 1000
) {
const filtered_signals_a1 = signals_a1.map(signals=>_filtered_signals(signals.$))
await fn()
await Promise.all(signals_a1.map((signals, idx)=>
subscribe_wait_timeout(signals,
$signals=>{
return _filtered_signals($signals).length > filtered_signals_a1[idx].length
}
, timeout)
))
return signals_a1.map(signals=>_filtered_signals(signals.$).reverse())
}

0 comments on commit add6188

Please sign in to comment.