Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal : unsubscribe all message & conversation event #188

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,20 @@ These conversations include all conversations for a user **regardless of which a
You can also listen for new conversations being started in real-time. This will allow applications to display incoming messages from new contacts.

> **Warning**
> This stream will continue infinitely. To end the stream you can call `cancelStream()`.
> This stream will continue infinitely. To end the stream you can call the method returned by `conversations.stream()`.

```tsx
const stream = await xmtp.conversations.stream()
for await (const conversation of stream) {
console.log(`New conversation started with ${conversation.peerAddress}`)
// Say hello to your new friend
await conversation.send('Hi there!')
// Break from the loop to stop listening
break
}
const cancelStream = await xmtp.conversations.stream(
async (conversation) => {
console.log(`New conversation started with ${conversation.peerAddress}`)

// Say hello to your new friend
await conversation.send('Hi there!')
}
)

// To unsubscribe from the stream
cancelStream()
```

### Start a new conversation
Expand Down Expand Up @@ -222,19 +225,22 @@ You can listen for any new messages (incoming or outgoing) in a conversation by

A successfully received message (that makes it through the decoding and decryption without throwing) can be trusted to be authentic, i.e. that it was sent by the owner of the `message.senderAddress` wallet and that it wasn't modified in transit. The `message.sent` timestamp can be trusted to have been set by the sender.

The Stream returned by the `stream` methods is an asynchronous iterator and as such usable by a for-await-of loop. Note however that it is by its nature infinite, so any looping construct used with it will not terminate, unless the termination is explicitly initiated by calling `cancelStreamMessages()`.
The Stream returned by the `stream` methods is an asynchronous iterator and as such usable by a for-await-of loop. Note however that it is by its nature infinite, so any looping construct used with it will not terminate, unless the termination is explicitly initiated by calling the method returned by `conversation.streamMessages()`.

```tsx
const conversation = await xmtp.conversations.newConversation(
kele-leanes marked this conversation as resolved.
Show resolved Hide resolved
'0x3F11b27F323b62B159D2642964fa27C46C841897'
)
for await (const message of await conversation.streamMessages()) {
const cancelStream = await conversation.streamMessages(async (message) => {
if (message.senderAddress === xmtp.address) {
// This message was sent from me
continue
}
console.log(`New message from ${message.senderAddress}: ${message.content}`)
}
})

// To unsubscribe from the stream
cancelStream()
```

