Skip to content

Commit

Permalink
merging w. remote
Browse files Browse the repository at this point in the history
  • Loading branch information
kushti committed Dec 30, 2021
2 parents 2d21df5 + 84bd7a3 commit 58ec522
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 11 deletions.
33 changes: 33 additions & 0 deletions src/main/resources/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ components:
bytes:
$ref: '#/components/schemas/HexString'

SnapshotsInfo:
type: object
required:
- availableManifests
properties:
availableManifests:
description: Map of available manifests height -> manifestId
type: array
items:
type: object

ErgoTransactionOutput:
type: object
required:
Expand Down Expand Up @@ -4192,6 +4203,28 @@ paths:
schema:
$ref: '#/components/schemas/ApiError'

/utxo/getSnapshotsInfo:
get:
summary: Get information about saved UTXO snapshots
operationId: getSnapshotsInfo
tags:
- utxo
responses:
'200':
description: A list of saved snapshots
content:
application/json:
schema:
$ref: '#/components/schemas/SnapshotsInfo'
default:
description: Error
content:
application/json:
schema:
$ref: '#/components/schemas/ApiError'



/utxo/genesis:
get:
summary: Get genesis boxes (boxes existed before the very first block)
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/org/ergoplatform/ErgoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class ErgoApp(args: Args) extends ScorexLogging {
new PeersSpec(featureSerializers, scorexSettings.network.maxPeerSpecObjects),
invSpec,
requestModifierSpec,
modifiersSpec
modifiersSpec,
GetSnapshotsInfoSpec
)
}

Expand Down
16 changes: 16 additions & 0 deletions src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import sigmastate.interpreter._
import sigmastate.interpreter.CryptoConstants.EcPointType
import io.circe.syntax._
import org.ergoplatform.http.api.requests.{CryptoResult, ExecuteRequest, HintExtractionRequest}
import org.ergoplatform.nodeView.state.SnapshotsInfo
import org.ergoplatform.nodeView.state.UtxoState.ManifestId
import org.ergoplatform.wallet.interface4j.SecretString
import scorex.crypto.authds.{LeafData, Side}
import scorex.crypto.authds.merkle.MerkleProof
Expand Down Expand Up @@ -401,6 +403,20 @@ trait ApiCodecs extends JsonCodecs {
} yield TransactionHintsBag(secretHints.mapValues(HintsBag.apply), publicHints.mapValues(HintsBag.apply))
}

implicit val SnapshotInfoEncoder: Encoder[SnapshotsInfo] = { si =>
Json.obj(
"availableManifests" -> si.availableManifests.map { case (height, manifest) =>
height -> manifest
}.asJson
)
}

implicit val SnapshotInfoDecoder: Decoder[SnapshotsInfo] = { cursor =>
for {
availableManifests <- Decoder.decodeMap[Int, ManifestId].tryDecode(cursor.downField("availableManifests"))
} yield SnapshotsInfo(availableManifests)
}

