From 371b081ca53d6228b8071db2eaf2436f9b01e502 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Tue, 27 Feb 2024 08:50:46 -0700 Subject: [PATCH 1/4] fix: Remove conversation event emitters when unsubscribing streams --- example/src/tests/tests.ts | 45 ++++++++++++++++++++++++++++++++++++++ src/lib/Conversations.ts | 8 ++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/example/src/tests/tests.ts b/example/src/tests/tests.ts index 89fd7f1cf..3afa96d03 100644 --- a/example/src/tests/tests.ts +++ b/example/src/tests/tests.ts @@ -929,3 +929,48 @@ test('instantiate frames client correctly', async () => { } return true }) + +test('Can cancel a stream and start a new one', async () => { + // Creat clients + const alix = await Client.createRandom({ env: 'local' }) + await delayToPropogate() + const bo = await Client.createRandom({ env: 'local' }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local' }) + await delayToPropogate() + const davon = await Client.createRandom({ env: 'local' }) + await delayToPropogate() + + // Start stream + let numEvents1 = 0 + await alix.conversations.stream(async (_) => { + console.log('stream event 1') + numEvents1++ + }) + await delayToPropogate() + const convo1 = await alix.conversations.newConversation(bo.address) + await delayToPropogate() + assert(numEvents1 === 1, 'expected 1 event, first stream') + + // Cancel stream + alix.conversations.cancelStream() + const convo2 = await alix.conversations.newConversation(caro.address) + assert(numEvents1 === 1, 'expected 1 event, first stream after cancel') + + // Start new stream + let numEvents2 = 0 + await alix.conversations.stream(async (_) => { + console.log('stream event 2') + numEvents2++ + }) + await delayToPropogate() + + const convo3 = await alix.conversations.newConversation(davon.address) + await delayToPropogate() + + // Verify correct number of events from each stream + assert(numEvents1 === 1, 'expected 1 event, first stream after cancel, but found ' + numEvents1) + assert(numEvents2 === 1, 'expected 1 event, second stream') + + return true +}) diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index f7a47e152..f36b82803 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -17,6 +17,7 @@ export default class Conversations< > { client: Client private known = {} as { [topic: string]: boolean } + private subscriptions: { [key: string]: { remove: () => void } } = {} constructor(client: Client) { this.client = client @@ -168,7 +169,7 @@ export default class Conversations< callback: (conversation: Conversation) => Promise ) { XMTPModule.subscribeToConversations(this.client.address) - XMTPModule.emitter.addListener( + const subscription = XMTPModule.emitter.addListener( EventTypes.Conversation, async ({ clientAddress, @@ -188,6 +189,7 @@ export default class Conversations< await callback(new Conversation(this.client, conversation)) } ) + this.subscriptions[EventTypes.Conversation] = subscription } /** @@ -311,6 +313,10 @@ export default class Conversations< * Cancels the stream for new conversations. */ cancelStream() { + if (this.subscriptions[EventTypes.Conversation]) { + this.subscriptions[EventTypes.Conversation].remove() + delete this.subscriptions[EventTypes.Conversation] + } XMTPModule.unsubscribeFromConversations(this.client.address) } From de30e2efc7699dfa25ed4e2a23a697d4ca2d4f40 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Wed, 28 Feb 2024 07:36:02 -0700 Subject: [PATCH 2/4] New test category. Fixed cancelStreamGroups --- example/src/Navigation.tsx | 1 + example/src/TestScreen.tsx | 6 ++ example/src/tests/groupTests.ts | 6 +- example/src/tests/restartStreamsTests.ts | 99 ++++++++++++++++++++++++ example/src/tests/tests.ts | 45 ----------- src/lib/Conversations.ts | 5 ++ 6 files changed, 114 insertions(+), 48 deletions(-) create mode 100644 example/src/tests/restartStreamsTests.ts diff --git a/example/src/Navigation.tsx b/example/src/Navigation.tsx index f6c7b3dec..5858cf2c5 100644 --- a/example/src/Navigation.tsx +++ b/example/src/Navigation.tsx @@ -1,4 +1,5 @@ import { createNativeStackNavigator } from '@react-navigation/native-stack' + import { TestCategory } from './TestScreen' export type NavigationParamList = { diff --git a/example/src/TestScreen.tsx b/example/src/TestScreen.tsx index 2cef4e1b0..a1a3b5464 100644 --- a/example/src/TestScreen.tsx +++ b/example/src/TestScreen.tsx @@ -4,6 +4,7 @@ import { View, Text, Button, ScrollView } from 'react-native' import { createdAtTests } from './tests/createdAtTests' import { groupTests } from './tests/groupTests' +import { restartStreamTests } from './tests/restartStreamsTests' import { Test } from './tests/test-utils' import { tests } from './tests/tests' @@ -104,6 +105,7 @@ export enum TestCategory { tests = 'tests', group = 'group', createdAt = 'createdAt', + restartStreans = 'restartStreams', } export default function TestScreen(): JSX.Element { @@ -131,6 +133,10 @@ export default function TestScreen(): JSX.Element { activeTests = createdAtTests title = 'Created At Unit Tests' break + case TestCategory.restartStreans: + activeTests = restartStreamTests + title = 'Restart Streams Unit Tests' + break } return ( diff --git a/example/src/tests/groupTests.ts b/example/src/tests/groupTests.ts index 53ed00e33..f583ff873 100644 --- a/example/src/tests/groupTests.ts +++ b/example/src/tests/groupTests.ts @@ -132,13 +132,13 @@ test('can make a MLS V3 client from bundle', async () => { test('production MLS V3 client creation throws error', async () => { try { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const client = await Client.createRandom({ + await Client.createRandom({ env: 'production', appVersion: 'Testing/0.0.0', enableAlphaMls: true, }) - } catch (error: any) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + } catch (error) { return true } throw new Error( diff --git a/example/src/tests/restartStreamsTests.ts b/example/src/tests/restartStreamsTests.ts new file mode 100644 index 000000000..c091b30cd --- /dev/null +++ b/example/src/tests/restartStreamsTests.ts @@ -0,0 +1,99 @@ +import { Test, assert, createClients, delayToPropogate } from './test-utils' + +export const restartStreamTests: Test[] = [] + +function test(name: string, perform: () => Promise) { + restartStreamTests.push({ name, run: perform }) +} + +test('Can cancel a stream and start a new one', async () => { + // Create clients + const [alix, bo, caro, davon] = await createClients(4) + + // Start stream + let numEvents1 = 0 + await alix.conversations.stream(async (_) => { + console.log('stream event 1') + numEvents1++ + }) + await delayToPropogate() + await alix.conversations.newConversation(bo.address) + await delayToPropogate() + assert(numEvents1 === 1, 'expected 1 event, first stream') + + // Cancel stream + alix.conversations.cancelStream() + await alix.conversations.newConversation(caro.address) + await delayToPropogate() + assert(numEvents1 === 1, 'expected 1 event, first stream after cancel') + + // Start new stream + let numEvents2 = 0 + await alix.conversations.stream(async (_) => { + console.log('stream event 2') + numEvents2++ + }) + await delayToPropogate() + + await alix.conversations.newConversation(davon.address) + await delayToPropogate() + + // Verify correct number of events from each stream + assert( + numEvents1 === 1, + 'expected 1 event, first stream after cancel, but found ' + numEvents1 + ) + assert( + numEvents2 === 1, + 'expected 1 event, second stream, but found ' + numEvents2 + ) + + return true +}) + +// Existing issue, client who started stream, creating groups will not +// be streamed +test('Can cancel a streamGroups and start a new one', async () => { + // Create clients + const [alix, bo, caro, davon] = await createClients(4) + + // Start stream + let numEvents1 = 0 + await alix.conversations.streamGroups(async (_) => { + console.log('stream event 1') + numEvents1++ + }) + await delayToPropogate() + await bo.conversations.newGroup([alix.address]) + await delayToPropogate() + assert(numEvents1 === 1, 'expected 1 event, first stream') + + // Cancel stream + alix.conversations.cancelStreamGroups() + await caro.conversations.newGroup([alix.address]) + await delayToPropogate() + assert(numEvents1 === 1, 'expected 1 event, first stream after cancel') + + // Start new stream + let numEvents2 = 0 + await alix.conversations.streamGroups(async (_) => { + console.log('stream event 2') + numEvents2++ + }) + await delayToPropogate() + + await davon.conversations.newGroup([alix.address]) + await delayToPropogate() + + // Verify correct number of events from each stream + assert( + numEvents1 === 1, + 'expected 1 event, first stream after cancel, but found ' + numEvents1 + ) + assert( + numEvents2 === 1, + 'expected 1 event, second stream, but found ' + numEvents2 + ) + + return true +}) diff --git a/example/src/tests/tests.ts b/example/src/tests/tests.ts index 3afa96d03..89fd7f1cf 100644 --- a/example/src/tests/tests.ts +++ b/example/src/tests/tests.ts @@ -929,48 +929,3 @@ test('instantiate frames client correctly', async () => { } return true }) - -test('Can cancel a stream and start a new one', async () => { - // Creat clients - const alix = await Client.createRandom({ env: 'local' }) - await delayToPropogate() - const bo = await Client.createRandom({ env: 'local' }) - await delayToPropogate() - const caro = await Client.createRandom({ env: 'local' }) - await delayToPropogate() - const davon = await Client.createRandom({ env: 'local' }) - await delayToPropogate() - - // Start stream - let numEvents1 = 0 - await alix.conversations.stream(async (_) => { - console.log('stream event 1') - numEvents1++ - }) - await delayToPropogate() - const convo1 = await alix.conversations.newConversation(bo.address) - await delayToPropogate() - assert(numEvents1 === 1, 'expected 1 event, first stream') - - // Cancel stream - alix.conversations.cancelStream() - const convo2 = await alix.conversations.newConversation(caro.address) - assert(numEvents1 === 1, 'expected 1 event, first stream after cancel') - - // Start new stream - let numEvents2 = 0 - await alix.conversations.stream(async (_) => { - console.log('stream event 2') - numEvents2++ - }) - await delayToPropogate() - - const convo3 = await alix.conversations.newConversation(davon.address) - await delayToPropogate() - - // Verify correct number of events from each stream - assert(numEvents1 === 1, 'expected 1 event, first stream after cancel, but found ' + numEvents1) - assert(numEvents2 === 1, 'expected 1 event, second stream') - - return true -}) diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index f36b82803..62d14ac7a 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -127,6 +127,7 @@ export default class Conversations< await callback(new Group(this.client, group)) } ) + this.subscriptions[EventTypes.Group] = groupsSubscription return () => { groupsSubscription.remove() XMTPModule.unsubscribeFromGroups(this.client.address) @@ -324,6 +325,10 @@ export default class Conversations< * Cancels the stream for new conversations. */ cancelStreamGroups() { + if (this.subscriptions[EventTypes.Group]) { + this.subscriptions[EventTypes.Group].remove() + delete this.subscriptions[EventTypes.Group] + } XMTPModule.unsubscribeFromGroups(this.client.address) } From 5e65f1d71338a5bfb01582829bd4915a0542dd34 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Wed, 28 Feb 2024 07:55:07 -0700 Subject: [PATCH 3/4] Fixed cancelStreamAllMessages --- example/src/tests/restartStreamsTests.ts | 67 +++++++++++++++++++++++- src/lib/Conversations.ts | 7 ++- 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/example/src/tests/restartStreamsTests.ts b/example/src/tests/restartStreamsTests.ts index c091b30cd..9ed1480ba 100644 --- a/example/src/tests/restartStreamsTests.ts +++ b/example/src/tests/restartStreamsTests.ts @@ -6,7 +6,7 @@ function test(name: string, perform: () => Promise) { restartStreamTests.push({ name, run: perform }) } -test('Can cancel a stream and start a new one', async () => { +test('Can cancel a stream and restart', async () => { // Create clients const [alix, bo, caro, davon] = await createClients(4) @@ -53,7 +53,7 @@ test('Can cancel a stream and start a new one', async () => { // Existing issue, client who started stream, creating groups will not // be streamed -test('Can cancel a streamGroups and start a new one', async () => { +test('Can cancel a streamGroups and restart', async () => { // Create clients const [alix, bo, caro, davon] = await createClients(4) @@ -97,3 +97,66 @@ test('Can cancel a streamGroups and start a new one', async () => { return true }) + +test('Can cancel a streamAllMessages and restart', async () => { + // Create clients + const [alix, bo] = await createClients(2) + + // Create a group + await delayToPropogate() + await bo.conversations.newGroup([alix.address]) + await bo.conversations.newConversation(alix.address) + await delayToPropogate() + + // Start stream + let numEvents1 = 0 + await alix.conversations.streamAllMessages(async (_) => { + console.log('stream event 1') + numEvents1++ + }, true) + await delayToPropogate() + + // Send one Group message and one Conversation Message + const boGroup = (await bo.conversations.listGroups())[0] + const boConversation = (await bo.conversations.list())[0] + + await boGroup.send('test') + await boConversation.send('test') + await delayToPropogate() + + assert( + numEvents1 === 2, + 'expected 2 events, first stream, but found ' + numEvents1 + ) + + // Cancel stream + alix.conversations.cancelStreamAllMessages() + await boGroup.send('test') + await boConversation.send('test') + await delayToPropogate() + assert(numEvents1 === 2, 'expected 1 event, first stream after cancel') + + // Start new stream + let numEvents2 = 0 + await alix.conversations.streamAllMessages(async (_) => { + console.log('stream event 2') + numEvents2++ + }, true) + await delayToPropogate() + + await boGroup.send('test') + await boConversation.send('test') + await delayToPropogate() + + // Verify correct number of events from each stream + assert( + numEvents1 === 2, + 'expected 2 events, first stream after cancel, but found ' + numEvents1 + ) + assert( + numEvents2 === 2, + 'expected 2 events, second stream, but found ' + numEvents2 + ) + + return true +}) diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index 62d14ac7a..545446697 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -255,7 +255,7 @@ export default class Conversations< includeGroups: boolean = false ): Promise { XMTPModule.subscribeToAllMessages(this.client.address, includeGroups) - XMTPModule.emitter.addListener( + const subscription = XMTPModule.emitter.addListener( EventTypes.Message, async ({ clientAddress, @@ -275,6 +275,7 @@ export default class Conversations< await callback(DecodedMessage.fromObject(message, this.client)) } ) + this.subscriptions[EventTypes.Message] = subscription } /** @@ -336,6 +337,10 @@ export default class Conversations< * Cancels the stream for new messages in all conversations. */ cancelStreamAllMessages() { + if (this.subscriptions[EventTypes.Message]) { + this.subscriptions[EventTypes.Message].remove() + delete this.subscriptions[EventTypes.Message] + } XMTPModule.unsubscribeFromAllMessages(this.client.address) } From 30da1799b924c3852ee1502706ca5d395a6308f2 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Wed, 28 Feb 2024 08:14:33 -0700 Subject: [PATCH 4/4] Fixes cancelStreamAllGroupMessages --- example/src/TestScreen.tsx | 7 ++- example/src/tests/restartStreamsTests.ts | 64 +++++++++++++++++++++--- src/lib/Conversations.ts | 7 ++- 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/example/src/TestScreen.tsx b/example/src/TestScreen.tsx index a1a3b5464..3eea134c1 100644 --- a/example/src/TestScreen.tsx +++ b/example/src/TestScreen.tsx @@ -114,7 +114,12 @@ export default function TestScreen(): JSX.Element { const params = route.params as { testSelection: TestCategory } - const allTests = tests.concat(groupTests).concat(createdAtTests) + const allTests = [ + ...tests, + ...groupTests, + ...createdAtTests, + ...restartStreamTests, + ] let activeTests, title switch (params.testSelection) { case TestCategory.all: diff --git a/example/src/tests/restartStreamsTests.ts b/example/src/tests/restartStreamsTests.ts index 9ed1480ba..db71a4b80 100644 --- a/example/src/tests/restartStreamsTests.ts +++ b/example/src/tests/restartStreamsTests.ts @@ -13,7 +13,6 @@ test('Can cancel a stream and restart', async () => { // Start stream let numEvents1 = 0 await alix.conversations.stream(async (_) => { - console.log('stream event 1') numEvents1++ }) await delayToPropogate() @@ -30,7 +29,6 @@ test('Can cancel a stream and restart', async () => { // Start new stream let numEvents2 = 0 await alix.conversations.stream(async (_) => { - console.log('stream event 2') numEvents2++ }) await delayToPropogate() @@ -60,7 +58,6 @@ test('Can cancel a streamGroups and restart', async () => { // Start stream let numEvents1 = 0 await alix.conversations.streamGroups(async (_) => { - console.log('stream event 1') numEvents1++ }) await delayToPropogate() @@ -77,7 +74,6 @@ test('Can cancel a streamGroups and restart', async () => { // Start new stream let numEvents2 = 0 await alix.conversations.streamGroups(async (_) => { - console.log('stream event 2') numEvents2++ }) await delayToPropogate() @@ -111,7 +107,6 @@ test('Can cancel a streamAllMessages and restart', async () => { // Start stream let numEvents1 = 0 await alix.conversations.streamAllMessages(async (_) => { - console.log('stream event 1') numEvents1++ }, true) await delayToPropogate() @@ -134,12 +129,11 @@ test('Can cancel a streamAllMessages and restart', async () => { await boGroup.send('test') await boConversation.send('test') await delayToPropogate() - assert(numEvents1 === 2, 'expected 1 event, first stream after cancel') + assert(numEvents1 === 2, 'expected 2 events, first stream after cancel') // Start new stream let numEvents2 = 0 await alix.conversations.streamAllMessages(async (_) => { - console.log('stream event 2') numEvents2++ }, true) await delayToPropogate() @@ -160,3 +154,59 @@ test('Can cancel a streamAllMessages and restart', async () => { return true }) + +test('Can cancel a streamAllGroupMessages and restart', async () => { + // Create clients + const [alix, bo] = await createClients(2) + + // Create a group + await delayToPropogate() + await bo.conversations.newGroup([alix.address]) + await delayToPropogate() + + // Start stream + let numEvents1 = 0 + await alix.conversations.streamAllGroupMessages(async (_) => { + numEvents1++ + }) + await delayToPropogate() + + // Send one Group message and one Conversation Message + const boGroup = (await bo.conversations.listGroups())[0] + + await boGroup.send('test') + await delayToPropogate() + + assert( + numEvents1 === 1, + 'expected 1 events, first stream, but found ' + numEvents1 + ) + + // Cancel stream + alix.conversations.cancelStreamAllGroupMessages() + await boGroup.send('test') + await delayToPropogate() + assert(numEvents1 === 1, 'expected 1 event, first stream after cancel') + + // Start new stream + let numEvents2 = 0 + await alix.conversations.streamAllGroupMessages(async (_) => { + numEvents2++ + }) + await delayToPropogate() + + await boGroup.send('test') + await delayToPropogate() + + // Verify correct number of events from each stream + assert( + numEvents1 === 1, + 'expected 1 event, first stream after cancel, but found ' + numEvents1 + ) + assert( + numEvents2 === 1, + 'expected 1 event, second stream, but found ' + numEvents2 + ) + + return true +}) diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index 545446697..39a206abb 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -289,7 +289,7 @@ export default class Conversations< callback: (message: DecodedMessage) => Promise ): Promise { XMTPModule.subscribeToAllGroupMessages(this.client.address) - XMTPModule.emitter.addListener( + const subscription = XMTPModule.emitter.addListener( EventTypes.AllGroupMessage, async ({ clientAddress, @@ -309,6 +309,7 @@ export default class Conversations< await callback(DecodedMessage.fromObject(message, this.client)) } ) + this.subscriptions[EventTypes.AllGroupMessage] = subscription } /** @@ -348,6 +349,10 @@ export default class Conversations< * Cancels the stream for new messages in all groups. */ cancelStreamAllGroupMessages() { + if (this.subscriptions[EventTypes.AllGroupMessage]) { + this.subscriptions[EventTypes.AllGroupMessage].remove() + delete this.subscriptions[EventTypes.AllGroupMessage] + } XMTPModule.unsubscribeFromAllGroupMessages(this.client.address) } }