Skip to content

Commit

Permalink
fixing getsnapshot processing
Browse files Browse the repository at this point in the history
  • Loading branch information
kushti committed Dec 30, 2021
1 parent 58ec522 commit a77205f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,18 +483,13 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
modifiersByStatus.getOrElse(Requested, Map.empty)
}

protected def sendSnapshotsInfo(peer: ConnectedPeer): Unit = {
utxoStateReaderOpt match {
case Some(reader) => {
reader.getSnapshotInfo() match {
case Some(snapInfo) => {
val msg = Message(SnapshotsInfoSpec, Right(snapInfo), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(peer))
}
case _ => log.warn(s"No snapshots avaialble")
}
protected def sendSnapshotsInfo(usr: UtxoStateReader, peer: ConnectedPeer): Unit = {
usr.getSnapshotInfo() match {
case Some(snapInfo) => {
val msg = Message(SnapshotsInfoSpec, Right(snapInfo), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(peer))
}
case _ => log.warn(s"Got data from peer while readers are not ready")
case _ => log.warn(s"No snapshots avaialble")
}
}

Expand Down Expand Up @@ -674,7 +669,10 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
case Message(spec, Left(msgBytes), Some(source)) => parseAndHandle(msgHandlers, spec, msgBytes, source)
}

protected def viewHolderEvents(historyReader: ErgoHistory, mempoolReader: ErgoMemPool, blockAppliedTxsCache: FixedSizeBloomFilterQueue): Receive = {
protected def viewHolderEvents(historyReader: ErgoHistory,
mempoolReader: ErgoMemPool,
utxoStateReaderOpt: Option[UtxoStateReader],
blockAppliedTxsCache: FixedSizeBloomFilterQueue): Receive = {
// Requests BlockSections with `Unknown` status that are defined by block headers but not downloaded yet.
// Trying to keep size of requested queue equals to `desiredSizeOfExpectingQueue`.

Expand Down Expand Up @@ -711,14 +709,15 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
deliveryTracker.setInvalid(mod.id, mod.modifierTypeId).foreach(penalizeMisbehavingPeer)

case ChangedHistory(newHistoryReader: ErgoHistory) =>
context.become(initialized(newHistoryReader, mempoolReader, blockAppliedTxsCache))
context.become(initialized(newHistoryReader, mempoolReader, utxoStateReaderOpt, blockAppliedTxsCache))

case ChangedMempool(newMempoolReader: ErgoMemPool) =>
context.become(initialized(historyReader, newMempoolReader, blockAppliedTxsCache))
context.become(initialized(historyReader, newMempoolReader, utxoStateReaderOpt, blockAppliedTxsCache))

case ChangedState(reader: ErgoStateReader) =>
reader match {
case utxoStateReader: UtxoStateReader => utxoStateReaderOpt = Some(utxoStateReader)
case utxoStateReader: UtxoStateReader =>
context.become(initialized(historyReader, mempoolReader, Some(utxoStateReader), blockAppliedTxsCache))
case _ =>
}

Expand All @@ -731,12 +730,13 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
case BlockAppliedTransactions(transactionIds: Seq[ModifierId]) =>
// We collect applied TXs to history in order to avoid banning peers that sent these afterwards
logger.info("Caching applied transactions")
context.become(initialized(historyReader, mempoolReader, blockAppliedTxsCache.putAll(transactionIds)))
context.become(initialized(historyReader, mempoolReader, utxoStateReaderOpt, blockAppliedTxsCache.putAll(transactionIds)))
}

/** get handlers of messages coming from peers */
private def msgHandlers(hr: ErgoHistory,
mp: ErgoMemPool,
usrOpt: Option[UtxoStateReader],
blockAppliedTxsCache: FixedSizeBloomFilterQueue
): PartialFunction[(MessageSpec[_], _, ConnectedPeer), Unit] = {
case (_: ErgoSyncInfoMessageSpec.type @unchecked, data: ErgoSyncInfo @unchecked, remote) =>
Expand All @@ -747,42 +747,60 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef,
modifiersReq(hr, mp, data, remote)
case (_: ModifiersSpec, data: ModifiersData, remote) =>
modifiersFromRemote(hr, data, remote, blockAppliedTxsCache)
case (spec: MessageSpec[_], _, remote) if spec.messageCode == GetSnapshotsInfoSpec.messageCode =>
usrOpt match {
case Some(usr) => sendSnapshotsInfo(usr, remote)
case None => log.warn(s"Asked for snapshot when UTXO set is not supported, remote: $remote")
}
}

def initialized(hr: ErgoHistory, mp: ErgoMemPool, blockAppliedTxsCache: FixedSizeBloomFilterQueue): PartialFunction[Any, Unit] = {
processDataFromPeer(msgHandlers(hr, mp, blockAppliedTxsCache)) orElse
def initialized(hr: ErgoHistory,
mp: ErgoMemPool,
usr: Option[UtxoStateReader],
blockAppliedTxsCache: FixedSizeBloomFilterQueue): PartialFunction[Any, Unit] = {
processDataFromPeer(msgHandlers(hr, mp, usr, blockAppliedTxsCache)) orElse
onDownloadRequest(hr) orElse
getLocalSyncInfo(hr) orElse
responseFromLocal orElse
viewHolderEvents(hr, mp, blockAppliedTxsCache) orElse
viewHolderEvents(hr, mp, usr, blockAppliedTxsCache) orElse
peerManagerEvents orElse
checkDelivery orElse {
case a: Any => log.error("Strange input: " + a)
}
}

/** Wait until both historyReader and mempoolReader instances are received so actor can be operational */
def initializing(hr: Option[ErgoHistory], mp: Option[ErgoMemPool], blockAppliedTxsCache: FixedSizeBloomFilterQueue): PartialFunction[Any, Unit] = {
def initializing(hr: Option[ErgoHistory],
mp: Option[ErgoMemPool],
usr: Option[UtxoStateReader],
blockAppliedTxsCache: FixedSizeBloomFilterQueue): PartialFunction[Any, Unit] = {
case ChangedHistory(historyReader: ErgoHistory) =>
mp match {
case Some(mempoolReader) =>
context.become(initialized(historyReader, mempoolReader, blockAppliedTxsCache))
context.become(initialized(historyReader, mempoolReader, usr, blockAppliedTxsCache))
case _ =>
context.become(initializing(Option(historyReader), mp, blockAppliedTxsCache))
context.become(initializing(Option(historyReader), mp, usr, blockAppliedTxsCache))
}
case ChangedMempool(mempoolReader: ErgoMemPool) =>
hr match {
case Some(historyReader) =>
context.become(initialized(historyReader, mempoolReader, blockAppliedTxsCache))
context.become(initialized(historyReader, mempoolReader, usr, blockAppliedTxsCache))
case _ =>
context.become(initializing(hr, Option(mempoolReader), usr, blockAppliedTxsCache))
}
case ChangedState(reader: ErgoStateReader) =>
reader match {
case utxoStateReader: UtxoStateReader =>
context.become(initializing(hr, mp, Some(utxoStateReader), blockAppliedTxsCache))
case _ =>
context.become(initializing(hr, Option(mempoolReader), blockAppliedTxsCache))
context.become(initializing(hr, mp, None, blockAppliedTxsCache))
}
case msg =>
// Actor not initialized yet, scheduling message until it is
context.system.scheduler.scheduleOnce(1.second, self, msg)
}

override def receive: Receive = initializing(None, None, FixedSizeBloomFilterQueue.empty(bloomFilterQueueSize = 5))
override def receive: Receive = initializing(None, None, None, FixedSizeBloomFilterQueue.empty(bloomFilterQueueSize = 5))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec

val s = stateGen.sample.get

if(s.isInstanceOf[UtxoStateReader]) {
if (s.isInstanceOf[UtxoStateReader]) {
// To initialize utxoStateReaderOpt in ErgoNodeView Synchronizer
node ! ChangedState(s)

Expand Down

0 comments on commit a77205f

Please sign in to comment.