implicit val transactionSigningRequestEncoder: Encoder[TransactionSigningRequest] = { tsr =>
Json.obj(
"tx" -> tsr.unsignedTx.asJson,
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/org/ergoplatform/http/api/UtxoApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,11 @@ case class UtxoApiRoute(readersHolder: ActorRef, override val settings: RESTApiS
ApiResponse(getState.map(_.genesisBoxes))
}

def getSnapshotsInfo: Route = (get & path("getSnapshotsInfo")) {
ApiResponse(getState.map {
case usr: UtxoStateReader =>
usr.getSnapshotInfo()
case _ => None
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.net.InetSocketAddress
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorRef, ActorRefFactory, DeathPactException, OneForOneStrategy, Props}
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.modifiers.{ErgoFullBlock, BlockSection}
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock}
import org.ergoplatform.nodeView.history.{ErgoSyncInfoV1, ErgoSyncInfoV2}
import org.ergoplatform.nodeView.history._
import org.ergoplatform.network.ErgoNodeViewSynchronizer.{CheckModifiersToDownload, PeerSyncState}
Expand All @@ -23,11 +23,10 @@ import scorex.core.network.ModifiersStatus.Requested
import scorex.core.{ModifierTypeId, NodeViewModifier, PersistentNodeViewModifier, idsToString}
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs}
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages._
import org.ergoplatform.nodeView.state.ErgoStateReader
import scorex.core.network.message.{InvSpec, MessageSpec, ModifiersSpec, RequestModifierSpec}
import org.ergoplatform.nodeView.state.{ErgoStateReader, UtxoStateReader}
import scorex.core.network.message._
import scorex.core.network._
import scorex.core.network.NetworkController.ReceivableMessages.SendToNetwork
import scorex.core.network.message.{InvData, Message, ModifiersData}
import scorex.core.network.{ConnectedPeer, ModifiersStatus, SendToPeer, SendToPeers}
import scorex.core.serialization.ScorexSerializer
import scorex.core.settings.NetworkSettings
Expand Down Expand Up @@ -95,7 +94,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
*/
override def preStart(): Unit = {
// subscribe for history and mempool changes
viewHolderRef ! GetNodeViewChanges(history = true, state = false, vault = false, mempool = true)
viewHolderRef ! GetNodeViewChanges(history = true, state = true, vault = false, mempool = true)

val toDownloadCheckInterval = networkSettings.syncInterval

Expand All @@ -110,6 +109,8 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
// subscribe for all the node view holder events involving modifiers and transactions
context.system.eventStream.subscribe(self, classOf[ChangedHistory[ErgoHistoryReader]])
context.system.eventStream.subscribe(self, classOf[ChangedMempool[ErgoMemPoolReader]])
context.system.eventStream.subscribe(self, classOf[ChangedState])

context.system.eventStream.subscribe(self, classOf[ModificationOutcome])
context.system.eventStream.subscribe(self, classOf[DownloadRequest])
context.system.eventStream.subscribe(self, classOf[BlockAppliedTransactions])
Expand Down Expand Up @@ -291,6 +292,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,

/**
* Headers should be downloaded from an Older node, it is triggered by received sync message from an older node
*
* @param callingPeer that can be used to download headers, it must be Older
* @return available peers to download headers from together with the state/origin of the peer
*/
Expand All @@ -303,6 +305,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,

/**
* Other persistent modifiers besides headers should be downloaded from either Older or Equal node, with fallback to Unknown or Fork
*
* @return available peers to download persistent modifiers from together with the state/origin of the peer
*/
private def getPeersForDownloadingBlocks: Option[(PeerSyncState, Iterable[ConnectedPeer])] = {
Expand All @@ -320,11 +323,12 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
/**
* Modifier download method that is given min/max constraints for modifiers to download from peers.
* It sends requests for modifiers to given peers in optimally sized batches.
* @param maxModifiers maximum modifiers to download
*
* @param maxModifiers maximum modifiers to download
* @param minModifiersPerBucket minimum modifiers to download per bucket
* @param maxModifiersPerBucket maximum modifiers to download per bucket
* @param getPeersOpt optionally get peers to download from, all peers have the same PeerSyncState
* @param fetchMax function that fetches modifiers, it is passed how many of them tops
* @param getPeersOpt optionally get peers to download from, all peers have the same PeerSyncState
* @param fetchMax function that fetches modifiers, it is passed how many of them tops
*/
protected def requestDownload(maxModifiers: Int, minModifiersPerBucket: Int, maxModifiersPerBucket: Int)
(getPeersOpt: => Option[(PeerSyncState, Iterable[ConnectedPeer])])
Expand Down Expand Up @@ -479,6 +483,21 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
modifiersByStatus.getOrElse(Requested, Map.empty)
}

protected def sendSnapshotsInfo(peer: ConnectedPeer): Unit = {
utxoStateReaderOpt match {
case Some(reader) => {
reader.getSnapshotInfo() match {
case Some(snapInfo) => {
val msg = Message(SnapshotsInfoSpec, Right(snapInfo), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(peer))
}
case _ => log.warn(s"No snapshots avaialble")
}
}
case _ => log.warn(s"Got data from peer while readers are not ready")
}
}

/**
* Object ids coming from other node.
* Filter out modifier ids that are already in process (requested, received or applied),
Expand Down Expand Up @@ -697,6 +716,12 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
case ChangedMempool(newMempoolReader: ErgoMemPool) =>
context.become(initialized(historyReader, newMempoolReader, blockAppliedTxsCache))

case ChangedState(reader: ErgoStateReader) =>
reader match {
case utxoStateReader: UtxoStateReader => utxoStateReaderOpt = Some(utxoStateReader)
case _ =>
}

case ModifiersProcessingResult(_: Seq[BlockSection], cleared: Seq[BlockSection]) =>
// stop processing for cleared modifiers
// applied modifiers state was already changed at `SyntacticallySuccessfulModifier`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,8 @@ trait UtxoStateReader extends ErgoStateReader with TransactionValidation {
*/
def withMempool(mp: ErgoMemPoolReader): UtxoState = withTransactions(mp.getAll)

def getSnapshotInfo(): Option[SnapshotsInfo] = {
val snapshotsDb = SnapshotsDb.create(constants.settings) //todo: move out (to constants?)
snapshotsDb.readSnapshotsInfo
}
}
3 changes: 2 additions & 1 deletion src/main/scala/scorex/core/app/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ trait Application extends ScorexLogging {
new PeersSpec(featureSerializers, settings.network.maxPeerSpecObjects),
invSpec,
requestModifierSpec,
modifiersSpec
modifiersSpec,
GetSnapshotsInfoSpec
)
}

Expand Down
52 changes: 52 additions & 0 deletions src/main/scala/scorex/core/network/message/BasicMessagesRepo.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package scorex.core.network.message


import org.ergoplatform.nodeView.state.SnapshotsInfo
import org.ergoplatform.wallet.Constants
import scorex.core.consensus.SyncInfo
import scorex.core.network._
import scorex.core.network.message.Message.MessageCode
import scorex.core.serialization.ScorexSerializer
import scorex.core.{ModifierTypeId, NodeViewModifier}
import scorex.crypto.hash.Digest32
import scorex.util.Extensions._
import scorex.util.serialization.{Reader, Writer}
import scorex.util.{ModifierId, ScorexLogging, bytesToId, idToBytes}
Expand Down Expand Up @@ -273,3 +276,52 @@ class HandshakeSpec(featureSerializers: PeerFeature.Serializers, sizeLimit: Int)
Handshake(data, t)
}
}


