Skip to content

Commit

Permalink
unsubscribe all message & conversation event
Browse files Browse the repository at this point in the history
  • Loading branch information
kele-leanes committed Dec 18, 2023
1 parent f22e116 commit 0483ee6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
30 changes: 18 additions & 12 deletions example/src/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,19 @@ test('can stream messages', async () => {

// Record new conversation stream
const allConversations: Conversation<any>[] = []
await alice.conversations.stream(async (conversation) => {
allConversations.push(conversation)
})
const cancelStream = await alice.conversations.stream(
async (conversation) => {
allConversations.push(conversation)
}
)

// Record message stream across all conversations
const allMessages: DecodedMessage[] = []
await alice.conversations.streamAllMessages(async (message) => {
allMessages.push(message)
})
const cancelStreamAllMessages = await alice.conversations.streamAllMessages(
async (message) => {
allMessages.push(message)
}
)

// Start Bob starts a new conversation.
const bobConvo = await bob.conversations.newConversation(alice.address, {
Expand Down Expand Up @@ -447,8 +451,8 @@ test('can stream messages', async () => {
throw Error('Unexpected convo message topic ' + convoMessages[i].topic)
}
}
alice.conversations.cancelStream()
alice.conversations.cancelStreamAllMessages()
cancelStream()
cancelStreamAllMessages()

return true
})
Expand Down Expand Up @@ -586,9 +590,11 @@ test('can stream all messages', async () => {

// Record message stream across all conversations
const allMessages: DecodedMessage[] = []
await alix.conversations.streamAllMessages(async (message) => {
allMessages.push(message)
})
const cancelStreamAllMessages = await alix.conversations.streamAllMessages(
async (message) => {
allMessages.push(message)
}
)

// Start Bob starts a new conversation.
const boConvo = await bo.conversations.newConversation(alix.address)
Expand Down Expand Up @@ -617,7 +623,7 @@ test('can stream all messages', async () => {
throw Error('Unexpected all messages count ' + allMessages.length)
}

alix.conversations.cancelStreamAllMessages()
cancelStreamAllMessages()

await alix.conversations.streamAllMessages(async (message) => {
allMessages.push(message)
Expand Down
30 changes: 12 additions & 18 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ export default class Conversations<ContentTypes> {
*/
async stream(
callback: (conversation: Conversation<ContentTypes>) => Promise<void>
) {
): Promise<() => void> {
XMTPModule.subscribeToConversations(this.client.address)
XMTPModule.emitter.addListener(
const conversationSubscription = XMTPModule.emitter.addListener(
'conversation',
async ({
clientAddress,
Expand All @@ -90,6 +90,10 @@ export default class Conversations<ContentTypes> {
await callback(new Conversation(this.client, conversation))
}
)
return () => {
conversationSubscription.remove()
XMTPModule.unsubscribeFromConversations(this.client.address)
}
}

/**
Expand All @@ -101,9 +105,9 @@ export default class Conversations<ContentTypes> {
*/
async streamAllMessages(
callback: (message: DecodedMessage) => Promise<void>
): Promise<void> {
): Promise<() => void> {
XMTPModule.subscribeToAllMessages(this.client.address)
XMTPModule.emitter.addListener(
const messagesSubscription = XMTPModule.emitter.addListener(
'message',
async ({
clientAddress,
Expand All @@ -123,19 +127,9 @@ export default class Conversations<ContentTypes> {
await callback(DecodedMessage.fromObject(message, this.client))
}
)
}

/**
* Cancels the stream for new conversations.
*/
cancelStream() {
XMTPModule.unsubscribeFromConversations(this.client.address)
}

/**
* Cancels the stream for new messages in all conversations.
*/
cancelStreamAllMessages() {
XMTPModule.unsubscribeFromAllMessages(this.client.address)
return () => {
messagesSubscription.remove()
XMTPModule.unsubscribeFromAllMessages(this.client.address)
}
}
}

0 comments on commit 0483ee6

Please sign in to comment.