Skip to content

Commit

Permalink
Merge pull request #1968 from dedis/work-be2-ons-federation-dataExcha…
Browse files Browse the repository at this point in the history
…nge-implementation

Federation data exchange implementation
  • Loading branch information
onsriahi14 authored Jun 30, 2024
2 parents 3c0b81e + b358e61 commit 0d0e614
Show file tree
Hide file tree
Showing 17 changed files with 350 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,36 @@ object MessageDataProtocol extends DefaultJsonProtocol {
}
}

implicit object FederationTokensExchangeFormat extends JsonFormat[FederationTokensExchange] {
final private val PARAM_LAO_ID = "lao_id"
final private val PARAM_ROLL_CALL_ID = "roll_call_id"
final private val PARAM_TOKENS = "tokens"
final private val PARAM_TIMESTAMP = "timestamp"

override def read(json: JsValue): FederationTokensExchange = json.asJsObject().getFields(PARAM_LAO_ID, PARAM_ROLL_CALL_ID, PARAM_TOKENS, PARAM_TIMESTAMP) match {
case Seq(laoId @ JsString(_), rollCallId @ JsString(_), JsArray(tokens), timestamp @ JsNumber(_)) =>
FederationTokensExchange(
laoId.convertTo[Hash],
rollCallId.convertTo[Hash],
tokens.map(_.convertTo[PublicKey]).toList,
timestamp.convertTo[Timestamp]
)
case _ => throw new IllegalArgumentException(s"Can't parse json value $json to a FederationTokensExchange object")
}

override def write(obj: FederationTokensExchange): JsValue = {
var jsObjectContent: ListMap[String, JsValue] = ListMap[String, JsValue](
PARAM_OBJECT -> JsString(obj._object.toString),
PARAM_ACTION -> JsString(obj.action.toString),
PARAM_LAO_ID -> obj.laoId.toJson,
PARAM_ROLL_CALL_ID -> obj.rollCallId.toJson,
PARAM_TOKENS -> obj.tokens.toJson,
PARAM_TIMESTAMP -> obj.timestamp.toJson
)
JsObject(jsObjectContent)
}
}