### Listen for new messages in all conversations
Expand All @@ -245,16 +251,21 @@ To listen for any new messages from _all_ conversations, use `conversations.stre
> There is a chance this stream can miss messages if multiple new conversations are received in the time it takes to update the stream to include a new conversation.

> **Warning**
> This stream will continue infinitely. To end the stream you can call `cancelStreamAllMessages()`.
> This stream will continue infinitely. To end the stream you can call the method returned by `conversations.streamAllMessages()`.

```tsx
for await (const message of await xmtp.conversations.streamAllMessages()) {
if (message.senderAddress === xmtp.address) {
// This message was sent from me
continue
const cancelAllMessagesStream = await xmtp.conversations.streamAllMessages(
async (message) => {
if (message.senderAddress === xmtp.address) {
// This message was sent from me
continue
}
console.log(`New message from ${message.senderAddress}: ${message.content}`)
}
console.log(`New message from ${message.senderAddress}: ${message.content}`)
}
)

// To unsubscribe from the all messages stream
cancelAllMessagesStream()
```

## Request and respect user consent
Expand Down
30 changes: 18 additions & 12 deletions example/src/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,19 @@ test('can stream messages', async () => {

// Record new conversation stream
const allConversations: Conversation<any>[] = []
await alice.conversations.stream(async (conversation) => {
allConversations.push(conversation)
})
const cancelStream = await alice.conversations.stream(
async (conversation) => {
allConversations.push(conversation)
}
)

// Record message stream across all conversations
const allMessages: DecodedMessage[] = []
await alice.conversations.streamAllMessages(async (message) => {
allMessages.push(message)
})
const cancelStreamAllMessages = await alice.conversations.streamAllMessages(
async (message) => {
allMessages.push(message)
}
)

// Start Bob starts a new conversation.
const bobConvo = await bob.conversations.newConversation(alice.address, {
Expand Down Expand Up @@ -447,8 +451,8 @@ test('can stream messages', async () => {
throw Error('Unexpected convo message topic ' + convoMessages[i].topic)
}
}
alice.conversations.cancelStream()
alice.conversations.cancelStreamAllMessages()
cancelStream()
cancelStreamAllMessages()

return true
})
Expand Down Expand Up @@ -586,9 +590,11 @@ test('can stream all messages', async () => {

// Record message stream across all conversations
const allMessages: DecodedMessage[] = []
await alix.conversations.streamAllMessages(async (message) => {
allMessages.push(message)
})
const cancelStreamAllMessages = await alix.conversations.streamAllMessages(
async (message) => {
allMessages.push(message)
}
)

// Start Bob starts a new conversation.
const boConvo = await bo.conversations.newConversation(alix.address)
Expand Down Expand Up @@ -617,7 +623,7 @@ test('can stream all messages', async () => {
throw Error('Unexpected all messages count ' + allMessages.length)
}

alix.conversations.cancelStreamAllMessages()
cancelStreamAllMessages()

await alix.conversations.streamAllMessages(async (message) => {
allMessages.push(message)
Expand Down
35 changes: 14 additions & 21 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ export default class Conversations<ContentTypes> {
* This method subscribes to conversations in real-time and listens for incoming conversation events.
* When a new conversation is detected, the provided callback function is invoked with the details of the conversation.
* @param {Function} callback - A callback function that will be invoked with the new Conversation when a conversation is started.
* @returns {Promise<void>} A Promise that resolves when the stream is set up.
* @warning This stream will continue infinitely. To end the stream, you can call {@linkcode Conversations.cancelStream | cancelStream()}.
* @returns {Promise<void>} A function that, when called, unsubscribes from the message stream and ends real-time updates..
*/
async stream(
callback: (conversation: Conversation<ContentTypes>) => Promise<void>
) {
): Promise<() => void> {
XMTPModule.subscribeToConversations(this.client.address)
XMTPModule.emitter.addListener(
const conversationSubscription = XMTPModule.emitter.addListener(
'conversation',
async ({
clientAddress,
Expand All @@ -90,20 +89,24 @@ export default class Conversations<ContentTypes> {
await callback(new Conversation(this.client, conversation))
}
)
return () => {
conversationSubscription.remove()
XMTPModule.unsubscribeFromConversations(this.client.address)
}
}

/**
* Listen for new messages in all conversations.
*
* This method subscribes to all conversations in real-time and listens for incoming and outgoing messages.
* @param {Function} callback - A callback function that will be invoked when a message is sent or received.
* @returns {Promise<void>} A Promise that resolves when the stream is set up.
* @returns {Promise<void>} A function that, when called, unsubscribes from all the messages stream and ends real-time updates.
*/
async streamAllMessages(
callback: (message: DecodedMessage) => Promise<void>
): Promise<void> {
): Promise<() => void> {
XMTPModule.subscribeToAllMessages(this.client.address)
XMTPModule.emitter.addListener(
const messagesSubscription = XMTPModule.emitter.addListener(
'message',
async ({
clientAddress,
Expand All @@ -123,19 +126,9 @@ export default class Conversations<ContentTypes> {
await callback(DecodedMessage.fromObject(message, this.client))
}
)
}

/**
* Cancels the stream for new conversations.
*/
cancelStream() {
XMTPModule.unsubscribeFromConversations(this.client.address)
}

/**
* Cancels the stream for new messages in all conversations.
*/
cancelStreamAllMessages() {
XMTPModule.unsubscribeFromAllMessages(this.client.address)
return () => {
messagesSubscription.remove()
XMTPModule.unsubscribeFromAllMessages(this.client.address)
}
}
}
Loading