Skip to content

Commit

Permalink
Merge pull request #280 from xmtp/np/stream-all-group-messages
Browse files Browse the repository at this point in the history
Stream All Group Messages
  • Loading branch information
nplasterer authored Feb 21, 2024
2 parents 295487d + 6c21178 commit f6e091c
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -731,9 +731,14 @@ class XMTPModule : Module() {
subscribeToAll(clientAddress = clientAddress)
}

Function("subscribeToAllMessages") { clientAddress: String ->
Function("subscribeToAllMessages") { clientAddress: String, includeGroups: Boolean ->
logV("subscribeToAllMessages")
subscribeToAllMessages(clientAddress = clientAddress)
subscribeToAllMessages(clientAddress = clientAddress, includeGroups = includeGroups)
}

Function("subscribeToAllGroupMessages") { clientAddress: String ->
logV("subscribeToAllGroupMessages")
subscribeToAllGroupMessages(clientAddress = clientAddress)
}

AsyncFunction("subscribeToMessages") { clientAddress: String, topic: String ->
Expand Down Expand Up @@ -767,6 +772,11 @@ class XMTPModule : Module() {
subscriptions[getMessagesKey(clientAddress)]?.cancel()
}

Function("unsubscribeFromAllGroupMessages") { clientAddress: String ->
logV("unsubscribeFromAllGroupMessages")
subscriptions[getGroupMessagesKey(clientAddress)]?.cancel()
}

AsyncFunction("unsubscribeFromMessages") { clientAddress: String, topic: String ->
logV("unsubscribeFromMessages")
unsubscribeFromMessages(
Expand Down Expand Up @@ -979,13 +989,13 @@ class XMTPModule : Module() {
}
}

private fun subscribeToAllMessages(clientAddress: String) {
private fun subscribeToAllMessages(clientAddress: String, includeGroups: Boolean = false) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions[getMessagesKey(clientAddress)]?.cancel()
subscriptions[getMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamAllDecryptedMessages().collect { message ->
client.conversations.streamAllDecryptedMessages(includeGroups = includeGroups).collect { message ->
sendEvent(
"message",
mapOf(
Expand All @@ -1001,6 +1011,28 @@ class XMTPModule : Module() {
}
}

private fun subscribeToAllGroupMessages(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions[getGroupMessagesKey(clientAddress)]?.cancel()
subscriptions[getGroupMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamAllGroupDecryptedMessages().collect { message ->
sendEvent(
"message",
mapOf(
"clientAddress" to clientAddress,
"message" to DecodedMessageWrapper.encodeMap(message),
)
)
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in all group messages subscription: $e")
subscriptions[getGroupMessagesKey(clientAddress)]?.cancel()
}
}
}

private fun subscribeToMessages(clientAddress: String, topic: String) {
val conversation =
findConversation(
Expand Down Expand Up @@ -1057,6 +1089,10 @@ class XMTPModule : Module() {
return "messages:$clientAddress"
}

private fun getGroupMessagesKey(clientAddress: String): String {
return "groupMessages:$clientAddress"
}

private fun getConversationsKey(clientAddress: String): String {
return "conversations:$clientAddress"
}
Expand Down
8 changes: 4 additions & 4 deletions example/ios/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ PODS:
- GenericJSON (~> 2.0)
- Logging (~> 1.0.0)
- secp256k1.swift (~> 0.1)
- XMTP (0.8.9):
- XMTP (0.8.10):
- Connect-Swift (= 0.3.0)
- GzipSwift
- LibXMTP (= 0.4.2-beta3)
Expand All @@ -451,7 +451,7 @@ PODS:
- ExpoModulesCore
- MessagePacker
- secp256k1.swift
- XMTP (= 0.8.9)
- XMTP (= 0.8.10)
- Yoga (1.14.0)

DEPENDENCIES:
Expand Down Expand Up @@ -744,8 +744,8 @@ SPEC CHECKSUMS:
secp256k1.swift: a7e7a214f6db6ce5db32cc6b2b45e5c4dd633634
SwiftProtobuf: b02b5075dcf60c9f5f403000b3b0c202a11b6ae1
web3.swift: 2263d1e12e121b2c42ffb63a5a7beb1acaf33959
XMTP: 4bc651602abefdb119a7ac2e8a12e9bf67aa3447
XMTPReactNative: 4c0ae41719aa36ca90c3f65ff998977c3011bca8
XMTP: 2c9f62b75b68e28ce91b67a7fb56ca71b7e22e87
XMTPReactNative: 734154aef1c07f656641a587e9cc31e8cf8b587b
Yoga: e71803b4c1fff832ccf9b92541e00f9b873119b9

PODFILE CHECKSUM: 95d6ace79946933ecf80684613842ee553dd76a2
Expand Down
70 changes: 65 additions & 5 deletions example/src/tests/groupTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,9 @@ test('can stream group messages', async () => {
})

test('can stream all messages', async () => {
const bo = await Client.createRandom({ env: 'local' })
const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const alix = await Client.createRandom({ env: 'local' })
const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()

// Record message stream across all conversations
Expand All @@ -699,11 +699,13 @@ test('can stream all messages', async () => {
}

// Starts a new conversation.
const caro = await Client.createRandom({ env: 'local' })
const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
const caroConvo = await caro.conversations.newConversation(alix.address)
const caroGroup = await caro.conversations.newGroup([alix.address])
await delayToPropogate()
for (let i = 0; i < 5; i++) {
await caroConvo.send({ text: `Message ${i}` })
await caroGroup.send({ text: `Message ${i}` })
await delayToPropogate()
}

Expand All @@ -715,13 +717,14 @@ test('can stream all messages', async () => {

await alix.conversations.streamAllMessages(async (message) => {
allMessages.push(message)
})
}, true)

for (let i = 0; i < 5; i++) {
await boConvo.send({ text: `Message ${i}` })
await caroGroup.send({ text: `Message ${i}` })
await delayToPropogate()
}
if (allMessages.length <= 10) {
if (allMessages.length <= 15) {
throw Error('Unexpected all messages count ' + allMessages.length)
}

Expand Down Expand Up @@ -800,3 +803,60 @@ test('can paginate group messages', async () => {

return true
})

test('can stream all group messages', async () => {
const bo = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()
const alix = await Client.createRandom({ env: 'local', enableAlphaMls: true })
await delayToPropogate()

// Start Bob starts a new group.
const boGroup = await bo.conversations.newGroup([alix.address])
await delayToPropogate()

// Starts a new conversation.
const caro = await Client.createRandom({ env: 'local', enableAlphaMls: true })
const caroGroup = await caro.conversations.newGroup([alix.address])

// Record message stream across all conversations
const allMessages: DecodedMessage[] = []
await alix.conversations.streamAllGroupMessages(async (message) => {
allMessages.push(message)
})

for (let i = 0; i < 5; i++) {
await boGroup.send({ text: `Message ${i}` })
await delayToPropogate()
}

const count = allMessages.length
if (count !== 5) {
throw Error('Unexpected all messages count first' + allMessages.length)
}

await delayToPropogate()
for (let i = 0; i < 5; i++) {
await caroGroup.send({ text: `Message ${i}` })
await delayToPropogate()
}

if (allMessages.length !== 10) {
throw Error('Unexpected all messages count second' + allMessages.length)
}

alix.conversations.cancelStreamAllGroupMessages()
await delayToPropogate()
await alix.conversations.streamAllGroupMessages(async (message) => {
allMessages.push(message)
})

for (let i = 0; i < 5; i++) {
await boGroup.send({ text: `Message ${i}` })
await delayToPropogate()
}
if (allMessages.length <= 10) {
throw Error('Unexpected all messages count ' + allMessages.length)
}

return true
})
46 changes: 42 additions & 4 deletions ios/XMTPModule.swift
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,12 @@ public class XMTPModule: Module {
try await subscribeToConversations(clientAddress: clientAddress)
}

AsyncFunction("subscribeToAllMessages") { (clientAddress: String) in
try await subscribeToAllMessages(clientAddress: clientAddress)
AsyncFunction("subscribeToAllMessages") { (clientAddress: String, includeGroups: Bool) in
try await subscribeToAllMessages(clientAddress: clientAddress, includeGroups: includeGroups)
}

AsyncFunction("subscribeToAllGroupMessages") { (clientAddress: String) in
try await subscribeToAllGroupMessages(clientAddress: clientAddress)
}

AsyncFunction("subscribeToMessages") { (clientAddress: String, topic: String) in
Expand All @@ -711,6 +715,11 @@ public class XMTPModule: Module {
AsyncFunction("unsubscribeFromAllMessages") { (clientAddress: String) in
await subscriptionsManager.get(getMessagesKey(clientAddress: clientAddress))?.cancel()
}

AsyncFunction("unsubscribeFromAllGroupMessages") { (clientAddress: String) in
await subscriptionsManager.get(getGroupMessagesKey(clientAddress: clientAddress))?.cancel()
}


AsyncFunction("unsubscribeFromMessages") { (clientAddress: String, topic: String) in
try await unsubscribeFromMessages(clientAddress: clientAddress, topic: topic)
Expand Down Expand Up @@ -916,15 +925,40 @@ public class XMTPModule: Module {
})
}

func subscribeToAllMessages(clientAddress: String) async throws {
func subscribeToAllMessages(clientAddress: String, includeGroups: Bool = false) async throws {
guard let client = await clientsManager.getClient(key: clientAddress) else {
return
}

await subscriptionsManager.get(getMessagesKey(clientAddress: clientAddress))?.cancel()
await subscriptionsManager.set(getMessagesKey(clientAddress: clientAddress), Task {
do {
for try await message in try await client.conversations.streamAllDecryptedMessages() {
for try await message in await client.conversations.streamAllDecryptedMessages(includeGroups: includeGroups) {
do {
try sendEvent("message", [
"clientAddress": clientAddress,
"message": DecodedMessageWrapper.encodeToObj(message, client: client),
])
} catch {
print("discarding message, unable to encode wrapper \(message.id)")
}
}
} catch {
print("Error in all messages subscription: \(error)")
await subscriptionsManager.get(getMessagesKey(clientAddress: clientAddress))?.cancel()
}
})
}

func subscribeToAllGroupMessages(clientAddress: String) async throws {
guard let client = await clientsManager.getClient(key: clientAddress) else {
return
}

await subscriptionsManager.get(getGroupMessagesKey(clientAddress: clientAddress))?.cancel()
await subscriptionsManager.set(getGroupMessagesKey(clientAddress: clientAddress), Task {
do {
for try await message in await client.conversations.streamAllGroupDecryptedMessages() {
do {
try sendEvent("message", [
"clientAddress": clientAddress,
Expand Down Expand Up @@ -1060,6 +1094,10 @@ public class XMTPModule: Module {
func getMessagesKey(clientAddress: String) -> String {
return "messages:\(clientAddress)"
}

func getGroupMessagesKey(clientAddress: String) -> String {
return "groupMessages:\(clientAddress)"
}

func getConversationsKey(clientAddress: String) -> String {
return "conversations:\(clientAddress)"
Expand Down
2 changes: 1 addition & 1 deletion ios/XMTPReactNative.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ Pod::Spec.new do |s|
s.source_files = "**/*.{h,m,swift}"
s.dependency 'secp256k1.swift'
s.dependency "MessagePacker"
s.dependency "XMTP", "= 0.8.9"
s.dependency "XMTP", "= 0.8.10"
end
15 changes: 13 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,15 @@ export function subscribeToGroups(clientAddress: string) {
return XMTPModule.subscribeToGroups(clientAddress)
}

export function subscribeToAllMessages(clientAddress: string) {
return XMTPModule.subscribeToAllMessages(clientAddress)
export function subscribeToAllMessages(
clientAddress: string,
includeGroups: boolean
) {
return XMTPModule.subscribeToAllMessages(clientAddress, includeGroups)
}

export function subscribeToAllGroupMessages(clientAddress: string) {
return XMTPModule.subscribeToAllGroupMessages(clientAddress)
}

export async function subscribeToMessages(
Expand All @@ -529,6 +536,10 @@ export function unsubscribeFromAllMessages(clientAddress: string) {
return XMTPModule.unsubscribeFromAllMessages(clientAddress)
}

export function unsubscribeFromAllGroupMessages(clientAddress: string) {
return XMTPModule.unsubscribeFromAllGroupMessages(clientAddress)
}

export async function unsubscribeFromMessages(
clientAddress: string,
topic: string
Expand Down
44 changes: 43 additions & 1 deletion src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,48 @@ export default class Conversations<
* Listen for new messages in all conversations.
*
* This method subscribes to all conversations in real-time and listens for incoming and outgoing messages.
* @param {boolean} includeGroups - Whether or not to include group messages in the stream.
* @param {Function} callback - A callback function that will be invoked when a message is sent or received.
* @returns {Promise<void>} A Promise that resolves when the stream is set up.
*/
async streamAllMessages(
callback: (message: DecodedMessage<ContentTypes>) => Promise<void>,
includeGroups: boolean = false
): Promise<void> {
XMTPModule.subscribeToAllMessages(this.client.address, includeGroups)
XMTPModule.emitter.addListener(
'message',
async ({
clientAddress,
message,
}: {
clientAddress: string
message: DecodedMessage
}) => {
if (clientAddress !== this.client.address) {
return
}
if (this.known[message.id]) {
return
}

this.known[message.id] = true
await callback(DecodedMessage.fromObject(message, this.client))
}
)
}

/**
* Listen for new messages in all groups.
*
* This method subscribes to all groups in real-time and listens for incoming and outgoing messages.
* @param {Function} callback - A callback function that will be invoked when a message is sent or received.
* @returns {Promise<void>} A Promise that resolves when the stream is set up.
*/
async streamAllGroupMessages(
callback: (message: DecodedMessage<ContentTypes>) => Promise<void>
): Promise<void> {
XMTPModule.subscribeToAllMessages(this.client.address)
XMTPModule.subscribeToAllGroupMessages(this.client.address)
XMTPModule.emitter.addListener(
'message',
async ({
Expand Down Expand Up @@ -291,4 +326,11 @@ export default class Conversations<
cancelStreamAllMessages() {
XMTPModule.unsubscribeFromAllMessages(this.client.address)
}

/**
* Cancels the stream for new messages in all groups.
*/
cancelStreamAllGroupMessages() {
XMTPModule.unsubscribeFromAllGroupMessages(this.client.address)
}
}

0 comments on commit f6e091c

Please sign in to comment.