Skip to content

Commit

Permalink
feat: add android side of stream all group messages
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer committed Feb 21, 2024
1 parent 295487d commit 666a78f
Showing 1 changed file with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -731,9 +731,14 @@ class XMTPModule : Module() {
subscribeToAll(clientAddress = clientAddress)
}

Function("subscribeToAllMessages") { clientAddress: String ->
Function("subscribeToAllMessages") { clientAddress: String, includeGroups: Boolean ->
logV("subscribeToAllMessages")
subscribeToAllMessages(clientAddress = clientAddress)
subscribeToAllMessages(clientAddress = clientAddress, includeGroups = includeGroups)
}

Function("subscribeToAllGroupMessages") { clientAddress: String ->
logV("subscribeToAllGroupMessages")
subscribeToAllGroupMessages(clientAddress = clientAddress)
}

AsyncFunction("subscribeToMessages") { clientAddress: String, topic: String ->
Expand Down Expand Up @@ -767,6 +772,11 @@ class XMTPModule : Module() {
subscriptions[getMessagesKey(clientAddress)]?.cancel()
}

Function("unsubscribeFromAllGroupMessages") { clientAddress: String ->
logV("unsubscribeFromAllGroupMessages")
subscriptions[getGroupMessagesKey(clientAddress)]?.cancel()
}

AsyncFunction("unsubscribeFromMessages") { clientAddress: String, topic: String ->
logV("unsubscribeFromMessages")
unsubscribeFromMessages(
Expand Down Expand Up @@ -979,13 +989,13 @@ class XMTPModule : Module() {
}
}

private fun subscribeToAllMessages(clientAddress: String) {
private fun subscribeToAllMessages(clientAddress: String, includeGroups: Boolean = false) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions[getMessagesKey(clientAddress)]?.cancel()
subscriptions[getMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamAllDecryptedMessages().collect { message ->
client.conversations.streamAllDecryptedMessages(includeGroups = includeGroups).collect { message ->
sendEvent(
"message",
mapOf(
Expand All @@ -1001,6 +1011,28 @@ class XMTPModule : Module() {
}
}

private fun subscribeToAllGroupMessages(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions[getGroupMessagesKey(clientAddress)]?.cancel()
subscriptions[getGroupMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamAllGroupDecryptedMessages().collect { message ->
sendEvent(
"message",
mapOf(
"clientAddress" to clientAddress,
"message" to DecodedMessageWrapper.encodeMap(message),
)
)
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in all group messages subscription: $e")
subscriptions[getGroupMessagesKey(clientAddress)]?.cancel()
}
}
}

private fun subscribeToMessages(clientAddress: String, topic: String) {
val conversation =
findConversation(
Expand Down Expand Up @@ -1057,6 +1089,10 @@ class XMTPModule : Module() {
return "messages:$clientAddress"
}

private fun getGroupMessagesKey(clientAddress: String): String {
return "groupMessages:$clientAddress"
}

private fun getConversationsKey(clientAddress: String): String {
return "conversations:$clientAddress"
}
Expand Down

0 comments on commit 666a78f

Please sign in to comment.