Skip to content

Commit

Permalink
fix: Additional checks on stream callbacks
Browse files Browse the repository at this point in the history
Added checks for ids on JS callbacks
Added tests for streams with mutliple clients
  • Loading branch information
Alex Risch authored and Alex Risch committed Mar 2, 2024
1 parent 622c733 commit 30c7aea
Show file tree
Hide file tree
Showing 7 changed files with 452 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,7 @@ class XMTPModule : Module() {
mapOf(
"clientAddress" to clientAddress,
"message" to DecodedMessageWrapper.encodeMap(message),
"topic" to topic,
)
)
}
Expand All @@ -1125,6 +1126,7 @@ class XMTPModule : Module() {
mapOf(
"clientAddress" to clientAddress,
"message" to DecodedMessageWrapper.encodeMap(message),
"groupId" to id,
)
)
}
Expand Down
305 changes: 303 additions & 2 deletions example/src/tests/groupTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ test('can make a MLS V3 client with encryption key and database path', async ()
await client.conversations.newGroup([anotherClient.address])
assert(
(await client.conversations.listGroups()).length === 1,
`should have a group size of 1 but was ${(await client.conversations.listGroups()).length}`
`should have a group size of 1 but was ${
(await client.conversations.listGroups()).length
}`
)

const bundle = await client.exportKeyBundle()
Expand All @@ -112,7 +114,9 @@ test('can make a MLS V3 client with encryption key and database path', async ()

assert(
(await clientFromBundle.conversations.listGroups()).length === 1,
`should have a group size of 1 but was ${(await clientFromBundle.conversations.listGroups()).length}`
`should have a group size of 1 but was ${
(await clientFromBundle.conversations.listGroups()).length
}`
)
return true
})
Expand Down Expand Up @@ -967,3 +971,300 @@ test('can stream all group messages', async () => {

return true
})

test('can streamAll from multiple clients', async () => {
const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()

// Setup stream alls
const allBoConversations: any[] = []
const allAliConversations: any[] = []

await bo.conversations.streamAll(async (conversation) => {
allBoConversations.push(conversation)
})
await alix.conversations.streamAll(async (conversation) => {
allAliConversations.push(conversation)
})

// Start Caro starts a new conversation.
await caro.conversations.newConversation(alix.address)
await delayToPropogate()
if (allBoConversations.length !== 0) {
throw Error(
'Unexpected all conversations count for Bo ' +
allBoConversations.length +
' and Alix had ' +
allAliConversations.length
)
}
if (allAliConversations.length !== 1) {
throw Error(
'Unexpected all conversations count ' + allAliConversations.length
)
}
return true
})

test('can streamAll from multiple clients - swapped orderring', async () => {
const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()

// Setup stream alls
const allBoConversations: any[] = []
const allAliConversations: any[] = []

await alix.conversations.streamAll(async (conversation) => {
allAliConversations.push(conversation)
})

await bo.conversations.streamAll(async (conversation) => {
allBoConversations.push(conversation)
})

// Start Caro starts a new conversation.
await caro.conversations.newConversation(alix.address)
await delayToPropogate()
if (allBoConversations.length !== 0) {
throw Error(
'Unexpected all conversations count for Bo ' +
allBoConversations.length +
' and Alix had ' +
allAliConversations.length
)
}
if (allAliConversations.length !== 1) {
throw Error(
'Unexpected all conversations count ' + allAliConversations.length
)
}
return true
})

test('can streamAllMessages from multiple clients', async () => {
const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()

// Setup stream
const allBoMessages: any[] = []
const allAliMessages: any[] = []

await bo.conversations.streamAllMessages(async (conversation) => {
allBoMessages.push(conversation)
}, true)
await alix.conversations.streamAllMessages(async (conversation) => {
allAliMessages.push(conversation)
}, true)

// Start Caro starts a new conversation.
const caroConversation = await caro.conversations.newConversation(
alix.address
)
await caroConversation.send({ text: `Message` })
await delayToPropogate()
if (allBoMessages.length !== 0) {
throw Error('Unexpected all messages count for Bo ' + allBoMessages.length)
}

if (allAliMessages.length !== 1) {
throw Error(
'Unexpected all conversations count for Ali ' + allAliMessages.length
)
}

return true
})

