diff --git a/README.md b/README.md index 9085d3151..b1e98b9e2 100644 --- a/README.md +++ b/README.md @@ -152,17 +152,20 @@ These conversations include all conversations for a user **regardless of which a You can also listen for new conversations being started in real-time. This will allow applications to display incoming messages from new contacts. > **Warning** -> This stream will continue infinitely. To end the stream you can call `cancelStream()`. +> This stream will continue infinitely. To end the stream you can call the method returned by `conversations.stream()`. ```tsx -const stream = await xmtp.conversations.stream() -for await (const conversation of stream) { - console.log(`New conversation started with ${conversation.peerAddress}`) - // Say hello to your new friend - await conversation.send('Hi there!') - // Break from the loop to stop listening - break -} +const cancelStream = await xmtp.conversations.stream( + async (conversation) => { + console.log(`New conversation started with ${conversation.peerAddress}`) + + // Say hello to your new friend + await conversation.send('Hi there!') + } +) + +// To unsubscribe from the stream +cancelStream() ``` ### Start a new conversation @@ -222,19 +225,22 @@ You can listen for any new messages (incoming or outgoing) in a conversation by A successfully received message (that makes it through the decoding and decryption without throwing) can be trusted to be authentic, i.e. that it was sent by the owner of the `message.senderAddress` wallet and that it wasn't modified in transit. The `message.sent` timestamp can be trusted to have been set by the sender. -The Stream returned by the `stream` methods is an asynchronous iterator and as such usable by a for-await-of loop. Note however that it is by its nature infinite, so any looping construct used with it will not terminate, unless the termination is explicitly initiated by calling `cancelStreamMessages()`. +The Stream returned by the `stream` methods is an asynchronous iterator and as such usable by a for-await-of loop. Note however that it is by its nature infinite, so any looping construct used with it will not terminate, unless the termination is explicitly initiated by calling the method returned by `conversation.streamMessages()`. ```tsx const conversation = await xmtp.conversations.newConversation( '0x3F11b27F323b62B159D2642964fa27C46C841897' ) -for await (const message of await conversation.streamMessages()) { +const cancelStream = await conversation.streamMessages(async (message) => { if (message.senderAddress === xmtp.address) { // This message was sent from me continue } console.log(`New message from ${message.senderAddress}: ${message.content}`) -} +}) + +// To unsubscribe from the stream +cancelStream() ``` ### Listen for new messages in all conversations @@ -245,16 +251,21 @@ To listen for any new messages from _all_ conversations, use `conversations.stre > There is a chance this stream can miss messages if multiple new conversations are received in the time it takes to update the stream to include a new conversation. > **Warning** -> This stream will continue infinitely. To end the stream you can call `cancelStreamAllMessages()`. +> This stream will continue infinitely. To end the stream you can call the method returned by `conversations.streamAllMessages()`. ```tsx -for await (const message of await xmtp.conversations.streamAllMessages()) { - if (message.senderAddress === xmtp.address) { - // This message was sent from me - continue +const cancelAllMessagesStream = await xmtp.conversations.streamAllMessages( + async (message) => { + if (message.senderAddress === xmtp.address) { + // This message was sent from me + continue + } + console.log(`New message from ${message.senderAddress}: ${message.content}`) } - console.log(`New message from ${message.senderAddress}: ${message.content}`) -} +) + +// To unsubscribe from the all messages stream +cancelAllMessagesStream() ``` ## Request and respect user consent diff --git a/example/src/tests.ts b/example/src/tests.ts index 0c59eb60e..8b388f6f7 100644 --- a/example/src/tests.ts +++ b/example/src/tests.ts @@ -361,15 +361,19 @@ test('can stream messages', async () => { // Record new conversation stream const allConversations: Conversation[] = [] - await alice.conversations.stream(async (conversation) => { - allConversations.push(conversation) - }) + const cancelStream = await alice.conversations.stream( + async (conversation) => { + allConversations.push(conversation) + } + ) // Record message stream across all conversations const allMessages: DecodedMessage[] = [] - await alice.conversations.streamAllMessages(async (message) => { - allMessages.push(message) - }) + const cancelStreamAllMessages = await alice.conversations.streamAllMessages( + async (message) => { + allMessages.push(message) + } + ) // Start Bob starts a new conversation. const bobConvo = await bob.conversations.newConversation(alice.address, { @@ -447,8 +451,8 @@ test('can stream messages', async () => { throw Error('Unexpected convo message topic ' + convoMessages[i].topic) } } - alice.conversations.cancelStream() - alice.conversations.cancelStreamAllMessages() + cancelStream() + cancelStreamAllMessages() return true }) @@ -586,9 +590,11 @@ test('can stream all messages', async () => { // Record message stream across all conversations const allMessages: DecodedMessage[] = [] - await alix.conversations.streamAllMessages(async (message) => { - allMessages.push(message) - }) + const cancelStreamAllMessages = await alix.conversations.streamAllMessages( + async (message) => { + allMessages.push(message) + } + ) // Start Bob starts a new conversation. const boConvo = await bo.conversations.newConversation(alix.address) @@ -617,7 +623,7 @@ test('can stream all messages', async () => { throw Error('Unexpected all messages count ' + allMessages.length) } - alix.conversations.cancelStreamAllMessages() + cancelStreamAllMessages() await alix.conversations.streamAllMessages(async (message) => { allMessages.push(message) diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index 4b13f7352..1e6226086 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -63,14 +63,13 @@ export default class Conversations { * This method subscribes to conversations in real-time and listens for incoming conversation events. * When a new conversation is detected, the provided callback function is invoked with the details of the conversation. * @param {Function} callback - A callback function that will be invoked with the new Conversation when a conversation is started. - * @returns {Promise} A Promise that resolves when the stream is set up. - * @warning This stream will continue infinitely. To end the stream, you can call {@linkcode Conversations.cancelStream | cancelStream()}. + * @returns {Promise} A function that, when called, unsubscribes from the message stream and ends real-time updates.. */ async stream( callback: (conversation: Conversation) => Promise - ) { + ): Promise<() => void> { XMTPModule.subscribeToConversations(this.client.address) - XMTPModule.emitter.addListener( + const conversationSubscription = XMTPModule.emitter.addListener( 'conversation', async ({ clientAddress, @@ -90,6 +89,10 @@ export default class Conversations { await callback(new Conversation(this.client, conversation)) } ) + return () => { + conversationSubscription.remove() + XMTPModule.unsubscribeFromConversations(this.client.address) + } } /** @@ -97,13 +100,13 @@ export default class Conversations { * * This method subscribes to all conversations in real-time and listens for incoming and outgoing messages. * @param {Function} callback - A callback function that will be invoked when a message is sent or received. - * @returns {Promise} A Promise that resolves when the stream is set up. + * @returns {Promise} A function that, when called, unsubscribes from all the messages stream and ends real-time updates. */ async streamAllMessages( callback: (message: DecodedMessage) => Promise - ): Promise { + ): Promise<() => void> { XMTPModule.subscribeToAllMessages(this.client.address) - XMTPModule.emitter.addListener( + const messagesSubscription = XMTPModule.emitter.addListener( 'message', async ({ clientAddress, @@ -123,19 +126,9 @@ export default class Conversations { await callback(DecodedMessage.fromObject(message, this.client)) } ) - } - - /** - * Cancels the stream for new conversations. - */ - cancelStream() { - XMTPModule.unsubscribeFromConversations(this.client.address) - } - - /** - * Cancels the stream for new messages in all conversations. - */ - cancelStreamAllMessages() { - XMTPModule.unsubscribeFromAllMessages(this.client.address) + return () => { + messagesSubscription.remove() + XMTPModule.unsubscribeFromAllMessages(this.client.address) + } } }