From 666a78f6e3811fd22854ebd83a4473e25aed857e Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 20 Feb 2024 23:44:01 -0800 Subject: [PATCH 1/6] feat: add android side of stream all group messages --- .../modules/xmtpreactnativesdk/XMTPModule.kt | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt index efd8582b6..d3b97ef05 100644 --- a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt +++ b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt @@ -731,9 +731,14 @@ class XMTPModule : Module() { subscribeToAll(clientAddress = clientAddress) } - Function("subscribeToAllMessages") { clientAddress: String -> + Function("subscribeToAllMessages") { clientAddress: String, includeGroups: Boolean -> logV("subscribeToAllMessages") - subscribeToAllMessages(clientAddress = clientAddress) + subscribeToAllMessages(clientAddress = clientAddress, includeGroups = includeGroups) + } + + Function("subscribeToAllGroupMessages") { clientAddress: String -> + logV("subscribeToAllGroupMessages") + subscribeToAllGroupMessages(clientAddress = clientAddress) } AsyncFunction("subscribeToMessages") { clientAddress: String, topic: String -> @@ -767,6 +772,11 @@ class XMTPModule : Module() { subscriptions[getMessagesKey(clientAddress)]?.cancel() } + Function("unsubscribeFromAllGroupMessages") { clientAddress: String -> + logV("unsubscribeFromAllGroupMessages") + subscriptions[getGroupMessagesKey(clientAddress)]?.cancel() + } + AsyncFunction("unsubscribeFromMessages") { clientAddress: String, topic: String -> logV("unsubscribeFromMessages") unsubscribeFromMessages( @@ -979,13 +989,13 @@ class XMTPModule : Module() { } } - private fun subscribeToAllMessages(clientAddress: String) { + private fun subscribeToAllMessages(clientAddress: String, includeGroups: Boolean = false) { val client = clients[clientAddress] ?: throw XMTPException("No client") subscriptions[getMessagesKey(clientAddress)]?.cancel() subscriptions[getMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch { try { - client.conversations.streamAllDecryptedMessages().collect { message -> + client.conversations.streamAllDecryptedMessages(includeGroups = includeGroups).collect { message -> sendEvent( "message", mapOf( @@ -1001,6 +1011,28 @@ class XMTPModule : Module() { } } + private fun subscribeToAllGroupMessages(clientAddress: String) { + val client = clients[clientAddress] ?: throw XMTPException("No client") + + subscriptions[getGroupMessagesKey(clientAddress)]?.cancel() + subscriptions[getGroupMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch { + try { + client.conversations.streamAllGroupDecryptedMessages().collect { message -> + sendEvent( + "message", + mapOf( + "clientAddress" to clientAddress, + "message" to DecodedMessageWrapper.encodeMap(message), + ) + ) + } + } catch (e: Exception) { + Log.e("XMTPModule", "Error in all group messages subscription: $e") + subscriptions[getGroupMessagesKey(clientAddress)]?.cancel() + } + } + } + private fun subscribeToMessages(clientAddress: String, topic: String) { val conversation = findConversation( @@ -1057,6 +1089,10 @@ class XMTPModule : Module() { return "messages:$clientAddress" } + private fun getGroupMessagesKey(clientAddress: String): String { + return "groupMessages:$clientAddress" + } + private fun getConversationsKey(clientAddress: String): String { return "conversations:$clientAddress" } From 56682300dea250f37d240033031f235c0a83268a Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 20 Feb 2024 23:49:15 -0800 Subject: [PATCH 2/6] add the RN side of the feature --- src/index.ts | 15 ++++++++++++-- src/lib/Conversations.ts | 44 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/src/index.ts b/src/index.ts index 0f31ff4ff..e9b4721fd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -506,8 +506,15 @@ export function subscribeToGroups(clientAddress: string) { return XMTPModule.subscribeToGroups(clientAddress) } -export function subscribeToAllMessages(clientAddress: string) { - return XMTPModule.subscribeToAllMessages(clientAddress) +export function subscribeToAllMessages( + clientAddress: string, + includeGroups: boolean +) { + return XMTPModule.subscribeToAllMessages(clientAddress, includeGroups) +} + +export function subscribeToAllGroupMessages(clientAddress: string) { + return XMTPModule.subscribeToAllGroupMessages(clientAddress) } export async function subscribeToMessages( @@ -529,6 +536,10 @@ export function unsubscribeFromAllMessages(clientAddress: string) { return XMTPModule.unsubscribeFromAllMessages(clientAddress) } +export function unsubscribeFromAllGroupMessages(clientAddress: string) { + return XMTPModule.unsubscribeFromAllGroupMessages(clientAddress) +} + export async function unsubscribeFromMessages( clientAddress: string, topic: string diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index 2c04a3de1..77337cc44 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -242,13 +242,48 @@ export default class Conversations< * Listen for new messages in all conversations. * * This method subscribes to all conversations in real-time and listens for incoming and outgoing messages. + * @param {boolean} includeGroups - Whether or not to include group messages in the stream. * @param {Function} callback - A callback function that will be invoked when a message is sent or received. * @returns {Promise} A Promise that resolves when the stream is set up. */ async streamAllMessages( + includeGroups: boolean = false, callback: (message: DecodedMessage) => Promise ): Promise { - XMTPModule.subscribeToAllMessages(this.client.address) + XMTPModule.subscribeToAllMessages(this.client.address, includeGroups) + XMTPModule.emitter.addListener( + 'message', + async ({ + clientAddress, + message, + }: { + clientAddress: string + message: DecodedMessage + }) => { + if (clientAddress !== this.client.address) { + return + } + if (this.known[message.id]) { + return + } + + this.known[message.id] = true + await callback(DecodedMessage.fromObject(message, this.client)) + } + ) + } + + /** + * Listen for new messages in all groups. + * + * This method subscribes to all groups in real-time and listens for incoming and outgoing messages. + * @param {Function} callback - A callback function that will be invoked when a message is sent or received. + * @returns {Promise} A Promise that resolves when the stream is set up. + */ + async streamAllGroupMessages( + callback: (message: DecodedMessage) => Promise + ): Promise { + XMTPModule.subscribeToAllGroupMessages(this.client.address) XMTPModule.emitter.addListener( 'message', async ({ @@ -291,4 +326,11 @@ export default class Conversations< cancelStreamAllMessages() { XMTPModule.unsubscribeFromAllMessages(this.client.address) } + + /** + * Cancels the stream for new messages in all groups. + */ + cancelStreamAllGroupMessages() { + XMTPModule.unsubscribeFromAllGroupMessages(this.client.address) + } } From 5e9b4d6f2cb2a3797afc60a45ef4e8424ddb4bdc Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 20 Feb 2024 23:50:43 -0800 Subject: [PATCH 3/6] bump the pod --- example/ios/Podfile.lock | 8 ++++---- ios/XMTPReactNative.podspec | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/example/ios/Podfile.lock b/example/ios/Podfile.lock index 867c88bcf..e35aac798 100644 --- a/example/ios/Podfile.lock +++ b/example/ios/Podfile.lock @@ -442,7 +442,7 @@ PODS: - GenericJSON (~> 2.0) - Logging (~> 1.0.0) - secp256k1.swift (~> 0.1) - - XMTP (0.8.9): + - XMTP (0.8.10): - Connect-Swift (= 0.3.0) - GzipSwift - LibXMTP (= 0.4.2-beta3) @@ -451,7 +451,7 @@ PODS: - ExpoModulesCore - MessagePacker - secp256k1.swift - - XMTP (= 0.8.9) + - XMTP (= 0.8.10) - Yoga (1.14.0) DEPENDENCIES: @@ -744,8 +744,8 @@ SPEC CHECKSUMS: secp256k1.swift: a7e7a214f6db6ce5db32cc6b2b45e5c4dd633634 SwiftProtobuf: b02b5075dcf60c9f5f403000b3b0c202a11b6ae1 web3.swift: 2263d1e12e121b2c42ffb63a5a7beb1acaf33959 - XMTP: 4bc651602abefdb119a7ac2e8a12e9bf67aa3447 - XMTPReactNative: 4c0ae41719aa36ca90c3f65ff998977c3011bca8 + XMTP: 2c9f62b75b68e28ce91b67a7fb56ca71b7e22e87 + XMTPReactNative: 734154aef1c07f656641a587e9cc31e8cf8b587b Yoga: e71803b4c1fff832ccf9b92541e00f9b873119b9 PODFILE CHECKSUM: 95d6ace79946933ecf80684613842ee553dd76a2 diff --git a/ios/XMTPReactNative.podspec b/ios/XMTPReactNative.podspec index bcd8f30d1..97b1f096a 100644 --- a/ios/XMTPReactNative.podspec +++ b/ios/XMTPReactNative.podspec @@ -26,5 +26,5 @@ Pod::Spec.new do |s| s.source_files = "**/*.{h,m,swift}" s.dependency 'secp256k1.swift' s.dependency "MessagePacker" - s.dependency "XMTP", "= 0.8.9" + s.dependency "XMTP", "= 0.8.10" end From 6264f288626c38fb0a358651301660158c6ba9aa Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 20 Feb 2024 23:54:13 -0800 Subject: [PATCH 4/6] add the iOS side of the feature --- ios/XMTPModule.swift | 46 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/ios/XMTPModule.swift b/ios/XMTPModule.swift index 3f1dd4172..b89a19ac1 100644 --- a/ios/XMTPModule.swift +++ b/ios/XMTPModule.swift @@ -684,8 +684,12 @@ public class XMTPModule: Module { try await subscribeToConversations(clientAddress: clientAddress) } - AsyncFunction("subscribeToAllMessages") { (clientAddress: String) in - try await subscribeToAllMessages(clientAddress: clientAddress) + AsyncFunction("subscribeToAllMessages") { (clientAddress: String, includeGroups: Bool) in + try await subscribeToAllMessages(clientAddress: clientAddress, includeGroups: includeGroups) + } + + AsyncFunction("subscribeToAllGroupMessages") { (clientAddress: String) in + try await subscribeToAllGroupMessages(clientAddress: clientAddress) } AsyncFunction("subscribeToMessages") { (clientAddress: String, topic: String) in @@ -711,6 +715,11 @@ public class XMTPModule: Module { AsyncFunction("unsubscribeFromAllMessages") { (clientAddress: String) in await subscriptionsManager.get(getMessagesKey(clientAddress: clientAddress))?.cancel() } + + AsyncFunction("unsubscribeFromAllGroupMessages") { (clientAddress: String) in + await subscriptionsManager.get(getGroupMessagesKey(clientAddress: clientAddress))?.cancel() + } + AsyncFunction("unsubscribeFromMessages") { (clientAddress: String, topic: String) in try await unsubscribeFromMessages(clientAddress: clientAddress, topic: topic) @@ -916,7 +925,7 @@ public class XMTPModule: Module { }) } - func subscribeToAllMessages(clientAddress: String) async throws { + func subscribeToAllMessages(clientAddress: String, includeGroups: Bool = false) async throws { guard let client = await clientsManager.getClient(key: clientAddress) else { return } @@ -924,7 +933,32 @@ public class XMTPModule: Module { await subscriptionsManager.get(getMessagesKey(clientAddress: clientAddress))?.cancel() await subscriptionsManager.set(getMessagesKey(clientAddress: clientAddress), Task { do { - for try await message in try await client.conversations.streamAllDecryptedMessages() { + for try await message in await client.conversations.streamAllDecryptedMessages(includeGroups: includeGroups) { + do { + try sendEvent("message", [ + "clientAddress": clientAddress, + "message": DecodedMessageWrapper.encodeToObj(message, client: client), + ]) + } catch { + print("discarding message, unable to encode wrapper \(message.id)") + } + } + } catch { + print("Error in all messages subscription: \(error)") + await subscriptionsManager.get(getMessagesKey(clientAddress: clientAddress))?.cancel() + } + }) + } + + func subscribeToAllGroupMessages(clientAddress: String) async throws { + guard let client = await clientsManager.getClient(key: clientAddress) else { + return + } + + await subscriptionsManager.get(getGroupMessagesKey(clientAddress: clientAddress))?.cancel() + await subscriptionsManager.set(getGroupMessagesKey(clientAddress: clientAddress), Task { + do { + for try await message in await client.conversations.streamAllGroupDecryptedMessages() { do { try sendEvent("message", [ "clientAddress": clientAddress, @@ -1060,6 +1094,10 @@ public class XMTPModule: Module { func getMessagesKey(clientAddress: String) -> String { return "messages:\(clientAddress)" } + + func getGroupMessagesKey(clientAddress: String) -> String { + return "groupMessages:\(clientAddress)" + } func getConversationsKey(clientAddress: String) -> String { return "conversations:\(clientAddress)" From 6d9bd153d3417f46ce944f7c8d7bf52e54317665 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Feb 2024 00:06:53 -0800 Subject: [PATCH 5/6] add tests for it --- example/src/tests/groupTests.ts | 70 ++++++++++++++++++++++++++++++--- src/lib/Conversations.ts | 4 +- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/example/src/tests/groupTests.ts b/example/src/tests/groupTests.ts index ffc059535..e082648b5 100644 --- a/example/src/tests/groupTests.ts +++ b/example/src/tests/groupTests.ts @@ -673,9 +673,9 @@ test('can stream group messages', async () => { }) test('can stream all messages', async () => { - const bo = await Client.createRandom({ env: 'local' }) + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) await delayToPropogate() - const alix = await Client.createRandom({ env: 'local' }) + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) await delayToPropogate() // Record message stream across all conversations @@ -699,11 +699,13 @@ test('can stream all messages', async () => { } // Starts a new conversation. - const caro = await Client.createRandom({ env: 'local' }) + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) const caroConvo = await caro.conversations.newConversation(alix.address) + const caroGroup = await caro.conversations.newGroup([alix.address]) await delayToPropogate() for (let i = 0; i < 5; i++) { await caroConvo.send({ text: `Message ${i}` }) + await caroGroup.send({ text: `Message ${i}` }) await delayToPropogate() } @@ -715,13 +717,14 @@ test('can stream all messages', async () => { await alix.conversations.streamAllMessages(async (message) => { allMessages.push(message) - }) + }, true) for (let i = 0; i < 5; i++) { await boConvo.send({ text: `Message ${i}` }) + await caroGroup.send({ text: `Message ${i}` }) await delayToPropogate() } - if (allMessages.length <= 10) { + if (allMessages.length <= 15) { throw Error('Unexpected all messages count ' + allMessages.length) } @@ -800,3 +803,60 @@ test('can paginate group messages', async () => { return true }) + +test('can stream all group messages', async () => { + const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + await delayToPropogate() + + // Start Bob starts a new group. + const boGroup = await bo.conversations.newGroup([alix.address]) + await delayToPropogate() + + // Starts a new conversation. + const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true }) + const caroGroup = await caro.conversations.newGroup([alix.address]) + + // Record message stream across all conversations + const allMessages: DecodedMessage[] = [] + await alix.conversations.streamAllGroupMessages(async (message) => { + allMessages.push(message) + }) + + for (let i = 0; i < 5; i++) { + await boGroup.send({ text: `Message ${i}` }) + await delayToPropogate() + } + + const count = allMessages.length + if (count !== 5) { + throw Error('Unexpected all messages count first' + allMessages.length) + } + + await delayToPropogate() + for (let i = 0; i < 5; i++) { + await caroGroup.send({ text: `Message ${i}` }) + await delayToPropogate() + } + + if (allMessages.length !== 10) { + throw Error('Unexpected all messages count second' + allMessages.length) + } + + alix.conversations.cancelStreamAllGroupMessages() + + await alix.conversations.streamAllGroupMessages(async (message) => { + allMessages.push(message) + }) + + for (let i = 0; i < 5; i++) { + await boGroup.send({ text: `Message ${i}` }) + await delayToPropogate() + } + if (allMessages.length <= 10) { + throw Error('Unexpected all messages count ' + allMessages.length) + } + + return true +}) diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index 77337cc44..6e371a454 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -247,8 +247,8 @@ export default class Conversations< * @returns {Promise} A Promise that resolves when the stream is set up. */ async streamAllMessages( - includeGroups: boolean = false, - callback: (message: DecodedMessage) => Promise + callback: (message: DecodedMessage) => Promise, + includeGroups: boolean = false ): Promise { XMTPModule.subscribeToAllMessages(this.client.address, includeGroups) XMTPModule.emitter.addListener( From 6c211788276822f63c3820e76640541a9141e667 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Wed, 21 Feb 2024 07:54:01 -0800 Subject: [PATCH 6/6] Update example/src/tests/groupTests.ts Co-authored-by: Cameron Voell --- example/src/tests/groupTests.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/src/tests/groupTests.ts b/example/src/tests/groupTests.ts index e082648b5..6314943a0 100644 --- a/example/src/tests/groupTests.ts +++ b/example/src/tests/groupTests.ts @@ -845,7 +845,7 @@ test('can stream all group messages', async () => { } alix.conversations.cancelStreamAllGroupMessages() - + await delayToPropogate() await alix.conversations.streamAllGroupMessages(async (message) => { allMessages.push(message) })