Skip to content

Commit

Permalink
feat: Store outbound messages (#84)
Browse files Browse the repository at this point in the history
Store all outbound messages
Minor bug fix also:
- The reply must only be in one way

Signed-off-by: Fabio Pinheiro <[email protected]>

Signed-off-by: Shailesh Patil <[email protected]>
  • Loading branch information
FabioPinheiro authored and mineme0110 committed May 1, 2024
1 parent 2c6d032 commit e97cd1d
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 19 deletions.
6 changes: 5 additions & 1 deletion initdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ db.createUser({
const database = 'mediator';
const collectionDidAccount = 'user.account';
const collectionMessages = 'messages';
const collectionMessagesSend = 'messages.outbound';

// The current database to use.
use(database);

// Create collections.
db.createCollection(collectionDidAccount);
db.createCollection(collectionMessages);
db.createCollection(collectionMessagesSend);

//create index
db.getCollection(collectionDidAccount).createIndex({ 'did': 1 }, { unique: true });
// Only enforce uniqueness on non-empty arrays
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true , partialFilterExpression: { "alias.0": { $exists: true } }});
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true, partialFilterExpression: { "alias.0": { $exists: true } } });
db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 });
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ object ActionUtils {
def packResponse(
originalMessage: Option[PlaintextMessage],
action: Action
): ZIO[Operations & Agent & Resolver & MessageDispatcher, MediatorError, Option[EncryptedMessage]] =
): ZIO[
Operations & Agent & Resolver & MessageDispatcher & OutboxMessageRepo,
MediatorError,
Option[EncryptedMessage]
] =
action match {
case _: NoReply.type => ZIO.succeed(None)
case action: AnyReply =>
Expand All @@ -31,6 +35,7 @@ object ActionUtils {
case Some(value) => authEncrypt(reply)
case None => anonEncrypt(reply)
}.mapError(fail => MediatorDidError(fail))
outboxRepo <- ZIO.service[OutboxMessageRepo]
// TODO forward message
maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve
case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none
Expand All @@ -41,6 +46,7 @@ object ActionUtils {
val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for {
messageDispatcher <- ZIO.service[MessageDispatcher]
resolver <- ZIO.service[Resolver]

doc <- resolver
.didDocument(to)
.mapError(fail => MediatorDidError(fail))
Expand All @@ -56,8 +62,9 @@ object ActionUtils {
jobToRun <- mURL match
case None => ZIO.logWarning(s"No url to send message")
case Some(url) => {
ZIO.log(s"Send to url: $url") *>
messageDispatcher
for {
_ <- ZIO.log(s"Send to url: $url")
response <- messageDispatcher
.send(
msg,
url,
Expand All @@ -69,19 +76,58 @@ object ActionUtils {
// case _ => None
)
.catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") }

_ <- outboxRepo
.insert(
SentMessageItem(
msg = msg,
plaintext = reply,
recipient = Set(to),
distination = Some(url),
sendMethod = MessageSendMethod.HTTPS_POST,
result = response match
case str: String => Some(str)
case _: Unit => None
,
)
) // Maybe fork
.catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") }
} yield ()
}

} yield (jobToRun)
action match
case Reply(_) => job
case Reply(_) =>
job
.when( // this is +- the opposite condition as below
originalMessage
.map { oMsg => oMsg.return_route.isEmpty || oMsg.return_route.contains(ReturnRoute.none) }
.getOrElse(true) // If originalMessage is None
)
case SyncReplyOnly(_) => ZIO.unit
case AsyncReplyOnly(_) => job
) *> ZIO
.succeed(msg)
.tap(msg =>
outboxRepo
.insert(
SentMessageItem(
msg = msg,
plaintext = reply,
recipient = reply.to.getOrElse(Set.empty),
distination = None,
sendMethod = MessageSendMethod.INLINE_REPLY,
result = None,
)
)
.catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") }
)
.when(
originalMessage
.map { oMsg =>
oMsg.return_route.contains(ReturnRoute.all) && // Should replies use the same transport channel?
{ // Should replies use the same transport channel?
oMsg.return_route.contains(ReturnRoute.all) || oMsg.return_route.contains(ReturnRoute.thread)
} &&
oMsg.from.map(_.asTO).exists(send2DIDs.contains) // Is the reply back to the original sender?
}
.getOrElse(false) // If originalMessage is None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait ProtocolExecuter[-R, +E] { // <: MediatorError | StorageError] {
}

object ProtocolExecuter {
type Services = Resolver & Agent & Operations & MessageDispatcher
type Services = Resolver & Agent & Operations & MessageDispatcher & OutboxMessageRepo
type Erros = MediatorError | StorageError
}
case class ProtocolExecuterCollection[-R <: Agent, +E](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ case class MediatorAgent(
// val resolverLayer: ULayer[DynamicResolver] =
// DynamicResolver.resolverLayer(didSocketManager)

type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo
val protocolHandlerLayer
: URLayer[UserAccountRepo & MessageItemRepo, ProtocolExecuter[Services, MediatorError | StorageError]] =
type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo &
OutboxMessageRepo
val protocolHandlerLayer: URLayer[UserAccountRepo & MessageItemRepo & OutboxMessageRepo, ProtocolExecuter[
Services,
MediatorError | StorageError
]] =
ZLayer.succeed(
ProtocolExecuterCollection[Services, MediatorError | StorageError](
BasicMessageExecuter,
Expand Down Expand Up @@ -88,7 +91,7 @@ case class MediatorAgent(
data: String,
mSocketID: Option[SocketID],
): ZIO[
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo,
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
MediatorError | StorageError,
Option[EncryptedMessage]
] =
Expand All @@ -109,7 +112,7 @@ case class MediatorAgent(
msg: EncryptedMessage,
mSocketID: Option[SocketID]
): ZIO[
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo,
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
MediatorError | StorageError,
Option[EncryptedMessage]
] =
Expand Down Expand Up @@ -203,7 +206,7 @@ case class MediatorAgent(
def createSocketApp(
annotationMap: Seq[LogAnnotation]
): ZIO[
MediatorAgent & Resolver & Operations & MessageDispatcher & MessageItemRepo & UserAccountRepo,
MediatorAgent & Resolver & Operations & MessageDispatcher & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
Nothing,
zio.http.Response
] = {
Expand Down Expand Up @@ -373,7 +376,7 @@ object MediatorAgent {
} yield ret
}
}: Http[
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo,
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
Throwable,
Request,
Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ object MediatorStandalone extends ZIOAppDefault {
Runtime.removeDefaultLoggers >>> SLF4J.slf4j(mediatorColorFormat)

val app: HttpApp[ // type HttpApp[-R, +Err] = Http[R, Err, Request, Response]
Hub[String] & Operations & MessageDispatcher & MediatorAgent & Resolver & MessageItemRepo & UserAccountRepo,
Hub[String] & Operations & MessageDispatcher & MediatorAgent & Resolver & MessageItemRepo & UserAccountRepo &
OutboxMessageRepo,
Throwable
] = MediatorAgent.didCommApp
++ Http
Expand Down Expand Up @@ -117,7 +118,7 @@ object MediatorStandalone extends ZIOAppDefault {
.provideSomeLayer(
AsyncDriverResource.layer
>>> ReactiveMongoApi.layer(mediatorDbConfig.connectionString)
>>> MessageItemRepo.layer.and(UserAccountRepo.layer)
>>> MessageItemRepo.layer.and(UserAccountRepo.layer).and(OutboxMessageRepo.layer)
)
.provideSomeLayer(Operations.layerDefault)
.provideSomeLayer(client >>> MessageDispatcherJVM.layer)
Expand Down
160 changes: 158 additions & 2 deletions mediator/src/main/scala/io/iohk/atala/mediator/db/BsonImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package io.iohk.atala.mediator.db
import fmgp.crypto.*
import fmgp.did.*
import fmgp.did.comm.*
import fmgp.did.comm.extension.*
import fmgp.util.*
import reactivemongo.api.bson.*
import zio.json.*
import zio.json.ast.Json
import reactivemongo.api.bson.*

import scala.util.*

Expand Down Expand Up @@ -230,7 +232,7 @@ given BSONDocumentReader[AuthProtectedHeader] =
Macros.reader[AuthProtectedHeader]

given BSONDocumentWriter[ProtectedHeader] =
Macros.writer[ProtectedHeader]
Macros.writer[ProtectedHeader] // TODO FIX The encoder for ProtectedHeader MUST not have the field "className"
given BSONDocumentReader[ProtectedHeader] =
Macros.reader[ProtectedHeader]

Expand All @@ -248,3 +250,157 @@ given BSONDocumentReader[EncryptedMessage] with {
override def readDocument(doc: BSONDocument): Try[EncryptedMessage] =
aux.readDocument(doc)
}

// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1

given BSONWriter[MsgID] with {
import MsgID.*
def writeTry(obj: MsgID): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[MsgID] with {
def readTry(bson: BSONValue): Try[MsgID] = bson.asTry[String].map(v => MsgID(v))
}

given BSONWriter[PIURI] with {
import PIURI.*
def writeTry(obj: PIURI): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[PIURI] with {
def readTry(bson: BSONValue): Try[PIURI] = bson.asTry[String].map(v => PIURI(v))
}

given BSONWriter[TO] with {
import TO.*
def writeTry(obj: TO): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[TO] with {
def readTry(bson: BSONValue): Try[TO] = bson.asTry[String].map(v => TO(v))
}

given BSONWriter[FROM] with {
import FROM.*
def writeTry(obj: FROM): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[FROM] with {
def readTry(bson: BSONValue): Try[FROM] = bson.asTry[String].map(v => FROM(v))
}

//JSON_RFC7159
def sequenceTrys[T](trySequence: Seq[_ <: Try[_ <: T]]): Try[Seq[T]] = {
trySequence.foldLeft(Try(Seq.empty[T])) { (acc, tryElement) =>
acc.flatMap(accSeq => tryElement.map(success => accSeq :+ success))
}
}

def toBSON(j: Json): Try[BSONValue] = j match
case Json.Obj(fields) => sequenceTrys(fields.map(e => toBSON(e._2).map(e2 => (e._1, e2)))).map(BSONDocument(_))
case Json.Arr(elements) => sequenceTrys(elements.map(toBSON(_))).map(BSONArray(_))
case Json.Bool(value) => Try(BSONBoolean(value))
case Json.Str(value) => Try(BSONString(value))
case Json.Num(value) => BSONDecimal.fromBigDecimal(value)
case zio.json.ast.Json.Null => Try(BSONNull)

def toJson(b: BSONValue): Try[Json] = b match
case doc: BSONDocument =>
sequenceTrys(doc.toMap.toSeq.map(e => toJson(e._2).map(e2 => (e._1, e2)))).map(Json.Obj(_: _*))
case array: BSONArray => sequenceTrys(array.values.map(toJson(_))).map(Json.Arr(_: _*))
case e: BSONDouble => e.toDouble.map(Json.Num(_))
case e: BSONInteger => e.toDouble.map(Json.Num(_))
case e: BSONLong => e.toDouble.map(Json.Num(_))
case e: BSONDecimal => e.toDouble.map(Json.Num(_))
case e: BSONString => Try(Json.Str(e.value))
case e: BSONBoolean => Try(Json.Bool(e.value))
case BSONUndefined => Try(Json.Null)
case BSONNull => Try(Json.Null)
// case _: BSONBinary =>
// case _: BSONDateTime =>
// case _: BSONRegex =>
// case _: BSONJavaScript =>
// case _: BSONSymbol =>
// case _: BSONJavaScriptWS =>
// case BSONMinKey =>
// case BSONMaxKey =>
case _ => ??? // FIXME

given BSONWriter[JSON_RFC7159] with {

def writeTry(obj: JSON_RFC7159): Try[BSONDocument] =
sequenceTrys(obj.fields.map(e => toBSON(e._2).map(e2 => (e._1, e2)))).map(BSONDocument(_))
}
given BSONReader[JSON_RFC7159] with {
def readTry(bson: BSONValue): Try[JSON_RFC7159] =
bson.asTry[BSONDocument].flatMap { doc =>
sequenceTrys(doc.toMap.toSeq.map(e => toJson(e._2).map(e2 => (e._1, e2)))).map(Json.Obj(_: _*))
}
}

given BSONWriter[Json] with {
def writeTry(obj: Json): Try[BSONValue] = toBSON(obj)
}
given BSONReader[Json] with {
def readTry(bson: BSONValue): Try[Json] = toJson(bson)
}

given BSONDocumentWriter[AttachmentDataJWS] = Macros.writer[AttachmentDataJWS]
given BSONDocumentReader[AttachmentDataJWS] = Macros.reader[AttachmentDataJWS]
given BSONDocumentWriter[AttachmentDataLinks] = Macros.writer[AttachmentDataLinks]
given BSONDocumentReader[AttachmentDataLinks] = Macros.reader[AttachmentDataLinks]
given BSONDocumentWriter[AttachmentDataBase64] = Macros.writer[AttachmentDataBase64]
given BSONDocumentReader[AttachmentDataBase64] = Macros.reader[AttachmentDataBase64]
given BSONDocumentWriter[AttachmentDataJson] = Macros.writer[AttachmentDataJson]
given BSONDocumentReader[AttachmentDataJson] = Macros.reader[AttachmentDataJson]
given BSONDocumentWriter[AttachmentDataAny] = Macros.writer[AttachmentDataAny]
given BSONDocumentReader[AttachmentDataAny] = Macros.reader[AttachmentDataAny]
given BSONDocumentWriter[AttachmentData] = Macros.writer[AttachmentData]
given BSONDocumentReader[AttachmentData] = Macros.reader[AttachmentData]
given BSONDocumentWriter[Attachment] = Macros.writer[Attachment]
given BSONDocumentReader[Attachment] = Macros.reader[Attachment]

given BSONWriter[ReturnRoute] with {
def writeTry(obj: ReturnRoute): Try[BSONValue] = Try(BSONString(obj.toString()))
}
given BSONReader[ReturnRoute] with {
def readTry(bson: BSONValue): Try[ReturnRoute] = bson.asTry[String].map(v => ReturnRoute.valueOf(v))
}

given BSONDocumentWriter[L10nInline] = Macros.writer[L10nInline]
given BSONDocumentReader[L10nInline] = Macros.reader[L10nInline]

given BSONDocumentWriter[L10n] = Macros.writer[L10n]
given BSONDocumentReader[L10n] = Macros.reader[L10n]

given BSONWriter[SenderOrder] with {
import SenderOrder.*
def writeTry(obj: SenderOrder): Try[BSONInteger] = Try(BSONInteger(obj.value))
}
given BSONReader[SenderOrder] with {
def readTry(bson: BSONValue): Try[SenderOrder] = bson.asTry[BSONInteger].flatMap(_.asInt.map(SenderOrder(_)))
}

given BSONWriter[SentCount] with {
import SentCount.*
def writeTry(obj: SentCount): Try[BSONInteger] = Try(BSONInteger(obj.value))
}
given BSONReader[SentCount] with {
def readTry(bson: BSONValue): Try[SentCount] = bson.asTry[BSONInteger].flatMap(_.asInt.map(SentCount(_)))
}

given BSONDocumentWriter[ReceivedOrdersElement] = Macros.writer[ReceivedOrdersElement]
given BSONDocumentReader[ReceivedOrdersElement] = Macros.reader[ReceivedOrdersElement]

// fmgp.did.comm.extension.AdvancedSequencingpackage.SenderOrder

given BSONDocumentWriter[PlaintextMessage] with {
val aux = Macros.writer[PlaintextMessageClass]
override def writeTry(obj: PlaintextMessage): Try[BSONDocument] =
obj match {
case msg: PlaintextMessageClass => aux.writeTry(msg) // Success(msg): Try[reactivemongo.api.bson.BSONDocument]
case _ => Failure(RuntimeException("Only support PlaintextMessageClass"))
}

}
given BSONDocumentReader[PlaintextMessage] with {
val aux = Macros.reader[PlaintextMessageClass]
override def readDocument(doc: BSONDocument): Try[PlaintextMessage] =
aux.readDocument(doc)
}
Loading

0 comments on commit e97cd1d

Please sign in to comment.