Skip to content

Commit

Permalink
server: Optimize the ledger synchronization process
Browse files Browse the repository at this point in the history
The block transactions are loaded only when they are required, this
is specially useful while synchronizing bitcoin because it reduces
the workload to the bitcoind on the initial sync.
  • Loading branch information
AlexITC committed Apr 6, 2019
1 parent 13e25fa commit 3ea419b
Showing 1 changed file with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OptionOps}
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models._
import com.xsn.explorer.models.persisted.Block
import com.xsn.explorer.models.transformers._
import com.xsn.explorer.models.values.{Blockhash, Height}
import com.xsn.explorer.models.values._
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.{Bad, Good}
Expand Down Expand Up @@ -37,15 +37,14 @@ class LedgerSynchronizerService @Inject() (
*/
def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
data <- getRPCBlock(blockhash).toFutureOr
data <- xsnService.getBlock(blockhash).toFutureOr
_ <- synchronize(data).toFutureOr
} yield ()

result.toFuture
}

private def synchronize(data: BlockData): FutureApplicationResult[Unit] = {
val block = data._1
private def synchronize(block: rpc.Block): FutureApplicationResult[Unit] = {
logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}")

val result = for {
Expand All @@ -56,8 +55,8 @@ class LedgerSynchronizerService @Inject() (
.recoverFrom(BlockNotFoundError)(None)

_ <- latestBlockMaybe
.map { latestBlock => onLatestBlock(latestBlock, data) }
.getOrElse { onEmptyLedger(data) }
.map { latestBlock => onLatestBlock(latestBlock, block) }
.getOrElse { onEmptyLedger(block) }
.toFutureOr
} yield ()

Expand All @@ -69,16 +68,15 @@ class LedgerSynchronizerService @Inject() (
* 1.1. the given block is the genensis block, it is added.
* 1.2. the given block is not the genesis block, sync everything until the given block.
*/
private def onEmptyLedger(data: BlockData): FutureApplicationResult[Unit] = {
val block = data._1
private def onEmptyLedger(block: rpc.Block): FutureApplicationResult[Unit] = {
if (block.height.int == 0) {
logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}")
ledgerDataHandler.push(block, data._2)
appendBlock(block)
} else {
logger.info(s"Synchronize block ${block.height} on empty ledger, hash = ${block.hash}")
val result = for {
_ <- sync(0 until block.height.int).toFutureOr
_ <- synchronize(data).toFutureOr
_ <- synchronize(block).toFutureOr
} yield ()

result.toFuture
Expand All @@ -93,28 +91,27 @@ class LedgerSynchronizerService @Inject() (
* 2.4. if H <= N, if the hash already exists, it is ignored.
* 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H.
*/
private def onLatestBlock(ledgerBlock: Block, newData: BlockData): FutureApplicationResult[Unit] = {
val newBlock = newData._1
private def onLatestBlock(ledgerBlock: Block, newBlock: rpc.Block): FutureApplicationResult[Unit] = {
if (ledgerBlock.height.int + 1 == newBlock.height.int &&
newBlock.previousBlockhash.contains(ledgerBlock.hash)) {

logger.info(s"Appending block ${newBlock.height}, hash = ${newBlock.hash}")
ledgerDataHandler.push(newBlock, newData._2)
appendBlock(newBlock)
} else if (ledgerBlock.height.int + 1 == newBlock.height.int) {
logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError)
previousBlock <- getRPCBlock(blockhash).toFutureOr
previousBlock <- xsnService.getBlock(blockhash).toFutureOr
_ <- synchronize(previousBlock).toFutureOr
_ <- synchronize(newData).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()

result.toFuture
} else if (newBlock.height.int > ledgerBlock.height.int) {
logger.info(s"Filling holes to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
_ <- sync(ledgerBlock.height.int + 1 until newBlock.height.int).toFutureOr
_ <- synchronize(newData).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()

result.toFuture
Expand All @@ -132,7 +129,7 @@ class LedgerSynchronizerService @Inject() (
.getOrElse {
val x = for {
_ <- trimTo(newBlock.height).toFutureOr
_ <- synchronize(newData).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()
x.toFuture
}
Expand All @@ -143,6 +140,16 @@ class LedgerSynchronizerService @Inject() (
}
}

private def appendBlock(newBlock: rpc.Block): FutureApplicationResult[Unit] = {
val result = for {
data <- getBlockData(newBlock).toFutureOr
(blockWithTransactions, tposContracts) = data
_ <- ledgerDataHandler.push(blockWithTransactions, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* Sync the given range to our ledger.
*/
Expand All @@ -154,17 +161,16 @@ class LedgerSynchronizerService @Inject() (
val result = for {
_ <- previous.toFutureOr
blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr
data <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(data).toFutureOr
block <- xsnService.getBlock(blockhash).toFutureOr
_ <- synchronize(block).toFutureOr
} yield ()

result.toFuture
}
}

private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[BlockData] = {
private def getBlockData(rpcBlock: rpc.Block): FutureApplicationResult[BlockData] = {
val result = for {
rpcBlock <- xsnService.getBlock(blockhash).toFutureOr
extractionMethod <- blockService.extractionMethod(rpcBlock).toFutureOr
data <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr
(transactions, contracts) = data
Expand Down

0 comments on commit 3ea419b

Please sign in to comment.