test('can streamAllMessages from multiple clients - swapped', async () => {
const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()

// Setup stream
const allBoMessages: any[] = []
const allAliMessages: any[] = []
const caroGroup = await caro.conversations.newGroup([alix.address])

await alix.conversations.streamAllMessages(async (conversation) => {
allAliMessages.push(conversation)
}, true)
await bo.conversations.streamAllMessages(async (conversation) => {
allBoMessages.push(conversation)
}, true)

// Start Caro starts a new conversation.
const caroConvo = await caro.conversations.newConversation(alix.address)
await delayToPropogate()
await caroConvo.send({ text: `Message` })
await caroGroup.send({ text: `Message` })
await delayToPropogate()
if (allBoMessages.length !== 0) {
throw Error(
'Unexpected all conversations count for Bo ' + allBoMessages.length
)
}

if (allAliMessages.length !== 2) {
throw Error(
'Unexpected all conversations count for Ali ' + allAliMessages.length
)
}

return true
})

test('can stream all group Messages from multiple clients', async () => {
const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()

// Setup stream
const allAlixMessages: DecodedMessage[] = []
const allBoMessages: DecodedMessage[] = []
const alixGroup = await caro.conversations.newGroup([alix.address])
const boGroup = await caro.conversations.newGroup([bo.address])

await alixGroup.streamGroupMessages(async (message) => {
allAlixMessages.push(message)
})
await boGroup.streamGroupMessages(async (message) => {
allBoMessages.push(message)
})

// Start Caro starts a new conversation.
await delayToPropogate()
await alixGroup.send({ text: `Message` })
await delayToPropogate()
if (allBoMessages.length !== 0) {
throw Error('Unexpected all messages count for Bo ' + allBoMessages.length)
}

if (allAlixMessages.length !== 1) {
throw Error(
'Unexpected all messages count for Ali ' + allAlixMessages.length
)
}

const alixConv = (await alix.conversations.listGroups())[0]
await alixConv.send({ text: `Message` })
await delayToPropogate()
if (allBoMessages.length !== 0) {
throw Error('Unexpected all messages count for Bo ' + allBoMessages.length)
}
// @ts-ignore-next-line
if (allAlixMessages.length !== 2) {
throw Error(
'Unexpected all messages count for Ali ' + allAlixMessages.length
)
}

return true
})

test('can stream all group Messages from multiple clients - swapped', async () => {
const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()

// Setup stream
const allAlixMessages: DecodedMessage[] = []
const allBoMessages: DecodedMessage[] = []
const alixGroup = await caro.conversations.newGroup([alix.address])
const boGroup = await caro.conversations.newGroup([bo.address])

await boGroup.streamGroupMessages(async (message) => {
allBoMessages.push(message)
})
await alixGroup.streamGroupMessages(async (message) => {
allAlixMessages.push(message)
})

// Start Caro starts a new conversation.
await delayToPropogate()
await alixGroup.send({ text: `Message` })
await delayToPropogate()
if (allBoMessages.length !== 0) {
throw Error('Unexpected all messages count for Bo ' + allBoMessages.length)
}

if (allAlixMessages.length !== 1) {
throw Error(
'Unexpected all messages count for Ali ' + allAlixMessages.length
)
}

const alixConv = (await alix.conversations.listGroups())[0]
await alixConv.send({ text: `Message` })
await delayToPropogate()
if (allBoMessages.length !== 0) {
throw Error('Unexpected all messages count for Bo ' + allBoMessages.length)
}
// @ts-ignore-next-line
if (allAlixMessages.length !== 2) {
throw Error(
'Unexpected all messages count for Ali ' + allAlixMessages.length
)
}

return true
})

// Commenting this out so it doesn't block people, but nice to have?
// test('can stream messages for a long time', async () => {
// const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
// await delayToPropogate()
// const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
// await delayToPropogate()
// const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
// await delayToPropogate()

// // Setup stream alls
// const allBoMessages: any[] = []
// const allAliMessages: any[] = []

// const group = await caro.conversations.newGroup([alix.address])
// await bo.conversations.streamAllMessages(async (conversation) => {
// allBoMessages.push(conversation)
// }, true)
// await alix.conversations.streamAllMessages(async (conversation) => {
// allAliMessages.push(conversation)
// }, true)

// // Wait for 15 minutes
// await delayToPropogate(15 * 1000 * 60)

// // Start Caro starts a new conversation.
// const convo = await caro.conversations.newConversation(alix.address)
// await group.send({ text: 'hello' })
// await convo.send({ text: 'hello' })
// await delayToPropogate()
// if (allBoMessages.length !== 0) {
// throw Error('Unexpected all conversations count ' + allBoMessages.length)
// }
// if (allAliMessages.length !== 2) {
// throw Error('Unexpected all conversations count ' + allAliMessages.length)
// }

// return true
// })
Loading

0 comments on commit 30c7aea

Please sign in to comment.