Skip to content

Commit

Permalink
Merge pull request #1824 from dedis/work-be2-daniel-rumor-validator-db
Browse files Browse the repository at this point in the history
Add Db handle of Rumors
  • Loading branch information
DanielTavaresA authored Apr 29, 2024
2 parents 3cd4457 + 8cf6501 commit 280a87f
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 3 deletions.
13 changes: 13 additions & 0 deletions be2-scala/src/main/scala/ch/epfl/pop/json/ObjectProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ object ObjectProtocol extends DefaultJsonProtocol {
)
}

implicit object RumorDataFormat extends JsonFormat[RumorData] {
final private val PARAM_RUMOR_IDS: String = "rumor_ids"

override def read(json: JsValue): RumorData = json.asJsObject.getFields(PARAM_RUMOR_IDS) match {
case Seq(rumorIds @ JsArray(_)) => RumorData(rumorIds.convertTo[List[Int]])
case _ => throw new IllegalArgumentException(s"Can't parse json value $json to a RumorData object")
}

override def write(obj: RumorData): JsValue = JsObject(
PARAM_RUMOR_IDS -> obj.rumorIds.toJson
)
}

implicit val lockScriptFormat: JsonFormat[LockScript] = jsonFormat[String, Address, LockScript](LockScript.apply, "type", "pubkey_hash")
implicit val unlockScriptFormat: JsonFormat[UnlockScript] = jsonFormat[String, PublicKey, Base64Data, UnlockScript](UnlockScript.apply, "type", "pubkey", "sig")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ final case class Rumor(senderPk: PublicKey, rumorId: Int, messages: Map[Channel,

override def hasMessage: Boolean = true

def toJsonString: String = {
this.toJson.toString
}

}

object Rumor extends Parsable {
Expand Down
29 changes: 29 additions & 0 deletions be2-scala/src/main/scala/ch/epfl/pop/model/objects/RumorData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ch.epfl.pop.model.objects

import ch.epfl.pop.model.network.Parsable
import ch.epfl.pop.json.ObjectProtocol.*
import ch.epfl.pop.model.network.method.Rumor
import spray.json.*

final case class RumorData(
rumorIds: List[Int]
) {
def toJsonString: String = {
this.toJson.toString
}

def updateWith(rumorId: Int): RumorData = {
new RumorData((rumorId :: rumorIds).sorted)
}

}

object RumorData extends Parsable {
def apply(
rumorIds: List[Int]
): RumorData = {
new RumorData(rumorIds)
}

override def buildFromJson(payload: String): RumorData = payload.parseJson.asJsObject.convertTo[RumorData]
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ object PublishSubscribe {
val portHeartbeat = 5
val portGetMessagesById = 6
val portGreetServer = 7
val totalPorts = 8
val portRumor = 8
val totalPorts = 9

/* building blocks */
val input = builder.add(Flow[GraphMessage].collect { case msg: GraphMessage => msg })
Expand All @@ -131,6 +132,7 @@ object PublishSubscribe {
case MethodType.heartbeat => portHeartbeat
case MethodType.get_messages_by_id => portGetMessagesById
case MethodType.greet_server => portGreetServer
case MethodType.rumor => portRumor
case _ => portPipelineError
}

Expand All @@ -145,6 +147,7 @@ object PublishSubscribe {
val heartbeatPartition = builder.add(ParamsWithMapHandler.heartbeatHandler(dbActorRef))
val getMessagesByIdPartition = builder.add(ParamsWithMapHandler.getMessagesByIdHandler(dbActorRef))
val greetServerPartition = builder.add(ParamsHandler.greetServerHandler(clientActorRef))
val rumorPartition = builder.add(ParamsHandler.rumorHandler(dbActorRef))

val merger = builder.add(Merge[GraphMessage](totalPorts))

Expand All @@ -159,6 +162,7 @@ object PublishSubscribe {
methodPartitioner.out(portHeartbeat) ~> heartbeatPartition ~> merger
methodPartitioner.out(portGetMessagesById) ~> getMessagesByIdPartition ~> merger
methodPartitioner.out(portGreetServer) ~> greetServerPartition ~> merger
methodPartitioner.out(portRumor) ~> rumorPartition ~> merger

/* close the shape */
FlowShape(input.in, merger.out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import akka.actor.ActorRef
import akka.pattern.AskableActorRef
import akka.stream.scaladsl.Flow
import ch.epfl.pop.model.network.MethodType
import ch.epfl.pop.model.network.method.GreetServer
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.{GreetServer, Rumor}
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse}
import ch.epfl.pop.model.objects.Channel
import ch.epfl.pop.model.objects.{Channel, PublicKey}
import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError}
import ch.epfl.pop.pubsub.{AskPatternConstants, ClientActor, PubSubMediator}

Expand Down Expand Up @@ -85,4 +86,19 @@ object ParamsHandler extends AskPatternConstants {
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "GreetServerHandler received an unexpected message:" + graphMessage, None))
}.filter(_ => false)

def rumorHandler(dbActorRef: AskableActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(jsonRpcMessage: JsonRpcRequest) =>
jsonRpcMessage.method match {
case MethodType.rumor =>
val rumor: Rumor = jsonRpcMessage.getParams.asInstanceOf[Rumor]
val senderPk: PublicKey = rumor.senderPk
val rumorId: Int = rumor.rumorId
val messages: Map[Channel, List[Message]] = rumor.messages
Right(jsonRpcMessage)
case _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "RumorHandler received a non expected jsonRpcRequest", jsonRpcMessage.id))
}
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "RumorHandler received an unexpected message:" + graphMessage, None))

}

}
93 changes: 93 additions & 0 deletions be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.pattern.AskableActorRef
import ch.epfl.pop.decentralized.ConnectionMediator
import ch.epfl.pop.json.MessageDataProtocol
import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat
import ch.epfl.pop.model.network.method.Rumor
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.lao.GreetLao
import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType}
Expand All @@ -19,6 +20,7 @@ import com.google.crypto.tink.subtle.Ed25519Sign

