diff --git a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt index efd8582b6..d3b97ef05 100644 --- a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt +++ b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt @@ -731,9 +731,14 @@ class XMTPModule : Module() { subscribeToAll(clientAddress = clientAddress) } - Function("subscribeToAllMessages") { clientAddress: String -> + Function("subscribeToAllMessages") { clientAddress: String, includeGroups: Boolean -> logV("subscribeToAllMessages") - subscribeToAllMessages(clientAddress = clientAddress) + subscribeToAllMessages(clientAddress = clientAddress, includeGroups = includeGroups) + } + + Function("subscribeToAllGroupMessages") { clientAddress: String -> + logV("subscribeToAllGroupMessages") + subscribeToAllGroupMessages(clientAddress = clientAddress) } AsyncFunction("subscribeToMessages") { clientAddress: String, topic: String -> @@ -767,6 +772,11 @@ class XMTPModule : Module() { subscriptions[getMessagesKey(clientAddress)]?.cancel() } + Function("unsubscribeFromAllGroupMessages") { clientAddress: String -> + logV("unsubscribeFromAllGroupMessages") + subscriptions[getGroupMessagesKey(clientAddress)]?.cancel() + } + AsyncFunction("unsubscribeFromMessages") { clientAddress: String, topic: String -> logV("unsubscribeFromMessages") unsubscribeFromMessages( @@ -979,13 +989,13 @@ class XMTPModule : Module() { } } - private fun subscribeToAllMessages(clientAddress: String) { + private fun subscribeToAllMessages(clientAddress: String, includeGroups: Boolean = false) { val client = clients[clientAddress] ?: throw XMTPException("No client") subscriptions[getMessagesKey(clientAddress)]?.cancel() subscriptions[getMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch { try { - client.conversations.streamAllDecryptedMessages().collect { message -> + client.conversations.streamAllDecryptedMessages(includeGroups = includeGroups).collect { message -> sendEvent( "message", mapOf( @@ -1001,6 +1011,28 @@ class XMTPModule : Module() { } } + private fun subscribeToAllGroupMessages(clientAddress: String) { + val client = clients[clientAddress] ?: throw XMTPException("No client") + + subscriptions[getGroupMessagesKey(clientAddress)]?.cancel() + subscriptions[getGroupMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch { + try { + client.conversations.streamAllGroupDecryptedMessages().collect { message -> + sendEvent( + "message", + mapOf( + "clientAddress" to clientAddress, + "message" to DecodedMessageWrapper.encodeMap(message), + ) + ) + } + } catch (e: Exception) { + Log.e("XMTPModule", "Error in all group messages subscription: $e") + subscriptions[getGroupMessagesKey(clientAddress)]?.cancel() + } + } + } + private fun subscribeToMessages(clientAddress: String, topic: String) { val conversation = findConversation( @@ -1057,6 +1089,10 @@ class XMTPModule : Module() { return "messages:$clientAddress" } + private fun getGroupMessagesKey(clientAddress: String): String { + return "groupMessages:$clientAddress" + } + private fun getConversationsKey(clientAddress: String): String { return "conversations:$clientAddress" }