From 371b081ca53d6228b8071db2eaf2436f9b01e502 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Tue, 27 Feb 2024 08:50:46 -0700 Subject: [PATCH] fix: Remove conversation event emitters when unsubscribing streams --- example/src/tests/tests.ts | 45 ++++++++++++++++++++++++++++++++++++++ src/lib/Conversations.ts | 8 ++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/example/src/tests/tests.ts b/example/src/tests/tests.ts index 89fd7f1cf..3afa96d03 100644 --- a/example/src/tests/tests.ts +++ b/example/src/tests/tests.ts @@ -929,3 +929,48 @@ test('instantiate frames client correctly', async () => { } return true }) + +test('Can cancel a stream and start a new one', async () => { + // Creat clients + const alix = await Client.createRandom({ env: 'local' }) + await delayToPropogate() + const bo = await Client.createRandom({ env: 'local' }) + await delayToPropogate() + const caro = await Client.createRandom({ env: 'local' }) + await delayToPropogate() + const davon = await Client.createRandom({ env: 'local' }) + await delayToPropogate() + + // Start stream + let numEvents1 = 0 + await alix.conversations.stream(async (_) => { + console.log('stream event 1') + numEvents1++ + }) + await delayToPropogate() + const convo1 = await alix.conversations.newConversation(bo.address) + await delayToPropogate() + assert(numEvents1 === 1, 'expected 1 event, first stream') + + // Cancel stream + alix.conversations.cancelStream() + const convo2 = await alix.conversations.newConversation(caro.address) + assert(numEvents1 === 1, 'expected 1 event, first stream after cancel') + + // Start new stream + let numEvents2 = 0 + await alix.conversations.stream(async (_) => { + console.log('stream event 2') + numEvents2++ + }) + await delayToPropogate() + + const convo3 = await alix.conversations.newConversation(davon.address) + await delayToPropogate() + + // Verify correct number of events from each stream + assert(numEvents1 === 1, 'expected 1 event, first stream after cancel, but found ' + numEvents1) + assert(numEvents2 === 1, 'expected 1 event, second stream') + + return true +}) diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index f7a47e152..f36b82803 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -17,6 +17,7 @@ export default class Conversations< > { client: Client private known = {} as { [topic: string]: boolean } + private subscriptions: { [key: string]: { remove: () => void } } = {} constructor(client: Client) { this.client = client @@ -168,7 +169,7 @@ export default class Conversations< callback: (conversation: Conversation) => Promise ) { XMTPModule.subscribeToConversations(this.client.address) - XMTPModule.emitter.addListener( + const subscription = XMTPModule.emitter.addListener( EventTypes.Conversation, async ({ clientAddress, @@ -188,6 +189,7 @@ export default class Conversations< await callback(new Conversation(this.client, conversation)) } ) + this.subscriptions[EventTypes.Conversation] = subscription } /** @@ -311,6 +313,10 @@ export default class Conversations< * Cancels the stream for new conversations. */ cancelStream() { + if (this.subscriptions[EventTypes.Conversation]) { + this.subscriptions[EventTypes.Conversation].remove() + delete this.subscriptions[EventTypes.Conversation] + } XMTPModule.unsubscribeFromConversations(this.client.address) }