import java.util.concurrent.TimeUnit
import scala.collection.immutable.HashMap
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -384,6 +386,50 @@ final case class DbActor(
heartbeatMap
}

private def generateRumorKey(senderPk: PublicKey, rumorId: Int): String = {
s"${storage.RUMOR_KEY}${senderPk.base64Data.data}${Channel.DATA_SEPARATOR}$rumorId"
}

private def generateRumorDataKey(senderPk: PublicKey): String = {
s"${storage.RUMOR_DATA_KEY}${senderPk.base64Data.data}"
}

@throws[DbActorNAckException]
private def readRumorData(senderPk: PublicKey): RumorData = {
Try(storage.read(generateRumorDataKey(senderPk))) match {
case Success(Some(json)) => RumorData.buildFromJson(json)
case Success(None) => RumorData(List.empty)
case Failure(ex) => throw ex
}
}

@throws[DbActorNAckException]
private def readRumors(desiredRumors: Map[PublicKey, List[Int]]): Map[PublicKey, List[Rumor]] = {
desiredRumors.map { case (senderPk, rumorIds) =>
val rumorsForSender: List[Rumor] = rumorIds.flatMap { rumorId =>
val rumorKey = generateRumorKey(senderPk, rumorId)
Try(storage.read(rumorKey)) match {
case Success(Some(json)) => Some(Rumor.buildFromJson(json))
case Success(None) => None
case Failure(ex) => throw ex
}
}
senderPk -> rumorsForSender
}
}

@throws[DbActorNAckException]
private def writeRumor(rumor: Rumor): Unit = {
this.synchronized {
val rumorData: RumorData = Try(readRumorData(rumor.senderPk)) match {
case Success(data) => data
case Failure(_) => RumorData(List.empty)
}
storage.write(generateRumorDataKey(rumor.senderPk) -> rumorData.updateWith(rumor.rumorId).toJsonString)
storage.write(generateRumorKey(rumor.senderPk, rumor.rumorId) -> rumor.toJsonString)
}
}

