Skip to content

Commit

Permalink
Fixes cancelStreamAllGroupMessages
Browse files Browse the repository at this point in the history
  • Loading branch information
cameronvoell committed Feb 28, 2024
1 parent 5e65f1d commit 30da179
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 9 deletions.
7 changes: 6 additions & 1 deletion example/src/TestScreen.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ export default function TestScreen(): JSX.Element {
const params = route.params as {
testSelection: TestCategory
}
const allTests = tests.concat(groupTests).concat(createdAtTests)
const allTests = [
...tests,
...groupTests,
...createdAtTests,
...restartStreamTests,
]
let activeTests, title
switch (params.testSelection) {
case TestCategory.all:
Expand Down
64 changes: 57 additions & 7 deletions example/src/tests/restartStreamsTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ test('Can cancel a stream and restart', async () => {
// Start stream
let numEvents1 = 0
await alix.conversations.stream(async (_) => {
console.log('stream event 1')
numEvents1++
})
await delayToPropogate()
Expand All @@ -30,7 +29,6 @@ test('Can cancel a stream and restart', async () => {
// Start new stream
let numEvents2 = 0
await alix.conversations.stream(async (_) => {
console.log('stream event 2')
numEvents2++
})
await delayToPropogate()
Expand Down Expand Up @@ -60,7 +58,6 @@ test('Can cancel a streamGroups and restart', async () => {
// Start stream
let numEvents1 = 0
await alix.conversations.streamGroups(async (_) => {
console.log('stream event 1')
numEvents1++
})
await delayToPropogate()
Expand All @@ -77,7 +74,6 @@ test('Can cancel a streamGroups and restart', async () => {
// Start new stream
let numEvents2 = 0
await alix.conversations.streamGroups(async (_) => {
console.log('stream event 2')
numEvents2++
})
await delayToPropogate()
Expand Down Expand Up @@ -111,7 +107,6 @@ test('Can cancel a streamAllMessages and restart', async () => {
// Start stream
let numEvents1 = 0
await alix.conversations.streamAllMessages(async (_) => {
console.log('stream event 1')
numEvents1++
}, true)
await delayToPropogate()
Expand All @@ -134,12 +129,11 @@ test('Can cancel a streamAllMessages and restart', async () => {
await boGroup.send('test')
await boConversation.send('test')
await delayToPropogate()
assert(numEvents1 === 2, 'expected 1 event, first stream after cancel')
assert(numEvents1 === 2, 'expected 2 events, first stream after cancel')

// Start new stream
let numEvents2 = 0
await alix.conversations.streamAllMessages(async (_) => {
console.log('stream event 2')
numEvents2++
}, true)
await delayToPropogate()
Expand All @@ -160,3 +154,59 @@ test('Can cancel a streamAllMessages and restart', async () => {

return true
})

test('Can cancel a streamAllGroupMessages and restart', async () => {
// Create clients
const [alix, bo] = await createClients(2)

// Create a group
await delayToPropogate()
await bo.conversations.newGroup([alix.address])
await delayToPropogate()

// Start stream
let numEvents1 = 0
await alix.conversations.streamAllGroupMessages(async (_) => {
numEvents1++
})
await delayToPropogate()

// Send one Group message and one Conversation Message
const boGroup = (await bo.conversations.listGroups())[0]

await boGroup.send('test')
await delayToPropogate()

assert(
numEvents1 === 1,
'expected 1 events, first stream, but found ' + numEvents1
)

// Cancel stream
alix.conversations.cancelStreamAllGroupMessages()
await boGroup.send('test')
await delayToPropogate()
assert(numEvents1 === 1, 'expected 1 event, first stream after cancel')

// Start new stream
let numEvents2 = 0
await alix.conversations.streamAllGroupMessages(async (_) => {
numEvents2++
})
await delayToPropogate()

await boGroup.send('test')
await delayToPropogate()

// Verify correct number of events from each stream
assert(
numEvents1 === 1,
'expected 1 event, first stream after cancel, but found ' + numEvents1
)
assert(
numEvents2 === 1,
'expected 1 event, second stream, but found ' + numEvents2
)

return true
})
7 changes: 6 additions & 1 deletion src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ export default class Conversations<
callback: (message: DecodedMessage<ContentTypes>) => Promise<void>
): Promise<void> {
XMTPModule.subscribeToAllGroupMessages(this.client.address)
XMTPModule.emitter.addListener(
const subscription = XMTPModule.emitter.addListener(
EventTypes.AllGroupMessage,
async ({
clientAddress,
Expand All @@ -309,6 +309,7 @@ export default class Conversations<
await callback(DecodedMessage.fromObject(message, this.client))
}
)
this.subscriptions[EventTypes.AllGroupMessage] = subscription
}

/**
Expand Down Expand Up @@ -348,6 +349,10 @@ export default class Conversations<
* Cancels the stream for new messages in all groups.
*/
cancelStreamAllGroupMessages() {
if (this.subscriptions[EventTypes.AllGroupMessage]) {
this.subscriptions[EventTypes.AllGroupMessage].remove()
delete this.subscriptions[EventTypes.AllGroupMessage]
}
XMTPModule.unsubscribeFromAllGroupMessages(this.client.address)
}
}

0 comments on commit 30da179

Please sign in to comment.