implicit object NumberOfChirpsReactionsDataFormat extends JsonFormat[NumberOfChirpsReactionsData] {
final private val PARAM_NUMBER_OF_CHIRPS_REACTIONS: String = "numberOfChirpsReactions"

Expand All @@ -622,6 +652,7 @@ object MessageDataProtocol extends DefaultJsonProtocol {
override def write(obj: NumberOfChirpsReactionsData): JsValue = JsObject(
PARAM_NUMBER_OF_CHIRPS_REACTIONS -> obj.numberOfChirpsReactions.toJson
)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ enum ActionType(val action: String):
case expect extends ActionType("expect")
case challenge_request extends ActionType("challenge_request")
case challenge extends ActionType("challenge")
case tokens_exchange extends ActionType("tokens_exchange")
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ch.epfl.pop.model.network.method.message.data.federation

import ch.epfl.pop.json.MessageDataProtocol.*
import ch.epfl.pop.model.network.Parsable
import ch.epfl.pop.model.network.method.message.data.{ActionType, MessageData, ObjectType}
import ch.epfl.pop.model.objects.{Hash, PublicKey, Timestamp}
import spray.json.*

final case class FederationTokensExchange(
laoId: Hash,
rollCallId: Hash,
tokens: List[PublicKey],
timestamp: Timestamp
) extends MessageData {

override val _object: ObjectType = ObjectType.federation
override val action: ActionType = ActionType.tokens_exchange

}

object FederationTokensExchange extends Parsable {
def apply(laoId: Hash, rollCallId: Hash, tokens: List[PublicKey], timestamp: Timestamp): FederationTokensExchange = {
new FederationTokensExchange(laoId, rollCallId, tokens, timestamp)
}

override def buildFromJson(payload: String): FederationTokensExchange = payload.parseJson.asJsObject.convertTo[FederationTokensExchange]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.pop.pubsub
import ch.epfl.pop.model.network.JsonRpcRequest
import ch.epfl.pop.model.network.method.message.data.coin.PostTransaction
import ch.epfl.pop.model.network.method.message.data.election.*
import ch.epfl.pop.model.network.method.message.data.federation.{FederationChallenge, FederationChallengeRequest, FederationExpect, FederationInit, FederationResult}
import ch.epfl.pop.model.network.method.message.data.federation.{FederationChallenge, FederationChallengeRequest, FederationExpect, FederationInit, FederationResult, FederationTokensExchange}
import ch.epfl.pop.model.network.method.message.data.lao.{CreateLao, GreetLao, StateLao, UpdateLao}
import ch.epfl.pop.model.network.method.message.data.meeting.{CreateMeeting, StateMeeting}
import ch.epfl.pop.model.network.method.message.data.popcha.Authenticate
Expand Down Expand Up @@ -285,6 +285,14 @@ object MessageRegistry {
FederationHandler.handleFederationResult
)

register.add(
(ObjectType.federation, ActionType.tokens_exchange),
createSchemaVerifier("dataFederationTokensExchange.json"),
FederationTokensExchange.buildFromJson,
FederationValidator.validateFederationTokensExchange,
FederationHandler.handleFederationTokensExchange
)

new MessageRegistry(register.get)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import ch.epfl.pop.model.network.JsonRpcRequest
import ch.epfl.pop.model.network.MethodType.publish
import ch.epfl.pop.model.network.method.ParamsWithMessage
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.federation.{FederationChallenge, FederationExpect, FederationInit, FederationResult}
import ch.epfl.pop.model.network.method.message.data.federation.{FederationChallenge, FederationExpect, FederationInit, FederationResult, FederationTokensExchange}
import ch.epfl.pop.model.objects.*
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
import ch.epfl.pop.pubsub.PubSubMediator
Expand Down Expand Up @@ -39,6 +39,8 @@ object FederationHandler extends MessageHandler {

def handleFederationResult(rpcMessage: JsonRpcRequest): GraphMessage = handlerInstance.handleFederationResult(rpcMessage)

def handleFederationTokensExchange(rpcMessage: JsonRpcRequest): GraphMessage = handlerInstance.handleFederationTokensExchange(rpcMessage)

}

class FederationHandler(dbRef: => AskableActorRef, mediatorRef: => AskableActorRef, connectionMediatorRef: => AskableActorRef) extends MessageHandler {
Expand Down Expand Up @@ -127,6 +129,7 @@ class FederationHandler(dbRef: => AskableActorRef, mediatorRef: => AskableActorR
val askWrite = dbActor ? DbActor.WriteFederationInit(federationChannel, laoId, message)
Await.ready(askWrite, duration).value match {
case Some(Success(_)) =>
// get the other lao's server ref to send him the challenge
val getServer = connectionMediator ? GetFederationServer(serverAddress)
Await.result(getServer, duration) match {
case ConnectionMediator.GetFederationServerAck(federationServerRef) =>
Expand Down Expand Up @@ -164,6 +167,7 @@ class FederationHandler(dbRef: => AskableActorRef, mediatorRef: => AskableActorR
Await.ready(ask, duration).value match {
case Some(Success(DbActor.DbActorReadAck(Some(message)))) =>
val expect: FederationExpect = FederationExpect.buildFromJson(message.data.decodeToString())
// extract the challenge message from the FederationExpect message
val challengeMessage: Message = expect.challenge
val expectedChallenge: FederationChallenge = FederationChallenge.buildFromJson(challengeMessage.data.decodeToString())
val serverAddress = expect.serverAddress
Expand All @@ -175,6 +179,7 @@ class FederationHandler(dbRef: => AskableActorRef, mediatorRef: => AskableActorR

Await.ready(ask, duration).value match {
case Some(Success(data)) =>
// check if the challenge received matches the challenge extracted from FederationExpect message
if (data.value.equals(expectedChallenge.value) && data.validUntil == expectedChallenge.validUntil)
federationResult = FederationResult(STATUS._1, expect.publicKey, challengeMessage)
else
Expand Down Expand Up @@ -215,7 +220,7 @@ class FederationHandler(dbRef: => AskableActorRef, mediatorRef: => AskableActorR

Await.ready(combined, duration).value match {
case Some(Success(_)) => Right(rpcMessage)
case _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"handleChallengeResult unknown error", rpcMessage.getId))
case _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"handleFederationResult unknown error", rpcMessage.getId))
}
case _ => Left(PipelineError(
ErrorCodes.SERVER_ERROR.id,
Expand All @@ -226,6 +231,35 @@ class FederationHandler(dbRef: => AskableActorRef, mediatorRef: => AskableActorR
}
}

def handleFederationTokensExchange(rpcMessage: JsonRpcRequest): GraphMessage = {

val ask = for {
case (_, message, Some(data)) <- extractParameters[FederationTokensExchange](rpcMessage, serverUnexpectedAnswer)
} yield (message, data)

Await.ready(ask, duration).value match {
case Some(Success(message, data)) =>
val federationChannel: Channel = rpcMessage.getParamsChannel
val laoId: Hash = rpcMessage.extractLaoId
val combined = for {
_ <- mediator ? PubSubMediator.Propagate(rpcMessage.getParamsChannel, message)
// store the message in the db
_ <- dbActor ? DbActor.WriteFederationTokensExchange(federationChannel, laoId, message)
} yield ()

Await.ready(combined, duration).value match {
case Some(Success(_)) => Right(rpcMessage)
case _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"handleTokensExchange unknown error", rpcMessage.getId))
}
case _ => Left(PipelineError(
ErrorCodes.SERVER_ERROR.id,
s"Couldn't extract federationTokensExchange parameters",
rpcMessage.getId
))

}
}

private def generateRandomBytes(numBytes: Int): Try[Array[Byte]] = {
val randomBytes = new Array[Byte](numBytes)
val secureRandom = new SecureRandom()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ object FederationValidator extends MessageDataContentValidator {

def validateFederationResult(rpcMessage: JsonRpcRequest): GraphMessage = federationValidator.validateFederationResult(rpcMessage)

def validateFederationTokensExchange(rpcMessage: JsonRpcRequest): GraphMessage = federationValidator.validateFederationTokensExchange(rpcMessage)

}

sealed class FederationValidator(dbActorRef: => AskableActorRef) extends MessageDataContentValidator {
Expand Down Expand Up @@ -239,4 +241,33 @@ sealed class FederationValidator(dbActorRef: => AskableActorRef) extends Message
}
}

def validateFederationTokensExchange(rpcMessage: JsonRpcRequest): GraphMessage = {
def validationError(reason: String): PipelineError = super.validationError(reason, "FederationTokensExchange", rpcMessage.id)

rpcMessage.getParamsMessage match {
case Some(message: Message) =>
val (federationTokensExchange, laoId, senderPk, channel) = extractData[FederationTokensExchange](rpcMessage)

runChecks(
checkChannelType(
rpcMessage,
ObjectType.federation,
channel,
dbActorRef,
validationError(s"trying to send a federationTokensExchange message on a wrong type of channel $channel")
),
checkOwner(
rpcMessage,
senderPk,
channel,
dbActorRef,
validationError(s"Sender $senderPk of the federationTokensExchange is not the organizer")
)
)

case _ => Left(validationErrorNoMessage(rpcMessage.id))

}

}
}
64 changes: 48 additions & 16 deletions be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -857,15 +857,9 @@ final case class DbActor(

@throws[DbActorNAckException]
private def readFederationMessages(channel: Channel, key: String): Option[Message] = {
channel.extractLaoChannel match {
case Some(mainLaoChannel) =>
storage.read(storage.DATA_KEY + key + channel.toString) match {
case Some(msgId) => read(mainLaoChannel, Hash(Base64Data(msgId)))
case _ => None
}
case _ =>
log.info("Error : Trying to read a federationMessage from an invalid channel")
None
storage.read(storage.DATA_KEY + key + channel.toString) match {
case Some(msgId) => read(channel, Hash(Base64Data(msgId)))
case _ => None
}
}

Expand All @@ -889,16 +883,17 @@ final case class DbActor(
readFederationMessages(channel, storage.FEDERATION_RESULT_KEY + laoId.toString)
}

@throws[DbActorNAckException]
private def readFederationTokensExchange(channel: Channel, laoId: Hash): Option[Message] = {
readFederationMessages(channel, storage.FEDERATION_TOKENS_EXCHANGE_KEY + laoId.toString)
}

@throws[DbActorNAckException]
private def writeFederationMessages(channel: Channel, key: String, message: Message): Unit = {
channel.extractLaoChannel match {
case Some(mainLaoChannel) =>
createChannel(channel, ObjectType.federation)
storage.write((storage.DATA_KEY + key + channel.toString, message.message_id.toString()))
writeAndPropagate(mainLaoChannel, message)
createChannel(channel, ObjectType.federation)
storage.write((storage.DATA_KEY + key + channel.toString, message.message_id.toString()))
write(channel, message)

case _ => log.info("Error : Trying to write a federationMessage on an invalid channel")
}
}

@throws[DbActorNAckException]
Expand All @@ -921,6 +916,11 @@ final case class DbActor(
writeFederationMessages(channel, storage.FEDERATION_RESULT_KEY + laoId.toString, message)
}

@throws[DbActorNAckException]
private def writeFederationTokensExchange(channel: Channel, laoId: Hash, message: Message): Unit = {
writeFederationMessages(channel, storage.FEDERATION_TOKENS_EXCHANGE_KEY + laoId.toString, message)
}

@throws[DbActorNAckException]
private def deleteFederationChallenge(channel: Channel, laoId: Hash): Unit = {
val key = storage.DATA_KEY + storage.FEDERATION_CHALLENGE_KEY + laoId.toString + channel.toString
Expand Down Expand Up @@ -1186,6 +1186,13 @@ final case class DbActor(
case failure => sender() ! failure.recover(Status.Failure(_))
}

case ReadFederationTokensExchange(channel, laoId) =>
log.info(s"Actor $self (db) received a ReadFederationTokensExchange request")
Try(readFederationTokensExchange(channel, laoId)) match {
case Success(message) => sender() ! DbActorReadAck(message)
case failure => sender() ! failure.recover(Status.Failure(_))
}

case WriteFederationChallenge(channel, laoId, message) =>
log.info(s"Actor $self (db) received a WriteFederationChallenge request")
Try(writeFederationChallenge(channel, laoId, message)) match {
Expand Down Expand Up @@ -1214,6 +1221,13 @@ final case class DbActor(
case failure => sender() ! failure.recover(Status.Failure(_))
}

case WriteFederationTokensExchange(channel, laoId, message) =>
log.info(s"Actor $self (db) received a WriteFederationTokensExchange request")
Try(writeFederationTokensExchange(channel, laoId, message)) match {
case Success(_) => sender() ! DbActorAck()
case failure => sender() ! failure.recover(Status.Failure(_))
}

case DeleteFederationChallenge(channel, laoId) =>
log.info(s"Actor $self (db) received a DeleteFederationChallenge request")
Try(deleteFederationChallenge(channel, laoId)) match {
Expand Down Expand Up @@ -1509,6 +1523,14 @@ object DbActor {
*/
final case class ReadFederationResult(channel: Channel, laoId: Hash) extends Event

/** Requests the Db for the federationTokensExchange message
* @param channel
* the channel in which the federationTokensExchange is being sent
* @param laoId
* the id of the lao in which the federationTokensExchange message is
*/
final case class ReadFederationTokensExchange(channel: Channel, laoId: Hash) extends Event

/** Requests the Db to write the challenge
* @param channel
* the channel in which the challenge is being sent
Expand Down Expand Up @@ -1552,6 +1574,16 @@ object DbActor {
*/
final case class WriteFederationResult(channel: Channel, laoId: Hash, message: Message) extends Event

/** Requests the Db to write the federationTokensExchange
* @param channel
* the channel in which the federationTokensExchange is being sent
* @param laoId
* the id of the lao in which the federationTokensExchange message is
* @param message
* the federationTokensExchange message
*/
final case class WriteFederationTokensExchange(channel: Channel, laoId: Hash, message: Message) extends Event

/** Requests the Db to delete the challenge
* @param channel
* the channel in which the challenge is being sent
Expand Down
1 change: 1 addition & 0 deletions be2-scala/src/main/scala/ch/epfl/pop/storage/Storage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ trait Storage {
final val FEDERATION_EXPECT_KEY = "FederationExpect:"
final val FEDERATION_INIT_KEY = "FederationInit:"
final val FEDERATION_RESULT_KEY = "FederationResult:"
final val FEDERATION_TOKENS_EXCHANGE_KEY = "FederationTokensExchange:"
final val NUMBER_OF_REACTIONS_KEY = "NumberOfReactions:"

/** Optionally returns the value associated with a key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import org.scalatest.matchers.should.Matchers
import scala.io.{BufferedSource, Source}
import ch.epfl.pop.model.network.method.message.data.coin.PostTransaction
import ch.epfl.pop.model.network.method.message.data.election.VersionType.*
import ch.epfl.pop.model.network.method.message.data.federation.{FederationChallenge, FederationChallengeRequest, FederationExpect, FederationInit, FederationResult}
import ch.epfl.pop.model.network.method.message.data.federation.{FederationChallenge, FederationChallengeRequest, FederationExpect, FederationInit, FederationResult, FederationTokensExchange}
import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType}
import spray.json.*
import util.examples.Federation.FederationChallengeExample.{CHALLENGE, CHALLENGE_1}
import util.examples.Federation.FederationChallengeRequestExample.CHALLENGE_REQUEST
import util.examples.Federation.FederationExpectExample.{EXPECT, EXPECT_1}
import util.examples.Federation.FederationInitExample.{INIT, INIT_1}
import util.examples.Federation.FederationResultExample.{RESULT_1, RESULT_1_1, RESULT_2, RESULT_2_2}
import util.examples.Federation.FederationTokensExchangeExample.TOKENS_EXCHANGE

class MessageDataProtocolSuite extends FunSuite with Matchers {

Expand Down Expand Up @@ -322,6 +323,20 @@ class MessageDataProtocolSuite extends FunSuite with Matchers {
federationResultFromBuiltJson_2 should equal(expectedFederationResult_2)
}

test("Parser correctly encodes and decodes FederationTokensExchange") {
val example = getExampleMessage("messageData/federation_tokens_exchange/federation_tokens_exchange.json")
val expectedFederationTokensExchange = TOKENS_EXCHANGE

val federationTokensExchangeFromExample = FederationTokensExchange.buildFromJson(example)
val buildTokensExchangeJson = MessageDataProtocol.FederationTokensExchangeFormat.write(expectedFederationTokensExchange)
val federationTokensExchangeFromBuiltJson = MessageDataProtocol.FederationTokensExchangeFormat.read(buildTokensExchangeJson)

federationTokensExchangeFromBuiltJson shouldBe a[FederationTokensExchange]
federationTokensExchangeFromExample shouldBe a[FederationTokensExchange]
federationTokensExchangeFromExample should equal(expectedFederationTokensExchange)
federationTokensExchangeFromBuiltJson should equal(expectedFederationTokensExchange)
}

test("Parser correctly encodes and decodes ObjectType") {
ObjectType.values.foreach(obj => {
val fromJson = MessageDataProtocol.objectTypeFormat.write(obj)
Expand Down
Loading

0 comments on commit 0d0e614

Please sign in to comment.