Skip to content

Commit

Permalink
Merge pull request #1826 from ergoplatform/v4.0.43
Browse files Browse the repository at this point in the history
Candidate for 4.0.43
  • Loading branch information
kushti authored Sep 17, 2022
2 parents 3ef580f + b68c4da commit d5cb8e1
Show file tree
Hide file tree
Showing 33 changed files with 488 additions and 442 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ To run specific Ergo version `<VERSION>` as a service with custom config `/path/
-e MAX_HEAP=3G \
ergoplatform/ergo:<VERSION> --<networkId> -c /etc/myergo.conf

Available versions can be found on [Ergo Docker image page](https://hub.docker.com/r/ergoplatform/ergo/tags), for example, `v4.0.42`.
Available versions can be found on [Ergo Docker image page](https://hub.docker.com/r/ergoplatform/ergo/tags), for example, `v4.0.43`.

This will connect to the Ergo mainnet or testnet following your configuration passed in `myergo.conf` and network flag `--<networkId>`. Every default config value would be overwritten with corresponding value in `myergo.conf`. `MAX_HEAP` variable can be used to control how much memory can the node consume.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package org.ergoplatform.bench
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import org.ergoplatform.Utils
import org.ergoplatform.Utils.BenchReport
import org.ergoplatform.modifiers.ErgoFullBlock
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.SemanticallySuccessfulModifier
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.FullBlockApplied
import scorex.util.ScorexLogging

import scala.concurrent.ExecutionContext
Expand All @@ -22,7 +21,7 @@ class BenchActor(threshold: Int) extends Actor with ScorexLogging {
val timeout: FiniteDuration = 2 hours

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[SemanticallySuccessfulModifier])
context.system.eventStream.subscribe(self, classOf[FullBlockApplied])
context.system.scheduler.scheduleOnce(timeout, self, BenchActor.Timeout)
()
}
Expand All @@ -31,7 +30,7 @@ class BenchActor(threshold: Int) extends Actor with ScorexLogging {
case BenchActor.Start =>
start = System.nanoTime()
log.info(s"Starting bench..")
case SemanticallySuccessfulModifier(_: ErgoFullBlock) =>
case FullBlockApplied(_) =>
self ! BenchActor.Inc
case BenchActor.Inc =>
counter += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ package org.ergoplatform.bench

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import org.ergoplatform.bench.misc.CrawlerConfig
import org.ergoplatform.modifiers.ErgoFullBlock
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.SemanticallySuccessfulModifier
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.FullBlockApplied
import scorex.util.ScorexLogging

class CrawlerActor(c: CrawlerConfig) extends Actor with ScorexLogging {

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[SemanticallySuccessfulModifier])
context.system.eventStream.subscribe(self, classOf[FullBlockApplied])
()
}

override def receive: Receive = {
case SemanticallySuccessfulModifier(mod: ErgoFullBlock) =>
val height = mod.header.height
case FullBlockApplied(header) =>
val height = header.height
if (height % 100 == 0) logger.info(s"Got $height modifiers")
if (mod.header.height >= c.threshold) {
if (header.height >= c.threshold) {
log.error("Got enough modifiers.")
log.warn("Exiting benchmark..")
System.exit(0)
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ libraryDependencies ++= Seq(
"com.iheart" %% "ficus" % "1.4.7",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.google.guava" % "guava" % "21.0",
"com.github.ben-manes.caffeine" % "caffeine" % "2.9.3", // use 3.x only for java 11+
"com.github.scopt" %% "scopt" % "4.0.1",

"org.scala-lang.modules" %% "scala-async" % "0.9.7" % "test",
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/api/openapi.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
openapi: "3.0.2"

info:
version: "4.0.42"
version: "4.0.43"
title: Ergo Node API
description: API docs for Ergo Node. Models are shared between all Ergo products
contact:
Expand Down
15 changes: 13 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ akka {

scorex {

# Execution context for all tasks, except of akka actors.
# Execution context used in tests
executionContext {
type = Dispatcher
executor = "thread-pool-executor"
Expand Down Expand Up @@ -400,7 +400,7 @@ scorex {
nodeName = "ergo-node"

# Network protocol version to be sent in handshakes
appVersion = 4.0.42
appVersion = 4.0.43

# Network agent name. May contain information about client code
# stack, starting from core code-base up to the end graphical interface.
Expand Down Expand Up @@ -553,9 +553,20 @@ scorex {
# server answer timeout
timeout = 30s
}
}


# dispatcher which is used for block candidate generator and NodeViewHolder actors only
critical-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 2
}
throughput = 1
}

# dispatcher for some API-related actors
api-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/mainnet.conf
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ scorex {
network {
magicBytes = [1, 0, 2, 4]
bindAddress = "0.0.0.0:9030"
nodeName = "ergo-mainnet-4.0.42"
nodeName = "ergo-mainnet-4.0.43"
nodeName = ${?NODENAME}
knownPeers = [
"213.239.193.208:9030",
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/org/ergoplatform/ErgoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,10 @@ object ErgoApp extends ScorexLogging {
(implicit system: ActorSystem): Future[Done] =
CoordinatedShutdown(system).run(reason)

def main(args: Array[String]): Unit =
def main(args: Array[String]): Unit = {
argParser.parse(args, Args()).foreach { argsParsed =>
new ErgoApp(argsParsed).run()
}
}

}
12 changes: 4 additions & 8 deletions src/main/scala/org/ergoplatform/local/CleanupWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
import org.ergoplatform.nodeView.state.UtxoStateReader
import org.ergoplatform.settings.NodeConfigurationSettings
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.{EliminateTransactions, RecheckedTransactions}
import scorex.core.transaction.state.TransactionValidation
import scorex.util.{ModifierId, ScorexLogging}

import scala.annotation.tailrec
Expand Down Expand Up @@ -41,7 +40,7 @@ class CleanupWorker(nodeViewHolderRef: ActorRef,
case a: Any => log.warn(s"Strange input: $a")
}

private def runCleanup(validator: TransactionValidation,
private def runCleanup(validator: UtxoStateReader,
mempool: ErgoMemPoolReader): Unit = {
val (validated, toEliminate) = validatePool(validator, mempool)

Expand All @@ -61,7 +60,7 @@ class CleanupWorker(nodeViewHolderRef: ActorRef,
*
* @return - updated valid transactions and invalidated transaction ids
*/
private def validatePool(validator: TransactionValidation,
private def validatePool(validator: UtxoStateReader,
mempool: ErgoMemPoolReader): (Seq[UnconfirmedTransaction], Seq[ModifierId]) = {

val now = System.currentTimeMillis()
Expand All @@ -75,10 +74,7 @@ class CleanupWorker(nodeViewHolderRef: ActorRef,

// Take into account other transactions from the pool.
// This provides possibility to validate transactions which are spending off-chain outputs.
val state = validator match {
case u: UtxoStateReader => u.withUnconfirmedTransactions(allPoolTxs)
case _ => validator
}
val state = validator.withUnconfirmedTransactions(allPoolTxs)

//internal loop function validating transactions, returns validated and invalidated transaction ids
@tailrec
Expand Down Expand Up @@ -127,6 +123,6 @@ object CleanupWorker {
* @param validator - a state implementation which provides transaction validation
* @param mempool - mempool reader instance
*/
case class RunCleanup(validator: TransactionValidation, mempool: ErgoMemPoolReader)
case class RunCleanup(validator: UtxoStateReader, mempool: ErgoMemPoolReader)

}
10 changes: 4 additions & 6 deletions src/main/scala/org/ergoplatform/local/ErgoStatsCollector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ErgoStatsCollector(readersHolder: ActorRef,
context.system.eventStream.subscribe(self, classOf[ChangedHistory])
context.system.eventStream.subscribe(self, classOf[ChangedState])
context.system.eventStream.subscribe(self, classOf[ChangedMempool])
context.system.eventStream.subscribe(self, classOf[SemanticallySuccessfulModifier])
context.system.eventStream.subscribe(self, classOf[FullBlockApplied])
context.system.scheduler.scheduleAtFixedRate(10.seconds, 20.seconds, networkController, GetConnectedPeers)(ec, self)
context.system.scheduler.scheduleAtFixedRate(45.seconds, 30.seconds, networkController, GetPeersStatus)(ec, self)
}
Expand Down Expand Up @@ -143,12 +143,10 @@ class ErgoStatsCollector(readersHolder: ActorRef,
}

def onSemanticallySuccessfulModification: Receive = {
case SemanticallySuccessfulModifier(fb: ErgoFullBlock) =>
case FullBlockApplied(header) =>
nodeInfo = nodeInfo.copy(
stateRoot = Some(Algos.encode(fb.header.stateRoot)),
stateVersion = Some(fb.encodedId))
case SemanticallySuccessfulModifier(_) =>
// Ignore other modifiers
stateRoot = Some(Algos.encode(header.stateRoot)),
stateVersion = Some(header.encodedId))
}

}
Expand Down
34 changes: 9 additions & 25 deletions src/main/scala/org/ergoplatform/local/MempoolAuditor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@ import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorRef, ActorRefFactory, DeathPactException, OneForOneStrategy, Props}
import org.ergoplatform.local.CleanupWorker.RunCleanup
import org.ergoplatform.local.MempoolAuditor.CleanupDone
import org.ergoplatform.modifiers.ErgoFullBlock
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.UnconfirmedTransaction
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
import org.ergoplatform.settings.ErgoSettings
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.GetNodeViewChanges
import scorex.core.network.Broadcast
import scorex.core.network.NetworkController.ReceivableMessages.SendToNetwork
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.{ChangedMempool, ChangedState, SemanticallySuccessfulModifier}
import org.ergoplatform.nodeView.state.UtxoStateReader
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.RecheckMempool
import org.ergoplatform.nodeView.state.{ErgoStateReader, UtxoStateReader}
import scorex.core.network.message.{InvData, InvSpec, Message}
import scorex.core.transaction.Transaction
import scorex.core.transaction.state.TransactionValidation
import scorex.util.ScorexLogging

import scala.concurrent.duration._
Expand Down Expand Up @@ -53,49 +49,37 @@ class MempoolAuditor(nodeViewHolderRef: ActorRef,
Restart
}

private var stateReaderOpt: Option[TransactionValidation] = None
private var poolReaderOpt: Option[ErgoMemPoolReader] = None
private var stateReaderOpt: Option[ErgoStateReader] = None

private val worker: ActorRef =
context.actorOf(Props(new CleanupWorker(nodeViewHolderRef, settings.nodeSettings)))

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[SemanticallySuccessfulModifier])
context.system.eventStream.subscribe(self, classOf[RecheckMempool])
}

override def receive: Receive = awaiting

private def awaiting: Receive = {
case SemanticallySuccessfulModifier(_: ErgoFullBlock) | SemanticallySuccessfulModifier(_: Header) =>
stateReaderOpt = None
poolReaderOpt = None
nodeViewHolderRef ! GetNodeViewChanges(history = false, state = true, mempool = true, vault = false)

case ChangedMempool(mp: ErgoMemPoolReader) =>
poolReaderOpt = Some(mp)
stateReaderOpt.foreach(st => initiateCleanup(st, mp))

case ChangedState(st: TransactionValidation) =>
case RecheckMempool(st: UtxoStateReader, mp: ErgoMemPoolReader) =>
stateReaderOpt = Some(st)
poolReaderOpt.foreach(mp => initiateCleanup(st, mp))

case ChangedState(_) | ChangedMempool(_) => // do nothing
poolReaderOpt = Some(mp)
initiateCleanup(st, mp)
}

private def working: Receive = {
case CleanupDone =>
log.info("Cleanup done. Switching to awaiting mode")
//rebroadcast transactions
rebroadcastTransactions()
stateReaderOpt = None
poolReaderOpt = None
context become awaiting

case _ => // ignore other triggers until work is done
}

private def initiateCleanup(validator: TransactionValidation, mempool: ErgoMemPoolReader): Unit = {
log.info("Initiating cleanup. Switching to working mode")
private def initiateCleanup(validator: UtxoStateReader, mempool: ErgoMemPoolReader): Unit = {
log.info("Initiating mempool cleanup")
worker ! RunCleanup(validator, mempool)
context become working // ignore other triggers until work is done
}
Expand Down
28 changes: 13 additions & 15 deletions src/main/scala/org/ergoplatform/mining/CandidateGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.ergoplatform.modifiers.history.header.{Header, HeaderWithoutPow}
import org.ergoplatform.modifiers.history.popow.NipopowAlgos
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages
import ReceivableMessages.{ChangedHistory, ChangedMempool, ChangedState, NodeViewChange, SemanticallySuccessfulModifier}
import ReceivableMessages.{ChangedHistory, ChangedMempool, ChangedState, NodeViewChange, FullBlockApplied}
import org.ergoplatform.nodeView.ErgoReadersHolder.{GetReaders, Readers}
import org.ergoplatform.nodeView.history.ErgoHistory.Height
import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoHistoryReader}
Expand Down Expand Up @@ -99,7 +99,7 @@ class CandidateGenerator(
)
self ! GenerateCandidate(txsToInclude = Seq.empty, reply = false)
context.system.eventStream
.subscribe(self, classOf[SemanticallySuccessfulModifier])
.subscribe(self, classOf[FullBlockApplied])
context.system.eventStream.subscribe(self, classOf[NodeViewChange])
case Readers(_, _, _, _) =>
log.error("Invalid readers state, mining is possible in UTXO mode only")
Expand All @@ -123,12 +123,12 @@ class CandidateGenerator(
* When new block is applied, either one mined by us or received from peers isn't equal to our candidate's parent,
* we need to generate new candidate and possibly also discard existing solution if it is also behind
*/
case SemanticallySuccessfulModifier(mod: ErgoFullBlock) =>
case FullBlockApplied(header) =>
log.info(
s"Preparing new candidate on getting new block at ${mod.height}"
s"Preparing new candidate on getting new block at ${header.height}"
)
if (needNewCandidate(state.cache, mod)) {
if (needNewSolution(state.solvedBlock, mod))
if (needNewCandidate(state.cache, header)) {
if (needNewSolution(state.solvedBlock, header.id))
context.become(initialized(state.copy(cache = None, solvedBlock = None)))
else
context.become(initialized(state.copy(cache = None)))
Expand All @@ -137,9 +137,6 @@ class CandidateGenerator(
context.become(initialized(state))
}

case SemanticallySuccessfulModifier(_) =>
// Just ignore all other modifiers.

case gen @ GenerateCandidate(txsToInclude, reply) =>
val senderOpt = if (reply) Some(sender()) else None
if (cachedFor(state.cache, txsToInclude)) {
Expand Down Expand Up @@ -269,7 +266,7 @@ object CandidateGenerator extends ScorexLogging {
timeProvider,
ergoSettings
)
),
).withDispatcher("critical-dispatcher"),
s"CandidateGenerator-${Random.alphanumeric.take(5).mkString}"
)

Expand All @@ -288,18 +285,19 @@ object CandidateGenerator extends ScorexLogging {
/** we need new candidate if given block is not parent of our cached block */
def needNewCandidate(
cache: Option[Candidate],
bestFullBlock: ErgoFullBlock
bestFullBlockHeader: Header
): Boolean = {
val parentHeaderIdOpt = cache.map(_.candidateBlock).flatMap(_.parentOpt).map(_.id)
!parentHeaderIdOpt.contains(bestFullBlock.header.id)
!parentHeaderIdOpt.contains(bestFullBlockHeader.id)
}

/** Solution is valid only if bestFullBlock on the chain is its parent */
def needNewSolution(
solvedBlock: Option[ErgoFullBlock],
bestFullBlock: ErgoFullBlock
): Boolean =
solvedBlock.nonEmpty && !solvedBlock.map(_.parentId).contains(bestFullBlock.id)
bestFullBlockId: ModifierId
): Boolean = {
solvedBlock.nonEmpty && !solvedBlock.map(_.parentId).contains(bestFullBlockId)
}

/** Calculate average mining time from latest block header timestamps */
def getBlockMiningTimeAvg(
Expand Down
Loading

0 comments on commit d5cb8e1

Please sign in to comment.