Skip to content

Commit

Permalink
server: Refactor the TransactionCollectorService
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexITC committed Apr 26, 2019
1 parent 364fe6a commit 6961c76
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 71 deletions.
135 changes: 71 additions & 64 deletions server/app/com/xsn/explorer/services/TransactionCollectorService.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.xsn.explorer.services

import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.async.TransactionFutureDataHandler
import com.xsn.explorer.errors.TransactionNotFoundError
import com.xsn.explorer.models._
import com.xsn.explorer.models.values._
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.{Bad, Good, One, Or}

Expand Down Expand Up @@ -39,7 +38,7 @@ class TransactionCollectorService @Inject() (
def collect(txidList: List[TransactionId], excludedTransactions: List[TransactionId]): FutureApplicationResult[Result] = {
val futureOr = for {
rpcTransactions <- getRPCTransactions(txidList).toFutureOr
completeTransactions <- getRPCTransactionsWithValues(rpcTransactions, excludedTransactions).toFutureOr
completeTransactions <- completeValues(rpcTransactions, excludedTransactions).toFutureOr
} yield {
val result = completeTransactions.map(persisted.Transaction.fromRPC)
val contracts = result.flatMap(_._2)
Expand All @@ -50,7 +49,7 @@ class TransactionCollectorService @Inject() (
futureOr.toFuture
}

private[services] def getRPCTransactionsWithValues(rpcTransactions: RPCTransactionList, excludedTransactions: List[TransactionId]): FutureApplicationResult[RPCTransactionListWithValues] = {
private[services] def completeValues(rpcTransactions: List[RPCTransaction], excludedTransactions: List[TransactionId]): FutureApplicationResult[List[RPCCompleteTransaction]] = {
val neutral: FutureApplicationResult[List[rpc.Transaction[rpc.TransactionVIN.HasValues]]] = Future.successful(Good(List.empty))
val future = rpcTransactions.foldLeft(neutral) { case (acc, tx) =>
val result = for {
Expand All @@ -76,46 +75,46 @@ class TransactionCollectorService @Inject() (
* - The ones that weren't retrieved are retried sequentially using the RPC API.
*/
private[services] def getRPCTransactionVIN(vinList: List[rpc.TransactionVIN], excludedTransactions: List[TransactionId]): FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = {
val futureList = Future.sequence {
vinList
.filterNot(excludedTransactions contains _.txid)
.map { vin =>
transactionDataHandler
.getOutput(vin.txid, vin.voutIndex)
.toFutureOr
.map { output =>
vin.withValues(value = output.value, address = output.address)
}
.recoverWith(TransactionNotFoundError) {
getRPCTransactionVINWithValues(vin).toFutureOr
}
.toFuture
.map(vin -> _)
val filtered = vinList.filterNot(excludedTransactions contains _.txid)
getDBPartialVINList(filtered)
.flatMap(completeRPCVINSequentially)
}

private[services] def getDBPartialVINList(vinList: List[rpc.TransactionVIN]): Future[List[PartialTransactionVIN]] = {
val futures = for (vin <- vinList) yield {
transactionDataHandler
.getOutput(vin.txid, vin.voutIndex)
.toFutureOr
.map { output =>
vin.withValues(value = output.value, address = output.address)
}
.toFuture
.map(vin -> _)
}

val future = futureList
.flatMap { resultList =>
val neutral: FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = Future.successful(Good(List.empty))
resultList.foldLeft(neutral) {
case (acc, (vin, Bad(_))) =>
val result = for {
ready <- acc.toFutureOr
newVIN <- getRPCTransactionVINWithValues(vin).toFutureOr
} yield newVIN :: ready
Future.sequence(futures)
}

result.toFuture
private[services] def completeRPCVINSequentially(partial: List[PartialTransactionVIN]): FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = {
val neutral: FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = Future.successful(Good(List.empty))
val result = partial.foldLeft(neutral) {
case (acc, (vin, Bad(_))) =>
val result = for {
ready <- acc.toFutureOr
newVIN <- getRPCTransactionVINWithValues(vin).toFutureOr
} yield newVIN :: ready

case (acc, (_, Good(newVIN))) =>
val result = for {
ready <- acc.toFutureOr
} yield newVIN :: ready
result.toFuture

result.toFuture
}
}
case (acc, (_, Good(newVIN))) =>
val result = for {
ready <- acc.toFutureOr
} yield newVIN :: ready

future
result.toFuture
}

result
.toFutureOr
.map(_.reverse)
.toFuture
Expand All @@ -126,37 +125,41 @@ class TransactionCollectorService @Inject() (
* - Try to get them all from the RPC API in parallel
* - Retry the ones that weren't retrieved in parallel by retrieving them sequentially.
*/
private[services] def getRPCTransactions(txidList: List[TransactionId]): FutureApplicationResult[RPCTransactionList] = {
val futureList = Future.sequence {
txidList.map { txid =>
for {
r <- xsnService.getTransaction(txid)
} yield txid -> r
}
private[services] def getRPCTransactions(txidList: List[TransactionId]): FutureApplicationResult[List[RPCTransaction]] = {
getPartialRPCTransactions(txidList)
.flatMap(completeRPCTransactionsSequentially)
}

private[services] def getPartialRPCTransactions(txidList: List[TransactionId]): Future[List[PartialRPCTransaction]] = {
val futures = for (txid <- txidList) yield {
for {
r <- xsnService.getTransaction(txid)
} yield txid -> r
}

val future = futureList
.flatMap { resultList =>
val neutral: FutureApplicationResult[RPCTransactionList] = Future.successful(Good(List.empty))
resultList.foldLeft(neutral) {
case (acc, (txid, Bad(_))) =>
val result = for {
ready <- acc.toFutureOr
tx <- xsnService.getTransaction(txid).toFutureOr
} yield tx :: ready
Future.sequence(futures)
}

result.toFuture
private[services] def completeRPCTransactionsSequentially(partial: List[PartialRPCTransaction]): FutureApplicationResult[List[RPCTransaction]] = {
val neutral: FutureApplicationResult[List[RPCTransaction]] = Future.successful(Good(List.empty))
val result = partial.foldLeft(neutral) {
case (acc, (txid, Bad(_))) =>
val result = for {
ready <- acc.toFutureOr
tx <- xsnService.getTransaction(txid).toFutureOr
} yield tx :: ready

case (acc, (_, Good(tx))) =>
val result = for {
ready <- acc.toFutureOr
} yield tx :: ready
result.toFuture

result.toFuture
}
}
case (acc, (_, Good(tx))) =>
val result = for {
ready <- acc.toFutureOr
} yield tx :: ready

future
result.toFuture
}

result
.toFutureOr
.map(_.reverse)
.toFuture
Expand Down Expand Up @@ -191,6 +194,10 @@ object TransactionCollectorService {

type Result = (List[persisted.Transaction.HasIO], List[TPoSContract])

private type RPCTransactionList = List[rpc.Transaction[rpc.TransactionVIN]]
private type RPCTransactionListWithValues = List[rpc.Transaction[rpc.TransactionVIN.HasValues]]
private type RPCTransaction = rpc.Transaction[rpc.TransactionVIN]
private type RPCCompleteTransaction = rpc.Transaction[rpc.TransactionVIN.HasValues]

private type PartialTransactionVIN = (rpc.TransactionVIN, ApplicationResult[rpc.TransactionVIN.HasValues])
private type PartialRPCTransaction = (TransactionId, ApplicationResult[RPCTransaction])

}
Loading

0 comments on commit 6961c76

Please sign in to comment.