Skip to content

Commit

Permalink
Add GetManifest/GetSnapshotChunk messages
Browse files Browse the repository at this point in the history
refer #1552
  • Loading branch information
knizhnik committed Jan 11, 2022
1 parent a77205f commit f37c72e
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 11 deletions.
4 changes: 3 additions & 1 deletion src/main/scala/org/ergoplatform/ErgoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class ErgoApp(args: Args) extends ScorexLogging {
invSpec,
requestModifierSpec,
modifiersSpec,
GetSnapshotsInfoSpec
GetSnapshotsInfoSpec,
new GetManifestSpec,
new GetUtxoSnapshotChunkSpec
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.ergoplatform.network

import akka.actor.SupervisorStrategy.{Restart, Stop}

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorRef, ActorRefFactory, DeathPactException, OneForOneStrategy, Props}
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.ErgoTransaction
Expand All @@ -23,6 +23,7 @@ import scorex.core.network.ModifiersStatus.Requested
import scorex.core.{ModifierTypeId, NodeViewModifier, PersistentNodeViewModifier, idsToString}
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs}
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages._
import org.ergoplatform.nodeView.state.UtxoState.{ManifestId, SubtreeId}
import org.ergoplatform.nodeView.state.{ErgoStateReader, UtxoStateReader}
import scorex.core.network.message._
import scorex.core.network._
Expand Down Expand Up @@ -489,7 +490,27 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
val msg = Message(SnapshotsInfoSpec, Right(snapInfo), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(peer))
}
case _ => log.warn(s"No snapshots avaialble")
case _ => log.warn(s"No snapshots available")
}
}

protected def sendManifest(id: ManifestId, usr: UtxoStateReader, peer: ConnectedPeer): Unit = {
usr.getManifest(id) match {
case Some(manifest) => {
val msg = Message(ManifestSpec, Right(manifest), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(peer))
}
case _ => log.warn(s"No snapshots available")
}
}

protected def sendUtxoSnapshotChunk(id: SubtreeId, usr: UtxoStateReader, peer: ConnectedPeer): Unit = {
usr.getUtxoSnapshotChunk(id) match {
case Some(snapChunk) => {
val msg = Message(UtxoSnapshotChunkSpec, Right(snapChunk), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(peer))
}
case _ => log.warn(s"No snapshots available")
}
}

Expand Down Expand Up @@ -752,6 +773,16 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
case Some(usr) => sendSnapshotsInfo(usr, remote)
case None => log.warn(s"Asked for snapshot when UTXO set is not supported, remote: $remote")
}
case (_: GetManifestSpec, id: ManifestId @unchecked, remote) =>
usrOpt match {
case Some(usr) => sendManifest(id, usr, remote)
case None => log.warn(s"Asked for snapshot when UTXO set is not supported, remote: $remote")
}
case (_: GetUtxoSnapshotChunkSpec, id: SubtreeId @unchecked, remote) =>
usrOpt match {
case Some(usr) => sendUtxoSnapshotChunk(id, usr, remote)
case None => log.warn(s"Asked for snapshot when UTXO set is not supported, remote: $remote")
}
}

