From 10bc7cad70417aa04f393543de53d7d33869d084 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 15 Feb 2024 19:46:16 -0800 Subject: [PATCH] Fix: Stream All Streams All (#249) * try and fix the stream all method * add tests for the streaming * bump all the pods --- Package.swift | 2 +- Sources/XMTPiOS/Conversations.swift | 41 ++++++++++++++---------- Tests/XMTPTests/GroupTests.swift | 49 +++++++++++++++++++++++++++++ XMTP.podspec | 4 +-- 4 files changed, 76 insertions(+), 20 deletions(-) diff --git a/Package.swift b/Package.swift index 52392c45..27774629 100644 --- a/Package.swift +++ b/Package.swift @@ -25,7 +25,7 @@ let package = Package( .package(url: "https://github.com/1024jp/GzipSwift", from: "5.2.0"), .package(url: "https://github.com/bufbuild/connect-swift", exact: "0.3.0"), .package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.0.0"), - .package(url: "https://github.com/xmtp/libxmtp-swift", exact: "0.4.1-beta3"), + .package(url: "https://github.com/xmtp/libxmtp-swift", exact: "0.4.2-beta1"), ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index e9a30b32..9dd113bc 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -409,23 +409,30 @@ public actor Conversations { } } } - - public func streamAll() -> AsyncThrowingStream { - AsyncThrowingStream { continuation in - Task { - do { - for try await conversation in streamGroupConversations() { - continuation.yield(conversation) - } - for try await conversation in stream() { - continuation.yield(conversation) - } - } catch { - continuation.finish(throwing: error) - } - } - } - } + + public func streamAll() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + do { + var iterator = stream.makeAsyncIterator() + while let element = try await iterator.next() { + continuation.yield(element) + } + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + } + + Task { + await forwardStreamToMerged(stream: stream()) + } + Task { + await forwardStreamToMerged(stream: streamGroupConversations()) + } + } + } + private func makeConversation(from sealedInvitation: SealedInvitation) throws -> ConversationV2 { let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) let conversation = try ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header) diff --git a/Tests/XMTPTests/GroupTests.swift b/Tests/XMTPTests/GroupTests.swift index 57d4cfd5..3ca12f29 100644 --- a/Tests/XMTPTests/GroupTests.swift +++ b/Tests/XMTPTests/GroupTests.swift @@ -306,4 +306,53 @@ class GroupTests: XCTestCase { XCTAssertEqual("sup gang", String(data: Data(aliceMessage.encodedContent.content), encoding: .utf8)) XCTAssertEqual("sup gang", String(data: Data(bobMessage.encodedContent.content), encoding: .utf8)) } + + func testCanStreamGroups() async throws { + let fixtures = try await localFixtures() + + let expectation1 = expectation(description: "got a group") + + Task(priority: .userInitiated) { + for try await _ in try await fixtures.aliceClient.conversations.streamGroups() { + expectation1.fulfill() + } + } + + try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + + await waitForExpectations(timeout: 3) + } + + func testCanStreamGroupsAndConversationsWorksGroups() async throws { + let fixtures = try await localFixtures() + + let expectation1 = expectation(description: "got a conversation") + + Task(priority: .userInitiated) { + for try await _ in try await fixtures.aliceClient.conversations.streamAll() { + expectation1.fulfill() + } + } + + _ = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + // _ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) + + await waitForExpectations(timeout: 3) + } + + func testCanStreamGroupsAndConversationsWorksConvos() async throws { + let fixtures = try await localFixtures() + + let expectation1 = expectation(description: "got a conversation") + + Task(priority: .userInitiated) { + for try await _ in try await fixtures.aliceClient.conversations.streamAll() { + expectation1.fulfill() + } + } + + _ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) + + await waitForExpectations(timeout: 3) + } } diff --git a/XMTP.podspec b/XMTP.podspec index 7a4a6747..309884e2 100644 --- a/XMTP.podspec +++ b/XMTP.podspec @@ -16,7 +16,7 @@ Pod::Spec.new do |spec| # spec.name = "XMTP" - spec.version = "0.8.5" + spec.version = "0.8.6" spec.summary = "XMTP SDK Cocoapod" # This description is used to generate tags and improve search results. @@ -44,5 +44,5 @@ Pod::Spec.new do |spec| spec.dependency "web3.swift" spec.dependency "GzipSwift" spec.dependency "Connect-Swift", "= 0.3.0" - spec.dependency 'LibXMTP', '= 0.4.1-beta3' + spec.dependency 'LibXMTP', '= 0.4.2-beta1' end