Skip to content

Commit

Permalink
feat(agent): implement send messages
Browse files Browse the repository at this point in the history
Fixes ATL-2708
  • Loading branch information
goncalo-frade-iohk committed Dec 15, 2022
1 parent c62bb2a commit 07c8fe4
Show file tree
Hide file tree
Showing 30 changed files with 465 additions and 54 deletions.
3 changes: 1 addition & 2 deletions Castor/Tests/PeerDIDCreationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@ final class PeerDIDCreationTests: XCTestCase {
let mypeerDID = DID(
schema: "did",
method: "peer",
methodId: "2.Ez6LScuRnbZAJaVthcL5RELq75EBK2sBmBxsSs98LKNeriHQJ.Vz6MkpeaW7DeptJW7pi2qNXTdCXeQV4EYpZnAouH1LrfHj6uf.SeyJ0IjoiZG0iLCJzIjoiaHR0cHM6Ly9rOHMtZGV2LmF0YWxhcHJpc20uaW8vcHJpc20tYWdlbnQvZGlkY29tbSIsInIiOltdLCJhIjpbImRpZGNvbW0vdjIiXX0"
methodId: "2.Ez6LSfuJvTtcmcFrjNYYSAuD32tMZWQUD2HYDfXrJqy3ui6MQ.Vz6MkoPyvuxAecSezqmL4ERE7eW2XPbiUEHRH9aqay6LA8Eqr.SeyJ0IjoiZG0iLCJzIjoiaHR0cDovL2hvc3QuZG9ja2VyLmludGVybmFsOjgwL2RpZGNvbW0vIiwiciI6W10sImEiOlsiZGlkY29tbS92MiJdfQ"
)

let apollo = ApolloImpl()
let castor = CastorImpl(apollo: apollo)
let document = try await castor.resolveDID(did: mypeerDID)
print()
}
}
14 changes: 14 additions & 0 deletions Core/Sources/Helpers/First+AsyncAwait.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import Combine

extension Publishers {
struct MissingOutputError: Error {}
}

public extension Publishers.First where Failure == Error {
func await() async throws -> Output {
for try await output in values {
return output
}
throw Publishers.MissingOutputError()
}
}
File renamed without changes.
6 changes: 4 additions & 2 deletions Domain/Sources/BBs/Pluto.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ public protocol Pluto {
) -> AnyPublisher<Void, Error>
func storePeerDID(did: DID, privateKeys: [PrivateKey]) -> AnyPublisher<Void, Error>
func storeDIDPair(holder: DID, other: DID, name: String) -> AnyPublisher<Void, Error>
func storeMessage(message: Message) -> AnyPublisher<Void, Error>
func storeMessages(messages: [Message]) -> AnyPublisher<Void, Error>
func storeMessage(message: Message, direction: Message.Direction) -> AnyPublisher<Void, Error>
func storeMessages(messages: [(Message, Message.Direction)]) -> AnyPublisher<Void, Error>
func storeMediator(peer: DID, routingDID: DID, mediatorDID: DID) -> AnyPublisher<Void, Error>
func storeCredential(credential: VerifiableCredential) -> AnyPublisher<Void, Error>

