From f37c72ee5d5074346d6ca9846c9ad52cb7953cf1 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 11 Jan 2022 17:43:34 +0300 Subject: [PATCH] Add GetManifest/GetSnapshotChunk messages refer #1552 --- src/main/scala/org/ergoplatform/ErgoApp.scala | 4 +- .../network/ErgoNodeViewSynchronizer.scala | 35 +++++++- .../nodeView/state/SnapshotsInfo.scala | 16 +++- .../nodeView/state/UtxoState.scala | 4 +- .../nodeView/state/UtxoStateReader.scala | 13 ++- .../scala/scorex/core/app/Application.scala | 4 +- .../network/message/BasicMessagesRepo.scala | 88 +++++++++++++++++++ .../NodeViewSynchronizerTests.scala | 73 +++++++++++++++ 8 files changed, 226 insertions(+), 11 deletions(-) diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index cbcbbfa2e3..709948f8f3 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -68,7 +68,9 @@ class ErgoApp(args: Args) extends ScorexLogging { invSpec, requestModifierSpec, modifiersSpec, - GetSnapshotsInfoSpec + GetSnapshotsInfoSpec, + new GetManifestSpec, + new GetUtxoSnapshotChunkSpec ) } diff --git a/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala b/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala index 260fa09f1f..e130524b95 100644 --- a/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala +++ b/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala @@ -1,8 +1,8 @@ package org.ergoplatform.network import akka.actor.SupervisorStrategy.{Restart, Stop} - import java.net.InetSocketAddress + import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorRef, ActorRefFactory, DeathPactException, OneForOneStrategy, Props} import org.ergoplatform.modifiers.history.header.Header import org.ergoplatform.modifiers.mempool.ErgoTransaction @@ -23,6 +23,7 @@ import scorex.core.network.ModifiersStatus.Requested import scorex.core.{ModifierTypeId, NodeViewModifier, PersistentNodeViewModifier, idsToString} import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs} import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages._ +import org.ergoplatform.nodeView.state.UtxoState.{ManifestId, SubtreeId} import org.ergoplatform.nodeView.state.{ErgoStateReader, UtxoStateReader} import scorex.core.network.message._ import scorex.core.network._ @@ -489,7 +490,27 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, val msg = Message(SnapshotsInfoSpec, Right(snapInfo), None) networkControllerRef ! SendToNetwork(msg, SendToPeer(peer)) } - case _ => log.warn(s"No snapshots avaialble") + case _ => log.warn(s"No snapshots available") + } + } + + protected def sendManifest(id: ManifestId, usr: UtxoStateReader, peer: ConnectedPeer): Unit = { + usr.getManifest(id) match { + case Some(manifest) => { + val msg = Message(ManifestSpec, Right(manifest), None) + networkControllerRef ! SendToNetwork(msg, SendToPeer(peer)) + } + case _ => log.warn(s"No snapshots available") + } + } + + protected def sendUtxoSnapshotChunk(id: SubtreeId, usr: UtxoStateReader, peer: ConnectedPeer): Unit = { + usr.getUtxoSnapshotChunk(id) match { + case Some(snapChunk) => { + val msg = Message(UtxoSnapshotChunkSpec, Right(snapChunk), None) + networkControllerRef ! SendToNetwork(msg, SendToPeer(peer)) + } + case _ => log.warn(s"No snapshots available") } } @@ -752,6 +773,16 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, case Some(usr) => sendSnapshotsInfo(usr, remote) case None => log.warn(s"Asked for snapshot when UTXO set is not supported, remote: $remote") } + case (_: GetManifestSpec, id: ManifestId @unchecked, remote) => + usrOpt match { + case Some(usr) => sendManifest(id, usr, remote) + case None => log.warn(s"Asked for snapshot when UTXO set is not supported, remote: $remote") + } + case (_: GetUtxoSnapshotChunkSpec, id: SubtreeId @unchecked, remote) => + usrOpt match { + case Some(usr) => sendUtxoSnapshotChunk(id, usr, remote) + case None => log.warn(s"Asked for snapshot when UTXO set is not supported, remote: $remote") + } } def initialized(hr: ErgoHistory, diff --git a/src/main/scala/org/ergoplatform/nodeView/state/SnapshotsInfo.scala b/src/main/scala/org/ergoplatform/nodeView/state/SnapshotsInfo.scala index 63c336b9fe..249d836610 100644 --- a/src/main/scala/org/ergoplatform/nodeView/state/SnapshotsInfo.scala +++ b/src/main/scala/org/ergoplatform/nodeView/state/SnapshotsInfo.scala @@ -91,12 +91,20 @@ class SnapshotsDb(store: LDBKVStore) extends ScorexLogging { writeSnapshotsInfo(si) } - def readManifestBytes(id: ManifestId): Option[BatchAVLProverManifest[Digest32]] = { - store.get(id).flatMap(bs => serializer.manifestFromBytes(bs, Constants.ModifierIdLength).toOption) + def readManifest(id: ManifestId): Option[BatchAVLProverManifest[Digest32]] = { + readManifestBytes(id).flatMap(bs => serializer.manifestFromBytes(bs, Constants.ModifierIdLength).toOption) } - def readSubtreeBytes(id: SubtreeId): Option[BatchAVLProverSubtree[Digest32]] = { - store.get(id).flatMap(bs => serializer.subtreeFromBytes(bs, Constants.ModifierIdLength).toOption) + def readSubtree(id: SubtreeId): Option[BatchAVLProverSubtree[Digest32]] = { + readSubtreeBytes(id).flatMap(bs => serializer.subtreeFromBytes(bs, Constants.ModifierIdLength).toOption) + } + + def readManifestBytes(id: ManifestId): Option[Array[Byte]] = { + store.get(id) + } + + def readSubtreeBytes(id: SubtreeId): Option[Array[Byte]] = { + store.get(id) } } diff --git a/src/main/scala/org/ergoplatform/nodeView/state/UtxoState.scala b/src/main/scala/org/ergoplatform/nodeView/state/UtxoState.scala index b7135e2109..dfab8a6b77 100644 --- a/src/main/scala/org/ergoplatform/nodeView/state/UtxoState.scala +++ b/src/main/scala/org/ergoplatform/nodeView/state/UtxoState.scala @@ -253,9 +253,9 @@ object UtxoState extends ScorexLogging { case Some(snapshotsInfo) => val (h, manifestId) = snapshotsInfo.availableManifests.maxBy(_._1) log.info(s"Reading snapshot from height $h") - val manifest = snapshotDb.readManifestBytes(manifestId).get + val manifest = snapshotDb.readManifest(manifestId).get val subtreeIds = manifest.subtreesIds - val subtrees = subtreeIds.map(sid => snapshotDb.readSubtreeBytes(sid).get) + val subtrees = subtreeIds.map(sid => snapshotDb.readSubtree(sid).get) val serializer = new BatchAVLProverSerializer[Digest32, HF]()(ErgoAlgos.hash) val prover = serializer.combine(manifest -> subtrees, Algos.hash.DigestSize, None).get diff --git a/src/main/scala/org/ergoplatform/nodeView/state/UtxoStateReader.scala b/src/main/scala/org/ergoplatform/nodeView/state/UtxoStateReader.scala index 9bc8b4f881..898e5a4f34 100644 --- a/src/main/scala/org/ergoplatform/nodeView/state/UtxoStateReader.scala +++ b/src/main/scala/org/ergoplatform/nodeView/state/UtxoStateReader.scala @@ -5,6 +5,7 @@ import org.ergoplatform.modifiers.ErgoFullBlock import org.ergoplatform.modifiers.history.ADProofs import org.ergoplatform.modifiers.mempool.ErgoTransaction import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader +import org.ergoplatform.nodeView.state.UtxoState.{ManifestId, SubtreeId} import org.ergoplatform.settings.Algos import org.ergoplatform.settings.Algos.HF import org.ergoplatform.wallet.boxes.ErgoBoxSerializer @@ -25,7 +26,7 @@ trait UtxoStateReader extends ErgoStateReader with TransactionValidation { private lazy val np = NodeParameters(keySize = 32, valueSize = None, labelSize = 32) protected lazy val storage = new VersionedLDBAVLStorage(store, np) - protected val persistentProver: PersistentBatchAVLProver[Digest32, HF] + val persistentProver: PersistentBatchAVLProver[Digest32, HF] /** * Validate transaction against provided state context, if specified, @@ -149,4 +150,14 @@ trait UtxoStateReader extends ErgoStateReader with TransactionValidation { val snapshotsDb = SnapshotsDb.create(constants.settings) //todo: move out (to constants?) snapshotsDb.readSnapshotsInfo } + + def getManifest(id: ManifestId): Option[Array[Byte]] = { + val snapshotsDb = SnapshotsDb.create(constants.settings) //todo: move out (to constants?) + snapshotsDb.readManifestBytes(id) + } + + def getUtxoSnapshotChunk(id: SubtreeId): Option[Array[Byte]] = { + val snapshotsDb = SnapshotsDb.create(constants.settings) //todo: move out (to constants?) + snapshotsDb.readSubtreeBytes(id) + } } diff --git a/src/main/scala/scorex/core/app/Application.scala b/src/main/scala/scorex/core/app/Application.scala index 8ecbdba9a3..c9fa94f634 100644 --- a/src/main/scala/scorex/core/app/Application.scala +++ b/src/main/scala/scorex/core/app/Application.scala @@ -52,7 +52,9 @@ trait Application extends ScorexLogging { invSpec, requestModifierSpec, modifiersSpec, - GetSnapshotsInfoSpec + GetSnapshotsInfoSpec, + new GetManifestSpec, + new GetUtxoSnapshotChunkSpec ) } diff --git a/src/main/scala/scorex/core/network/message/BasicMessagesRepo.scala b/src/main/scala/scorex/core/network/message/BasicMessagesRepo.scala index 3e669a98a4..23712b6828 100644 --- a/src/main/scala/scorex/core/network/message/BasicMessagesRepo.scala +++ b/src/main/scala/scorex/core/network/message/BasicMessagesRepo.scala @@ -2,6 +2,7 @@ package scorex.core.network.message import org.ergoplatform.nodeView.state.SnapshotsInfo +import org.ergoplatform.nodeView.state.UtxoState.{ManifestId, SubtreeId} import org.ergoplatform.wallet.Constants import scorex.core.consensus.SyncInfo import scorex.core.network._ @@ -325,3 +326,90 @@ object SnapshotsInfoSpec extends MessageSpecV1[SnapshotsInfo] { }.toMap) } } + +/** + * The `GetManifest` sends manifest (BatchAVLProverManifest) identifier + */ +class GetManifestSpec extends MessageSpecV1[ManifestId] { + private val SizeLimit = 100 + + override val messageCode: MessageCode = 78: Byte + override val messageName: String = "GetManifest" + + override def serialize(id: ManifestId, w: Writer): Unit = { + w.putBytes(id) + } + + override def parse(r: Reader): ManifestId = { + require(r.remaining < SizeLimit, "Too big GetManifest message") + Digest32 @@ r.getBytes(Constants.ModifierIdLength) + } +} + +/** + * The `Manifest` message is a reply to a `GetManifest` message. + */ +object ManifestSpec extends MessageSpecV1[Array[Byte]] { + private val SizeLimit = 1000 + + override val messageCode: MessageCode = 79: Byte + + override val messageName: String = "Manifest" + + override def serialize(manifestBytes: Array[Byte], w: Writer): Unit = { + w.putUInt(manifestBytes.size) + w.putBytes(manifestBytes) + } + + override def parse(r: Reader): Array[Byte] = { + require(r.remaining <= SizeLimit, s"Too big Manifest message.") + + val length = r.getUInt().toIntExact + r.getBytes(length) + } +} + +/** + * The `GetManifest` sends send utxo subtree (BatchAVLProverSubtree) identifier + */ +class GetUtxoSnapshotChunkSpec() extends MessageSpecV1[SubtreeId] { + private val SizeLimit = 100 + + override val messageCode: MessageCode = 80: Byte + + override val messageName: String = "GetUtxoSnapshotChunk" + + override def serialize(id: SubtreeId, w: Writer): Unit = { + w.putBytes(id) + } + + override def parse(r: Reader): SubtreeId = { + require(r.remaining < SizeLimit, "Too big GetManifest message") + Digest32 @@ r.getBytes(Constants.ModifierIdLength) + } +} + +/** + * The `Manifest` message is a reply to a `GetManifest` message. + */ +object UtxoSnapshotChunkSpec extends MessageSpecV1[Array[Byte]] { + private val SizeLimit = 1000 + + override val messageCode: MessageCode = 81: Byte + + override val messageName: String = "UtxoSnapshotChunk" + + override def serialize(subtree: Array[Byte], w: Writer): Unit = { + w.putUInt(subtree.size) + w.putBytes(subtree) + } + + override def parse(r: Reader): Array[Byte] = { + require(r.remaining <= SizeLimit, s"Too big UtxoSnapshotChunk message.") + + val length = r.getUInt().toIntExact + r.getBytes(length) + } +} + + diff --git a/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala b/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala index 3a27bf461b..fa2fc43201 100644 --- a/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala +++ b/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala @@ -13,12 +13,15 @@ import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.{GetNodeV import scorex.core.consensus.SyncInfo import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, SendToNetwork} import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages._ +import org.ergoplatform.nodeView.state.UtxoState.ManifestId import org.ergoplatform.nodeView.state.{ErgoState, SnapshotsDb, SnapshotsInfo, UtxoStateReader} import org.ergoplatform.settings.Algos +import org.ergoplatform.settings.Algos.HF import scorex.core.network._ import scorex.core.network.message._ import scorex.core.network.peer.PenaltyType import scorex.core.serialization.{BytesSerializable, ScorexSerializer} +import scorex.crypto.authds.avltree.batch.serialization.BatchAVLProverSerializer import scorex.crypto.hash.Digest32 import scorex.testkit.generators.{SyntacticallyTargetedModifierProducer, TotallyValidModifierProducer} import scorex.testkit.utils.AkkaFixture @@ -261,4 +264,74 @@ trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec } } + property("NodeViewSynchronizer: GetManifest") { + withFixture { ctx => + import ctx._ + + val s = stateGen.sample.get + + s match { + case usr: UtxoStateReader => { + // To initialize utxoStateReaderOpt in ErgoNodeView Synchronizer + node ! ChangedState(s) + + // Generate some snapshot + val height = 1 + usr.applyModifier(mod, Some(height)) + usr.persistentProver.generateProofAndUpdateStorage() + implicit val hf: HF = Algos.hash + val serializer = new BatchAVLProverSerializer[Digest32, HF] + val (manifest, subtrees) = serializer.slice(usr.persistentProver.avlProver, subtreeDepth = 12) + + val db = SnapshotsDb.create(s.constants.settings) + db.writeSnapshot(height, manifest, subtrees) + + // Then send message to request it + node ! Message[ManifestId](new GetManifestSpec, Left(manifest.id), Option(peer)) + ncProbe.fishForMessage(5 seconds) { + case stn: SendToNetwork if stn.message.spec.isInstanceOf[ManifestSpec.type] => true + case _: Any => false + } + } + case _ => + log.info("Snapshots not supported by digest-state") + } + } + } + + + property("NodeViewSynchronizer: GetSnapshotChunk") { + withFixture { ctx => + import ctx._ + + val s = stateGen.sample.get + + s match { + case usr: UtxoStateReader => { + // To initialize utxoStateReaderOpt in ErgoNodeView Synchronizer + node ! ChangedState(s) + + // Generate some snapshot + val height = 1 + usr.applyModifier(mod, Some(height)) + usr.persistentProver.generateProofAndUpdateStorage() + implicit val hf: HF = Algos.hash + val serializer = new BatchAVLProverSerializer[Digest32, HF] + val (manifest, subtrees) = serializer.slice(usr.persistentProver.avlProver, subtreeDepth = 12) + + val db = SnapshotsDb.create(s.constants.settings) + db.writeSnapshot(height, manifest, subtrees) + + // Then send message to request it + node ! Message[ManifestId](new GetUtxoSnapshotChunkSpec, Left(subtrees.last.id), Option(peer)) + ncProbe.fishForMessage(5 seconds) { + case stn: SendToNetwork if stn.message.spec.isInstanceOf[UtxoSnapshotChunkSpec.type] => true + case _: Any => false + } + } + case _ => + log.info("Snapshots not supported by digest-state") + } + } + } }