Skip to content

Commit

Permalink
server: Add the BlockParallelChunkSynchronizer
Browse files Browse the repository at this point in the history
This synchronizer allows to sync a single block in parallel.
  • Loading branch information
AlexITC committed Jul 8, 2019
1 parent 63adb40 commit c920db6
Show file tree
Hide file tree
Showing 6 changed files with 515 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.xsn.explorer.services.synchronizer

import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Block
import com.xsn.explorer.models.values.Blockhash
import com.xsn.explorer.services.synchronizer.operations.BlockParallelChunkAddOps
import com.xsn.explorer.services.synchronizer.repository.BlockChunkRepository
import javax.inject.Inject
import org.scalactic.Good
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, Future}

class BlockParallelChunkSynchronizer @Inject()(
blockChunkRepository: BlockChunkRepository.FutureImpl,
addOps: BlockParallelChunkAddOps
)(
implicit ec: ExecutionContext
) {

private val logger = LoggerFactory.getLogger(this.getClass)

/**
* Synchronize the given block (continuing from the last step if it is partially synchronized).
*/
def sync(block: Block.HasTransactions, tposContracts: List[TPoSContract]): FutureApplicationResult[Unit] = {
val start = System.currentTimeMillis()
val result = for {
stateMaybe <- blockChunkRepository.findSyncState(block.hash).toFutureOr
currentState = stateMaybe.getOrElse(BlockSynchronizationState.StoringBlock)
_ <- addOps.continueFromState(currentState, block, tposContracts).toFutureOr
_ = logger.debug(s"Synced ${block.height}, took ${System.currentTimeMillis() - start} ms")
} yield ()

result.toFuture
}

def rollback(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
stateMaybe <- blockChunkRepository.findSyncState(blockhash).toFutureOr
} yield stateMaybe match {
case None =>
logger.warn(s"The block $blockhash is supposed to be rolled back but it is not syncing")
Future.successful(Good(())).toFutureOr
case Some(state) =>
logger.warn(s"The block $blockhash is going to be rolled back from the $state state")
blockChunkRepository.atomicRollback(blockhash).toFutureOr
}

result.flatMap(identity).toFuture
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package com.xsn.explorer.services.synchronizer.operations

import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.xsn.explorer.gcs.GolombEncoding
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Block
import com.xsn.explorer.services.synchronizer.BlockSynchronizationState
import com.xsn.explorer.services.synchronizer.repository.BlockChunkRepository
import com.xsn.explorer.util.Extensions.FutureApplicationResultListExt
import com.xsn.explorer.util.{TransactionAddressesHelper, TransactionBalancesHelper}
import javax.inject.Inject
import org.scalactic.Good

import scala.concurrent.{ExecutionContext, Future}

class BlockParallelChunkAddOps @Inject()(blockChunkRepository: BlockChunkRepository.FutureImpl)(
implicit ec: ExecutionContext
) {

def continueFromState(
state: BlockSynchronizationState,
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = state match {
case BlockSynchronizationState.StoringBlock => storeBlock(block, tposContracts)
case BlockSynchronizationState.StoringBlockData => storeBlockData(block, tposContracts)
case BlockSynchronizationState.StoringTransactions => storeTransactions(block, tposContracts)
case BlockSynchronizationState.StoringOutputs => storeOutputs(block, tposContracts)
case BlockSynchronizationState.StoringInputs => storeInputs(block, tposContracts)
case BlockSynchronizationState.SpendingOutputs => spendOutputs(block, tposContracts)
case BlockSynchronizationState.StoringAddressTransactionDetails =>
storeAddressTransactionDetails(block, tposContracts)
case BlockSynchronizationState.UpdatingTPoSContracts => updateTPoSContracts(block, tposContracts)
case BlockSynchronizationState.UpdatingBalances => updateBalances(block)
}

/**
* - Marks the block as being synchronized.
* - Creates or updates the block on the database.
* - Marks the next state to continue the syncrhonization.
*/
private def storeBlock(
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = {
val initialState = BlockSynchronizationState.StoringBlock
val nextState = BlockSynchronizationState.StoringBlockData

val result = for {
_ <- blockChunkRepository.upsertSyncState(block.hash, initialState).toFutureOr
_ <- blockChunkRepository.upsertBlock(block.block).toFutureOr
_ <- blockChunkRepository.upsertSyncState(block.hash, nextState).toFutureOr
_ <- continueFromState(nextState, block, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* In parallel:
* - Generate and store the block filter.
* - Link the previous block to the new one.
*
* Then, mark the next state to continue synchronizing.
*/
private def storeBlockData(
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = {
val nextState = BlockSynchronizationState.StoringTransactions
val filterF = Future { GolombEncoding.encode(block) }
.flatMap {
case None => Future.successful(Good(()))
case Some(filter) => blockChunkRepository.upsertFilter(block.hash, filter)
}

val linkF = block.previousBlockhash match {
case None => Future.successful(Good(()))
case Some(previousBlockhash) => blockChunkRepository.setNextBlockhash(previousBlockhash, block.hash)
}

val result = for {
_ <- filterF.toFutureOr
_ <- linkF.toFutureOr
_ <- blockChunkRepository.upsertSyncState(block.hash, nextState).toFutureOr
_ <- continueFromState(nextState, block, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* Store all transactions on the transactions table in parallel.
*
* Then, mark the next state to continue synchronizing.
*/
private def storeTransactions(
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = {
val nextState = BlockSynchronizationState.StoringOutputs
val storeTransactionsF = block.transactions.zipWithIndex.map {
case (tx, index) =>
blockChunkRepository.upsertTransaction(index, tx.transaction)
}.sequence

val result = for {
_ <- storeTransactionsF.toFutureOr
_ <- blockChunkRepository.upsertSyncState(block.hash, nextState).toFutureOr
_ <- continueFromState(nextState, block, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* Store the transaction outputs on the transaction_outputs table in parallel.
*
* Then, mark the next state to continue synchronizing.
*/
private def storeOutputs(
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = {
val nextState = BlockSynchronizationState.StoringInputs
val storeOutputsF =
for (tx <- block.transactions; output <- tx.outputs)
yield blockChunkRepository.upsertOutput(output)

val result = for {
_ <- storeOutputsF.sequence.toFutureOr
_ <- blockChunkRepository.upsertSyncState(block.hash, nextState).toFutureOr
_ <- continueFromState(nextState, block, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* Store the inputs on the transaction_inputs table in parallel.
*
* Then, mark the next state to continue synchronizing.
*/
private def storeInputs(
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = {
val nextState = BlockSynchronizationState.SpendingOutputs
val storeInputsF =
for (tx <- block.transactions; input <- tx.inputs)
yield blockChunkRepository.upsertInput(tx.id, input)

val result = for {
_ <- storeInputsF.sequence.toFutureOr
_ <- blockChunkRepository.upsertSyncState(block.hash, nextState).toFutureOr
_ <- continueFromState(nextState, block, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* Spend the transaction inputs in parallel.
*
* Then, mark the next state to continue synchronizing.
*/
private def spendOutputs(
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = {
val nextState = BlockSynchronizationState.StoringAddressTransactionDetails
val spendOutputsF =
for (tx <- block.transactions; input <- tx.inputs)
yield blockChunkRepository.spendOutput(input.fromTxid, input.fromOutputIndex, tx.id)

val result = for {
_ <- spendOutputsF.sequence.toFutureOr
_ <- blockChunkRepository.upsertSyncState(block.hash, nextState).toFutureOr
_ <- continueFromState(nextState, block, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* Store the transaction details for each involved address in parallel.
*
* Then, mark the next state to continue synchronizing.
*/
private def storeAddressTransactionDetails(
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = {
val nextState = BlockSynchronizationState.UpdatingTPoSContracts
val storeDetailsF =
for (tx <- block.transactions; details <- TransactionAddressesHelper.computeDetails(tx))
yield blockChunkRepository.upsertAddressTransactionDetails(details)

val result = for {
_ <- storeDetailsF.sequence.toFutureOr
_ <- blockChunkRepository.upsertSyncState(block.hash, nextState).toFutureOr
_ <- continueFromState(nextState, block, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* Creates the new contracts, and closes the old ones in parallel.
*
* Then, mark the next state to continue synchronizing.
*/
private def updateTPoSContracts(
block: Block.HasTransactions,
tposContracts: List[TPoSContract]
): FutureApplicationResult[Unit] = {
val nextState = BlockSynchronizationState.UpdatingBalances

val createContractsF = tposContracts.map { contract =>
blockChunkRepository.upsertContract(contract)
}

// a block may open and close the same contract
lazy val closeContractsF = for {
tx <- block.transactions
// a contract requires 1 XSN
input <- tx.inputs if input.value == 1
} yield {
val id = TPoSContract.Id(input.fromTxid, input.fromOutputIndex)
blockChunkRepository.closeContract(id, tx.id)
}

val result = for {
_ <- createContractsF.sequence.toFutureOr
_ <- closeContractsF.sequence.toFutureOr
_ <- blockChunkRepository.upsertSyncState(block.hash, nextState).toFutureOr
_ <- continueFromState(nextState, block, tposContracts).toFutureOr
} yield ()

result.toFuture
}

/**
* Updates the address balances, available coins and completes the block synchronizaiton.
*/
private def updateBalances(block: Block.HasTransactions): FutureApplicationResult[Unit] = {
val balanceList = TransactionBalancesHelper.computeBalances(block.transactions).toList
blockChunkRepository.atomicUpdateBalances(block.hash, balanceList)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ trait PostgresDataHandlerSpec

protected def clearDatabase() = {
database.withConnection { implicit conn =>
_root_.anorm.SQL("""DELETE FROM tpos_contracts""").execute()
_root_.anorm.SQL("""DELETE FROM transaction_inputs""").execute()
_root_.anorm.SQL("""DELETE FROM transaction_outputs""").execute()
_root_.anorm.SQL("""DELETE FROM address_transaction_details""").execute()
Expand All @@ -67,6 +68,8 @@ trait PostgresDataHandlerSpec
_root_.anorm.SQL("""DELETE FROM blocks""").execute()
_root_.anorm.SQL("""DELETE FROM balances""").execute()
_root_.anorm.SQL("""DELETE FROM hidden_addresses""").execute()
_root_.anorm.SQL("""DELETE FROM aggregated_amounts""").execute()
_root_.anorm.SQL("""DELETE FROM block_synchronization_progress""").execute()
}
}
}
28 changes: 22 additions & 6 deletions server/test/com/xsn/explorer/helpers/DataGenerator.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.xsn.explorer.helpers

import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Transaction
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.values.{TransactionId, _}
Expand Down Expand Up @@ -65,18 +66,23 @@ trait DataGenerator {
)
}

def randomOutput: Transaction.Output = {
def randomOutput(txid: TransactionId = randomTransactionId, index: Int = nextInt(100)): Transaction.Output = {
Transaction.Output(
txid = randomTransactionId,
index = nextInt(100),
txid = txid,
index = index,
value = nextInt(1000000),
address = randomAddress,
script = randomHexString(8)
)
}

def randomOutputs(n: Int = nextInt(5) + 1): List[Transaction.Output] = {
(0 until n).map(x => randomOutput.copy(index = x)).toList
def randomOutputs(
howMany: Int = nextInt(5) + 1,
txid: TransactionId = randomTransactionId
): List[Transaction.Output] = {
(0 until howMany).map { index =>
randomOutput(txid, index)
}.toList
}

def randomInput(utxos: List[Transaction.Output]): Transaction.Input = {
Expand Down Expand Up @@ -131,10 +137,20 @@ trait DataGenerator {
fromOutputIndex = output.index,
index = index,
value = output.value,
address = randomAddress
addresses = output.addresses
)
}
}

def randomTPoSContract(
txid: TransactionId = randomTransactionId,
index: Int = scala.util.Random.nextInt(100)
): TPoSContract = {
val state = randomItem(TPoSContract.State.values.toList)
val commission = TPoSContract.Commission.from(scala.util.Random.nextInt(50) + 1).get
val details = TPoSContract.Details(randomAddress, randomAddress, commission)
TPoSContract(TPoSContract.Id(txid, index), details = details, time = System.currentTimeMillis(), state)
}
}

object DataGenerator extends DataGenerator
Loading

0 comments on commit c920db6

Please sign in to comment.