diff --git a/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala b/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala index b63b390c7d..260fa09f1f 100644 --- a/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala +++ b/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala @@ -483,18 +483,13 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, modifiersByStatus.getOrElse(Requested, Map.empty) } - protected def sendSnapshotsInfo(peer: ConnectedPeer): Unit = { - utxoStateReaderOpt match { - case Some(reader) => { - reader.getSnapshotInfo() match { - case Some(snapInfo) => { - val msg = Message(SnapshotsInfoSpec, Right(snapInfo), None) - networkControllerRef ! SendToNetwork(msg, SendToPeer(peer)) - } - case _ => log.warn(s"No snapshots avaialble") - } + protected def sendSnapshotsInfo(usr: UtxoStateReader, peer: ConnectedPeer): Unit = { + usr.getSnapshotInfo() match { + case Some(snapInfo) => { + val msg = Message(SnapshotsInfoSpec, Right(snapInfo), None) + networkControllerRef ! SendToNetwork(msg, SendToPeer(peer)) } - case _ => log.warn(s"Got data from peer while readers are not ready") + case _ => log.warn(s"No snapshots avaialble") } } @@ -674,7 +669,10 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, case Message(spec, Left(msgBytes), Some(source)) => parseAndHandle(msgHandlers, spec, msgBytes, source) } - protected def viewHolderEvents(historyReader: ErgoHistory, mempoolReader: ErgoMemPool, blockAppliedTxsCache: FixedSizeBloomFilterQueue): Receive = { + protected def viewHolderEvents(historyReader: ErgoHistory, + mempoolReader: ErgoMemPool, + utxoStateReaderOpt: Option[UtxoStateReader], + blockAppliedTxsCache: FixedSizeBloomFilterQueue): Receive = { // Requests BlockSections with `Unknown` status that are defined by block headers but not downloaded yet. // Trying to keep size of requested queue equals to `desiredSizeOfExpectingQueue`. @@ -711,14 +709,15 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, deliveryTracker.setInvalid(mod.id, mod.modifierTypeId).foreach(penalizeMisbehavingPeer) case ChangedHistory(newHistoryReader: ErgoHistory) => - context.become(initialized(newHistoryReader, mempoolReader, blockAppliedTxsCache)) + context.become(initialized(newHistoryReader, mempoolReader, utxoStateReaderOpt, blockAppliedTxsCache)) case ChangedMempool(newMempoolReader: ErgoMemPool) => - context.become(initialized(historyReader, newMempoolReader, blockAppliedTxsCache)) + context.become(initialized(historyReader, newMempoolReader, utxoStateReaderOpt, blockAppliedTxsCache)) case ChangedState(reader: ErgoStateReader) => reader match { - case utxoStateReader: UtxoStateReader => utxoStateReaderOpt = Some(utxoStateReader) + case utxoStateReader: UtxoStateReader => + context.become(initialized(historyReader, mempoolReader, Some(utxoStateReader), blockAppliedTxsCache)) case _ => } @@ -731,12 +730,13 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, case BlockAppliedTransactions(transactionIds: Seq[ModifierId]) => // We collect applied TXs to history in order to avoid banning peers that sent these afterwards logger.info("Caching applied transactions") - context.become(initialized(historyReader, mempoolReader, blockAppliedTxsCache.putAll(transactionIds))) + context.become(initialized(historyReader, mempoolReader, utxoStateReaderOpt, blockAppliedTxsCache.putAll(transactionIds))) } /** get handlers of messages coming from peers */ private def msgHandlers(hr: ErgoHistory, mp: ErgoMemPool, + usrOpt: Option[UtxoStateReader], blockAppliedTxsCache: FixedSizeBloomFilterQueue ): PartialFunction[(MessageSpec[_], _, ConnectedPeer), Unit] = { case (_: ErgoSyncInfoMessageSpec.type @unchecked, data: ErgoSyncInfo @unchecked, remote) => @@ -747,14 +747,22 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, modifiersReq(hr, mp, data, remote) case (_: ModifiersSpec, data: ModifiersData, remote) => modifiersFromRemote(hr, data, remote, blockAppliedTxsCache) + case (spec: MessageSpec[_], _, remote) if spec.messageCode == GetSnapshotsInfoSpec.messageCode => + usrOpt match { + case Some(usr) => sendSnapshotsInfo(usr, remote) + case None => log.warn(s"Asked for snapshot when UTXO set is not supported, remote: $remote") + } } - def initialized(hr: ErgoHistory, mp: ErgoMemPool, blockAppliedTxsCache: FixedSizeBloomFilterQueue): PartialFunction[Any, Unit] = { - processDataFromPeer(msgHandlers(hr, mp, blockAppliedTxsCache)) orElse + def initialized(hr: ErgoHistory, + mp: ErgoMemPool, + usr: Option[UtxoStateReader], + blockAppliedTxsCache: FixedSizeBloomFilterQueue): PartialFunction[Any, Unit] = { + processDataFromPeer(msgHandlers(hr, mp, usr, blockAppliedTxsCache)) orElse onDownloadRequest(hr) orElse getLocalSyncInfo(hr) orElse responseFromLocal orElse - viewHolderEvents(hr, mp, blockAppliedTxsCache) orElse + viewHolderEvents(hr, mp, usr, blockAppliedTxsCache) orElse peerManagerEvents orElse checkDelivery orElse { case a: Any => log.error("Strange input: " + a) @@ -762,27 +770,37 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, } /** Wait until both historyReader and mempoolReader instances are received so actor can be operational */ - def initializing(hr: Option[ErgoHistory], mp: Option[ErgoMemPool], blockAppliedTxsCache: FixedSizeBloomFilterQueue): PartialFunction[Any, Unit] = { + def initializing(hr: Option[ErgoHistory], + mp: Option[ErgoMemPool], + usr: Option[UtxoStateReader], + blockAppliedTxsCache: FixedSizeBloomFilterQueue): PartialFunction[Any, Unit] = { case ChangedHistory(historyReader: ErgoHistory) => mp match { case Some(mempoolReader) => - context.become(initialized(historyReader, mempoolReader, blockAppliedTxsCache)) + context.become(initialized(historyReader, mempoolReader, usr, blockAppliedTxsCache)) case _ => - context.become(initializing(Option(historyReader), mp, blockAppliedTxsCache)) + context.become(initializing(Option(historyReader), mp, usr, blockAppliedTxsCache)) } case ChangedMempool(mempoolReader: ErgoMemPool) => hr match { case Some(historyReader) => - context.become(initialized(historyReader, mempoolReader, blockAppliedTxsCache)) + context.become(initialized(historyReader, mempoolReader, usr, blockAppliedTxsCache)) + case _ => + context.become(initializing(hr, Option(mempoolReader), usr, blockAppliedTxsCache)) + } + case ChangedState(reader: ErgoStateReader) => + reader match { + case utxoStateReader: UtxoStateReader => + context.become(initializing(hr, mp, Some(utxoStateReader), blockAppliedTxsCache)) case _ => - context.become(initializing(hr, Option(mempoolReader), blockAppliedTxsCache)) + context.become(initializing(hr, mp, None, blockAppliedTxsCache)) } case msg => // Actor not initialized yet, scheduling message until it is context.system.scheduler.scheduleOnce(1.second, self, msg) } - override def receive: Receive = initializing(None, None, FixedSizeBloomFilterQueue.empty(bloomFilterQueueSize = 5)) + override def receive: Receive = initializing(None, None, None, FixedSizeBloomFilterQueue.empty(bloomFilterQueueSize = 5)) } diff --git a/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala b/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala index ce79bd2efb..3a27bf461b 100644 --- a/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala +++ b/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala @@ -237,7 +237,7 @@ trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec val s = stateGen.sample.get - if(s.isInstanceOf[UtxoStateReader]) { + if (s.isInstanceOf[UtxoStateReader]) { // To initialize utxoStateReaderOpt in ErgoNodeView Synchronizer node ! ChangedState(s)