From f5d420641ab8ec577a685e446d7ae7b45e25e125 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Sun, 26 Nov 2023 14:38:00 +0100 Subject: [PATCH 01/12] Added explicit state to indexer, added block caching, optimized IndexedErgoTransaction --- src/main/resources/application.conf | 16 + src/main/scala/org/ergoplatform/ErgoApp.scala | 14 +- .../org/ergoplatform/GlobalConstants.scala | 2 + .../org/ergoplatform/http/api/ApiCodecs.scala | 2 +- .../http/api/BlockchainApiRoute.scala | 5 +- .../nodeView/history/extra/ExtraIndexer.scala | 343 ++++++++++-------- .../extra/IndexedErgoTransaction.scala | 49 ++- .../nodeView/history/extra/IndexedToken.scala | 16 +- .../nodeView/history/extra/IndexerState.scala | 50 +++ .../nodeView/history/extra/Segment.scala | 22 +- .../history/storage/HistoryStorage.scala | 1 - 11 files changed, 320 insertions(+), 200 deletions(-) create mode 100644 src/main/scala/org/ergoplatform/nodeView/history/extra/IndexerState.scala diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index d578aeb9ee..2e262fcc39 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -606,4 +606,20 @@ api-dispatcher { # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 4 +} + +indexer-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 1 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 16 + } } \ No newline at end of file diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index 88021c75b2..fdffb58b8d 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -108,13 +108,13 @@ class ErgoApp(args: Args) extends ScorexLogging { None } - if(ergoSettings.nodeSettings.extraIndex) - require( - ergoSettings.nodeSettings.stateType.holdsUtxoSet && !ergoSettings.nodeSettings.isFullBlocksPruned, - "Node must store full UTXO set and all blocks to run extra indexer." - ) // Create an instance of ExtraIndexer actor (will start if "extraIndex = true" in config) - private val indexer: ActorRef = ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings) + private val indexerOpt: Option[ActorRef] = + if (ergoSettings.nodeSettings.extraIndex) { + Some(ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings)) + } else { + None + } private val syncTracker = ErgoSyncTracker(scorexSettings.network) @@ -184,7 +184,7 @@ class ErgoApp(args: Args) extends ScorexLogging { private val apiRoutes: Seq[ApiRoute] = Seq( EmissionApiRoute(ergoSettings), ErgoUtilsApiRoute(ergoSettings), - BlockchainApiRoute(readersHolderRef, ergoSettings, indexer), + BlockchainApiRoute(readersHolderRef, ergoSettings, indexerOpt), ErgoPeersApiRoute( peerManagerRef, networkControllerRef, diff --git a/src/main/scala/org/ergoplatform/GlobalConstants.scala b/src/main/scala/org/ergoplatform/GlobalConstants.scala index 0025ba4fc9..5d396394ba 100644 --- a/src/main/scala/org/ergoplatform/GlobalConstants.scala +++ b/src/main/scala/org/ergoplatform/GlobalConstants.scala @@ -10,4 +10,6 @@ object GlobalConstants { * (to avoid clashing between blockchain processing and API actors) */ val ApiDispatcher = "api-dispatcher" + + val IndexerDispatcher = "indexer-dispatcher" } diff --git a/src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala b/src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala index 424c279c99..fdd7014a30 100644 --- a/src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala +++ b/src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala @@ -535,7 +535,7 @@ trait ApiCodecs extends JsonCodecs { "inputs" -> iEt.inputs.asJson, "dataInputs" -> iEt.dataInputs.asJson, "outputs" -> iEt.outputs.asJson, - "size" -> iEt.txSize.asJson + "size" -> iEt.size.asJson ) } diff --git a/src/main/scala/org/ergoplatform/http/api/BlockchainApiRoute.scala b/src/main/scala/org/ergoplatform/http/api/BlockchainApiRoute.scala index e5cb090761..0897904fc2 100644 --- a/src/main/scala/org/ergoplatform/http/api/BlockchainApiRoute.scala +++ b/src/main/scala/org/ergoplatform/http/api/BlockchainApiRoute.scala @@ -28,13 +28,14 @@ import scala.concurrent.duration.{Duration, SECONDS} import scala.concurrent.{Await, Future} import scala.util.Success -case class BlockchainApiRoute(readersHolder: ActorRef, ergoSettings: ErgoSettings, indexer: ActorRef) +case class BlockchainApiRoute(readersHolder: ActorRef, ergoSettings: ErgoSettings, indexerOpt: Option[ActorRef]) (implicit val context: ActorRefFactory) extends ErgoBaseApiRoute with ApiCodecs { val settings: RESTApiSettings = ergoSettings.scorexSettings.restApi - private implicit val segmentTreshold: Int = + private implicit val segmentTreshold: Int = indexerOpt.map { indexer => Await.result[Int]((indexer ? GetSegmentTreshold).asInstanceOf[Future[Int]], Duration(3, SECONDS)) + }.getOrElse(0) private val paging: Directive[(Int, Int)] = parameters("offset".as[Int] ? 0, "limit".as[Int] ? 5) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index f0b0e4e34c..8e5c1c6084 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -1,15 +1,15 @@ package org.ergoplatform.nodeView.history.extra -import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash} import org.ergoplatform.ErgoBox.TokenId -import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, Pay2SAddress} +import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, GlobalConstants, Pay2SAddress} import org.ergoplatform.modifiers.history.BlockTransactions import org.ergoplatform.modifiers.history.header.Header import org.ergoplatform.modifiers.mempool.ErgoTransaction import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.{FullBlockApplied, Rollback} -import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{GlobalBoxIndexKey, GlobalTxIndexKey, IndexedHeightKey, getIndex} +import org.ergoplatform.nodeView.history.extra.ExtraIndexer._ import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoHistoryReader} -import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.{GetSegmentTreshold, StartExtraIndexer} +import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages._ import org.ergoplatform.nodeView.history.extra.IndexedErgoAddressSerializer.hashErgoTree import org.ergoplatform.nodeView.history.extra.IndexedTokenSerializer.uniqueId import org.ergoplatform.nodeView.history.storage.HistoryStorage @@ -22,32 +22,18 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import spire.syntax.all.cfor +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable +import scala.collection.concurrent +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.jdk.CollectionConverters._ /** * Base trait for extra indexer actor and its test. */ -trait ExtraIndexerBase extends ScorexLogging { +trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { - /** - * Indexed block height - */ - protected var indexedHeight: Int = 0 - - /** - * Indexed transaction count - */ - protected var globalTxIndex: Long = 0L - - /** - * Indexed box count - */ - protected var globalBoxIndex: Long = 0L - - /** - * Last block height when buffer contents were saved to database - */ - protected var lastWroteToDB: Int = 0 + private implicit val ec: ExecutionContextExecutor = context.dispatcher /** * Max buffer size (determined by config) @@ -64,16 +50,6 @@ trait ExtraIndexerBase extends ScorexLogging { */ protected implicit val addressEncoder: ErgoAddressEncoder - /** - * Flag to signal when indexer has reached current block height - */ - protected var caughtUp: Boolean = false - - /** - * Flag to signal a rollback - */ - protected var rollback: Boolean = false - /** * Database handle */ @@ -91,10 +67,39 @@ trait ExtraIndexerBase extends ScorexLogging { protected val segments: mutable.HashMap[ModifierId,Segment[_]] = mutable.HashMap.empty[ModifierId,Segment[_]] /** - * Input tokens in a transaction + * Input tokens in a transaction, cleared after every transaction */ private val inputTokens: ArrayBuffer[(TokenId, Long)] = ArrayBuffer.empty[(TokenId, Long)] + /** + * Holds upcoming blocks to be indexed, and when empty, it is filled back from multiple threads + */ + private val blockCache: concurrent.Map[Int,BlockTransactions] = new ConcurrentHashMap[Int,BlockTransactions]().asScala + private var readingUpTo: Int = 0 + + /** + * Get transactions for specified height, preferably from cache, or from database. + * If indexer is getting close to emptying cache, asynchronously reads 1000 blocks into it + * @param height - blockheight to get transations from + * @return transactions at height + */ + private def getBlockTransactionsAt(height: Int): BlockTransactions = { + val txs = blockCache.remove(height).getOrElse(history.bestBlockTransactionsAt(height).get) + if(height % 1000 == 0) blockCache.keySet.filter(_ < height).map(blockCache.remove) + if (readingUpTo - height < 300 && chainHeight - height > 1000) { + readingUpTo = math.min(height + 1001, chainHeight) + val blockNums = height + 1 to readingUpTo by 50 + blockNums.zip(blockNums.tail).map { range => // ranges of 50 blocks for each thread to read + Future { + (range._1 until range._2).foreach { blockNum => + blockCache.put(blockNum, history.bestBlockTransactionsAt(blockNum).get) + } + } + } + } + txs + } + /** * Spend an IndexedErgoBox from buffer or database. Also record tokens for later use in balance tracking logic. * @@ -125,24 +130,24 @@ trait ExtraIndexerBase extends ScorexLogging { * @param id - hash of the (ergotree) address * @param spendOrReceive - IndexedErgoBox to receive (Right) or spend (Left) */ - private def findAndUpdateTree(id: ModifierId, spendOrReceive: Either[IndexedErgoBox, IndexedErgoBox]): Unit = { + private def findAndUpdateTree(id: ModifierId, spendOrReceive: Either[IndexedErgoBox, IndexedErgoBox])(implicit state: IndexerState): Unit = { trees.get(id).map { tree => spendOrReceive match { - case Left(iEb) => tree.addTx(globalTxIndex).spendBox(iEb, Some(history)) // spend box - case Right(iEb) => tree.addTx(globalTxIndex).addBox(iEb) // receive box + case Left(iEb) => tree.addTx(state.globalTxIndex).spendBox(iEb, Some(history)) // spend box + case Right(iEb) => tree.addTx(state.globalTxIndex).addBox(iEb) // receive box } return } history.typedExtraIndexById[IndexedErgoAddress](id) match { // address not found in last saveLimit modifiers case Some(x) => spendOrReceive match { - case Left(iEb) => trees.put(id, x.addTx(globalTxIndex).spendBox(iEb, Some(history))) // spend box - case Right(iEb) => trees.put(id, x.addTx(globalTxIndex).addBox(iEb)) // receive box + case Left(iEb) => trees.put(id, x.addTx(state.globalTxIndex).spendBox(iEb, Some(history))) // spend box + case Right(iEb) => trees.put(id, x.addTx(state.globalTxIndex).addBox(iEb)) // receive box } case None => // address not found at all spendOrReceive match { case Left(iEb) => log.warn(s"Unknown address spent box ${bytesToId(iEb.box.id)}") // spend box should never happen by an unknown address - case Right(iEb) => trees.put(id, IndexedErgoAddress(id).initBalance.addTx(globalTxIndex).addBox(iEb)) // receive box, new address + case Right(iEb) => trees.put(id, IndexedErgoAddress(id).initBalance.addTx(state.globalTxIndex).addBox(iEb)) // receive box, new address } } } @@ -178,37 +183,34 @@ trait ExtraIndexerBase extends ScorexLogging { private def modCount: Int = general.length + boxes.size + trees.size + tokens.size /** - * Write buffered indexes to database and clear buffers. - */ - private def saveProgress(writeLog: Boolean = true): Unit = { + * Write buffered indexes to database and clear buffers. + */ + private def saveProgress(state: IndexerState, writeLog: Boolean = true): Unit = { val start: Long = System.currentTimeMillis // perform segmentation on big addresses and save their internal segment buffer trees.values.foreach { tree => - if(tree.buffer.nonEmpty) { - tree.buffer.values.foreach(seg => segments.put(seg.id, seg)) - tree.buffer.clear() - } - if(tree.txs.length > segmentTreshold || tree.boxes.length > segmentTreshold) - tree.splitToSegments.foreach(seg => segments.put(seg.id, seg)) + tree.buffer.values.foreach(seg => segments.put(seg.id, seg)) + tree.splitToSegments.foreach(seg => segments.put(seg.id, seg)) } // perform segmentation on big tokens and save their internal segment buffer tokens.values.foreach { token => - if(token.buffer.nonEmpty) { - token.buffer.values.foreach(seg => segments.put(seg.id, seg)) - token.buffer.clear() - } - if(token.boxes.length > segmentTreshold) - token.splitToSegments.foreach(seg => segments.put(seg.id, seg)) + token.buffer.values.foreach(seg => segments.put(seg.id, seg)) + token.splitToSegments.foreach(seg => segments.put(seg.id, seg)) } // insert modifiers and progress info to db - historyStorage.insertExtra(Array((IndexedHeightKey, ByteBuffer.allocate(4).putInt(indexedHeight).array), - (GlobalTxIndexKey, ByteBuffer.allocate(8).putLong(globalTxIndex).array), - (GlobalBoxIndexKey,ByteBuffer.allocate(8).putLong(globalBoxIndex).array)), - ((((general ++= boxes.values) ++= trees.values) ++= tokens.values) ++= segments.values).toArray) + historyStorage.insertExtra( + Array( + (IndexedHeightKey, ByteBuffer.allocate(4).putInt(state.indexedHeight).array), + (GlobalTxIndexKey, ByteBuffer.allocate(8).putLong(state.globalTxIndex).array), + (GlobalBoxIndexKey, ByteBuffer.allocate(8).putLong(state.globalBoxIndex).array), + (RollbackToKey, ByteBuffer.allocate(4).putInt(state.rollbackTo).array) + ), + ((((general ++= boxes.values) ++= trees.values) ++= tokens.values) ++= segments.values).toArray + ) if (writeLog) log.info(s"Processed ${trees.size} ErgoTrees with ${boxes.size} boxes and inserted them to database in ${System.currentTimeMillis - start}ms") @@ -219,29 +221,30 @@ trait ExtraIndexerBase extends ScorexLogging { trees.clear() tokens.clear() segments.clear() - - lastWroteToDB = indexedHeight } /** * Process a batch of BlockTransactions into memory and occasionally write them to database. * - * @param bt - BlockTransaction to process - * @param height - height of the block containing the transactions + * @param state - current indexer state + * @param headerOpt - header to index blocktransactions of (used after caught up with chain) */ - protected def index(bt: BlockTransactions, height: Int): Unit = { + protected def index(state: IndexerState, headerOpt: Option[Header] = None): IndexerState = { - if (rollback || // rollback in progress - (caughtUp && height <= indexedHeight)) // do not process older blocks again after caught up (due to actor message queue) - return + val bt = headerOpt.map { header => + history.typedModifierById[BlockTransactions](header.transactionsId).get + }.getOrElse(getBlockTransactionsAt(state.indexedHeight)) + val height = headerOpt.map(_.height).getOrElse(state.indexedHeight) var boxCount: Int = 0 + implicit var newState: IndexerState = state // record transactions and boxes cfor(0)(_ < bt.txs.length, _ + 1) { n => val tx: ErgoTransaction = bt.txs(n) val inputs: Array[Long] = Array.ofDim[Long](tx.inputs.length) + val outputs: Array[Long] = Array.ofDim[Long](tx.outputs.length) inputTokens.clear() @@ -262,92 +265,57 @@ trait ExtraIndexerBase extends ScorexLogging { //process transaction outputs cfor(0)(_ < tx.outputs.size, _ + 1) { i => - val iEb: IndexedErgoBox = new IndexedErgoBox(height, None, None, tx.outputs(i), globalBoxIndex) + val iEb: IndexedErgoBox = new IndexedErgoBox(height, None, None, tx.outputs(i), newState.globalBoxIndex) boxes.put(iEb.id, iEb) // box by id - general += NumericBoxIndex(globalBoxIndex, iEb.id) // box id by global box number + general += NumericBoxIndex(newState.globalBoxIndex, iEb.id) // box id by global box number + outputs(i) = iEb.globalIndex // box by address findAndUpdateTree(hashErgoTree(iEb.box.ergoTree), Right(boxes(iEb.id))) // check if box is creating new tokens, if yes record them cfor(0)(_ < iEb.box.additionalTokens.length, _ + 1) { j => - if (!inputTokens.exists(x => x._1 == iEb.box.additionalTokens(j)._1)) { + if (!inputTokens.exists(x => java.util.Arrays.equals(x._1.toArray, iEb.box.additionalTokens(j)._1.toArray))) { val token = IndexedToken.fromBox(iEb, j) tokens.get(token.tokenId) match { case Some(t) => // same new token created in multiple boxes -> add amounts - val newToken = IndexedToken(t.tokenId, t.boxId, t.amount + token.amount, t.name, t.description, t.decimals, t.boxes) - newToken.buffer ++= t.buffer - tokens.put(token.tokenId, newToken) + tokens.put(token.tokenId, t.addEmissionAmount(token.amount)) case None => tokens.put(token.tokenId, token) // new token } } findAndUpdateToken(iEb.box.additionalTokens(j)._1.toModifierId, Right(iEb)) } - globalBoxIndex += 1 + newState = newState.incrementBoxIndex boxCount += 1 } //process transaction - general += IndexedErgoTransaction(tx.id, height, globalTxIndex, inputs) - general += NumericTxIndex(globalTxIndex, tx.id) + general += IndexedErgoTransaction.fromTx(tx, n, height, newState.globalTxIndex, inputs, outputs) + general += NumericTxIndex(newState.globalTxIndex, tx.id) - globalTxIndex += 1 + newState = newState.incrementTxIndex } log.info(s"Buffered block $height / $chainHeight [txs: ${bt.txs.length}, boxes: $boxCount] (buffer: $modCount / $saveLimit)") - if (caughtUp) { - - indexedHeight = height // update height here after caught up with chain - - if (modCount >= saveLimit || // modifier limit reached to write to db - history.fullBlockHeight == history.headersHeight) // write to db every block after chain synced - saveProgress() - - } else if (modCount >= saveLimit) - saveProgress() // active syncing, write to db after modifier limit - - } - - /** - * Main indexer loop that tries to catch up with the already present blocks in database. - */ - protected def run(): Unit = { - - indexedHeight = getIndex(IndexedHeightKey, history).getInt - globalTxIndex = getIndex(GlobalTxIndexKey, history).getLong - globalBoxIndex = getIndex(GlobalBoxIndexKey, history).getLong - - log.info(s"Started extra indexer at height $indexedHeight") - - while (indexedHeight < chainHeight && !rollback) { - indexedHeight += 1 - index(history.bestBlockTransactionsAt(indexedHeight).get, indexedHeight) - } - - saveProgress(false) // flush any remaining data - - if (rollback) - log.info("Stopping indexer to perform rollback") - else { - caughtUp = true - log.info("Indexer caught up with chain") - } - + newState.updateCaughtUp(headerOpt.map(_.height).getOrElse(chainHeight)) } /** * Remove all indexes after a given height and revert address balances. * + * @param state - current state of indexer * @param height - starting height */ - protected def removeAfter(height: Int): Unit = { + protected def removeAfter(state: IndexerState, height: Int): IndexerState = { + + var newState: IndexerState = state - saveProgress(false) - log.info(s"Rolling back indexes from $indexedHeight to $height") + saveProgress(newState, writeLog = false) + log.info(s"Rolling back indexes from ${state.indexedHeight} to $height") val lastTxToKeep: ErgoTransaction = history.bestBlockTransactionsAt(height).get.txs.last val txTarget: Long = history.typedExtraIndexById[IndexedErgoTransaction](lastTxToKeep.id).get.globalIndex @@ -355,9 +323,9 @@ trait ExtraIndexerBase extends ScorexLogging { val toRemove: ArrayBuffer[ModifierId] = ArrayBuffer.empty[ModifierId] // remove all tx indexes - globalTxIndex -= 1 - while(globalTxIndex > txTarget) { - val tx: IndexedErgoTransaction = NumericTxIndex.getTxByNumber(history, globalTxIndex).get + newState = newState.decrementTxIndex + while(newState.globalTxIndex > txTarget) { + val tx: IndexedErgoTransaction = NumericTxIndex.getTxByNumber(history, newState.globalTxIndex).get tx.inputNums.map(NumericBoxIndex.getBoxByNumber(history, _).get).foreach { iEb => // undo all spendings iEb.spendingHeightOpt = None @@ -375,20 +343,21 @@ trait ExtraIndexerBase extends ScorexLogging { } } toRemove += tx.id // tx by id - toRemove += bytesToId(NumericTxIndex.indexToBytes(globalTxIndex)) // tx id by number - globalTxIndex -= 1 + toRemove += bytesToId(NumericTxIndex.indexToBytes(newState.globalTxIndex)) // tx id by number + newState = newState.decrementTxIndex } - globalTxIndex += 1 + newState = newState.incrementTxIndex // remove all box indexes, tokens and address balances - globalBoxIndex -= 1 - while(globalBoxIndex > boxTarget) { - val iEb: IndexedErgoBox = NumericBoxIndex.getBoxByNumber(history, globalBoxIndex).get + newState = newState.decrementBoxIndex + while(newState.globalBoxIndex > boxTarget) { + val iEb: IndexedErgoBox = NumericBoxIndex.getBoxByNumber(history, newState.globalBoxIndex).get cfor(0)(_ < iEb.box.additionalTokens.length, _ + 1) { i => history.typedExtraIndexById[IndexedToken](IndexedToken.fromBox(iEb, i).id).map { token => - if(token.boxId == iEb.id) // token created, delete + if(token.boxId == iEb.id) { // token created, delete toRemove += token.id - else // no token created, update + log.info(s"Removing token ${token.tokenId} created in box ${iEb.id} at height ${iEb.inclusionHeight}") + } else // no token created, update toRemove ++= token.rollback(txTarget, boxTarget, _history) } } @@ -397,21 +366,66 @@ trait ExtraIndexerBase extends ScorexLogging { toRemove ++= address.rollback(txTarget, boxTarget, _history) } toRemove += iEb.id // box by id - toRemove += bytesToId(NumericBoxIndex.indexToBytes(globalBoxIndex)) // box id by number - globalBoxIndex -= 1 + toRemove += bytesToId(NumericBoxIndex.indexToBytes(newState.globalBoxIndex)) // box id by number + newState = newState.decrementBoxIndex } - globalBoxIndex += 1 - - // Reset indexer flags - indexedHeight = height - caughtUp = false - rollback = false + newState = newState.incrementBoxIndex // Save changes - saveProgress(false) + newState = newState.copy(indexedHeight = height, rollbackTo = 0).updateCaughtUp(chainHeight) historyStorage.removeExtra(toRemove.toArray) + saveProgress(newState, writeLog = false) + + newState + } + + protected def loaded(state: IndexerState): Receive = { + + case Index() if !state.caughtUp && !state.rollbackInProgress => + val newState = index(state.incrementIndexedHeight) + if (modCount >= saveLimit) saveProgress(newState) + context.become(loaded(newState)) + self ! Index() + + case Index() if state.caughtUp => + if(modCount > 0) saveProgress(state) + blockCache.clear() + log.info("Indexer caught up with chain") + + // after the indexer caught up with the chain, stay up to date + case FullBlockApplied(header: Header) if state.caughtUp => + if (!state.rollbackInProgress) { + val newState: IndexerState = index(state.incrementIndexedHeight, Some(header)) + saveProgress(newState) + context.become(loaded(newState)) + } else + stash() + + case Rollback(branchPoint: ModifierId) => + if (state.rollbackInProgress) { + log.warn(s"Rollback already in progress") + stash() + } else { + val branchHeight: Int = history.heightOf(branchPoint).get + if (branchHeight < state.indexedHeight) { + context.become(loaded(state.copy(rollbackTo = branchHeight))) + self ! RemoveAfter(branchHeight) + } + } + + case RemoveAfter(branchHeight: Int) if state.rollbackInProgress => + blockCache.clear() + readingUpTo = 0 + val newState = removeAfter(state, branchHeight) + context.become(loaded(newState)) + log.info(s"Successfully rolled back indexes to $branchHeight") + unstashAll() + + case GetSegmentTreshold => + sender ! segmentTreshold + + case _ => - log.info(s"Successfully rolled back indexes to $height") } } @@ -424,7 +438,7 @@ trait ExtraIndexerBase extends ScorexLogging { */ class ExtraIndexer(cacheSettings: CacheSettings, ae: ErgoAddressEncoder) - extends Actor with ExtraIndexerBase { + extends ExtraIndexerBase { override val saveLimit: Int = cacheSettings.history.extraCacheSize * 20 @@ -438,28 +452,23 @@ class ExtraIndexer(cacheSettings: CacheSettings, context.system.eventStream.subscribe(self, classOf[StartExtraIndexer]) } - override def postStop(): Unit = - log.info(s"Stopped extra indexer at height ${if(lastWroteToDB > 0) lastWroteToDB else indexedHeight}") + override def postStop(): Unit = { + log.info(s"Stopped extra indexer") + super.postStop() + } override def receive: Receive = { - case FullBlockApplied(header: Header) if caughtUp => - index(history.typedModifierById[BlockTransactions](header.transactionsId).get, header.height) // after the indexer caught up with the chain, stay up to date - - case Rollback(branchPoint: ModifierId) if _history != null => // only rollback if indexing is enabled - val branchHeight: Int = history.heightOf(branchPoint).get - rollback = branchHeight < indexedHeight - if(rollback) { - removeAfter(branchHeight) - run() // restart indexer - } - case StartExtraIndexer(history: ErgoHistory) => _history = history - run() + val state = IndexerState.fromHistory(history) + context.become(loaded(state)) + log.info(s"Started extra indexer at height ${state.indexedHeight}") + self ! Index() + unstashAll() + + case _ => stash() - case GetSegmentTreshold => - sender ! segmentTreshold } } @@ -478,6 +487,17 @@ object ExtraIndexer { * Retreive the currently used segment treshold */ case class GetSegmentTreshold() + + /** + * Index block at current indexer height + */ + case class Index() + + /** + * Remove and roll back all indexes after branchHeight + * @param branchHeight - height of last block to keep + */ + case class RemoveAfter(branchHeight: Int) } /** @@ -514,12 +534,13 @@ object ExtraIndexer { /** * Current newest database schema version. Used to force extra database resync. */ - val NewestVersion: Int = 4 + val NewestVersion: Int = 5 val NewestVersionBytes: Array[Byte] = ByteBuffer.allocate(4).putInt(NewestVersion).array val IndexedHeightKey: Array[Byte] = Algos.hash("indexed height") val GlobalTxIndexKey: Array[Byte] = Algos.hash("txns height") val GlobalBoxIndexKey: Array[Byte] = Algos.hash("boxes height") + val RollbackToKey: Array[Byte] = Algos.hash("rollback to") val SchemaVersionKey: Array[Byte] = Algos.hash("schema version") def getIndex(key: Array[Byte], history: HistoryStorage): ByteBuffer = @@ -527,8 +548,10 @@ object ExtraIndexer { def getIndex(key: Array[Byte], history: ErgoHistoryReader): ByteBuffer = getIndex(key, history.historyStorage) - def apply(chainSettings: ChainSettings, cacheSettings: CacheSettings)(implicit system: ActorSystem): ActorRef = - system.actorOf(Props.create(classOf[ExtraIndexer], cacheSettings, chainSettings.addressEncoder)) + def apply(chainSettings: ChainSettings, cacheSettings: CacheSettings)(implicit system: ActorSystem): ActorRef = { + val props = Props.create(classOf[ExtraIndexer], cacheSettings, chainSettings.addressEncoder) + system.actorOf(props.withDispatcher(GlobalConstants.IndexerDispatcher)) + } } /** diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexedErgoTransaction.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexedErgoTransaction.scala index 8b35587686..2705b76df5 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexedErgoTransaction.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexedErgoTransaction.scala @@ -3,9 +3,10 @@ package org.ergoplatform.nodeView.history.extra import org.ergoplatform.modifiers.history.header.Header import org.ergoplatform.nodeView.history.ErgoHistoryReader import org.ergoplatform.DataInput -import org.ergoplatform.modifiers.history.BlockTransactions +import org.ergoplatform.modifiers.mempool.ErgoTransaction import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{ExtraIndexTypeId, fastIdToBytes} import scorex.core.serialization.ErgoSerializer +import scorex.crypto.authds.ADKey import scorex.util.serialization.{Reader, Writer} import scorex.util.{ModifierId, bytesToId} import spire.implicits.cfor @@ -13,14 +14,21 @@ import spire.implicits.cfor /** * Minimum general information for transaction. Not storing the whole transation is done to save space. * @param txid - id of this transaction + * @param index - index of transaction in parent block * @param height - height of the block which includes this transaction + * @param size - size of this transaction in bytes * @param globalIndex - serial number of this transaction counting from block 1 - * @param inputNums - list of transaction inputs (needed for rollback) + * @param inputNums - list of transaction inputs + * @param outputNums - list of transaction outputs */ case class IndexedErgoTransaction(txid: ModifierId, + index: Int, height: Int, + size: Int, globalIndex: Long, - inputNums: Array[Long]) extends ExtraIndex { + inputNums: Array[Long], + outputNums: Array[Long], + dataInputs: Array[DataInput]) extends ExtraIndex { override lazy val id: ModifierId = txid override def serializedId: Array[Byte] = fastIdToBytes(id) @@ -28,22 +36,16 @@ case class IndexedErgoTransaction(txid: ModifierId, private var _blockId: ModifierId = ModifierId @@ "" private var _inclusionHeight: Int = 0 private var _timestamp: Header.Timestamp = 0L - private var _index: Int = 0 private var _numConfirmations: Int = 0 private var _inputs: IndexedSeq[IndexedErgoBox] = IndexedSeq.empty[IndexedErgoBox] - private var _dataInputs: IndexedSeq[DataInput] = IndexedSeq.empty[DataInput] private var _outputs: IndexedSeq[IndexedErgoBox] = IndexedSeq.empty[IndexedErgoBox] - private var _txSize: Int = 0 def blockId: ModifierId = _blockId def inclusionHeight: Int = _inclusionHeight def timestamp: Header.Timestamp = _timestamp - def index: Int = _index def numConfirmations: Int = _numConfirmations def inputs: IndexedSeq[IndexedErgoBox] = _inputs - def dataInputs: IndexedSeq[DataInput] = _dataInputs def outputs: IndexedSeq[IndexedErgoBox] = _outputs - def txSize: Int = _txSize /** * Get all information related to this transaction from database. @@ -53,17 +55,13 @@ case class IndexedErgoTransaction(txid: ModifierId, def retrieveBody(history: ErgoHistoryReader): IndexedErgoTransaction = { val header: Header = history.typedModifierById[Header](history.bestHeaderIdAtHeight(height).get).get - val blockTxs: BlockTransactions = history.typedModifierById[BlockTransactions](header.transactionsId).get _blockId = header.id _inclusionHeight = height _timestamp = header.timestamp - _index = blockTxs.txs.indices.find(blockTxs.txs(_).id == txid).get - _numConfirmations = history.bestFullBlockOpt.get.height - height - _inputs = blockTxs.txs(_index).inputs.map(input => history.typedExtraIndexById[IndexedErgoBox](bytesToId(input.boxId)).get) - _dataInputs = blockTxs.txs(_index).dataInputs - _outputs = blockTxs.txs(_index).outputs.map(output => history.typedExtraIndexById[IndexedErgoBox](bytesToId(output.id)).get) - _txSize = blockTxs.txs(_index).size + _numConfirmations = history.fullBlockHeight - height + _inputs = inputNums.flatMap(NumericBoxIndex.getBoxByNumber(history, _)) + _outputs = outputNums.flatMap(NumericBoxIndex.getBoxByNumber(history, _)) this } @@ -74,24 +72,41 @@ object IndexedErgoTransactionSerializer extends ErgoSerializer[IndexedErgoTransa override def serialize(iTx: IndexedErgoTransaction, w: Writer): Unit = { w.putUByte(iTx.serializedId.length) w.putBytes(iTx.serializedId) + w.putInt(iTx.index) w.putInt(iTx.height) + w.putInt(iTx.size) w.putLong(iTx.globalIndex) w.putUShort(iTx.inputNums.length) cfor(0)(_ < iTx.inputNums.length, _ + 1) { i => w.putLong(iTx.inputNums(i)) } + w.putUShort(iTx.outputNums.length) + cfor(0)(_ < iTx.outputNums.length, _ + 1) { i => w.putLong(iTx.outputNums(i)) } + w.putUShort(iTx.dataInputs.length) + cfor(0)(_ < iTx.dataInputs.length, _ + 1) { i => w.putBytes(iTx.dataInputs(i).boxId) } } override def parse(r: Reader): IndexedErgoTransaction = { val idLen = r.getUByte() val id = bytesToId(r.getBytes(idLen)) + val index = r.getInt() val height = r.getInt() + val size = r.getInt() val globalIndex = r.getLong() val inputCount: Int = r.getUShort() val inputNums: Array[Long] = Array.ofDim[Long](inputCount) cfor(0)(_ < inputCount, _ + 1) { i => inputNums(i) = r.getLong() } - IndexedErgoTransaction(id, height, globalIndex, inputNums) + val outputCount: Int = r.getUShort() + val outputNums: Array[Long] = Array.ofDim[Long](outputCount) + cfor(0)(_ < outputCount, _ + 1) { i => outputNums(i) = r.getLong() } + val dataInputsCount = r.getUShort() + val dataInputs: Array[DataInput] = Array.ofDim[DataInput](dataInputsCount) + cfor(0)(_ < dataInputsCount, _ + 1) { i => dataInputs(i) = DataInput(ADKey @@ r.getBytes(32)) } + IndexedErgoTransaction(id, index, height, size, globalIndex, inputNums, outputNums, dataInputs) } } object IndexedErgoTransaction { val extraIndexTypeId: ExtraIndexTypeId = 10.toByte + + def fromTx(tx: ErgoTransaction, index: Int, height: Int, globalIndex: Long, inputs: Array[Long], outputs: Array[Long]): IndexedErgoTransaction = + IndexedErgoTransaction(tx.id, index, height, tx.size, globalIndex, inputs, outputs, tx.dataInputs.toArray) } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexedToken.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexedToken.scala index ec00019729..6e6850658b 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexedToken.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexedToken.scala @@ -49,9 +49,10 @@ case class IndexedToken(tokenId: ModifierId, val toRemove: ArrayBuffer[ModifierId] = rollbackState(txTarget, boxTarget, history.getReader) - if (boxCount == 0) + if (boxCount == 0) { toRemove += id // all segments empty after rollback, delete parent - else + log.info(s"Removing token $tokenId because no more boxes are associated with it") + } else history.historyStorage.insertExtra(Array.empty, Array(this)) // save the changes made to this address toRemove.toArray @@ -100,6 +101,17 @@ case class IndexedToken(tokenId: ModifierId, override private[extra] def filterMempool(boxes: Seq[ErgoBox]): Seq[ErgoBox] = boxes.filter(_.additionalTokens.exists(_._1.toModifierId == tokenId)) + /** + * Increase emission amount of this token. Sometimes tokens get created in multiple boxes. + * @param plus - emission amount to add + * @return updated token + */ + private[extra] def addEmissionAmount(plus: Long): IndexedToken = { + val updated = IndexedToken(tokenId, boxId, amount + plus, name, description, decimals, boxes) + updated.buffer ++= buffer + updated + } + } object IndexedTokenSerializer extends ErgoSerializer[IndexedToken] { diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexerState.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexerState.scala new file mode 100644 index 0000000000..808e2d5d9b --- /dev/null +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexerState.scala @@ -0,0 +1,50 @@ +package org.ergoplatform.nodeView.history.extra + +import org.ergoplatform.nodeView.history.ErgoHistory +import org.ergoplatform.nodeView.history.extra.ExtraIndexer._ + +/** + * An immutable state for extra indexer + * @param indexedHeight - Indexed block height + * @param globalTxIndex - Indexed transaction count + * @param globalBoxIndex - Indexed box count + * @param rollbackTo - blockheight to rollback to, 0 if no rollback is in progress + * @param caughtUp - flag to indicate if the indexer is caught up with the chain and is listening for updates + */ +case class IndexerState(indexedHeight: Int, + globalTxIndex: Long, + globalBoxIndex: Long, + rollbackTo: Int, + caughtUp: Boolean) { + + def rollbackInProgress: Boolean = rollbackTo > 0 + + def updateCaughtUp(chainHeight: Int): IndexerState = copy(caughtUp = indexedHeight == chainHeight) + + def incrementIndexedHeight: IndexerState = copy(indexedHeight = indexedHeight + 1) + + def incrementTxIndex: IndexerState = copy(globalTxIndex = globalTxIndex + 1) + def incrementBoxIndex: IndexerState = copy(globalBoxIndex = globalBoxIndex + 1) + + def decrementTxIndex: IndexerState = copy(globalTxIndex = globalTxIndex - 1) + def decrementBoxIndex: IndexerState = copy(globalBoxIndex = globalBoxIndex - 1) + +} + +object IndexerState { + + def fromHistory(history: ErgoHistory): IndexerState = { + val indexedHeight = getIndex(IndexedHeightKey, history).getInt + val globalTxIndex = getIndex(GlobalTxIndexKey, history).getLong + val globalBoxIndex = getIndex(GlobalBoxIndexKey, history).getLong + val rollbackTo = getIndex(RollbackToKey, history).getInt + IndexerState( + indexedHeight, + globalTxIndex, + globalBoxIndex, + rollbackTo, + caughtUp = indexedHeight == history.fullBlockHeight + ) + } + +} diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala index e1f82c7d8b..40339f3ba2 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala @@ -307,8 +307,8 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId, txs.clear() txs ++= tmp if (txs.isEmpty && txSegmentCount > 0) { // entire current tx set removed, retrieving more from database if possible - val segmentId = txSegmentId(parentId, txSegmentCount - 1) - history.typedExtraIndexById[T](idMod(segmentId)).get.txs ++=: txs + val segmentId = idMod(txSegmentId(parentId, txSegmentCount - 1)) + history.typedExtraIndexById[T](segmentId).get.txs ++=: txs toRemove += segmentId txSegmentCount -= 1 } @@ -320,8 +320,8 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId, boxes.clear() boxes ++= tmp if (boxes.isEmpty && boxSegmentCount > 0) { // entire current box set removed, retrieving more from database if possible - val segmentId = boxSegmentId(parentId, boxSegmentCount - 1) - history.typedExtraIndexById[T](idMod(segmentId)).get.boxes ++=: boxes + val segmentId = idMod(boxSegmentId(parentId, boxSegmentCount - 1)) + history.typedExtraIndexById[T](segmentId).get.boxes ++=: boxes toRemove += segmentId boxSegmentCount -= 1 } @@ -418,19 +418,21 @@ object SegmentSerializer { } def serialize(s: Segment[_], w: Writer): Unit = { - w.putUInt(s.txs.length) + w.putInt(s.txs.length) cfor(0)(_ < s.txs.length, _ + 1) { i => w.putLong(s.txs(i)) } - w.putUInt(s.boxes.length) + w.putInt(s.boxes.length) cfor(0)(_ < s.boxes.length, _ + 1) { i => w.putLong(s.boxes(i)) } w.putInt(s.boxSegmentCount) w.putInt(s.txSegmentCount) } def parse(r: Reader, s: Segment[_]): Unit = { - val txnsLen: Long = r.getUInt() - cfor(0)(_ < txnsLen, _ + 1) { _ => s.txs.+=(r.getLong()) } - val boxesLen: Long = r.getUInt() - cfor(0)(_ < boxesLen, _ + 1) { _ => s.boxes.+=(r.getLong()) } + val txnsLen: Int = r.getInt() + s.txs.sizeHint(txnsLen) + cfor(0)(_ < txnsLen, _ + 1) { _ => s.txs += r.getLong() } + val boxesLen: Int = r.getInt() + s.boxes.sizeHint(boxesLen) + cfor(0)(_ < boxesLen, _ + 1) { _ => s.boxes += r.getLong() } s.boxSegmentCount = r.getInt() s.txSegmentCount = r.getInt() } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/storage/HistoryStorage.scala b/src/main/scala/org/ergoplatform/nodeView/history/storage/HistoryStorage.scala index 2b4559af84..e7347cfef3 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/storage/HistoryStorage.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/storage/HistoryStorage.scala @@ -154,7 +154,6 @@ class HistoryStorage private(indexStore: LDBKVStore, objectsStore: LDBKVStore, e objectsToInsert.map(mod => mod.serializedId), objectsToInsert.map(mod => ExtraIndexSerializer.toBytes(mod)) ) - cfor(0)(_ < objectsToInsert.length, _ + 1) { i => val ei = objectsToInsert(i); extraCache.put(ei.id, ei)} cfor(0)(_ < indexesToInsert.length, _ + 1) { i => extraStore.insert(indexesToInsert(i)._1, indexesToInsert(i)._2)} } From 4dc432a0dc109cec49e7f6782097a05b96f384e5 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Mon, 27 Nov 2023 22:28:48 +0100 Subject: [PATCH 02/12] Fixed indexer not keeping up after rollback --- .../ergoplatform/nodeView/history/extra/ExtraIndexer.scala | 5 +++-- .../ergoplatform/nodeView/history/extra/IndexerState.scala | 2 -- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 8e5c1c6084..bb3c65aa98 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -301,7 +301,8 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { log.info(s"Buffered block $height / $chainHeight [txs: ${bt.txs.length}, boxes: $boxCount] (buffer: $modCount / $saveLimit)") - newState.updateCaughtUp(headerOpt.map(_.height).getOrElse(chainHeight)) + val maxHeight = headerOpt.map(_.height).getOrElse(chainHeight) + newState.copy(caughtUp = newState.indexedHeight == maxHeight) } /** @@ -372,7 +373,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { newState = newState.incrementBoxIndex // Save changes - newState = newState.copy(indexedHeight = height, rollbackTo = 0).updateCaughtUp(chainHeight) + newState = newState.copy(indexedHeight = height, rollbackTo = 0) historyStorage.removeExtra(toRemove.toArray) saveProgress(newState, writeLog = false) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexerState.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexerState.scala index 808e2d5d9b..05dec8d177 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexerState.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/IndexerState.scala @@ -19,8 +19,6 @@ case class IndexerState(indexedHeight: Int, def rollbackInProgress: Boolean = rollbackTo > 0 - def updateCaughtUp(chainHeight: Int): IndexerState = copy(caughtUp = indexedHeight == chainHeight) - def incrementIndexedHeight: IndexerState = copy(indexedHeight = indexedHeight + 1) def incrementTxIndex: IndexerState = copy(globalTxIndex = globalTxIndex + 1) From 6ce263584f1b359182d5262abff1b71f1cb75be0 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Thu, 14 Dec 2023 21:00:36 +0100 Subject: [PATCH 03/12] Added temporary fix for weird box spending bug --- .../org/ergoplatform/nodeView/history/extra/Segment.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala index 40339f3ba2..4f4ec0ee93 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala @@ -96,9 +96,13 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId, buffer.get(segmentId) match { case Some(segment) => val i: Int = binarySearch(segment.boxes, boxNumAbs) - segment.boxes(i) = -segment.boxes(i) + if(i >= 0) { + segment.boxes(i) = -segment.boxes(i) + }else { + log.error(s"Box $boxNum not found in predicted segment of parent: ${segment.boxes.mkString("[", ",", "]")}") + } case None => - log.warn(s"Box $boxNum not found in any segment of parent when trying to spend") + log.error(s"Box $boxNum not found in any segment of parent") } } } From ae1a2ef7b8f9159f88d316a17f17ee8cee1f5784 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Thu, 14 Dec 2023 21:13:23 +0100 Subject: [PATCH 04/12] Fixed imports --- src/main/scala/org/ergoplatform/http/api/ApiExtraCodecs.scala | 2 +- .../org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/ergoplatform/http/api/ApiExtraCodecs.scala b/src/main/scala/org/ergoplatform/http/api/ApiExtraCodecs.scala index 414ccd77ea..250d8d88a4 100644 --- a/src/main/scala/org/ergoplatform/http/api/ApiExtraCodecs.scala +++ b/src/main/scala/org/ergoplatform/http/api/ApiExtraCodecs.scala @@ -46,7 +46,7 @@ trait ApiExtraCodecs extends JsonCodecs { "inputs" -> iEt.inputs.asJson, "dataInputs" -> iEt.dataInputs.asJson, "outputs" -> iEt.outputs.asJson, - "size" -> iEt.txSize.asJson + "size" -> iEt.size.asJson ) } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 2be7c85ada..24d395a8c2 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -6,7 +6,7 @@ import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, GlobalConstants, Pay2S import org.ergoplatform.modifiers.history.BlockTransactions import org.ergoplatform.modifiers.history.header.Header import org.ergoplatform.modifiers.mempool.ErgoTransaction -import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.{FullBlockApplied, Rollback} +import org.ergoplatform.network.ErgoNodeViewSynchronizerMessages.{FullBlockApplied, Rollback} import org.ergoplatform.nodeView.history.extra.ExtraIndexer._ import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoHistoryReader} import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages._ From c93dbc04d484f341f17aa5f079cd41cd8e04cd4b Mon Sep 17 00:00:00 2001 From: Alexander Chepurnoy Date: Thu, 14 Dec 2023 23:51:17 +0300 Subject: [PATCH 05/12] preRestart added --- .../ergoplatform/nodeView/history/extra/ExtraIndexer.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 24d395a8c2..6108ae2c95 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -454,10 +454,15 @@ class ExtraIndexer(cacheSettings: CacheSettings, } override def postStop(): Unit = { - log.info(s"Stopped extra indexer") + log.error(s"Stopped extra indexer") super.postStop() } + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + log.error(s"Attempted extra indexer restart due to ${reason.getMessage} ", reason) + super.preRestart(reason, message) + } + override def receive: Receive = { case StartExtraIndexer(history: ErgoHistory) => From ff694d7c6a63081101a7fc0602938285f344ff31 Mon Sep 17 00:00:00 2001 From: Alexander Chepurnoy Date: Fri, 15 Dec 2023 14:41:54 +0300 Subject: [PATCH 06/12] .get eliminated outside removeAfter --- .../nodeView/history/extra/ExtraIndexer.scala | 122 ++++++++++-------- 1 file changed, 69 insertions(+), 53 deletions(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 6108ae2c95..57be46258c 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -36,56 +36,59 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { private implicit val ec: ExecutionContextExecutor = context.dispatcher /** - * Max buffer size (determined by config) - */ + * Max buffer size (determined by config) + */ protected val saveLimit: Int /** - * Number of transaction/box numberic indexes object segments contain - */ + * Number of transaction/box numberic indexes object segments contain + */ protected implicit val segmentTreshold: Int /** - * Address encoder instance - */ + * Address encoder instance + */ protected implicit val addressEncoder: ErgoAddressEncoder /** - * Database handle - */ + * Database handle + */ protected var _history: ErgoHistory = _ protected def chainHeight: Int = _history.fullBlockHeight + protected def history: ErgoHistoryReader = _history.getReader + protected def historyStorage: HistoryStorage = _history.historyStorage // fast access buffers protected val general: ArrayBuffer[ExtraIndex] = ArrayBuffer.empty[ExtraIndex] - protected val boxes: mutable.HashMap[ModifierId,IndexedErgoBox] = mutable.HashMap.empty[ModifierId,IndexedErgoBox] - protected val trees: mutable.HashMap[ModifierId,IndexedErgoAddress] = mutable.HashMap.empty[ModifierId,IndexedErgoAddress] - protected val tokens: mutable.HashMap[ModifierId,IndexedToken] = mutable.HashMap.empty[ModifierId,IndexedToken] - protected val segments: mutable.HashMap[ModifierId,Segment[_]] = mutable.HashMap.empty[ModifierId,Segment[_]] + protected val boxes: mutable.HashMap[ModifierId, IndexedErgoBox] = mutable.HashMap.empty[ModifierId, IndexedErgoBox] + protected val trees: mutable.HashMap[ModifierId, IndexedErgoAddress] = mutable.HashMap.empty[ModifierId, IndexedErgoAddress] + protected val tokens: mutable.HashMap[ModifierId, IndexedToken] = mutable.HashMap.empty[ModifierId, IndexedToken] + protected val segments: mutable.HashMap[ModifierId, Segment[_]] = mutable.HashMap.empty[ModifierId, Segment[_]] /** - * Input tokens in a transaction, cleared after every transaction - */ + * Input tokens in a transaction, cleared after every transaction + */ private val inputTokens: ArrayBuffer[(TokenId, Long)] = ArrayBuffer.empty[(TokenId, Long)] /** - * Holds upcoming blocks to be indexed, and when empty, it is filled back from multiple threads - */ - private val blockCache: concurrent.Map[Int,BlockTransactions] = new ConcurrentHashMap[Int,BlockTransactions]().asScala + * Holds upcoming blocks to be indexed, and when empty, it is filled back from multiple threads + */ + private val blockCache: concurrent.Map[Int, BlockTransactions] = new ConcurrentHashMap[Int, BlockTransactions]().asScala private var readingUpTo: Int = 0 /** - * Get transactions for specified height, preferably from cache, or from database. - * If indexer is getting close to emptying cache, asynchronously reads 1000 blocks into it - * @param height - blockheight to get transations from - * @return transactions at height - */ + * Get transactions for specified height, preferably from cache, or from database. + * If indexer is getting close to emptying cache, asynchronously reads 1000 blocks into it + * + * @param height - blockheight to get transations from + * @return transactions at height + */ private def getBlockTransactionsAt(height: Int): BlockTransactions = { val txs = blockCache.remove(height).getOrElse(history.bestBlockTransactionsAt(height).get) - if(height % 1000 == 0) blockCache.keySet.filter(_ < height).map(blockCache.remove) + if (height % 1000 == 0) blockCache.keySet.filter(_ < height).map(blockCache.remove) if (readingUpTo - height < 300 && chainHeight - height > 1000) { readingUpTo = math.min(height + 1001, chainHeight) val blockNums = height + 1 to readingUpTo by 50 @@ -153,11 +156,11 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { } /** - * Add or subtract a box from a token in the buffer or in database. - * - * @param id - token id - * @param spendOrReceive - IndexedErgoBox to receive (Right) or spend (Left) - */ + * Add or subtract a box from a token in the buffer or in database. + * + * @param id - token id + * @param spendOrReceive - IndexedErgoBox to receive (Right) or spend (Left) + */ private def findAndUpdateToken(id: ModifierId, spendOrReceive: Either[IndexedErgoBox, IndexedErgoBox]): Unit = { tokens.get(id).map { token => spendOrReceive match { @@ -183,8 +186,8 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { private def modCount: Int = general.length + boxes.size + trees.size + tokens.size /** - * Write buffered indexes to database and clear buffers. - */ + * Write buffered indexes to database and clear buffers. + */ private def saveProgress(state: IndexerState, writeLog: Boolean = true): Unit = { val start: Long = System.currentTimeMillis @@ -230,9 +233,8 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { * @param headerOpt - header to index blocktransactions of (used after caught up with chain) */ protected def index(state: IndexerState, headerOpt: Option[Header] = None): IndexerState = { - - val bt = headerOpt.map { header => - history.typedModifierById[BlockTransactions](header.transactionsId).get + val bt = headerOpt.flatMap { header => + history.typedModifierById[BlockTransactions](header.transactionsId) }.getOrElse(getBlockTransactionsAt(state.indexedHeight)) val height = headerOpt.map(_.height).getOrElse(state.indexedHeight) @@ -308,7 +310,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { /** * Remove all indexes after a given height and revert address balances. * - * @param state - current state of indexer + * @param state - current state of indexer * @param height - starting height */ protected def removeAfter(state: IndexerState, height: Int): IndexerState = { @@ -325,7 +327,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { // remove all tx indexes newState = newState.decrementTxIndex - while(newState.globalTxIndex > txTarget) { + while (newState.globalTxIndex > txTarget) { val tx: IndexedErgoTransaction = NumericTxIndex.getTxByNumber(history, newState.globalTxIndex).get tx.inputNums.map(NumericBoxIndex.getBoxByNumber(history, _).get).foreach { iEb => // undo all spendings @@ -351,11 +353,11 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { // remove all box indexes, tokens and address balances newState = newState.decrementBoxIndex - while(newState.globalBoxIndex > boxTarget) { + while (newState.globalBoxIndex > boxTarget) { val iEb: IndexedErgoBox = NumericBoxIndex.getBoxByNumber(history, newState.globalBoxIndex).get cfor(0)(_ < iEb.box.additionalTokens.length, _ + 1) { i => history.typedExtraIndexById[IndexedToken](IndexedToken.fromBox(iEb, i).id).map { token => - if(token.boxId == iEb.id) { // token created, delete + if (token.boxId == iEb.id) { // token created, delete toRemove += token.id log.info(s"Removing token ${token.tokenId} created in box ${iEb.id} at height ${iEb.inclusionHeight}") } else // no token created, update @@ -389,7 +391,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { self ! Index() case Index() if state.caughtUp => - if(modCount > 0) saveProgress(state) + if (modCount > 0) saveProgress(state) blockCache.clear() log.info("Indexer caught up with chain") @@ -407,10 +409,18 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { log.warn(s"Rollback already in progress") stash() } else { - val branchHeight: Int = history.heightOf(branchPoint).get - if (branchHeight < state.indexedHeight) { - context.become(loaded(state.copy(rollbackTo = branchHeight))) - self ! RemoveAfter(branchHeight) + history.heightOf(branchPoint) match { + case Some(branchHeight) => + if (branchHeight < state.indexedHeight) { + context.become (loaded (state.copy (rollbackTo = branchHeight) ) ) + self ! RemoveAfter (branchHeight) + } + case None => + log.error(s"No rollback height found for $branchPoint") + // todo: we try to continue scanning without doing rollback, could this be done at all? + val newState = state.copy(rollbackTo = 0) + context.become(loaded(newState)) + unstashAll() } } @@ -434,8 +444,9 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { /** * Actor that constructs an index of database elements. + * * @param cacheSettings - cacheSettings to use for saveLimit size - * @param ae - ergo address encoder to use for handling addresses + * @param ae - ergo address encoder to use for handling addresses */ class ExtraIndexer(cacheSettings: CacheSettings, ae: ErgoAddressEncoder) @@ -485,6 +496,7 @@ object ExtraIndexer { object ReceivableMessages { /** * Initialize ExtraIndexer and start indexing. + * * @param history - handle to database */ case class StartExtraIndexer(history: ErgoHistory) @@ -495,14 +507,15 @@ object ExtraIndexer { case class GetSegmentTreshold() /** - * Index block at current indexer height - */ + * Index block at current indexer height + */ case class Index() /** - * Remove and roll back all indexes after branchHeight - * @param branchHeight - height of last block to keep - */ + * Remove and roll back all indexes after branchHeight + * + * @param branchHeight - height of last block to keep + */ case class RemoveAfter(branchHeight: Int) } @@ -520,7 +533,7 @@ object ExtraIndexer { "0123456789abcdef".toCharArray.zipWithIndex.foreach { case (c, i) => index(c) = i.toByte } - "abcdef".toCharArray.foreach{ c => + "abcdef".toCharArray.foreach { c => index(c.toUpper) = index(c) } index @@ -528,18 +541,19 @@ object ExtraIndexer { /** * Faster id to bytes - no safety checks + * * @param id - ModifierId to convert to byte representation * @return an array of bytes */ private[extra] def fastIdToBytes(id: ModifierId): Array[Byte] = { val x: Array[Byte] = new Array[Byte](id.length / 2) - cfor(0)(_ < id.length, _ + 2) {i => x(i / 2) = ((hexIndex(id(i)) << 4) | hexIndex(id(i + 1))).toByte} + cfor(0)(_ < id.length, _ + 2) { i => x(i / 2) = ((hexIndex(id(i)) << 4) | hexIndex(id(i + 1))).toByte } x } /** - * Current newest database schema version. Used to force extra database resync. - */ + * Current newest database schema version. Used to force extra database resync. + */ val NewestVersion: Int = 5 val NewestVersionBytes: Array[Byte] = ByteBuffer.allocate(4).putInt(NewestVersion).array @@ -550,7 +564,9 @@ object ExtraIndexer { val SchemaVersionKey: Array[Byte] = Algos.hash("schema version") def getIndex(key: Array[Byte], history: HistoryStorage): ByteBuffer = - ByteBuffer.wrap(history.modifierBytesById(bytesToId(key)).getOrElse(Array.fill[Byte](8){0})) + ByteBuffer.wrap(history.modifierBytesById(bytesToId(key)).getOrElse(Array.fill[Byte](8) { + 0 + })) def getIndex(key: Array[Byte], history: ErgoHistoryReader): ByteBuffer = getIndex(key, history.historyStorage) From eb4f6c2866953fed4e7df001a2272abb7198d102 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Fri, 15 Dec 2023 18:35:20 +0100 Subject: [PATCH 07/12] Fixed possible duplicate block indexing --- .../nodeView/history/extra/ExtraIndexer.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 57be46258c..7260e70e98 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -375,7 +375,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { newState = newState.incrementBoxIndex // Save changes - newState = newState.copy(indexedHeight = height, rollbackTo = 0) + newState = newState.copy(indexedHeight = height, rollbackTo = 0, caughtUp = true) historyStorage.removeExtra(toRemove.toArray) saveProgress(newState, writeLog = false) @@ -396,13 +396,16 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { log.info("Indexer caught up with chain") // after the indexer caught up with the chain, stay up to date - case FullBlockApplied(header: Header) if state.caughtUp => - if (!state.rollbackInProgress) { + case FullBlockApplied(header: Header) if state.caughtUp && !state.rollbackInProgress => + if (header.height == state.indexedHeight + 1) { // applied block is next in line val newState: IndexerState = index(state.incrementIndexedHeight, Some(header)) saveProgress(newState) context.become(loaded(newState)) - } else - stash() + } else if(header.height > state.indexedHeight + 1) { // applied block is ahead of indexer + context.become(loaded(state.copy(caughtUp = false))) + self ! Index() + } else // applied block has already been indexed, skipping duplicate + log.warn(s"Skipping block ${header.id} applied at height ${header.height}, indexed height is ${state.indexedHeight}") case Rollback(branchPoint: ModifierId) => if (state.rollbackInProgress) { @@ -417,7 +420,6 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { } case None => log.error(s"No rollback height found for $branchPoint") - // todo: we try to continue scanning without doing rollback, could this be done at all? val newState = state.copy(rollbackTo = 0) context.become(loaded(newState)) unstashAll() From e8b7a7be90d70aeace9f705228ae04d7d48a0a23 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Sat, 16 Dec 2023 20:51:12 +0100 Subject: [PATCH 08/12] Use append instead of prepend on empty buffer --- .../org/ergoplatform/nodeView/history/extra/Segment.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala index c5f32bbabc..cffc3ab030 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala @@ -312,7 +312,7 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId, txs ++= tmp if (txs.isEmpty && txSegmentCount > 0) { // entire current tx set removed, retrieving more from database if possible val segmentId = idMod(txSegmentId(parentId, txSegmentCount - 1)) - history.typedExtraIndexById[T](segmentId).get.txs ++=: txs + txs ++= history.typedExtraIndexById[T](segmentId).get.txs toRemove += segmentId txSegmentCount -= 1 } @@ -325,7 +325,7 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId, boxes ++= tmp if (boxes.isEmpty && boxSegmentCount > 0) { // entire current box set removed, retrieving more from database if possible val segmentId = idMod(boxSegmentId(parentId, boxSegmentCount - 1)) - history.typedExtraIndexById[T](segmentId).get.boxes ++=: boxes + boxes ++= history.typedExtraIndexById[T](segmentId).get.boxes toRemove += segmentId boxSegmentCount -= 1 } From 430a524f337b7d34a326e6b50e9e745c706c9ff5 Mon Sep 17 00:00:00 2001 From: Alexander Chepurnoy Date: Mon, 18 Dec 2023 14:11:26 +0300 Subject: [PATCH 09/12] less indexing load in indexer-dispatcher settings --- src/main/resources/application.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 2e262fcc39..1295b7acb6 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -618,8 +618,8 @@ indexer-dispatcher { # Min number of threads to cap factor-based parallelism number to parallelism-min = 1 # Parallelism (threads) ... ceil(available processors * factor) - parallelism-factor = 2.0 + parallelism-factor = 1.0 # Max number of threads to cap factor-based parallelism number to - parallelism-max = 16 + parallelism-max = 4 } } \ No newline at end of file From 2681adf029a4b07d759b31699fbb5fed4758133c Mon Sep 17 00:00:00 2001 From: Alexander Chepurnoy Date: Mon, 18 Dec 2023 15:00:43 +0300 Subject: [PATCH 10/12] formatting, naming --- .../ergoplatform/http/api/BlockchainApiRoute.scala | 4 ++-- .../nodeView/history/extra/ExtraIndexer.scala | 12 ++++++------ .../nodeView/history/extra/Segment.scala | 6 +++--- .../history/extra/ExtraIndexerSpecification.scala | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/scala/org/ergoplatform/http/api/BlockchainApiRoute.scala b/src/main/scala/org/ergoplatform/http/api/BlockchainApiRoute.scala index eadc213014..39c913bdee 100644 --- a/src/main/scala/org/ergoplatform/http/api/BlockchainApiRoute.scala +++ b/src/main/scala/org/ergoplatform/http/api/BlockchainApiRoute.scala @@ -10,7 +10,7 @@ import org.ergoplatform.http.api.SortDirection.{ASC, DESC, Direction, INVALID} import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder} import org.ergoplatform.nodeView.ErgoReadersHolder.{GetDataFromHistory, GetReaders, Readers} import org.ergoplatform.nodeView.history.ErgoHistoryReader -import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.GetSegmentTreshold +import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.GetSegmentThreshold import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{GlobalBoxIndexKey, GlobalTxIndexKey, getIndex} import org.ergoplatform.nodeView.history.extra.IndexedErgoAddressSerializer.hashErgoTree import org.ergoplatform.nodeView.history.extra.IndexedTokenSerializer.uniqueId @@ -33,7 +33,7 @@ case class BlockchainApiRoute(readersHolder: ActorRef, ergoSettings: ErgoSetting val settings: RESTApiSettings = ergoSettings.scorexSettings.restApi private implicit val segmentTreshold: Int = indexerOpt.map { indexer => - Await.result[Int]((indexer ? GetSegmentTreshold).asInstanceOf[Future[Int]], Duration(3, SECONDS)) + Await.result[Int]((indexer ? GetSegmentThreshold).asInstanceOf[Future[Int]], Duration(3, SECONDS)) }.getOrElse(0) private val paging: Directive[(Int, Int)] = parameters("offset".as[Int] ? 0, "limit".as[Int] ? 5) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 7260e70e98..3d5783e231 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -41,9 +41,9 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { protected val saveLimit: Int /** - * Number of transaction/box numberic indexes object segments contain + * Number of transaction/box numeric indexes object segments contain */ - protected implicit val segmentTreshold: Int + protected implicit val segmentThreshold: Int /** * Address encoder instance @@ -434,8 +434,8 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { log.info(s"Successfully rolled back indexes to $branchHeight") unstashAll() - case GetSegmentTreshold => - sender ! segmentTreshold + case GetSegmentThreshold => + sender ! segmentThreshold case _ => @@ -456,7 +456,7 @@ class ExtraIndexer(cacheSettings: CacheSettings, override val saveLimit: Int = cacheSettings.history.extraCacheSize * 20 - override implicit val segmentTreshold: Int = 512 + override implicit val segmentThreshold: Int = 512 override implicit val addressEncoder: ErgoAddressEncoder = ae @@ -506,7 +506,7 @@ object ExtraIndexer { /** * Retreive the currently used segment treshold */ - case class GetSegmentTreshold() + case class GetSegmentThreshold() /** * Index block at current indexer height diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala index cffc3ab030..c45a5c9595 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/Segment.scala @@ -96,9 +96,9 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId, buffer.get(segmentId) match { case Some(segment) => val i: Int = binarySearch(segment.boxes, boxNumAbs) - if(i >= 0) { + if (i >= 0) { segment.boxes(i) = -segment.boxes(i) - }else { + } else { log.error(s"Box $boxNum not found in predicted segment of parent: ${segment.boxes.mkString("[", ",", "]")}") } case None => @@ -108,7 +108,7 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId, } /** - * Create an array of parent objects each containing [[ExtraIndexerBase.segmentTreshold]] number of transaction/box indexes. + * Create an array of parent objects each containing [[ExtraIndexerBase.segmentThreshold]] number of transaction/box indexes. * These objects have their ids calculated by "txSegmentId" and "boxSegmentId" respectively. * * @return array of parent objects diff --git a/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerSpecification.scala b/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerSpecification.scala index 4d2ca3d5ab..bf4578aa93 100644 --- a/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerSpecification.scala +++ b/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerSpecification.scala @@ -39,7 +39,7 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w type ID_LL = mutable.HashMap[ModifierId,(Long,Long)] override protected val saveLimit: Int = 1 // save every block - override protected implicit val segmentTreshold: Int = 8 // split to smaller segments + override protected implicit val segmentThreshold: Int = 8 // split to smaller segments override protected implicit val addressEncoder: ErgoAddressEncoder = initSettings.chainSettings.addressEncoder val nodeSettings: NodeConfigurationSettings = NodeConfigurationSettings(StateType.Utxo, verifyTransactions = true, From 04f085fdf6d9144ccb749dc8b311880c1052c95c Mon Sep 17 00:00:00 2001 From: Alexander Chepurnoy Date: Mon, 18 Dec 2023 16:23:53 +0300 Subject: [PATCH 11/12] wrapping exceptions in removeAfter --- .../nodeView/history/extra/ExtraIndexer.scala | 101 +++++++++--------- 1 file changed, 53 insertions(+), 48 deletions(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 3d5783e231..dbda566f34 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -215,8 +215,9 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { ((((general ++= boxes.values) ++= trees.values) ++= tokens.values) ++= segments.values).toArray ) - if (writeLog) + if (writeLog) { log.info(s"Processed ${trees.size} ErgoTrees with ${boxes.size} boxes and inserted them to database in ${System.currentTimeMillis - start}ms") + } // clear buffers for next batch general.clear() @@ -320,64 +321,68 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { saveProgress(newState, writeLog = false) log.info(s"Rolling back indexes from ${state.indexedHeight} to $height") - val lastTxToKeep: ErgoTransaction = history.bestBlockTransactionsAt(height).get.txs.last - val txTarget: Long = history.typedExtraIndexById[IndexedErgoTransaction](lastTxToKeep.id).get.globalIndex - val boxTarget: Long = history.typedExtraIndexById[IndexedErgoBox](bytesToId(lastTxToKeep.outputs.last.id)).get.globalIndex - val toRemove: ArrayBuffer[ModifierId] = ArrayBuffer.empty[ModifierId] + try { + val lastTxToKeep: ErgoTransaction = history.bestBlockTransactionsAt(height).get.txs.last + val txTarget: Long = history.typedExtraIndexById[IndexedErgoTransaction](lastTxToKeep.id).get.globalIndex + val boxTarget: Long = history.typedExtraIndexById[IndexedErgoBox](bytesToId(lastTxToKeep.outputs.last.id)).get.globalIndex + val toRemove: ArrayBuffer[ModifierId] = ArrayBuffer.empty[ModifierId] + + // remove all tx indexes + newState = newState.decrementTxIndex + while (newState.globalTxIndex > txTarget) { + val tx: IndexedErgoTransaction = NumericTxIndex.getTxByNumber(history, newState.globalTxIndex).get + tx.inputNums.map(NumericBoxIndex.getBoxByNumber(history, _).get).foreach { iEb => // undo all spendings - // remove all tx indexes - newState = newState.decrementTxIndex - while (newState.globalTxIndex > txTarget) { - val tx: IndexedErgoTransaction = NumericTxIndex.getTxByNumber(history, newState.globalTxIndex).get - tx.inputNums.map(NumericBoxIndex.getBoxByNumber(history, _).get).foreach { iEb => // undo all spendings + iEb.spendingHeightOpt = None + iEb.spendingTxIdOpt = None - iEb.spendingHeightOpt = None - iEb.spendingTxIdOpt = None + val address = history.typedExtraIndexById[IndexedErgoAddress](hashErgoTree(iEb.box.ergoTree)).get.addBox(iEb, record = false) + address.findAndModBox(iEb.globalIndex, history) + historyStorage.insertExtra(Array.empty, Array[ExtraIndex](iEb, address) ++ address.buffer.values) - val address = history.typedExtraIndexById[IndexedErgoAddress](hashErgoTree(iEb.box.ergoTree)).get.addBox(iEb, record = false) - address.findAndModBox(iEb.globalIndex, history) - historyStorage.insertExtra(Array.empty, Array[ExtraIndex](iEb, address) ++ address.buffer.values) + cfor(0)(_ < iEb.box.additionalTokens.length, _ + 1) { i => + history.typedExtraIndexById[IndexedToken](IndexedToken.fromBox(iEb, i).id).map { token => + token.findAndModBox(iEb.globalIndex, history) + historyStorage.insertExtra(Array.empty, Array[ExtraIndex](token) ++ token.buffer.values) + } + } + } + toRemove += tx.id // tx by id + toRemove += bytesToId(NumericTxIndex.indexToBytes(newState.globalTxIndex)) // tx id by number + newState = newState.decrementTxIndex + } + newState = newState.incrementTxIndex + // remove all box indexes, tokens and address balances + newState = newState.decrementBoxIndex + while (newState.globalBoxIndex > boxTarget) { + val iEb: IndexedErgoBox = NumericBoxIndex.getBoxByNumber(history, newState.globalBoxIndex).get cfor(0)(_ < iEb.box.additionalTokens.length, _ + 1) { i => history.typedExtraIndexById[IndexedToken](IndexedToken.fromBox(iEb, i).id).map { token => - token.findAndModBox(iEb.globalIndex, history) - historyStorage.insertExtra(Array.empty, Array[ExtraIndex](token) ++ token.buffer.values) + if (token.boxId == iEb.id) { // token created, delete + toRemove += token.id + log.info(s"Removing token ${token.tokenId} created in box ${iEb.id} at height ${iEb.inclusionHeight}") + } else // no token created, update + toRemove ++= token.rollback(txTarget, boxTarget, _history) } } - } - toRemove += tx.id // tx by id - toRemove += bytesToId(NumericTxIndex.indexToBytes(newState.globalTxIndex)) // tx id by number - newState = newState.decrementTxIndex - } - newState = newState.incrementTxIndex - - // remove all box indexes, tokens and address balances - newState = newState.decrementBoxIndex - while (newState.globalBoxIndex > boxTarget) { - val iEb: IndexedErgoBox = NumericBoxIndex.getBoxByNumber(history, newState.globalBoxIndex).get - cfor(0)(_ < iEb.box.additionalTokens.length, _ + 1) { i => - history.typedExtraIndexById[IndexedToken](IndexedToken.fromBox(iEb, i).id).map { token => - if (token.boxId == iEb.id) { // token created, delete - toRemove += token.id - log.info(s"Removing token ${token.tokenId} created in box ${iEb.id} at height ${iEb.inclusionHeight}") - } else // no token created, update - toRemove ++= token.rollback(txTarget, boxTarget, _history) + history.typedExtraIndexById[IndexedErgoAddress](hashErgoTree(iEb.box.ergoTree)).map { address => + address.spendBox(iEb) + toRemove ++= address.rollback(txTarget, boxTarget, _history) } + toRemove += iEb.id // box by id + toRemove += bytesToId(NumericBoxIndex.indexToBytes(newState.globalBoxIndex)) // box id by number + newState = newState.decrementBoxIndex } - history.typedExtraIndexById[IndexedErgoAddress](hashErgoTree(iEb.box.ergoTree)).map { address => - address.spendBox(iEb) - toRemove ++= address.rollback(txTarget, boxTarget, _history) - } - toRemove += iEb.id // box by id - toRemove += bytesToId(NumericBoxIndex.indexToBytes(newState.globalBoxIndex)) // box id by number - newState = newState.decrementBoxIndex + newState = newState.incrementBoxIndex + + // Save changes + newState = newState.copy(indexedHeight = height, rollbackTo = 0, caughtUp = true) + historyStorage.removeExtra(toRemove.toArray) + saveProgress(newState, writeLog = false) + } catch { + case t: Throwable => log.error(s"removeAfter during rollback failed due to: ${t.getMessage}", t) } - newState = newState.incrementBoxIndex - - // Save changes - newState = newState.copy(indexedHeight = height, rollbackTo = 0, caughtUp = true) - historyStorage.removeExtra(toRemove.toArray) - saveProgress(newState, writeLog = false) newState } From c9b38db7b32709359132f03ef4632ce1a9f44873 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Mon, 18 Dec 2023 18:31:07 +0100 Subject: [PATCH 12/12] Reworked tests to use actor features --- .../history/extra/ChainGenerator.scala | 208 +++++++++++ .../extra/ExtraIndexerSpecification.scala | 331 +++--------------- .../history/extra/ExtraIndexerTestActor.scala | 55 +++ 3 files changed, 320 insertions(+), 274 deletions(-) create mode 100644 src/test/scala/org/ergoplatform/nodeView/history/extra/ChainGenerator.scala create mode 100644 src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerTestActor.scala diff --git a/src/test/scala/org/ergoplatform/nodeView/history/extra/ChainGenerator.scala b/src/test/scala/org/ergoplatform/nodeView/history/extra/ChainGenerator.scala new file mode 100644 index 0000000000..144305ab33 --- /dev/null +++ b/src/test/scala/org/ergoplatform/nodeView/history/extra/ChainGenerator.scala @@ -0,0 +1,208 @@ +package org.ergoplatform.nodeView.history.extra + +import org.ergoplatform.ErgoBox.TokenId +import org.ergoplatform.ErgoLikeContext.Height +import org.ergoplatform.mining.difficulty.DifficultySerializer +import org.ergoplatform.mining.{AutolykosPowScheme, CandidateBlock, CandidateGenerator} +import org.ergoplatform.modifiers.ErgoFullBlock +import org.ergoplatform.modifiers.history.extension.{Extension, ExtensionCandidate} +import org.ergoplatform.modifiers.history.header.Header +import org.ergoplatform.modifiers.history.popow.NipopowAlgos +import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnsignedErgoTransaction} +import org.ergoplatform.nodeView.history.ErgoHistory +import org.ergoplatform.nodeView.history.ErgoHistoryUtils.GenesisHeight +import org.ergoplatform.nodeView.state.{ErgoState, ErgoStateContext, UtxoState, UtxoStateReader} +import org.ergoplatform.utils.ErgoTestHelpers +import org.ergoplatform._ +import scorex.util.ModifierId +import sigma.{Coll, Colls} +import sigmastate.Values +import sigmastate.crypto.DLogProtocol.ProveDlog +import sigmastate.eval.Extensions._ +import sigmastate.eval._ + +import java.io.File +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.util.{Random, Try} + +object ChainGenerator extends ErgoTestHelpers { + + val pow: AutolykosPowScheme = new AutolykosPowScheme(powScheme.k, powScheme.n) + val blockInterval: FiniteDuration = 2.minute + val EmissionTxCost: Long = 20000 + val MinTxAmount: Long = 2000000 + val RewardDelay: Int = initSettings.chainSettings.monetary.minerRewardDelay + val MaxTxsPerBlock: Int = 10 + val minerPk: ProveDlog = defaultProver.hdKeys.head.publicImage + val selfAddressScript: Values.ErgoTree = P2PKAddress(minerPk).script + val minerProp: Values.ErgoTree = ErgoTreePredef.rewardOutputScript(RewardDelay, minerPk) + val votingEpochLength: Height = votingSettings.votingLength + val protocolVersion: Byte = initSettings.chainSettings.protocolVersion + val minimalSuffix = 2 + val txCostLimit: Height = initSettings.nodeSettings.maxTransactionCost + val txSizeLimit: Height = initSettings.nodeSettings.maxTransactionSize + + var startTime: Long = 0 + + def generate(length: Int, dir: File)(history: ErgoHistory): Unit = { + val stateDir = new File(s"${dir.getAbsolutePath}/state") + stateDir.mkdirs() + val (state, _) = ErgoState.generateGenesisUtxoState(stateDir, initSettings) + System.out.println(s"Going to generate a chain at ${dir.getAbsolutePath} starting from ${history.bestFullBlockOpt}") + startTime = System.currentTimeMillis() - (blockInterval * (length - 1)).toMillis + val chain = loop(state, None, None, Seq())(history) + System.out.println(s"Chain of length ${chain.length} generated") + history.bestHeaderOpt shouldBe history.bestFullBlockOpt.map(_.header) + history.bestFullBlockOpt.get.id shouldBe chain.last + System.out.println("History was generated successfully") + } + + @tailrec + private def loop(state: UtxoState, + initBox: Option[ErgoBox], + last: Option[Header], + acc: Seq[ModifierId])(history: ErgoHistory): Seq[ModifierId] = { + val time: Long = last.map(_.timestamp + blockInterval.toMillis).getOrElse(startTime) + if (time < System.currentTimeMillis()) { + val (txs, lastOut) = genTransactions(last.map(_.height).getOrElse(GenesisHeight), + initBox, state.stateContext) + + val candidate = genCandidate(defaultProver.hdPubKeys.head.key, last, time, txs, state)(history) + val block = proveCandidate(candidate.get) + + history.append(block.header).get + block.blockSections.foreach(s => if (!history.contains(s)) history.append(s).get) + + val outToPassNext = if (last.isEmpty) { + block.transactions.flatMap(_.outputs).find(_.ergoTree == minerProp) + } else { + lastOut + } + + assert(outToPassNext.isDefined) + + log.info( + s"Block ${block.id} with ${block.transactions.size} transactions at height ${block.header.height} generated") + + loop(state.applyModifier(block, None)(_ => ()).get, outToPassNext, Some(block.header), acc :+ block.id)(history) + } else { + acc + } + } + + private def moveTokens(inOpt: Option[ErgoBox], cond: Boolean): Coll[(TokenId, Long)] = { + val tokens: ArrayBuffer[(TokenId, Long)] = ArrayBuffer.empty[(TokenId, Long)] + inOpt match { + case Some(input) if cond => + tokens += Tuple2(input.id.toTokenId, math.abs(Random.nextInt())) + case Some(tokenBox) if !cond => + tokenBox.additionalTokens.toArray.foreach(tokens += _) + case _ => + } + Colls.fromArray(tokens.toArray) + } + + private def genTransactions(height: Height, + inOpt: Option[ErgoBox], + ctx: ErgoStateContext): (Seq[ErgoTransaction], Option[ErgoBox]) = { + inOpt + .find { bx => + val canUnlock = (bx.creationHeight + RewardDelay <= height) || (bx.ergoTree != minerProp) + canUnlock && bx.ergoTree != initSettings.chainSettings.monetary.emissionBoxProposition && bx.value >= MinTxAmount + } + .map { input => + val qty = MaxTxsPerBlock + val amount = input.value + val outs = (0 until qty).map(i => new ErgoBoxCandidate(amount, selfAddressScript, height, moveTokens(inOpt, i == 0))) + var i = 0 + val x = outs + .foldLeft((Seq.empty[ErgoTransaction], input)) { case ((acc, in), out) => + val inputs = IndexedSeq(in) + val newOut = + if (i > 0) + new ErgoBoxCandidate(amount, selfAddressScript, height, moveTokens(acc.lastOption.map(_.outputs.head), cond = false)) + else + out + val unsignedTx = UnsignedErgoTransaction(inputs.map(box => new UnsignedInput(box.id)), IndexedSeq(newOut)) + i += 1 + defaultProver.sign(unsignedTx, inputs, emptyDataBoxes, ctx) + .fold(_ => acc -> in, tx => (acc :+ ErgoTransaction(tx)) -> unsignedTx.outputs.head) + } + ._1 + (x, Some(x.last.outputs.head)) + } + .getOrElse(Seq.empty -> inOpt) + } + + private def genCandidate(minerPk: ProveDlog, + lastHeaderOpt: Option[Header], + ts: Long, + txsFromPool: Seq[ErgoTransaction], + state: UtxoStateReader)(history: ErgoHistory): Try[CandidateBlock] = Try { + val stateContext = state.stateContext + val nBits: Long = lastHeaderOpt + .map(parent => history.requiredDifficultyAfter(parent)) + .map(d => DifficultySerializer.encodeCompactBits(d)) + .getOrElse(settings.chainSettings.initialNBits) + + val interlinks = lastHeaderOpt + .flatMap { h => + history.typedModifierById[Extension](h.extensionId) + .flatMap(ext => NipopowAlgos.unpackInterlinks(ext.fields).toOption) + .map(nipopowAlgos.updateInterlinks(h, _)) + } + .getOrElse(Seq.empty) + val interlinksExtension = nipopowAlgos.interlinksToExtension(interlinks) + + val (extensionCandidate, votes: Array[Byte], version: Byte) = lastHeaderOpt.map { header => + val newHeight = header.height + 1 + val currentParams = stateContext.currentParameters + val betterVersion = protocolVersion > header.version + val votingFinishHeight: Option[Height] = currentParams.softForkStartingHeight + .map(_ + votingSettings.votingLength * votingSettings.softForkEpochs) + val forkVotingAllowed = votingFinishHeight.forall(fh => newHeight < fh) + val forkOrdered = settings.votingTargets.softFork != 0 + val voteForFork = betterVersion && forkOrdered && forkVotingAllowed + + if (newHeight % votingEpochLength == 0 && newHeight > 0) { + val (newParams, _) = currentParams.update(newHeight, voteForFork, stateContext.votingData.epochVotes, emptyVSUpdate, votingSettings) + (newParams.toExtensionCandidate ++ interlinksExtension, + newParams.suggestVotes(settings.votingTargets.targets, voteForFork), + newParams.blockVersion) + } else { + (nipopowAlgos.interlinksToExtension(interlinks), + currentParams.vote(settings.votingTargets.targets, stateContext.votingData.epochVotes, voteForFork), + currentParams.blockVersion) + } + }.getOrElse((interlinksExtension, Array(0: Byte, 0: Byte, 0: Byte), Header.InitialVersion)) + + val emissionTxOpt = CandidateGenerator.collectEmission(state, minerPk, emptyStateContext) + val txs = emissionTxOpt.toSeq ++ txsFromPool + + state.proofsForTransactions(txs).map { case (adProof, adDigest) => + CandidateBlock(lastHeaderOpt, version, nBits, adDigest, adProof, txs, ts, extensionCandidate, votes) + } + }.flatten + + @tailrec + private def proveCandidate(candidate: CandidateBlock): ErgoFullBlock = { + log.info(s"Trying to prove block with parent ${candidate.parentOpt.map(_.encodedId)} and timestamp ${candidate.timestamp}") + + pow.proveCandidate(candidate, defaultProver.hdKeys.head.privateInput.w) match { + case Some(fb) => fb + case _ => + val interlinks = candidate.parentOpt + .map(nipopowAlgos.updateInterlinks(_, NipopowAlgos.unpackInterlinks(candidate.extension.fields).get)) + .getOrElse(Seq.empty) + val minerTag = scorex.utils.Random.randomBytes(Extension.FieldKeySize) + proveCandidate { + candidate.copy( + extension = ExtensionCandidate(Seq(Array(0: Byte, 2: Byte) -> minerTag)) ++ nipopowAlgos.interlinksToExtension(interlinks) + ) + } + } + } + +} diff --git a/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerSpecification.scala b/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerSpecification.scala index bf4578aa93..b18e2e7536 100644 --- a/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerSpecification.scala +++ b/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerSpecification.scala @@ -1,81 +1,33 @@ package org.ergoplatform.nodeView.history.extra -import org.ergoplatform.ErgoBox.TokenId -import org.ergoplatform.ErgoLikeContext.Height -import org.ergoplatform._ -import org.ergoplatform.mining.difficulty.DifficultySerializer -import org.ergoplatform.mining.{AutolykosPowScheme, CandidateBlock, CandidateGenerator} -import org.ergoplatform.modifiers.ErgoFullBlock -import org.ergoplatform.modifiers.history.extension.{Extension, ExtensionCandidate} -import org.ergoplatform.modifiers.history.header.Header -import org.ergoplatform.modifiers.history.popow.NipopowAlgos -import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnsignedErgoTransaction} -import org.ergoplatform.nodeView.history.ErgoHistory -import org.ergoplatform.nodeView.history.ErgoHistoryUtils._ +import akka.actor.{ActorRef, ActorSystem, Props} +import org.ergoplatform.network.ErgoNodeViewSynchronizerMessages.{FullBlockApplied, Rollback} +import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.Index import org.ergoplatform.nodeView.history.extra.IndexedErgoAddressSerializer.hashErgoTree import org.ergoplatform.nodeView.history.extra.SegmentSerializer.{boxSegmentId, txSegmentId} -import org.ergoplatform.nodeView.mempool.ErgoMemPoolUtils.SortingOption -import org.ergoplatform.nodeView.state._ -import org.ergoplatform.settings.{ErgoSettings, NetworkType, NipopowSettings, NodeConfigurationSettings, UtxoSettings} -import org.ergoplatform.utils.{ErgoPropertyTest, ErgoTestHelpers, HistoryTestHelpers} +import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoHistoryReader} +import org.ergoplatform.utils.HistoryTestHelpers import scorex.util.{ModifierId, bytesToId} -import sigmastate.Values -import sigmastate.crypto.DLogProtocol.ProveDlog -import sigmastate.eval.Extensions._ -import sigmastate.eval._ -import sigma.{Coll, Colls} import spire.implicits.cfor -import java.io.File -import scala.annotation.tailrec import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.reflect.ClassTag -import scala.util.{Random, Try} -class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase with HistoryTestHelpers { +class ExtraIndexerSpecification extends HistoryTestHelpers { - type ID_LL = mutable.HashMap[ModifierId,(Long,Long)] - - override protected val saveLimit: Int = 1 // save every block - override protected implicit val segmentThreshold: Int = 8 // split to smaller segments - override protected implicit val addressEncoder: ErgoAddressEncoder = initSettings.chainSettings.addressEncoder + case class CreateDB() - val nodeSettings: NodeConfigurationSettings = NodeConfigurationSettings(StateType.Utxo, verifyTransactions = true, - -1, UtxoSettings(utxoBootstrap = false, 0, 2), NipopowSettings(nipopowBootstrap = false, 1), mining = false, - ChainGenerator.txCostLimit, ChainGenerator.txSizeLimit, useExternalMiner = false, internalMinersCount = 1, - internalMinerPollingInterval = 1.second, miningPubKeyHex = None, offlineGeneration = false, - 200, 5.minutes, 100000, 1.minute, mempoolSorting = SortingOption.FeePerByte, rebroadcastCount = 20, - 1000000, 100, adProofsSuffixLength = 112 * 1024, extraIndex = false) + type ID_LL = mutable.HashMap[ModifierId,(Long,Long)] val HEIGHT: Int = 50 val BRANCHPOINT: Int = HEIGHT / 2 + implicit val segmentThreshold: Int = 8 - def createDB(): Unit = { - val dir: File = createTempDir - dir.mkdirs() - - val fullHistorySettings: ErgoSettings = ErgoSettings(dir.getAbsolutePath, NetworkType.TestNet, initSettings.chainSettings, - nodeSettings, settings.scorexSettings, settings.walletSettings, settings.cacheSettings) + val system: ActorSystem = ActorSystem.create("indexer-test") + val indexer: ActorRef = system.actorOf(Props.create(classOf[ExtraIndexerTestActor], this)) - _history = ErgoHistory.readOrGenerate(fullHistorySettings)(null) - - ChainGenerator.generate(HEIGHT, dir)(_history) - - // reset all variables - indexedHeight = 0 - globalTxIndex = 0L - globalBoxIndex = 0L - lastWroteToDB = 0 - caughtUp = false - rollback = false - general.clear() - boxes.clear() - trees.clear() - tokens.clear() - segments.clear() - } + var _history: ErgoHistory = _ + def history: ErgoHistoryReader = _history.getReader def manualIndex(limit: Int): (ID_LL, // address -> (erg,tokenSum) ID_LL, // tokenId -> (boxesCount,_) @@ -83,8 +35,8 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w Int) = { // boxes indexed var txsIndexed = 0 var boxesIndexed = 0 - val addresses: ID_LL = mutable.HashMap[ModifierId,(Long,Long)]() - val indexedTokens: ID_LL = mutable.HashMap[ModifierId,(Long,Long)]() + val addresses: ID_LL = mutable.HashMap[ModifierId, (Long, Long)]() + val indexedTokens: ID_LL = mutable.HashMap[ModifierId, (Long, Long)]() cfor(1)(_ <= limit, _ + 1) { i => _history.getReader.bestBlockTransactionsAt(i).get.txs.foreach { tx => txsIndexed += 1 @@ -120,18 +72,18 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w segmentables.foreach { segmentable => history.typedExtraIndexById[T](segmentable._1) match { case Some(obj: T) => - if(isChild) { // this is a segment + if (isChild) { // this is a segment // check tx segments - val txSegments: ID_LL = mutable.HashMap.empty[ModifierId,(Long,Long)] + val txSegments: ID_LL = mutable.HashMap.empty[ModifierId, (Long, Long)] txSegments ++= (0 until obj.txSegmentCount).map(n => obj.idMod(txSegmentId(obj.parentId, n))).map(Tuple2(_, (0L, 0L))) checkSegmentables(txSegments, isChild = true, check) shouldBe 0 // check box segments - val boxSegments: ID_LL = mutable.HashMap.empty[ModifierId,(Long,Long)] + val boxSegments: ID_LL = mutable.HashMap.empty[ModifierId, (Long, Long)] boxSegments ++= (0 until obj.boxSegmentCount).map(n => obj.idMod(boxSegmentId(obj.parentId, n))).map(Tuple2(_, (0L, 0L))) checkSegmentables(boxSegments, isChild = true, check) shouldBe 0 - }else { // this is the parent object + } else { // this is the parent object // check properties of object - if(!check((obj, segmentable._2))) + if (!check((obj, segmentable._2))) errors += 1 // check boxes in memory obj.boxes.foreach { boxNum => @@ -170,9 +122,11 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w }) property("extra indexer transactions") { - createDB() - run() - cfor(0)(_ < globalTxIndex, _ + 1) {n => + indexer ! CreateDB() + indexer ! Index() + Thread.sleep(5000) + val state = IndexerState.fromHistory(_history) + cfor(0)(_ < state.globalTxIndex, _ + 1) { n => val id = history.typedExtraIndexById[NumericTxIndex](bytesToId(NumericTxIndex.indexToBytes(n))) id shouldNot be(empty) history.typedExtraIndexById[IndexedErgoTransaction](id.get.m) shouldNot be(empty) @@ -180,9 +134,11 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w } property("extra indexer boxes") { - createDB() - run() - cfor(0)(_ < globalBoxIndex, _ + 1) { n => + indexer ! CreateDB() + indexer ! Index() + Thread.sleep(5000) + val state = IndexerState.fromHistory(_history) + cfor(0)(_ < state.globalBoxIndex, _ + 1) { n => val id = history.typedExtraIndexById[NumericBoxIndex](bytesToId(NumericBoxIndex.indexToBytes(n))) id shouldNot be(empty) history.typedExtraIndexById[IndexedErgoBox](id.get.m) shouldNot be(empty) @@ -190,32 +146,37 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w } property("extra indexer addresses") { - createDB() - run() + indexer ! CreateDB() + indexer ! Index() + Thread.sleep(5000) val (addresses, _, _, _) = manualIndex(HEIGHT) checkAddresses(addresses) shouldBe 0 } property("extra indexer tokens") { - createDB() - run() + indexer ! CreateDB() + indexer ! Index() + Thread.sleep(5000) val (_, indexedTokens, _, _) = manualIndex(HEIGHT) checkTokens(indexedTokens) shouldBe 0 } property("extra indexer rollback") { - createDB() + indexer ! CreateDB() + indexer ! Index() + Thread.sleep(5000) + var state = IndexerState.fromHistory(_history) - run() - - val txIndexBefore = globalTxIndex - val boxIndexBefore = globalBoxIndex + val txIndexBefore = state.globalTxIndex + val boxIndexBefore = state.globalBoxIndex // manually count balances val (addresses, indexedTokens, txsIndexed, boxesIndexed) = manualIndex(BRANCHPOINT) // perform rollback - removeAfter(BRANCHPOINT) + indexer ! Rollback(history.bestHeaderIdAtHeight(BRANCHPOINT).get) + Thread.sleep(5000) + state = IndexerState.fromHistory(_history) // address balances checkAddresses(addresses) shouldBe 0 @@ -224,13 +185,13 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w checkTokens(indexedTokens) shouldBe 0 // check indexnumbers - globalTxIndex shouldBe txsIndexed - globalBoxIndex shouldBe boxesIndexed + state.globalTxIndex shouldBe txsIndexed + state.globalBoxIndex shouldBe boxesIndexed // check txs - cfor(0)(_ < txIndexBefore, _ + 1) {txNum => + cfor(0)(_ < txIndexBefore, _ + 1) { txNum => val txOpt = history.typedExtraIndexById[NumericTxIndex](bytesToId(NumericTxIndex.indexToBytes(txNum))) - if(txNum < globalTxIndex) + if (txNum < state.globalTxIndex) txOpt shouldNot be(empty) else txOpt shouldBe None @@ -239,7 +200,7 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w // check boxes cfor(0)(_ < boxIndexBefore, _ + 1) { boxNum => val boxOpt = history.typedExtraIndexById[NumericBoxIndex](bytesToId(NumericBoxIndex.indexToBytes(boxNum))) - if (boxNum < globalBoxIndex) + if (boxNum < state.globalBoxIndex) boxOpt shouldNot be(empty) else boxOpt shouldBe None @@ -247,7 +208,11 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w // ------------------------------------------------------------------- // restart indexer to catch up - run() + cfor(BRANCHPOINT)(_ <= HEIGHT, _ + 1) { i => + indexer ! FullBlockApplied(history.bestHeaderAtHeight(i).get) + } + Thread.sleep(5000) + state = IndexerState.fromHistory(_history) // Check addresses again val (addresses2, indexedTokens2, _, _) = manualIndex(HEIGHT) @@ -255,8 +220,8 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w checkTokens(indexedTokens2) shouldBe 0 // check indexnumbers again - globalTxIndex shouldBe txIndexBefore - globalBoxIndex shouldBe boxIndexBefore + state.globalTxIndex shouldBe txIndexBefore + state.globalBoxIndex shouldBe boxIndexBefore // check txs after caught up cfor(0)(_ < txIndexBefore, _ + 1) { txNum => @@ -267,187 +232,5 @@ class ExtraIndexerSpecification extends ErgoPropertyTest with ExtraIndexerBase w cfor(0)(_ < boxIndexBefore, _ + 1) { boxNum => history.typedExtraIndexById[NumericBoxIndex](bytesToId(NumericBoxIndex.indexToBytes(boxNum))) shouldNot be(empty) } - - } - -} - -object ChainGenerator extends ErgoTestHelpers { - - val pow: AutolykosPowScheme = new AutolykosPowScheme(powScheme.k, powScheme.n) - val blockInterval: FiniteDuration = 2.minute - val EmissionTxCost: Long = 20000 - val MinTxAmount: Long = 2000000 - val RewardDelay: Int = initSettings.chainSettings.monetary.minerRewardDelay - val MaxTxsPerBlock: Int = 10 - val minerPk: ProveDlog = defaultProver.hdKeys.head.publicImage - val selfAddressScript: Values.ErgoTree = P2PKAddress(minerPk).script - val minerProp: Values.ErgoTree = ErgoTreePredef.rewardOutputScript(RewardDelay, minerPk) - val votingEpochLength: Height = votingSettings.votingLength - val protocolVersion: Byte = initSettings.chainSettings.protocolVersion - val minimalSuffix = 2 - val txCostLimit: Height = initSettings.nodeSettings.maxTransactionCost - val txSizeLimit: Height = initSettings.nodeSettings.maxTransactionSize - - var startTime: Long = 0 - - def generate(length: Int, dir: File)(history: ErgoHistory): Unit = { - val stateDir = new File(s"${dir.getAbsolutePath}/state") - stateDir.mkdirs() - val (state, _) = ErgoState.generateGenesisUtxoState(stateDir, initSettings) - System.out.println(s"Going to generate a chain at ${dir.getAbsolutePath} starting from ${history.bestFullBlockOpt}") - startTime = System.currentTimeMillis() - (blockInterval * (length - 1)).toMillis - val chain = loop(state, None, None, Seq())(history) - System.out.println(s"Chain of length ${chain.length} generated") - history.bestHeaderOpt shouldBe history.bestFullBlockOpt.map(_.header) - history.bestFullBlockOpt.get.id shouldBe chain.last - System.out.println("History was generated successfully") } - - @tailrec - private def loop(state: UtxoState, - initBox: Option[ErgoBox], - last: Option[Header], - acc: Seq[ModifierId])(history: ErgoHistory): Seq[ModifierId] = { - val time: Long = last.map(_.timestamp + blockInterval.toMillis).getOrElse(startTime) - if (time < System.currentTimeMillis()) { - val (txs, lastOut) = genTransactions(last.map(_.height).getOrElse(GenesisHeight), - initBox, state.stateContext) - - val candidate = genCandidate(defaultProver.hdPubKeys.head.key, last, time, txs, state)(history) - val block = proveCandidate(candidate.get) - - history.append(block.header).get - block.blockSections.foreach(s => if (!history.contains(s)) history.append(s).get) - - val outToPassNext = if (last.isEmpty) { - block.transactions.flatMap(_.outputs).find(_.ergoTree == minerProp) - } else { - lastOut - } - - assert(outToPassNext.isDefined) - - log.info( - s"Block ${block.id} with ${block.transactions.size} transactions at height ${block.header.height} generated") - - loop(state.applyModifier(block, None)(_ => ()).get, outToPassNext, Some(block.header), acc :+ block.id)(history) - } else { - acc - } - } - - private def moveTokens(inOpt: Option[ErgoBox], cond: Boolean): Coll[(TokenId, Long)] = { - val tokens: ArrayBuffer[(TokenId, Long)] = ArrayBuffer.empty[(TokenId, Long)] - inOpt match { - case Some(input) if cond => - tokens += Tuple2(input.id.toTokenId, math.abs(Random.nextInt())) - case Some(tokenBox) if !cond => - tokenBox.additionalTokens.toArray.foreach(tokens += _) - case _ => - } - Colls.fromArray(tokens.toArray) - } - - private def genTransactions(height: Height, - inOpt: Option[ErgoBox], - ctx: ErgoStateContext): (Seq[ErgoTransaction], Option[ErgoBox]) = { - inOpt - .find { bx => - val canUnlock = (bx.creationHeight + RewardDelay <= height) || (bx.ergoTree != minerProp) - canUnlock && bx.ergoTree != initSettings.chainSettings.monetary.emissionBoxProposition && bx.value >= MinTxAmount - } - .map { input => - val qty = MaxTxsPerBlock - val amount = input.value - val outs = (0 until qty).map(i => new ErgoBoxCandidate(amount, selfAddressScript, height, moveTokens(inOpt, i == 0))) - var i = 0 - val x = outs - .foldLeft((Seq.empty[ErgoTransaction], input)) { case ((acc, in), out) => - val inputs = IndexedSeq(in) - val newOut = - if (i > 0) - new ErgoBoxCandidate(amount, selfAddressScript, height, moveTokens(acc.lastOption.map(_.outputs.head), cond = false)) - else - out - val unsignedTx = UnsignedErgoTransaction(inputs.map(box => new UnsignedInput(box.id)), IndexedSeq(newOut)) - i += 1 - defaultProver.sign(unsignedTx, inputs, emptyDataBoxes, ctx) - .fold(_ => acc -> in, tx => (acc :+ ErgoTransaction(tx)) -> unsignedTx.outputs.head) - } - ._1 - (x, Some(x.last.outputs.head)) - } - .getOrElse(Seq.empty -> inOpt) - } - - private def genCandidate(minerPk: ProveDlog, - lastHeaderOpt: Option[Header], - ts: Long, - txsFromPool: Seq[ErgoTransaction], - state: UtxoStateReader)(history: ErgoHistory): Try[CandidateBlock] = Try { - val stateContext = state.stateContext - val nBits: Long = lastHeaderOpt - .map(parent => history.requiredDifficultyAfter(parent)) - .map(d => DifficultySerializer.encodeCompactBits(d)) - .getOrElse(settings.chainSettings.initialNBits) - - val interlinks = lastHeaderOpt - .flatMap { h => - history.typedModifierById[Extension](h.extensionId) - .flatMap(ext => NipopowAlgos.unpackInterlinks(ext.fields).toOption) - .map(nipopowAlgos.updateInterlinks(h, _)) - } - .getOrElse(Seq.empty) - val interlinksExtension = nipopowAlgos.interlinksToExtension(interlinks) - - val (extensionCandidate, votes: Array[Byte], version: Byte) = lastHeaderOpt.map { header => - val newHeight = header.height + 1 - val currentParams = stateContext.currentParameters - val betterVersion = protocolVersion > header.version - val votingFinishHeight: Option[Height] = currentParams.softForkStartingHeight - .map(_ + votingSettings.votingLength * votingSettings.softForkEpochs) - val forkVotingAllowed = votingFinishHeight.forall(fh => newHeight < fh) - val forkOrdered = settings.votingTargets.softFork != 0 - val voteForFork = betterVersion && forkOrdered && forkVotingAllowed - - if (newHeight % votingEpochLength == 0 && newHeight > 0) { - val (newParams, _) = currentParams.update(newHeight, voteForFork, stateContext.votingData.epochVotes, emptyVSUpdate, votingSettings) - (newParams.toExtensionCandidate ++ interlinksExtension, - newParams.suggestVotes(settings.votingTargets.targets, voteForFork), - newParams.blockVersion) - } else { - (nipopowAlgos.interlinksToExtension(interlinks), - currentParams.vote(settings.votingTargets.targets, stateContext.votingData.epochVotes, voteForFork), - currentParams.blockVersion) - } - }.getOrElse((interlinksExtension, Array(0: Byte, 0: Byte, 0: Byte), Header.InitialVersion)) - - val emissionTxOpt = CandidateGenerator.collectEmission(state, minerPk, emptyStateContext) - val txs = emissionTxOpt.toSeq ++ txsFromPool - - state.proofsForTransactions(txs).map { case (adProof, adDigest) => - CandidateBlock(lastHeaderOpt, version, nBits, adDigest, adProof, txs, ts, extensionCandidate, votes) - } - }.flatten - - @tailrec - private def proveCandidate(candidate: CandidateBlock): ErgoFullBlock = { - log.info(s"Trying to prove block with parent ${candidate.parentOpt.map(_.encodedId)} and timestamp ${candidate.timestamp}") - - pow.proveCandidate(candidate, defaultProver.hdKeys.head.privateInput.w) match { - case Some(fb) => fb - case _ => - val interlinks = candidate.parentOpt - .map(nipopowAlgos.updateInterlinks(_, NipopowAlgos.unpackInterlinks(candidate.extension.fields).get)) - .getOrElse(Seq.empty) - val minerTag = scorex.utils.Random.randomBytes(Extension.FieldKeySize) - proveCandidate { - candidate.copy( - extension = ExtensionCandidate(Seq(Array(0: Byte, 2: Byte) -> minerTag)) ++ nipopowAlgos.interlinksToExtension(interlinks) - ) - } - } - } - } diff --git a/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerTestActor.scala b/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerTestActor.scala new file mode 100644 index 0000000000..e32c3c647f --- /dev/null +++ b/src/test/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexerTestActor.scala @@ -0,0 +1,55 @@ +package org.ergoplatform.nodeView.history.extra + +import org.ergoplatform._ +import org.ergoplatform.nodeView.history.ErgoHistory +import org.ergoplatform.nodeView.mempool.ErgoMemPoolUtils.SortingOption +import org.ergoplatform.nodeView.state._ +import org.ergoplatform.settings._ +import org.ergoplatform.wallet.utils.FileUtils +import scorex.util.ModifierId + +import java.io.File +import scala.collection.mutable +import scala.concurrent.duration.DurationInt + +class ExtraIndexerTestActor(test: ExtraIndexerSpecification) extends ExtraIndexerBase with FileUtils { + + override def receive: Receive = { + case test.CreateDB() => createDB() + } + + type ID_LL = mutable.HashMap[ModifierId,(Long,Long)] + + override protected val saveLimit: Int = 1 // save every block + override protected implicit val segmentThreshold: Int = 8 // split to smaller segments + override protected implicit val addressEncoder: ErgoAddressEncoder = test.initSettings.chainSettings.addressEncoder + + val nodeSettings: NodeConfigurationSettings = NodeConfigurationSettings(StateType.Utxo, verifyTransactions = true, + -1, UtxoSettings(utxoBootstrap = false, 0, 2), NipopowSettings(nipopowBootstrap = false, 1), mining = false, + ChainGenerator.txCostLimit, ChainGenerator.txSizeLimit, useExternalMiner = false, internalMinersCount = 1, + internalMinerPollingInterval = 1.second, miningPubKeyHex = None, offlineGeneration = false, + 200, 5.minutes, 100000, 1.minute, mempoolSorting = SortingOption.FeePerByte, rebroadcastCount = 20, + 1000000, 100, adProofsSuffixLength = 112 * 1024, extraIndex = false) + + def createDB(): Unit = { + val dir: File = createTempDir + dir.mkdirs() + + val fullHistorySettings: ErgoSettings = ErgoSettings(dir.getAbsolutePath, NetworkType.TestNet, test.initSettings.chainSettings, + nodeSettings, test.initSettings.scorexSettings, test.initSettings.walletSettings, test.initSettings.cacheSettings) + + _history = ErgoHistory.readOrGenerate(fullHistorySettings)(null) + + ChainGenerator.generate(test.HEIGHT, dir)(_history) + test._history = _history + + // reset all variables + general.clear() + boxes.clear() + trees.clear() + tokens.clear() + segments.clear() + context.become(receive.orElse(loaded(IndexerState(0, 0, 0, 0, caughtUp = false)))) + } + +}