Expand All @@ -31,6 +31,8 @@ public protocol Pluto {

func getAllMessages() -> AnyPublisher<[Message], Error>
func getAllMessages(did: DID) -> AnyPublisher<[Message], Error>
func getAllMessagesSent() -> AnyPublisher<[Message], Error>
func getAllMessagesReceived() -> AnyPublisher<[Message], Error>
func getAllMessagesSentTo(did: DID) -> AnyPublisher<[Message], Error>
func getAllMessagesReceivedFrom(did: DID) -> AnyPublisher<[Message], Error>
func getAllMessagesOfType(type: String, relatedWithDID: DID?) -> AnyPublisher<[Message], Error>
Expand Down
10 changes: 9 additions & 1 deletion Domain/Sources/Models/Message.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import Foundation

public struct Message {
public enum Direction: String {
case sent
case received
}

public let id: String
public let piuri: String
public let from: DID?
Expand All @@ -14,6 +19,7 @@ public struct Message {
public let thid: String?
public let pthid: String?
public let ack: [String]
public let direction: Direction

public init(
id: String = UUID().uuidString,
Expand All @@ -28,7 +34,8 @@ public struct Message {
attachments: [AttachmentDescriptor] = [],
thid: String? = nil,
pthid: String? = nil,
ack: [String] = []
ack: [String] = [],
direction: Direction = .received
) {
self.id = id
self.piuri = piuri
Expand All @@ -43,5 +50,6 @@ public struct Message {
self.thid = thid
self.pthid = pthid
self.ack = ack
self.direction = direction
}
}
2 changes: 1 addition & 1 deletion Mercury/Sources/Helpers/DIDCommMessage+DomainParse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ extension DIDCommxSwift.AttachmentData {
case let .links(value):
return AttachmentLinkData(links: value.links, hash: value.hash)
case let .json(value):
guard let jsonData = Data(fromBase64URL: value.json) else {
guard let jsonData = value.json.data(using: .utf8) else {
throw MercuryError.unknownAttachmentDataError
}
return AttachmentJsonData(data: jsonData)
Expand Down
5 changes: 3 additions & 2 deletions Mercury/Sources/Helpers/Session.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct SessionManager {
headers: [String: String] = [:],
parameters: [String: String] = [:]
) async throws -> Data? {
try await call(request: try makeRequest(url: url, method: .post, body: body, parameters: parameters))
try await call(request: try makeRequest(url: url, method: .post, body: body, headers: headers, parameters: parameters))
}

private func call(request: URLRequest) async throws -> Data? {
Expand All @@ -45,7 +45,8 @@ struct SessionManager {
headers: [String: String] = [:],
parameters: [String: String]
) throws -> URLRequest {
var composition = URLComponents(url: url, resolvingAgainstBaseURL: true)
let urlParsed = URL(string: url.absoluteString.replacingOccurrences(of: "http://host.docker.internal:8080", with: "http://localhost:8080"))!
var composition = URLComponents(url: urlParsed, resolvingAgainstBaseURL: true)
if !parameters.isEmpty {
composition?.queryItems = parameters.map { URLQueryItem(name: $0, value: $1) }
}
Expand Down
2 changes: 2 additions & 0 deletions Pluto/Sources/Domain/Providers/MessageProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import Foundation
protocol MessageProvider {
func getAll() -> AnyPublisher<[Message], Error>
func getAllFor(did: DID) -> AnyPublisher<[Message], Error>
func getAllSent() -> AnyPublisher<[Message], Error>
func getAllReceived() -> AnyPublisher<[Message], Error>
func getAllSentTo(did: DID) -> AnyPublisher<[Message], Error>
func getAllReceivedFrom(did: DID) -> AnyPublisher<[Message], Error>
func getAllOfType(type: String, relatedWithDID: DID?) -> AnyPublisher<[Message], Error>
Expand Down
4 changes: 2 additions & 2 deletions Pluto/Sources/Domain/Stores/MessageStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import Domain
import Foundation

protocol MessageStore {
func addMessages(messages: [Message]) -> AnyPublisher<Void, Error>
func addMessage(msg: Message) -> AnyPublisher<Void, Error>
func addMessages(messages: [(Message, Message.Direction)]) -> AnyPublisher<Void, Error>
func addMessage(msg: Message, direction: Message.Direction) -> AnyPublisher<Void, Error>
func removeMessage(id: String) -> AnyPublisher<Void, Error>
func removeAll() -> AnyPublisher<Void, Error>
}
7 changes: 6 additions & 1 deletion Pluto/Sources/Helpers/Message+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct CodableMessage: Codable {
case pthid
case ack
case body
case direction
}

let message: Message
Expand All @@ -30,6 +31,7 @@ struct CodableMessage: Codable {
try container.encode(message.expiresTimePlus, forKey: .expiresTimePlus)
try container.encode(message.attachments, forKey: .attachments)
try container.encode(message.ack, forKey: .ack)
try container.encode(message.direction.rawValue, forKey: .direction)
try message.from.map { try container.encode(CodableDID(did: $0), forKey: .from) }
try message.to.map { try container.encode(CodableDID(did: $0), forKey: .to) }
try message.fromPrior.map { try container.encode($0, forKey: .fromPrior) }
Expand All @@ -56,6 +58,8 @@ struct CodableMessage: Codable {
let fromPrior = try? container.decode(String.self, forKey: .fromPrior)
let thid = try? container.decode(String.self, forKey: .thid)
let pthid = try? container.decode(String.self, forKey: .pthid)
let directionRaw = try? container.decode(String.self, forKey: .direction)
let direction = directionRaw.flatMap { Message.Direction(rawValue: $0) }

self.init(message: .init(
id: id,
Expand All @@ -70,7 +74,8 @@ struct CodableMessage: Codable {
attachments: attachments,
thid: thid,
pthid: pthid,
ack: ack
ack: ack,
direction: direction ?? .sent
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ extension CDMessageDAO: MessageProvider {
.eraseToAnyPublisher()
}

func getAllSent() -> AnyPublisher<[Message], Error> {
fetchController(
predicate: NSPredicate(format: "(direction == %@)", "sent"),
context: readContext
)
.tryMap { try $0.map { try $0.toDomain() } }
.eraseToAnyPublisher()
}

func getAllReceived() -> AnyPublisher<[Message], Error> {
fetchController(
predicate: NSPredicate(format: "(direction == %@)", "received"),
context: readContext
)
.tryMap { try $0.map { try $0.toDomain() } }
.eraseToAnyPublisher()
}

func getAllSentTo(did: DID) -> AnyPublisher<[Message], Error> {
fetchController(
predicate: NSPredicate(format: "to == %@", did.string),
Expand Down
25 changes: 10 additions & 15 deletions Pluto/Sources/PersistentStorage/DAO/CDMessageDAO+MessageStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,22 @@ import CoreData
import Domain

extension CDMessageDAO: MessageStore {
func addMessages(messages: [Message]) -> AnyPublisher<Void, Error> {
func addMessages(messages: [(Message, Message.Direction)]) -> AnyPublisher<Void, Error> {
messages
.publisher
.flatMap { self.addMessage(msg: $0) }
.flatMap { self.addMessage(msg: $0.0, direction: $0.1) }
.eraseToAnyPublisher()
}

func addMessage(msg: Message) -> AnyPublisher<Void, Error> {
guard
let fromDID = msg.from,
let toDID = msg.to
else {
return Fail(error: PlutoError.messageMissingFromOrToDIDError).eraseToAnyPublisher()
}
func addMessage(msg: Message, direction: Message.Direction) -> AnyPublisher<Void, Error> {
return pairDAO
.fetchController(
predicate: NSPredicate(
format: "(holderDID.did == %@) OR (holderDID.did == %@) OR (did == %@) OR (did == %@)",
fromDID.string,
toDID.string,
fromDID.string,
toDID.string
msg.from?.string ?? "",
msg.to?.string ?? "",
msg.from?.string ?? "",
msg.to?.string ?? ""
),
context: writeContext
)
Expand All @@ -35,7 +29,7 @@ extension CDMessageDAO: MessageStore {
msg.id,
context: writeContext
) { cdobj, _ in
try cdobj.fromDomain(msg: msg, pair: pair)
try cdobj.fromDomain(msg: msg, direction: direction, pair: pair)
}
}
.map { _ in }
Expand All @@ -52,13 +46,14 @@ extension CDMessageDAO: MessageStore {
}

private extension CDMessage {
func fromDomain(msg: Message, pair: CDDIDPair?) throws {
func fromDomain(msg: Message, direction: Message.Direction, pair: CDDIDPair?) throws {
self.messageId = msg.id
self.from = msg.from?.string
self.to = msg.to?.string
self.type = msg.piuri
self.dataJson = try JSONEncoder().encode(CodableMessage(message: msg))
self.createdTime = msg.createdTime
self.pair = pair
self.direction = direction.rawValue
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extension CDMessage {
@NSManaged var from: String?
@NSManaged var to: String?
@NSManaged var thid: String?
@NSManaged var direction: String?
@NSManaged var pair: CDDIDPair?
}

Expand Down
14 changes: 11 additions & 3 deletions Pluto/Sources/PlutoImpl+Public.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ extension PlutoImpl: Pluto {
pairDIDDao.addDIDPair(holder: holder, other: other, name: name)
}

public func storeMessage(message: Message) -> AnyPublisher<Void, Error> {
messageDao.addMessage(msg: message)
public func storeMessage(message: Message, direction: Message.Direction) -> AnyPublisher<Void, Error> {
messageDao.addMessage(msg: message, direction: direction)
}

public func storeMessages(messages: [Message]) -> AnyPublisher<Void, Error> {
public func storeMessages(messages: [(Message, Message.Direction)]) -> AnyPublisher<Void, Error> {
messageDao.addMessages(messages: messages)
}

Expand Down Expand Up @@ -90,6 +90,14 @@ extension PlutoImpl: Pluto {
messageDao.getAllFor(did: did)
}

public func getAllMessagesSent() -> AnyPublisher<[Message], Error> {
messageDao.getAllSent()
}

public func getAllMessagesReceived() -> AnyPublisher<[Message], Error> {
messageDao.getAllReceived()
}

public func getAllMessagesSentTo(did: DID) -> AnyPublisher<[Message], Error> {
messageDao.getAllSentTo(did: did)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<entity name="CDMessage" representedClassName="CDMessage" syncable="YES">
<attribute name="createdTime" attributeType="Date" usesScalarValueType="NO"/>
<attribute name="dataJson" attributeType="Binary"/>
<attribute name="direction" optional="YES" attributeType="String"/>
<attribute name="from" optional="YES" attributeType="String"/>
<attribute name="messageId" attributeType="String"/>
<attribute name="thid" optional="YES" attributeType="String"/>
Expand Down
10 changes: 5 additions & 5 deletions Pluto/Tests/CDMessagesDAOTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ final class CDMessagesDAOTests: XCTestCase {
)
}
.flatMap {
dao.addMessage(msg: testMessage)
dao.addMessage(msg: testMessage, direction: .received)
}
.flatMap {
dao.getMessage(id: testMessage.id).first()
Expand Down Expand Up @@ -98,10 +98,10 @@ final class CDMessagesDAOTests: XCTestCase {
)
}
.flatMap {
dao.addMessage(msg: testMessage)
dao.addMessage(msg: testMessage, direction: .received)
}
.flatMap {
dao.addMessage(msg: testMessage)
dao.addMessage(msg: testMessage, direction: .received)
}
.flatMap {
dao.getAll().first()
Expand Down Expand Up @@ -167,10 +167,10 @@ final class CDMessagesDAOTests: XCTestCase {
)
}
.flatMap {
dao.addMessage(msg: testMessage1)
dao.addMessage(msg: testMessage1, direction: .received)
}
.flatMap {
dao.addMessage(msg: testMessage2)
dao.addMessage(msg: testMessage2, direction: .received)
}
.flatMap {
dao.getAllFor(did: testHolderDID2).first()
Expand Down
39 changes: 38 additions & 1 deletion PrismAgent/Sources/ConnectionsManager/ConnectionsManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ class ConnectionsManagerImpl: ConnectionsManager {
}

extension ConnectionsManagerImpl: DIDCommConnection {
func sendMessage(_ message: Message) async throws -> Message? {
let mercury = self.mercury
return try await pluto
.storeMessage(message: message, direction: .sent)
.flatMap {
Future { try await mercury.sendMessageParseMessage(msg: message) }
}
.first()
.await()
}

func awaitMessages() async throws -> [Message] {
let stream: AnyPublisher<[Message], Error> = try awaitMessages()

Expand Down Expand Up @@ -164,9 +175,12 @@ extension ConnectionsManagerImpl: DIDCommConnection {
}
.flatMap { messages in
pluto
.storeMessages(messages: messages)
.storeMessages(messages: messages.map { ($0.message, .received) })
.map { messages }
}
.flatMap { messages in
pickupReceivedMessages(messages: messages, mediator: mediator, mercury: mercury)
}
.eraseToAnyPublisher()
}

Expand Down Expand Up @@ -227,3 +241,26 @@ extension ConnectionsManagerImpl: DIDCommConnection {
.eraseToAnyPublisher()
}
}

private func pickupReceivedMessages(
messages: [(Message, String)],
mediator: ConnectionsManagerImpl.Mediator,
mercury: Mercury
) -> AnyPublisher<[Message], Error> {
if messages.count > 0 {
return Future<Data?, Error> {
let message = try PickUpReceived(
from: mediator.peerDID,
to: mediator.mediatorDID,
body: .init(messageIdList: messages.map { $0.1 })
).makeMessage()
return try await mercury.sendMessage(msg: message)
}
.map { _ in messages.map { $0.0 } }
.eraseToAnyPublisher()
} else {
return Just(messages.map { $0.0 })
.tryMap { $0 }
.eraseToAnyPublisher()
}
}
Loading

0 comments on commit 07c8fe4

Please sign in to comment.