override def receive: Receive = LoggingReceive {
case Write(channel, message) =>
log.info(s"Actor $self (db) received a WRITE request on channel '$channel'")
Expand Down Expand Up @@ -565,6 +611,27 @@ final case class DbActor(
case failure => sender() ! failure.recover(Status.Failure(_))
}

case WriteRumor(rumor) =>
log.info(s"Actor $self (db) received a WriteRumor request")
Try(writeRumor(rumor)) match {
case Success(_) => sender() ! DbActorAck()
case failure => sender() ! failure.recover(Status.Failure(_))
}

case ReadRumors(desiredRumors) =>
log.info(s"Actor $self (db) received a ReadRumor request")
Try(readRumors(desiredRumors)) match {
case Success(foundRumors) => sender() ! DbActorReadRumors(foundRumors)
case failure => sender() ! failure.recover(Status.Failure(_))
}

case ReadRumorData(senderPk) =>
log.info(s"Actor $self (db) received a ReadRumorData request")
Try(readRumorData(senderPk)) match {
case Success(foundRumorIds) => sender() ! DbActorReadRumorData(foundRumorIds)
case failure => sender() ! failure.recover(Status.Failure(_))
}

case m =>
log.info(s"Actor $self (db) received an unknown message")
sender() ! Status.Failure(DbActorNAckException(ErrorCodes.INVALID_ACTION.id, s"database actor received a message '$m' that it could not recognize"))
Expand Down Expand Up @@ -783,6 +850,24 @@ object DbActor {
/** Request to generate a local heartbeat */
final case class GenerateHeartbeat() extends Event

/** Writes the given rumor in Db and updates RumorData accordingly
* @param rumor
* rumor to write in memory
*/
final case class WriteRumor(rumor: Rumor) extends Event

/** Requests the Db for rumors corresponding to keys {server public key:rumor id}
* @param desiredRumors
* Map of server public keys and list of desired rumor id for each
*/
final case class ReadRumors(desiredRumors: Map[PublicKey, List[Int]]) extends Event

/** Requests the Db for the list of rumorId received for a senderPk
* @param senderPk
* Public key that we want to request
*/
final case class ReadRumorData(senderPk: PublicKey) extends Event

// DbActor DbActorMessage correspond to messages the actor may emit
sealed trait DbActorMessage

Expand Down Expand Up @@ -864,6 +949,14 @@ object DbActor {
*/
final case class DbActorGenerateHeartbeatAck(heartbeatMap: HashMap[Channel, Set[Hash]]) extends DbActorMessage

/** Response for a [[ReadRumors]]
*/
final case class DbActorReadRumors(foundRumors: Map[PublicKey, List[Rumor]]) extends DbActorMessage

/** Response for a [[ReadRumorData]]
*/
final case class DbActorReadRumorData(rumorIds: RumorData) extends DbActorMessage

/** Response for a general db actor ACK
*/
final case class DbActorAck() extends DbActorMessage
Expand Down
2 changes: 2 additions & 0 deletions be2-scala/src/main/scala/ch/epfl/pop/storage/Storage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ trait Storage {

/* List of prefix to the keys */
final val CHANNEL_DATA_KEY = "ChannelData:"
final val RUMOR_DATA_KEY = "RumorData:"
final val DATA_KEY = "Data:"
final val AUTHENTICATED_KEY = "AuthPopTokenClientUser:"
final val CREATE_LAO_KEY = "CreateLaoId:"
final val SETUP_ELECTION_KEY = "SetupElectionMessageId:"
final val SERVER_PUBLIC_KEY = "ServerPublicKey:"
final val SERVER_PRIVATE_KEY = "ServerPrivateKey:"
final val RUMOR_KEY = "Rumor:"
final val DEFAULT = "Default:"

/** Optionally returns the value associated with a key
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ch.epfl.pop.model.objects

import org.scalatest.funsuite.{AnyFunSuite => FunSuite}
import org.scalatest.matchers.should.Matchers
import spray.json._

class RumorDataSuite extends FunSuite with Matchers {

test("Json conversion works for RumorDataSuite") {
val rumorData = RumorData(List(1, 2, 3, 4, 5))

val rumorDataJson = RumorData.buildFromJson(rumorData.toJsonString)

rumorData should equal(rumorDataJson)
}

test("RumorData updates") {
val rumorData = RumorData(List(1, 2, 3, 4, 5))

val updatedRumorData = rumorData.updateWith(6)

updatedRumorData.rumorIds should equal(List(1, 2, 3, 4, 5, 6))

}

}
67 changes: 67 additions & 0 deletions be2-scala/src/test/scala/ch/epfl/pop/storage/DbActorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import org.scalatest.funsuite.AnyFunSuiteLike as FunSuiteLike
import org.scalatest.matchers.should.Matchers
import util.examples.MessageExample
import util.examples.RollCall.{CreateRollCallExamples, OpenRollCallExamples}
import util.examples.Rumor.RumorExample
import ch.epfl.pop.model.network.method.Rumor

import scala.collection.immutable.HashMap
import scala.concurrent.Await
Expand Down Expand Up @@ -919,4 +921,69 @@ class DbActorSuite extends TestKit(ActorSystem("DbActorSuiteActorSystem")) with
heartbeat should equal(expected)
}

test("writeRumor() writes correctly rumor") {

val initialStorage = InMemoryStorage()
val dbActor: AskableActorRef = system.actorOf(Props(DbActor(mediatorRef, MessageRegistry(), initialStorage)))

val rumor: Rumor = RumorExample.rumorExample

val write = dbActor ? DbActor.WriteRumor(rumor)
Await.result(write, duration) shouldBe a[DbActor.DbActorAck]

initialStorage.size should equal(2)

val rumorDataKey = s"${initialStorage.RUMOR_DATA_KEY}${rumor.senderPk.base64Data.data}"
val rumorDataFound = initialStorage.read(rumorDataKey)
val expectedRumorData = RumorData(List(rumor.rumorId))

rumorDataFound shouldBe Some(expectedRumorData.toJsonString)

val rumorKey = s"${initialStorage.RUMOR_KEY}${rumor.senderPk.base64Data.data}${Channel.DATA_SEPARATOR}${rumor.rumorId}"
val rumorFound = initialStorage.read(rumorKey)

rumorFound shouldBe Some(rumor.toJsonString)
}

test("can writeRumor() and then readRumors() correctly from storage") {

val initialStorage = InMemoryStorage()
val dbActor: AskableActorRef = system.actorOf(Props(DbActor(mediatorRef, MessageRegistry(), initialStorage)))

val rumor: Rumor = RumorExample.rumorExample

val write = dbActor ? DbActor.WriteRumor(rumor)
Await.result(write, duration) shouldBe a[DbActor.DbActorAck]

val desiredRumors: Map[PublicKey, List[Int]] = Map(rumor.senderPk -> List(rumor.rumorId))

val read = dbActor ? DbActor.ReadRumors(desiredRumors)
val foundRumors = Await.result(read, duration).asInstanceOf[DbActorReadRumors].foundRumors

foundRumors.foreach { (serverPk, rumorList) =>
desiredRumors.keys should contain(serverPk)
desiredRumors(serverPk) should equal(rumorList.map(_.rumorId))
rumorList.foreach { rumorFromDb =>
rumorFromDb should equal(rumor)
}
}
}

test("can recover list of rumorId received for a senderPk") {
val initialStorage = InMemoryStorage()
val dbActor: AskableActorRef = system.actorOf(Props(DbActor(mediatorRef, MessageRegistry(), initialStorage)))

val rumor: Rumor = RumorExample.rumorExample

val write = dbActor ? DbActor.WriteRumor(rumor)
Await.result(write, duration) shouldBe a[DbActor.DbActorAck]

val desiredRumorDataKey: PublicKey = rumor.senderPk
val readRumorData = dbActor ? DbActor.ReadRumorData(desiredRumorDataKey)
val foundRumorData = Await.result(readRumorData, duration)
val rumorData = foundRumorData.asInstanceOf[DbActorReadRumorData].rumorIds

rumorData.rumorIds should equal(List(rumor.rumorId))
}

}
15 changes: 15 additions & 0 deletions be2-scala/src/test/scala/util/examples/Rumor/RumorExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package util.examples.Rumor

import ch.epfl.pop.model.network.method.Rumor
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.objects.{Base64Data, Channel, PublicKey}
import util.examples.MessageExample

object RumorExample {

private val senderPk: PublicKey = PublicKey(Base64Data.encode("publicKey"))
private val channel: Channel = Channel(Channel.ROOT_CHANNEL_PREFIX + "rumorExample")
private val messages: List[Message] = (for i <- 0 until 10 yield MessageExample.MESSAGE).toList
val rumorExample: Rumor = Rumor(senderPk, 1, Map(channel -> messages))

}

0 comments on commit 280a87f

Please sign in to comment.