/**
* The `GetSnapshotsInfo` message requests an `SnapshotsInfo` message from the receiving node
*/
object GetSnapshotsInfoSpec extends MessageSpecV1[Unit] {
private val SizeLimit = 100

override val messageCode: MessageCode = 76: Byte

override val messageName: String = "GetSnapshotsInfo"

override def serialize(obj: Unit, w: Writer): Unit = {
}

override def parse(r: Reader): Unit = {
require(r.remaining < SizeLimit, "Too big GetSnapshotsInfo message")
}
}

/**
* The `SnapshotsInfo` message is a reply to a `GetSnapshotsInfo` message.
*/
object SnapshotsInfoSpec extends MessageSpecV1[SnapshotsInfo] {
private val SizeLimit = 1000

override val messageCode: MessageCode = 77: Byte

override val messageName: String = "SnapshotsInfo"

override def serialize(si: SnapshotsInfo, w: Writer): Unit = {
w.putUInt(si.availableManifests.size)
for ((height, manifest) <- si.availableManifests) {
w.putInt(height)
w.putBytes(manifest)
}
}

override def parse(r: Reader): SnapshotsInfo = {
require(r.remaining <= SizeLimit, s"Too big SnapshotsInfo message.")

val length = r.getUInt().toIntExact
SnapshotsInfo((0 until length).map { _ =>
val height = r.getInt()
val manifest = Digest32 @@ r.getBytes(Constants.ModifierIdLength)
height -> manifest
}.toMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.{GetNodeV
import scorex.core.consensus.SyncInfo
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, SendToNetwork}
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages._
import org.ergoplatform.nodeView.state.ErgoState
import org.ergoplatform.nodeView.state.{ErgoState, SnapshotsDb, SnapshotsInfo, UtxoStateReader}
import org.ergoplatform.settings.Algos
import scorex.core.network._
import scorex.core.network.message._
import scorex.core.network.peer.PenaltyType
import scorex.core.serialization.{BytesSerializable, ScorexSerializer}
import scorex.crypto.hash.Digest32
import scorex.testkit.generators.{SyntacticallyTargetedModifierProducer, TotallyValidModifierProducer}
import scorex.testkit.utils.AkkaFixture
import scorex.util.ScorexLogging
Expand All @@ -27,6 +29,7 @@ import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf"))
trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec
Expand All @@ -38,6 +41,8 @@ trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec
val historyGen: Gen[ErgoHistory]
val memPool: ErgoMemPool

val stateGen: Gen[ST]

def nodeViewSynchronizer(implicit system: ActorSystem):
(ActorRef, ErgoSyncInfo, BlockSection, ErgoTransaction, ConnectedPeer, TestProbe, TestProbe, TestProbe, TestProbe, ScorexSerializer[BlockSection])

Expand Down Expand Up @@ -226,4 +231,34 @@ trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec
}
}

property("NodeViewSynchronizer: GetSnapshotInfo") {
withFixture { ctx =>
import ctx._

val s = stateGen.sample.get

if(s.isInstanceOf[UtxoStateReader]) {
// To initialize utxoStateReaderOpt in ErgoNodeView Synchronizer
node ! ChangedState(s)

// First, store snapshots info in DB
val m = (0 until 100).map { _ =>
Random.nextInt(1000000) -> (Digest32 @@ Algos.decode(mod.id).get)
}.toMap
val si = SnapshotsInfo(m)
val db = SnapshotsDb.create(s.constants.settings)
db.writeSnapshotsInfo(si)

// Then send message to request it
node ! Message[Unit](GetSnapshotsInfoSpec, Left(Array.empty[Byte]), Option(peer))
ncProbe.fishForMessage(5 seconds) {
case stn: SendToNetwork if stn.message.spec.isInstanceOf[SnapshotsInfoSpec.type] => true
case _: Any => false
}
} else {
log.info("Snapshots not supported by digest-state")
}
}
}

}

0 comments on commit 58ec522

Please sign in to comment.