def initialized(hr: ErgoHistory,
Expand Down
16 changes: 12 additions & 4 deletions src/main/scala/org/ergoplatform/nodeView/state/SnapshotsInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,20 @@ class SnapshotsDb(store: LDBKVStore) extends ScorexLogging {
writeSnapshotsInfo(si)
}

def readManifestBytes(id: ManifestId): Option[BatchAVLProverManifest[Digest32]] = {
store.get(id).flatMap(bs => serializer.manifestFromBytes(bs, Constants.ModifierIdLength).toOption)
def readManifest(id: ManifestId): Option[BatchAVLProverManifest[Digest32]] = {
readManifestBytes(id).flatMap(bs => serializer.manifestFromBytes(bs, Constants.ModifierIdLength).toOption)
}

def readSubtreeBytes(id: SubtreeId): Option[BatchAVLProverSubtree[Digest32]] = {
store.get(id).flatMap(bs => serializer.subtreeFromBytes(bs, Constants.ModifierIdLength).toOption)
def readSubtree(id: SubtreeId): Option[BatchAVLProverSubtree[Digest32]] = {
readSubtreeBytes(id).flatMap(bs => serializer.subtreeFromBytes(bs, Constants.ModifierIdLength).toOption)
}

def readManifestBytes(id: ManifestId): Option[Array[Byte]] = {
store.get(id)
}

def readSubtreeBytes(id: SubtreeId): Option[Array[Byte]] = {
store.get(id)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ object UtxoState extends ScorexLogging {
case Some(snapshotsInfo) =>
val (h, manifestId) = snapshotsInfo.availableManifests.maxBy(_._1)
log.info(s"Reading snapshot from height $h")
val manifest = snapshotDb.readManifestBytes(manifestId).get
val manifest = snapshotDb.readManifest(manifestId).get
val subtreeIds = manifest.subtreesIds
val subtrees = subtreeIds.map(sid => snapshotDb.readSubtreeBytes(sid).get)
val subtrees = subtreeIds.map(sid => snapshotDb.readSubtree(sid).get)
val serializer = new BatchAVLProverSerializer[Digest32, HF]()(ErgoAlgos.hash)
val prover = serializer.combine(manifest -> subtrees, Algos.hash.DigestSize, None).get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.ergoplatform.modifiers.ErgoFullBlock
import org.ergoplatform.modifiers.history.ADProofs
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
import org.ergoplatform.nodeView.state.UtxoState.{ManifestId, SubtreeId}
import org.ergoplatform.settings.Algos
import org.ergoplatform.settings.Algos.HF
import org.ergoplatform.wallet.boxes.ErgoBoxSerializer
Expand All @@ -25,7 +26,7 @@ trait UtxoStateReader extends ErgoStateReader with TransactionValidation {
private lazy val np = NodeParameters(keySize = 32, valueSize = None, labelSize = 32)
protected lazy val storage = new VersionedLDBAVLStorage(store, np)

protected val persistentProver: PersistentBatchAVLProver[Digest32, HF]
val persistentProver: PersistentBatchAVLProver[Digest32, HF]

/**
* Validate transaction against provided state context, if specified,
Expand Down Expand Up @@ -149,4 +150,14 @@ trait UtxoStateReader extends ErgoStateReader with TransactionValidation {
val snapshotsDb = SnapshotsDb.create(constants.settings) //todo: move out (to constants?)
snapshotsDb.readSnapshotsInfo
}

def getManifest(id: ManifestId): Option[Array[Byte]] = {
val snapshotsDb = SnapshotsDb.create(constants.settings) //todo: move out (to constants?)
snapshotsDb.readManifestBytes(id)
}

def getUtxoSnapshotChunk(id: SubtreeId): Option[Array[Byte]] = {
val snapshotsDb = SnapshotsDb.create(constants.settings) //todo: move out (to constants?)
snapshotsDb.readSubtreeBytes(id)
}
}
4 changes: 3 additions & 1 deletion src/main/scala/scorex/core/app/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ trait Application extends ScorexLogging {
invSpec,
requestModifierSpec,
modifiersSpec,
GetSnapshotsInfoSpec
GetSnapshotsInfoSpec,
new GetManifestSpec,
new GetUtxoSnapshotChunkSpec
)
}

Expand Down
88 changes: 88 additions & 0 deletions src/main/scala/scorex/core/network/message/BasicMessagesRepo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scorex.core.network.message


import org.ergoplatform.nodeView.state.SnapshotsInfo
import org.ergoplatform.nodeView.state.UtxoState.{ManifestId, SubtreeId}
import org.ergoplatform.wallet.Constants
import scorex.core.consensus.SyncInfo
import scorex.core.network._
Expand Down Expand Up @@ -325,3 +326,90 @@ object SnapshotsInfoSpec extends MessageSpecV1[SnapshotsInfo] {
}.toMap)
}
}

/**
* The `GetManifest` sends manifest (BatchAVLProverManifest) identifier
*/
class GetManifestSpec extends MessageSpecV1[ManifestId] {
private val SizeLimit = 100

override val messageCode: MessageCode = 78: Byte
override val messageName: String = "GetManifest"

override def serialize(id: ManifestId, w: Writer): Unit = {
w.putBytes(id)
}

override def parse(r: Reader): ManifestId = {
require(r.remaining < SizeLimit, "Too big GetManifest message")
Digest32 @@ r.getBytes(Constants.ModifierIdLength)
}
}

/**
* The `Manifest` message is a reply to a `GetManifest` message.
*/
object ManifestSpec extends MessageSpecV1[Array[Byte]] {
private val SizeLimit = 1000

override val messageCode: MessageCode = 79: Byte

override val messageName: String = "Manifest"

override def serialize(manifestBytes: Array[Byte], w: Writer): Unit = {
w.putUInt(manifestBytes.size)
w.putBytes(manifestBytes)
}

override def parse(r: Reader): Array[Byte] = {
require(r.remaining <= SizeLimit, s"Too big Manifest message.")

val length = r.getUInt().toIntExact
r.getBytes(length)
}
}

/**
* The `GetManifest` sends send utxo subtree (BatchAVLProverSubtree) identifier
*/
class GetUtxoSnapshotChunkSpec() extends MessageSpecV1[SubtreeId] {
private val SizeLimit = 100

override val messageCode: MessageCode = 80: Byte

override val messageName: String = "GetUtxoSnapshotChunk"

override def serialize(id: SubtreeId, w: Writer): Unit = {
w.putBytes(id)
}

override def parse(r: Reader): SubtreeId = {
require(r.remaining < SizeLimit, "Too big GetManifest message")
Digest32 @@ r.getBytes(Constants.ModifierIdLength)
}
}

/**
* The `Manifest` message is a reply to a `GetManifest` message.
*/
object UtxoSnapshotChunkSpec extends MessageSpecV1[Array[Byte]] {
private val SizeLimit = 1000

override val messageCode: MessageCode = 81: Byte

override val messageName: String = "UtxoSnapshotChunk"

override def serialize(subtree: Array[Byte], w: Writer): Unit = {
w.putUInt(subtree.size)
w.putBytes(subtree)
}

override def parse(r: Reader): Array[Byte] = {
require(r.remaining <= SizeLimit, s"Too big UtxoSnapshotChunk message.")

val length = r.getUInt().toIntExact
r.getBytes(length)
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.{GetNodeV
import scorex.core.consensus.SyncInfo
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, SendToNetwork}
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages._
import org.ergoplatform.nodeView.state.UtxoState.ManifestId
import org.ergoplatform.nodeView.state.{ErgoState, SnapshotsDb, SnapshotsInfo, UtxoStateReader}
import org.ergoplatform.settings.Algos
import org.ergoplatform.settings.Algos.HF
import scorex.core.network._
import scorex.core.network.message._
import scorex.core.network.peer.PenaltyType
import scorex.core.serialization.{BytesSerializable, ScorexSerializer}
import scorex.crypto.authds.avltree.batch.serialization.BatchAVLProverSerializer
import scorex.crypto.hash.Digest32
import scorex.testkit.generators.{SyntacticallyTargetedModifierProducer, TotallyValidModifierProducer}
import scorex.testkit.utils.AkkaFixture
Expand Down Expand Up @@ -261,4 +264,74 @@ trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec
}
}

property("NodeViewSynchronizer: GetManifest") {
withFixture { ctx =>
import ctx._

val s = stateGen.sample.get

s match {
case usr: UtxoStateReader => {
// To initialize utxoStateReaderOpt in ErgoNodeView Synchronizer
node ! ChangedState(s)

// Generate some snapshot
val height = 1
usr.applyModifier(mod, Some(height))
usr.persistentProver.generateProofAndUpdateStorage()
implicit val hf: HF = Algos.hash
val serializer = new BatchAVLProverSerializer[Digest32, HF]
val (manifest, subtrees) = serializer.slice(usr.persistentProver.avlProver, subtreeDepth = 12)

val db = SnapshotsDb.create(s.constants.settings)
db.writeSnapshot(height, manifest, subtrees)

// Then send message to request it
node ! Message[ManifestId](new GetManifestSpec, Left(manifest.id), Option(peer))
ncProbe.fishForMessage(5 seconds) {
case stn: SendToNetwork if stn.message.spec.isInstanceOf[ManifestSpec.type] => true
case _: Any => false
}
}
case _ =>
log.info("Snapshots not supported by digest-state")
}
}
}


property("NodeViewSynchronizer: GetSnapshotChunk") {
withFixture { ctx =>
import ctx._

val s = stateGen.sample.get

s match {
case usr: UtxoStateReader => {
// To initialize utxoStateReaderOpt in ErgoNodeView Synchronizer
node ! ChangedState(s)

// Generate some snapshot
val height = 1
usr.applyModifier(mod, Some(height))
usr.persistentProver.generateProofAndUpdateStorage()
implicit val hf: HF = Algos.hash
val serializer = new BatchAVLProverSerializer[Digest32, HF]
val (manifest, subtrees) = serializer.slice(usr.persistentProver.avlProver, subtreeDepth = 12)

val db = SnapshotsDb.create(s.constants.settings)
db.writeSnapshot(height, manifest, subtrees)

// Then send message to request it
node ! Message[ManifestId](new GetUtxoSnapshotChunkSpec, Left(subtrees.last.id), Option(peer))
ncProbe.fishForMessage(5 seconds) {
case stn: SendToNetwork if stn.message.spec.isInstanceOf[UtxoSnapshotChunkSpec.type] => true
case _: Any => false
}
}
case _ =>
log.info("Snapshots not supported by digest-state")
}
}
}
}

0 comments on commit f37c72e

Please sign in to comment.