Skip to content

Commit

Permalink
Merge pull request #290 from xmtp/cv/fix-stream-unsubscribe
Browse files Browse the repository at this point in the history
Fix stream unsubscribe, remove event emitters
  • Loading branch information
cameronvoell authored Mar 1, 2024
2 parents 114122d + 30da179 commit 622c733
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 7 deletions.
1 change: 1 addition & 0 deletions example/src/Navigation.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createNativeStackNavigator } from '@react-navigation/native-stack'

import { TestCategory } from './TestScreen'

export type NavigationParamList = {
Expand Down
13 changes: 12 additions & 1 deletion example/src/TestScreen.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { View, Text, Button, ScrollView } from 'react-native'

import { createdAtTests } from './tests/createdAtTests'
import { groupTests } from './tests/groupTests'
import { restartStreamTests } from './tests/restartStreamsTests'
import { Test } from './tests/test-utils'
import { tests } from './tests/tests'

Expand Down Expand Up @@ -104,6 +105,7 @@ export enum TestCategory {
tests = 'tests',
group = 'group',
createdAt = 'createdAt',
restartStreans = 'restartStreams',
}

export default function TestScreen(): JSX.Element {
Expand All @@ -112,7 +114,12 @@ export default function TestScreen(): JSX.Element {
const params = route.params as {
testSelection: TestCategory
}
const allTests = tests.concat(groupTests).concat(createdAtTests)
const allTests = [
...tests,
...groupTests,
...createdAtTests,
...restartStreamTests,
]
let activeTests, title
switch (params.testSelection) {
case TestCategory.all:
Expand All @@ -131,6 +138,10 @@ export default function TestScreen(): JSX.Element {
activeTests = createdAtTests
title = 'Created At Unit Tests'
break
case TestCategory.restartStreans:
activeTests = restartStreamTests
title = 'Restart Streams Unit Tests'
break
}

return (
Expand Down
6 changes: 3 additions & 3 deletions example/src/tests/groupTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ test('can make a MLS V3 client from bundle', async () => {

test('production MLS V3 client creation throws error', async () => {
try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const client = await Client.createRandom({
await Client.createRandom({
env: 'production',
appVersion: 'Testing/0.0.0',
enableAlphaMls: true,
})
} catch (error: any) {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
} catch (error) {
return true
}
throw new Error(
Expand Down
212 changes: 212 additions & 0 deletions example/src/tests/restartStreamsTests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { Test, assert, createClients, delayToPropogate } from './test-utils'

export const restartStreamTests: Test[] = []

function test(name: string, perform: () => Promise<boolean>) {
restartStreamTests.push({ name, run: perform })
}

test('Can cancel a stream and restart', async () => {
// Create clients
const [alix, bo, caro, davon] = await createClients(4)

// Start stream
let numEvents1 = 0
await alix.conversations.stream(async (_) => {
numEvents1++
})
await delayToPropogate()
await alix.conversations.newConversation(bo.address)
await delayToPropogate()
assert(numEvents1 === 1, 'expected 1 event, first stream')

// Cancel stream
alix.conversations.cancelStream()
await alix.conversations.newConversation(caro.address)
await delayToPropogate()
assert(numEvents1 === 1, 'expected 1 event, first stream after cancel')

// Start new stream
let numEvents2 = 0
await alix.conversations.stream(async (_) => {
numEvents2++
})
await delayToPropogate()

await alix.conversations.newConversation(davon.address)
await delayToPropogate()

// Verify correct number of events from each stream
assert(
numEvents1 === 1,
'expected 1 event, first stream after cancel, but found ' + numEvents1
)
assert(
numEvents2 === 1,
'expected 1 event, second stream, but found ' + numEvents2
)

return true
})

// Existing issue, client who started stream, creating groups will not
// be streamed
test('Can cancel a streamGroups and restart', async () => {
// Create clients
const [alix, bo, caro, davon] = await createClients(4)

// Start stream
let numEvents1 = 0
await alix.conversations.streamGroups(async (_) => {
numEvents1++
})
await delayToPropogate()
await bo.conversations.newGroup([alix.address])
await delayToPropogate()
assert(numEvents1 === 1, 'expected 1 event, first stream')

// Cancel stream
alix.conversations.cancelStreamGroups()
await caro.conversations.newGroup([alix.address])
await delayToPropogate()
assert(numEvents1 === 1, 'expected 1 event, first stream after cancel')

// Start new stream
let numEvents2 = 0
await alix.conversations.streamGroups(async (_) => {
numEvents2++
})
await delayToPropogate()

await davon.conversations.newGroup([alix.address])
await delayToPropogate()

// Verify correct number of events from each stream
assert(
numEvents1 === 1,
'expected 1 event, first stream after cancel, but found ' + numEvents1
)
assert(
numEvents2 === 1,
'expected 1 event, second stream, but found ' + numEvents2
)

return true
})

test('Can cancel a streamAllMessages and restart', async () => {
// Create clients
const [alix, bo] = await createClients(2)

// Create a group
await delayToPropogate()
await bo.conversations.newGroup([alix.address])
await bo.conversations.newConversation(alix.address)
await delayToPropogate()

// Start stream
let numEvents1 = 0
await alix.conversations.streamAllMessages(async (_) => {
numEvents1++
}, true)
await delayToPropogate()

// Send one Group message and one Conversation Message
const boGroup = (await bo.conversations.listGroups())[0]
const boConversation = (await bo.conversations.list())[0]

await boGroup.send('test')
await boConversation.send('test')
await delayToPropogate()

assert(
numEvents1 === 2,
'expected 2 events, first stream, but found ' + numEvents1
)

// Cancel stream
alix.conversations.cancelStreamAllMessages()
await boGroup.send('test')
await boConversation.send('test')
await delayToPropogate()
assert(numEvents1 === 2, 'expected 2 events, first stream after cancel')

// Start new stream
let numEvents2 = 0
await alix.conversations.streamAllMessages(async (_) => {
numEvents2++
}, true)
await delayToPropogate()

await boGroup.send('test')
await boConversation.send('test')
await delayToPropogate()

// Verify correct number of events from each stream
assert(
numEvents1 === 2,
'expected 2 events, first stream after cancel, but found ' + numEvents1
)
assert(
numEvents2 === 2,
'expected 2 events, second stream, but found ' + numEvents2
)

return true
})

test('Can cancel a streamAllGroupMessages and restart', async () => {
// Create clients
const [alix, bo] = await createClients(2)

// Create a group
await delayToPropogate()
await bo.conversations.newGroup([alix.address])
await delayToPropogate()

// Start stream
let numEvents1 = 0
await alix.conversations.streamAllGroupMessages(async (_) => {
numEvents1++
})
await delayToPropogate()

// Send one Group message and one Conversation Message
const boGroup = (await bo.conversations.listGroups())[0]

await boGroup.send('test')
await delayToPropogate()

assert(
numEvents1 === 1,
'expected 1 events, first stream, but found ' + numEvents1
)

// Cancel stream
alix.conversations.cancelStreamAllGroupMessages()
await boGroup.send('test')
await delayToPropogate()
assert(numEvents1 === 1, 'expected 1 event, first stream after cancel')

// Start new stream
let numEvents2 = 0
await alix.conversations.streamAllGroupMessages(async (_) => {
numEvents2++
})
await delayToPropogate()

await boGroup.send('test')
await delayToPropogate()

// Verify correct number of events from each stream
assert(
numEvents1 === 1,
'expected 1 event, first stream after cancel, but found ' + numEvents1
)
assert(
numEvents2 === 1,
'expected 1 event, second stream, but found ' + numEvents2
)

return true
})
27 changes: 24 additions & 3 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export default class Conversations<
> {
client: Client<ContentTypes>
private known = {} as { [topic: string]: boolean }
private subscriptions: { [key: string]: { remove: () => void } } = {}

constructor(client: Client<ContentTypes>) {
this.client = client
Expand Down Expand Up @@ -126,6 +127,7 @@ export default class Conversations<
await callback(new Group(this.client, group))
}
)
this.subscriptions[EventTypes.Group] = groupsSubscription
return () => {
groupsSubscription.remove()
XMTPModule.unsubscribeFromGroups(this.client.address)
Expand Down Expand Up @@ -168,7 +170,7 @@ export default class Conversations<
callback: (conversation: Conversation<ContentTypes>) => Promise<void>
) {
XMTPModule.subscribeToConversations(this.client.address)
XMTPModule.emitter.addListener(
const subscription = XMTPModule.emitter.addListener(
EventTypes.Conversation,
async ({
clientAddress,
Expand All @@ -188,6 +190,7 @@ export default class Conversations<
await callback(new Conversation(this.client, conversation))
}
)
this.subscriptions[EventTypes.Conversation] = subscription
}

/**
Expand Down Expand Up @@ -252,7 +255,7 @@ export default class Conversations<
includeGroups: boolean = false
): Promise<void> {
XMTPModule.subscribeToAllMessages(this.client.address, includeGroups)
XMTPModule.emitter.addListener(
const subscription = XMTPModule.emitter.addListener(
EventTypes.Message,
async ({
clientAddress,
Expand All @@ -272,6 +275,7 @@ export default class Conversations<
await callback(DecodedMessage.fromObject(message, this.client))
}
)
this.subscriptions[EventTypes.Message] = subscription
}

/**
Expand All @@ -285,7 +289,7 @@ export default class Conversations<
callback: (message: DecodedMessage<ContentTypes>) => Promise<void>
): Promise<void> {
XMTPModule.subscribeToAllGroupMessages(this.client.address)
XMTPModule.emitter.addListener(
const subscription = XMTPModule.emitter.addListener(
EventTypes.AllGroupMessage,
async ({
clientAddress,
Expand All @@ -305,33 +309,50 @@ export default class Conversations<
await callback(DecodedMessage.fromObject(message, this.client))
}
)
this.subscriptions[EventTypes.AllGroupMessage] = subscription
}

/**
* Cancels the stream for new conversations.
*/
cancelStream() {
if (this.subscriptions[EventTypes.Conversation]) {
this.subscriptions[EventTypes.Conversation].remove()
delete this.subscriptions[EventTypes.Conversation]
}
XMTPModule.unsubscribeFromConversations(this.client.address)
}

/**
* Cancels the stream for new conversations.
*/
cancelStreamGroups() {
if (this.subscriptions[EventTypes.Group]) {
this.subscriptions[EventTypes.Group].remove()
delete this.subscriptions[EventTypes.Group]
}
XMTPModule.unsubscribeFromGroups(this.client.address)
}

/**
* Cancels the stream for new messages in all conversations.
*/
cancelStreamAllMessages() {
if (this.subscriptions[EventTypes.Message]) {
this.subscriptions[EventTypes.Message].remove()
delete this.subscriptions[EventTypes.Message]
}
XMTPModule.unsubscribeFromAllMessages(this.client.address)
}

/**
* Cancels the stream for new messages in all groups.
*/
cancelStreamAllGroupMessages() {
if (this.subscriptions[EventTypes.AllGroupMessage]) {
this.subscriptions[EventTypes.AllGroupMessage].remove()
delete this.subscriptions[EventTypes.AllGroupMessage]
}
XMTPModule.unsubscribeFromAllGroupMessages(this.client.address)
}
}

0 comments on commit 622c733

Please sign in to comment.