Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polishing bootstrapping with UTXO set snapshot code towards release #1999

Merged
merged 25 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import scorex.util.serialization.{Reader, Writer}
class ManifestSerializer(manifestDepth: Byte) extends ErgoSerializer[BatchAVLProverManifest[DigestType]] {
private val nodeSerializer = VersionedLDBAVLStorage.noStoreSerializer

override def serialize(manifest: BatchAVLProverManifest[DigestType], w: Writer): Unit = {
val rootNodeHeight = manifest.rootHeight.toByte
/**
* Serialize manifest provided as top subtree and height separately. Used in tests.
*/
def serialize(rootNode: ProverNodes[DigestType], rootNodeHeight: Byte, w: Writer): Unit = {
w.put(rootNodeHeight)
w.put(manifestDepth)

Expand All @@ -24,10 +26,15 @@ class ManifestSerializer(manifestDepth: Byte) extends ErgoSerializer[BatchAVLPro
case i: InternalProverNode[DigestType] if level < manifestDepth =>
loop(i.left, level + 1)
loop(i.right, level + 1)
case _: InternalProverNode[DigestType] =>
}
}

loop(manifest.root, level = 1)
loop(rootNode, level = 1)
}

override def serialize(manifest: BatchAVLProverManifest[DigestType], w: Writer): Unit = {
serialize(manifest.root, manifest.rootHeight.toByte, w)
}

override def parse(r: Reader): BatchAVLProverManifest[DigestType] = {
Expand Down
3 changes: 3 additions & 0 deletions avldb/src/main/scala/scorex/db/LDBVersionedStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,10 @@ class LDBVersionedStore(protected val dir: File, val initialKeepVersions: Int)
def processSnapshot[T](logic: SnapshotReadInterface => T): Try[T] = {
val ro = new ReadOptions()
try {
lock.writeLock().lock()
ro.snapshot(db.getSnapshot)
lock.writeLock().unlock()

object readInterface extends SnapshotReadInterface {
def get(key: Array[Byte]): Array[Byte] = db.get(key, ro)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ import org.scalatest.Assertion
import org.scalatest.matchers.should.Matchers
import org.scalatest.propspec.AnyPropSpec
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.topNodeHashKey
import scorex.core.serialization.{ManifestSerializer, SubtreeSerializer}
import scorex.crypto.authds.avltree.batch.helpers.TestHelper
import scorex.crypto.authds.{ADDigest, ADKey, ADValue, SerializedAdProof}
import scorex.util.encode.Base16
import scorex.crypto.hash.{Blake2b256, Digest32}
import scorex.db.{LDBFactory, LDBVersionedStore}
import scorex.util.ByteArrayBuilder
import scorex.util.serialization.VLQByteBufferWriter
import scorex.utils.{Random => RandomBytes}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Success, Try}

class VersionedLDBAVLStorageSpecification extends AnyPropSpec
with ScalaCheckPropertyChecks
with Matchers
with TestHelper {
class VersionedLDBAVLStorageSpecification
extends AnyPropSpec
with ScalaCheckPropertyChecks
with Matchers
with TestHelper {

override protected val KL = 32
override protected val VL = 8
Expand Down Expand Up @@ -341,14 +344,30 @@ class VersionedLDBAVLStorageSpecification extends AnyPropSpec
}

property("dumping snapshot") {
val manifestDepth: Byte = 6
val manifestSerializer = new ManifestSerializer(manifestDepth)
val prover = createPersistentProver()
blockchainWorkflowTest(prover)

val storage = prover.storage.asInstanceOf[VersionedLDBAVLStorage]
val store = LDBFactory.createKvDb("/tmp/aa")
val store = LDBFactory.createKvDb(getRandomTempDir.getAbsolutePath)

storage.dumpSnapshot(store, 4, prover.digest.dropRight(1))
store.get(topNodeHashKey).sameElements(prover.digest.dropRight(1)) shouldBe true
val rootNodeLabel = storage.dumpSnapshot(store, manifestDepth, prover.digest.dropRight(1)).get
rootNodeLabel.sameElements(prover.digest.dropRight(1)) shouldBe true
val manifestBytes = store.get(rootNodeLabel).get
val manifest = manifestSerializer.parseBytesTry(manifestBytes).get

val writer = new VLQByteBufferWriter(new ByteArrayBuilder())
manifestSerializer.serialize(prover.prover().topNode, prover.prover().rootNodeHeight.toByte, writer)
val altManifestBytes = writer.result().toBytes

manifestBytes.sameElements(altManifestBytes) shouldBe true

val subtreeIds = manifest.subtreesIds
subtreeIds.foreach { sid =>
val chunkBytes = store.get(sid).get
SubtreeSerializer.parseBytesTry(chunkBytes).get.id.sameElements(sid) shouldBe true
}
}

}
4 changes: 2 additions & 2 deletions papers/utxo.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ UTXO set is authenticated via AVL+ tree. Design principles for tree construction
[https://eprint.iacr.org/2016/994.pdf](https://eprint.iacr.org/2016/994.pdf), the implementation of the
tree is available in [the Scrypto framework](https://github.com/input-output-hk/scrypto).

Time is broken into epochs, 1 epoch = 51,200 blocks (~72 days).
Time is broken into epochs, 1 epoch = 52,224 blocks (~72.5 days).
Snapshot is taken after last block of an epoch, namely, after processing a block with
height *h % 51200 == 51199*.
height *h % 51200 == 52,224*.

Chunk format
------------
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ergo {

utxo {
# Download and apply UTXO set snapshot and full-blocks after that
# Done in 5.0.12, but better to bootstrap since version 5.0.13
utxoBootstrap = false

# how many utxo set snapshots to store, 0 means that they are not stored at all
Expand Down
12 changes: 12 additions & 0 deletions src/main/resources/mainnet.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ ergo {

# maximum cost of transaction for it to be propagated
maxTransactionCost = 4900000

utxo {
# Download and apply UTXO set snapshot and full-blocks after that
utxoBootstrap = false

# how many utxo set snapshots to store, 0 means that they are not stored at all
storingUtxoSnapshots = 2

# how many utxo set snapshots for a height with same id we need to find in p2p network
# in order to start downloading it
p2pUtxoSnapshots = 2
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
*/
private val availableManifests = mutable.Map[ModifierId, (Height, Seq[ConnectedPeer])]()

/**
* How many peers should have a utxo set snapshot to start downloading it
*/
private lazy val MinSnapshots = settings.nodeSettings.utxoSettings.p2pUtxoSnapshots

/**
* To be called when the node is synced and new block arrives, to reset transactions cost counter
*/
Expand Down Expand Up @@ -566,7 +571,12 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
// ask all the peers supporting UTXO set snapshots for snapshots they have
val msg = Message(GetSnapshotsInfoSpec, Right(()), None)
val peers = UtxoSetNetworkingFilter.filter(syncTracker.knownPeers()).toSeq
networkControllerRef ! SendToNetwork(msg, SendToPeers(peers))
val peersCount = peers.size
if (peersCount >= MinSnapshots) {
networkControllerRef ! SendToNetwork(msg, SendToPeers(peers))
} else {
log.warn(s"Less UTXO-snapshot supporting peers found than required mininum ($peersCount < $MinSnapshots)")
}
}

private def requestManifest(manifestId: ManifestId, peer: ConnectedPeer): Unit = {
Expand All @@ -578,8 +588,10 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
}

private def requestUtxoSetChunk(subtreeId: SubtreeId, peer: ConnectedPeer): Unit = {
// as we download multiple chunks in parallel and they can be quite large, timeout increased
val chunkDeliveryTimeout = 4 * deliveryTimeout
deliveryTracker.setRequested(UtxoSnapshotChunkTypeId.value, ModifierId @@ Algos.encode(subtreeId), peer) { deliveryCheck =>
context.system.scheduler.scheduleOnce(deliveryTimeout, self, deliveryCheck)
context.system.scheduler.scheduleOnce(chunkDeliveryTimeout, self, deliveryCheck)
}
val msg = Message(GetUtxoSnapshotChunkSpec, Right(subtreeId), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(peer))
Expand All @@ -593,7 +605,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
maxModifiers = deliveryTracker.modifiersToDownload,
minModifiersPerBucket,
maxModifiersPerBucket
)(getPeersForDownloadingBlocks) { howManyPerType =>
)(getPeersForDownloadingBlocks) { _ =>
// leave block section ids only not touched before
modifiersToFetch.flatMap { case (tid, mids) =>
val updMids = mids.filter { mid =>
Expand Down Expand Up @@ -888,14 +900,14 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
val encodedManifestId = ModifierId @@ Algos.encode(manifestId)
val ownId = hr.bestHeaderAtHeight(height).map(_.stateRoot).map(stateDigest => splitDigest(stateDigest)._1)
if (ownId.getOrElse(Array.emptyByteArray).sameElements(manifestId)) {
log.debug(s"Got manifest $encodedManifestId for height $height from $remote")
log.debug(s"Discovered manifest $encodedManifestId for height $height from $remote")
// add manifest to available manifests dictionary if it is not written there yet
val existingOffers = availableManifests.getOrElse(encodedManifestId, (height -> Seq.empty))
if (!existingOffers._2.contains(remote)) {
log.info(s"Found new manifest ${Algos.encode(manifestId)} for height $height at $remote")
availableManifests.put(encodedManifestId, height -> (existingOffers._2 :+ remote))
} else {
log.warn(s"Got manifest $manifestId twice from $remote")
log.warn(s"Double manifest declaration for $manifestId from $remote")
}
} else {
log.error(s"Got wrong manifest id $encodedManifestId from $remote")
Expand Down Expand Up @@ -954,14 +966,15 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
case Success(subtree) =>
val chunkId = ModifierId @@ Algos.encode(subtree.id)
deliveryTracker.getRequestedInfo(UtxoSnapshotChunkTypeId.value, chunkId) match {
case Some(ri) if ri.peer == remote =>
case Some(_) =>
log.debug(s"Got utxo snapshot chunk, id: $chunkId, size: ${serializedChunk.length}")
deliveryTracker.setUnknown(chunkId, UtxoSnapshotChunkTypeId.value)
hr.registerDownloadedChunk(subtree.id, serializedChunk)

hr.utxoSetSnapshotDownloadPlan() match {
case Some(downloadPlan) =>
if (downloadPlan.fullyDownloaded) {
log.info("All the UTXO set snapshot chunks downloaded")
// if all the chunks of snapshot are downloaded, initialize UTXO set state with it
if (!hr.isUtxoSnapshotApplied) {
val h = downloadPlan.snapshotHeight
Expand All @@ -983,8 +996,8 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
log.warn(s"No download plan found when processing UTXO set snapshot chunk $chunkId")
}

case _ =>
log.info(s"Penalizing spamming peer $remote sent non-asked UTXO set snapshot $chunkId")
case None =>
log.info(s"Penalizing spamming peer $remote sent non-asked UTXO set snapshot chunk $chunkId")
penalizeSpammingPeer(remote)
}

Expand Down Expand Up @@ -1260,7 +1273,6 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
// check if we have enough UTXO set snapshots for some height
// if so, request manifest from a random peer announced it
private def checkUtxoSetManifests(historyReader: ErgoHistory): Unit = {
val MinSnapshots = settings.nodeSettings.utxoSettings.p2pUtxoSnapshots

if (settings.nodeSettings.utxoSettings.utxoBootstrap &&
historyReader.fullBlockHeight == 0 &&
Expand Down Expand Up @@ -1397,8 +1409,12 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
logger.debug("Chain is good")

case ChainIsStuck(error) =>
log.warn(s"Chain is stuck! $error\nDelivery tracker State:\n$deliveryTracker\nSync tracker state:\n$syncTracker")
deliveryTracker.reset()
if (historyReader.fullBlockHeight > 0) {
log.warn(s"Chain is stuck! $error\nDelivery tracker State:\n$deliveryTracker\nSync tracker state:\n$syncTracker")
deliveryTracker.reset()
} else {
log.debug("Got ChainIsStuck signal when no full-blocks applied yet")
}
}

/** handlers of messages coming from peers */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,17 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
case InitStateFromSnapshot(height, blockId) =>
if (!history().isUtxoSnapshotApplied) {
val store = minimalState().store
history().createPersistentProver(store, blockId) match {
//todo: pass metadata?
history().createPersistentProver(store, history(), height, blockId) match {
case Success(pp) =>
log.info(s"Restoring state from prover with digest ${pp.digest} reconstructed for height $height")
history().utxoSnapshotApplied(height)
val newState = new UtxoState(pp, version = VersionTag @@@ blockId, store, settings)
// todo: apply 10 headers before utxo set snapshot?
updateNodeView(updatedState = Some(newState.asInstanceOf[State]))
case Failure(t) =>
log.error("UTXO set snapshot application failed: ", t)
}
} else {
log.warn("InitStateFromSnapshot arrived when state already initialized")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ trait ErgoHistoryReader
*/
def estimatedTip(): Option[Height] = {
Try { //error may happen if history not initialized
if(isHeadersChainSynced) {
if (isHeadersChainSynced) {
Some(headersHeight)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.ergoplatform.settings.ErgoSettings
* A class that keeps and calculates minimal height for full blocks starting from which we need to download these full
* blocks from the network and keep them in our history.
*/
trait FullBlockPruningProcessor {
trait FullBlockPruningProcessor extends MinimalFullBlockHeightFunctions {

protected def settings: ErgoSettings

Expand All @@ -18,7 +18,6 @@ trait FullBlockPruningProcessor {
private def VotingEpochLength = chainSettings.voting.votingLength

@volatile private[history] var isHeadersChainSyncedVar: Boolean = false
@volatile private[history] var minimalFullBlockHeightVar: Int = ErgoHistory.GenesisHeight

private def extensionWithParametersHeight(height: Int): Int = {
require(height >= VotingEpochLength)
Expand All @@ -33,7 +32,7 @@ trait FullBlockPruningProcessor {

/** Start height to download full blocks from
*/
def minimalFullBlockHeight: Int = minimalFullBlockHeightVar
def minimalFullBlockHeight: Int = readMinimalFullBlockHeight()

/** Check if headers chain is synchronized with the network and modifier is not too old
*/
Expand All @@ -47,15 +46,16 @@ trait FullBlockPruningProcessor {
* @return minimal height to process best full block
*/
def updateBestFullBlock(header: Header): Int = {
minimalFullBlockHeightVar = if (!nodeConfig.isFullBlocksPruned) {
ErgoHistory.GenesisHeight // keep all blocks in history
} else if (!isHeadersChainSynced && !nodeConfig.stateType.requireProofs) {
// just synced with the headers chain - determine first full block to apply
//TODO start with the height of UTXO snapshot applied. For now we start from genesis until this is implemented
ErgoHistory.GenesisHeight
val minimalFullBlockHeight = if (nodeConfig.blocksToKeep < 0) {
if (nodeConfig.utxoSettings.utxoBootstrap) {
// we have constant min full block height corresponding to first block after utxo set snapshot
readMinimalFullBlockHeight()
} else {
ErgoHistory.GenesisHeight // keep all blocks in history as no pruning set
}
} else {
// Start from config.blocksToKeep blocks back
val h = Math.max(minimalFullBlockHeight, header.height - nodeConfig.blocksToKeep + 1)
val h = Math.max(readMinimalFullBlockHeight(), header.height - nodeConfig.blocksToKeep + 1)
// ... but not later than the beginning of a voting epoch
if (h > VotingEpochLength) {
Math.min(h, extensionWithParametersHeight(h))
Expand All @@ -64,7 +64,8 @@ trait FullBlockPruningProcessor {
}
}
if (!isHeadersChainSynced) isHeadersChainSyncedVar = true
minimalFullBlockHeightVar
writeMinimalFullBlockHeight(minimalFullBlockHeight)
minimalFullBlockHeight
}

}
Loading