diff --git a/be2-scala/src/main/scala/ch/epfl/pop/json/ObjectProtocol.scala b/be2-scala/src/main/scala/ch/epfl/pop/json/ObjectProtocol.scala index d0a258b740..b472fcd6c0 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/json/ObjectProtocol.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/json/ObjectProtocol.scala @@ -95,6 +95,19 @@ object ObjectProtocol extends DefaultJsonProtocol { ) } + implicit object RumorDataFormat extends JsonFormat[RumorData] { + final private val PARAM_RUMOR_IDS: String = "rumor_ids" + + override def read(json: JsValue): RumorData = json.asJsObject.getFields(PARAM_RUMOR_IDS) match { + case Seq(rumorIds @ JsArray(_)) => RumorData(rumorIds.convertTo[List[Int]]) + case _ => throw new IllegalArgumentException(s"Can't parse json value $json to a RumorData object") + } + + override def write(obj: RumorData): JsValue = JsObject( + PARAM_RUMOR_IDS -> obj.rumorIds.toJson + ) + } + implicit val lockScriptFormat: JsonFormat[LockScript] = jsonFormat[String, Address, LockScript](LockScript.apply, "type", "pubkey_hash") implicit val unlockScriptFormat: JsonFormat[UnlockScript] = jsonFormat[String, PublicKey, Base64Data, UnlockScript](UnlockScript.apply, "type", "pubkey", "sig") diff --git a/be2-scala/src/main/scala/ch/epfl/pop/model/network/method/Rumor.scala b/be2-scala/src/main/scala/ch/epfl/pop/model/network/method/Rumor.scala index 6906adf77b..f1a04f07c7 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/model/network/method/Rumor.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/model/network/method/Rumor.scala @@ -12,6 +12,10 @@ final case class Rumor(senderPk: PublicKey, rumorId: Int, messages: Map[Channel, override def hasMessage: Boolean = true + def toJsonString: String = { + this.toJson.toString + } + } object Rumor extends Parsable { diff --git a/be2-scala/src/main/scala/ch/epfl/pop/model/objects/RumorData.scala b/be2-scala/src/main/scala/ch/epfl/pop/model/objects/RumorData.scala new file mode 100644 index 0000000000..b504a8c3ff --- /dev/null +++ b/be2-scala/src/main/scala/ch/epfl/pop/model/objects/RumorData.scala @@ -0,0 +1,29 @@ +package ch.epfl.pop.model.objects + +import ch.epfl.pop.model.network.Parsable +import ch.epfl.pop.json.ObjectProtocol.* +import ch.epfl.pop.model.network.method.Rumor +import spray.json.* + +final case class RumorData( + rumorIds: List[Int] +) { + def toJsonString: String = { + this.toJson.toString + } + + def updateWith(rumorId: Int): RumorData = { + new RumorData((rumorId :: rumorIds).sorted) + } + +} + +object RumorData extends Parsable { + def apply( + rumorIds: List[Int] + ): RumorData = { + new RumorData(rumorIds) + } + + override def buildFromJson(payload: String): RumorData = payload.parseJson.asJsObject.convertTo[RumorData] +} diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala index d37ec5f4e5..82c0913645 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/PublishSubscribe.scala @@ -113,7 +113,8 @@ object PublishSubscribe { val portHeartbeat = 5 val portGetMessagesById = 6 val portGreetServer = 7 - val totalPorts = 8 + val portRumor = 8 + val totalPorts = 9 /* building blocks */ val input = builder.add(Flow[GraphMessage].collect { case msg: GraphMessage => msg }) @@ -131,6 +132,7 @@ object PublishSubscribe { case MethodType.heartbeat => portHeartbeat case MethodType.get_messages_by_id => portGetMessagesById case MethodType.greet_server => portGreetServer + case MethodType.rumor => portRumor case _ => portPipelineError } @@ -145,6 +147,7 @@ object PublishSubscribe { val heartbeatPartition = builder.add(ParamsWithMapHandler.heartbeatHandler(dbActorRef)) val getMessagesByIdPartition = builder.add(ParamsWithMapHandler.getMessagesByIdHandler(dbActorRef)) val greetServerPartition = builder.add(ParamsHandler.greetServerHandler(clientActorRef)) + val rumorPartition = builder.add(ParamsHandler.rumorHandler(dbActorRef)) val merger = builder.add(Merge[GraphMessage](totalPorts)) @@ -159,6 +162,7 @@ object PublishSubscribe { methodPartitioner.out(portHeartbeat) ~> heartbeatPartition ~> merger methodPartitioner.out(portGetMessagesById) ~> getMessagesByIdPartition ~> merger methodPartitioner.out(portGreetServer) ~> greetServerPartition ~> merger + methodPartitioner.out(portRumor) ~> rumorPartition ~> merger /* close the shape */ FlowShape(input.in, merger.out) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ParamsHandler.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ParamsHandler.scala index 280a2dc68d..f8a21eb119 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ParamsHandler.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ParamsHandler.scala @@ -5,9 +5,10 @@ import akka.actor.ActorRef import akka.pattern.AskableActorRef import akka.stream.scaladsl.Flow import ch.epfl.pop.model.network.MethodType -import ch.epfl.pop.model.network.method.GreetServer +import ch.epfl.pop.model.network.method.message.Message +import ch.epfl.pop.model.network.method.{GreetServer, Rumor} import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse} -import ch.epfl.pop.model.objects.Channel +import ch.epfl.pop.model.objects.{Channel, PublicKey} import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError} import ch.epfl.pop.pubsub.{AskPatternConstants, ClientActor, PubSubMediator} @@ -85,4 +86,19 @@ object ParamsHandler extends AskPatternConstants { case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "GreetServerHandler received an unexpected message:" + graphMessage, None)) }.filter(_ => false) + def rumorHandler(dbActorRef: AskableActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { + case Right(jsonRpcMessage: JsonRpcRequest) => + jsonRpcMessage.method match { + case MethodType.rumor => + val rumor: Rumor = jsonRpcMessage.getParams.asInstanceOf[Rumor] + val senderPk: PublicKey = rumor.senderPk + val rumorId: Int = rumor.rumorId + val messages: Map[Channel, List[Message]] = rumor.messages + Right(jsonRpcMessage) + case _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "RumorHandler received a non expected jsonRpcRequest", jsonRpcMessage.id)) + } + case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "RumorHandler received an unexpected message:" + graphMessage, None)) + + } + } diff --git a/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala b/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala index 134ce08752..8566aab1db 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala @@ -6,6 +6,7 @@ import akka.pattern.AskableActorRef import ch.epfl.pop.decentralized.ConnectionMediator import ch.epfl.pop.json.MessageDataProtocol import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat +import ch.epfl.pop.model.network.method.Rumor import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.method.message.data.lao.GreetLao import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType} @@ -19,6 +20,7 @@ import com.google.crypto.tink.subtle.Ed25519Sign import java.util.concurrent.TimeUnit import scala.collection.immutable.HashMap +import scala.collection.mutable.ListBuffer import scala.concurrent.Await import scala.concurrent.duration.{Duration, FiniteDuration} import scala.util.{Failure, Success, Try} @@ -384,6 +386,50 @@ final case class DbActor( heartbeatMap } + private def generateRumorKey(senderPk: PublicKey, rumorId: Int): String = { + s"${storage.RUMOR_KEY}${senderPk.base64Data.data}${Channel.DATA_SEPARATOR}$rumorId" + } + + private def generateRumorDataKey(senderPk: PublicKey): String = { + s"${storage.RUMOR_DATA_KEY}${senderPk.base64Data.data}" + } + + @throws[DbActorNAckException] + private def readRumorData(senderPk: PublicKey): RumorData = { + Try(storage.read(generateRumorDataKey(senderPk))) match { + case Success(Some(json)) => RumorData.buildFromJson(json) + case Success(None) => RumorData(List.empty) + case Failure(ex) => throw ex + } + } + + @throws[DbActorNAckException] + private def readRumors(desiredRumors: Map[PublicKey, List[Int]]): Map[PublicKey, List[Rumor]] = { + desiredRumors.map { case (senderPk, rumorIds) => + val rumorsForSender: List[Rumor] = rumorIds.flatMap { rumorId => + val rumorKey = generateRumorKey(senderPk, rumorId) + Try(storage.read(rumorKey)) match { + case Success(Some(json)) => Some(Rumor.buildFromJson(json)) + case Success(None) => None + case Failure(ex) => throw ex + } + } + senderPk -> rumorsForSender + } + } + + @throws[DbActorNAckException] + private def writeRumor(rumor: Rumor): Unit = { + this.synchronized { + val rumorData: RumorData = Try(readRumorData(rumor.senderPk)) match { + case Success(data) => data + case Failure(_) => RumorData(List.empty) + } + storage.write(generateRumorDataKey(rumor.senderPk) -> rumorData.updateWith(rumor.rumorId).toJsonString) + storage.write(generateRumorKey(rumor.senderPk, rumor.rumorId) -> rumor.toJsonString) + } + } + override def receive: Receive = LoggingReceive { case Write(channel, message) => log.info(s"Actor $self (db) received a WRITE request on channel '$channel'") @@ -565,6 +611,27 @@ final case class DbActor( case failure => sender() ! failure.recover(Status.Failure(_)) } + case WriteRumor(rumor) => + log.info(s"Actor $self (db) received a WriteRumor request") + Try(writeRumor(rumor)) match { + case Success(_) => sender() ! DbActorAck() + case failure => sender() ! failure.recover(Status.Failure(_)) + } + + case ReadRumors(desiredRumors) => + log.info(s"Actor $self (db) received a ReadRumor request") + Try(readRumors(desiredRumors)) match { + case Success(foundRumors) => sender() ! DbActorReadRumors(foundRumors) + case failure => sender() ! failure.recover(Status.Failure(_)) + } + + case ReadRumorData(senderPk) => + log.info(s"Actor $self (db) received a ReadRumorData request") + Try(readRumorData(senderPk)) match { + case Success(foundRumorIds) => sender() ! DbActorReadRumorData(foundRumorIds) + case failure => sender() ! failure.recover(Status.Failure(_)) + } + case m => log.info(s"Actor $self (db) received an unknown message") sender() ! Status.Failure(DbActorNAckException(ErrorCodes.INVALID_ACTION.id, s"database actor received a message '$m' that it could not recognize")) @@ -783,6 +850,24 @@ object DbActor { /** Request to generate a local heartbeat */ final case class GenerateHeartbeat() extends Event + /** Writes the given rumor in Db and updates RumorData accordingly + * @param rumor + * rumor to write in memory + */ + final case class WriteRumor(rumor: Rumor) extends Event + + /** Requests the Db for rumors corresponding to keys {server public key:rumor id} + * @param desiredRumors + * Map of server public keys and list of desired rumor id for each + */ + final case class ReadRumors(desiredRumors: Map[PublicKey, List[Int]]) extends Event + + /** Requests the Db for the list of rumorId received for a senderPk + * @param senderPk + * Public key that we want to request + */ + final case class ReadRumorData(senderPk: PublicKey) extends Event + // DbActor DbActorMessage correspond to messages the actor may emit sealed trait DbActorMessage @@ -864,6 +949,14 @@ object DbActor { */ final case class DbActorGenerateHeartbeatAck(heartbeatMap: HashMap[Channel, Set[Hash]]) extends DbActorMessage + /** Response for a [[ReadRumors]] + */ + final case class DbActorReadRumors(foundRumors: Map[PublicKey, List[Rumor]]) extends DbActorMessage + + /** Response for a [[ReadRumorData]] + */ + final case class DbActorReadRumorData(rumorIds: RumorData) extends DbActorMessage + /** Response for a general db actor ACK */ final case class DbActorAck() extends DbActorMessage diff --git a/be2-scala/src/main/scala/ch/epfl/pop/storage/Storage.scala b/be2-scala/src/main/scala/ch/epfl/pop/storage/Storage.scala index 326d17faba..94756aabd2 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/storage/Storage.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/storage/Storage.scala @@ -4,12 +4,14 @@ trait Storage { /* List of prefix to the keys */ final val CHANNEL_DATA_KEY = "ChannelData:" + final val RUMOR_DATA_KEY = "RumorData:" final val DATA_KEY = "Data:" final val AUTHENTICATED_KEY = "AuthPopTokenClientUser:" final val CREATE_LAO_KEY = "CreateLaoId:" final val SETUP_ELECTION_KEY = "SetupElectionMessageId:" final val SERVER_PUBLIC_KEY = "ServerPublicKey:" final val SERVER_PRIVATE_KEY = "ServerPrivateKey:" + final val RUMOR_KEY = "Rumor:" final val DEFAULT = "Default:" /** Optionally returns the value associated with a key diff --git a/be2-scala/src/test/scala/ch/epfl/pop/model/objects/RumorDataSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/model/objects/RumorDataSuite.scala new file mode 100644 index 0000000000..c0ba428f58 --- /dev/null +++ b/be2-scala/src/test/scala/ch/epfl/pop/model/objects/RumorDataSuite.scala @@ -0,0 +1,26 @@ +package ch.epfl.pop.model.objects + +import org.scalatest.funsuite.{AnyFunSuite => FunSuite} +import org.scalatest.matchers.should.Matchers +import spray.json._ + +class RumorDataSuite extends FunSuite with Matchers { + + test("Json conversion works for RumorDataSuite") { + val rumorData = RumorData(List(1, 2, 3, 4, 5)) + + val rumorDataJson = RumorData.buildFromJson(rumorData.toJsonString) + + rumorData should equal(rumorDataJson) + } + + test("RumorData updates") { + val rumorData = RumorData(List(1, 2, 3, 4, 5)) + + val updatedRumorData = rumorData.updateWith(6) + + updatedRumorData.rumorIds should equal(List(1, 2, 3, 4, 5, 6)) + + } + +} diff --git a/be2-scala/src/test/scala/ch/epfl/pop/storage/DbActorSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/storage/DbActorSuite.scala index eea49bb571..66e3346306 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/storage/DbActorSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/storage/DbActorSuite.scala @@ -17,6 +17,8 @@ import org.scalatest.funsuite.AnyFunSuiteLike as FunSuiteLike import org.scalatest.matchers.should.Matchers import util.examples.MessageExample import util.examples.RollCall.{CreateRollCallExamples, OpenRollCallExamples} +import util.examples.Rumor.RumorExample +import ch.epfl.pop.model.network.method.Rumor import scala.collection.immutable.HashMap import scala.concurrent.Await @@ -919,4 +921,69 @@ class DbActorSuite extends TestKit(ActorSystem("DbActorSuiteActorSystem")) with heartbeat should equal(expected) } + test("writeRumor() writes correctly rumor") { + + val initialStorage = InMemoryStorage() + val dbActor: AskableActorRef = system.actorOf(Props(DbActor(mediatorRef, MessageRegistry(), initialStorage))) + + val rumor: Rumor = RumorExample.rumorExample + + val write = dbActor ? DbActor.WriteRumor(rumor) + Await.result(write, duration) shouldBe a[DbActor.DbActorAck] + + initialStorage.size should equal(2) + + val rumorDataKey = s"${initialStorage.RUMOR_DATA_KEY}${rumor.senderPk.base64Data.data}" + val rumorDataFound = initialStorage.read(rumorDataKey) + val expectedRumorData = RumorData(List(rumor.rumorId)) + + rumorDataFound shouldBe Some(expectedRumorData.toJsonString) + + val rumorKey = s"${initialStorage.RUMOR_KEY}${rumor.senderPk.base64Data.data}${Channel.DATA_SEPARATOR}${rumor.rumorId}" + val rumorFound = initialStorage.read(rumorKey) + + rumorFound shouldBe Some(rumor.toJsonString) + } + + test("can writeRumor() and then readRumors() correctly from storage") { + + val initialStorage = InMemoryStorage() + val dbActor: AskableActorRef = system.actorOf(Props(DbActor(mediatorRef, MessageRegistry(), initialStorage))) + + val rumor: Rumor = RumorExample.rumorExample + + val write = dbActor ? DbActor.WriteRumor(rumor) + Await.result(write, duration) shouldBe a[DbActor.DbActorAck] + + val desiredRumors: Map[PublicKey, List[Int]] = Map(rumor.senderPk -> List(rumor.rumorId)) + + val read = dbActor ? DbActor.ReadRumors(desiredRumors) + val foundRumors = Await.result(read, duration).asInstanceOf[DbActorReadRumors].foundRumors + + foundRumors.foreach { (serverPk, rumorList) => + desiredRumors.keys should contain(serverPk) + desiredRumors(serverPk) should equal(rumorList.map(_.rumorId)) + rumorList.foreach { rumorFromDb => + rumorFromDb should equal(rumor) + } + } + } + + test("can recover list of rumorId received for a senderPk") { + val initialStorage = InMemoryStorage() + val dbActor: AskableActorRef = system.actorOf(Props(DbActor(mediatorRef, MessageRegistry(), initialStorage))) + + val rumor: Rumor = RumorExample.rumorExample + + val write = dbActor ? DbActor.WriteRumor(rumor) + Await.result(write, duration) shouldBe a[DbActor.DbActorAck] + + val desiredRumorDataKey: PublicKey = rumor.senderPk + val readRumorData = dbActor ? DbActor.ReadRumorData(desiredRumorDataKey) + val foundRumorData = Await.result(readRumorData, duration) + val rumorData = foundRumorData.asInstanceOf[DbActorReadRumorData].rumorIds + + rumorData.rumorIds should equal(List(rumor.rumorId)) + } + } diff --git a/be2-scala/src/test/scala/util/examples/Rumor/RumorExample.scala b/be2-scala/src/test/scala/util/examples/Rumor/RumorExample.scala new file mode 100644 index 0000000000..feb6594a94 --- /dev/null +++ b/be2-scala/src/test/scala/util/examples/Rumor/RumorExample.scala @@ -0,0 +1,15 @@ +package util.examples.Rumor + +import ch.epfl.pop.model.network.method.Rumor +import ch.epfl.pop.model.network.method.message.Message +import ch.epfl.pop.model.objects.{Base64Data, Channel, PublicKey} +import util.examples.MessageExample + +object RumorExample { + + private val senderPk: PublicKey = PublicKey(Base64Data.encode("publicKey")) + private val channel: Channel = Channel(Channel.ROOT_CHANNEL_PREFIX + "rumorExample") + private val messages: List[Message] = (for i <- 0 until 10 yield MessageExample.MESSAGE).toList + val rumorExample: Rumor = Rumor(senderPk, 1, Map(channel -> messages)) + +}