diff --git a/benchmark_main.go b/benchmark_main.go index 841df3aedc..03cb6e8366 100644 --- a/benchmark_main.go +++ b/benchmark_main.go @@ -47,6 +47,18 @@ func getPeers(myIp, myPort, myShardId string, config *[][]string) []p2p.Peer { return peerList } +func getClientPeer(config *[][]string) *p2p.Peer { + for _, node := range *config { + ip, port, status := node[0], node[1], node[2] + if status == "client" { + continue + } + peer := p2p.Peer{Port: port, Ip: ip} + return &peer + } + return nil +} + func readConfigFile(configFile string) [][]string { file, _ := os.Open(configFile) fscanner := bufio.NewScanner(file) @@ -84,6 +96,12 @@ func main() { node := node.NewNode(&consensus) + clientPeer := getClientPeer(&config) + // If there is a client configured in the node list. + if clientPeer != nil { + node.ClientPeer = clientPeer + } + // Assign closure functions to the consensus object consensus.BlockVerifier = node.VerifyNewBlock consensus.OnConsensusDone = node.PostConsensusProcessing diff --git a/node/message.go b/node/message.go index 9ff8789c32..d3bf7d9154 100644 --- a/node/message.go +++ b/node/message.go @@ -13,6 +13,7 @@ type TransactionMessageType int const ( SEND TransactionMessageType = iota REQUEST + CROSS_TX_PROOF // The proof of accept or reject returned by the leader to the cross shard transaction client. ) // The types of messages used for NODE/CONTROL @@ -22,7 +23,7 @@ const ( STOP ControlMessageType = iota ) -//ConstructTransactionListMessage constructs serialized transactions +// Constructs serialized transactions func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) byteBuffer.WriteByte(byte(common.TRANSACTION)) @@ -37,7 +38,7 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b return byteBuffer.Bytes() } -//ConstructTransactionListMessage constructs serialized transactions +// Constructs serialized transactions func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) byteBuffer.WriteByte(byte(common.TRANSACTION)) @@ -48,10 +49,18 @@ func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { return byteBuffer.Bytes() } -//ConstructStopMessage is STOP message +// Constructs STOP message for node to stop func ConstructStopMessage() []byte { byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) byteBuffer.WriteByte(byte(common.CONTROL)) byteBuffer.WriteByte(byte(STOP)) return byteBuffer.Bytes() } + +//ConstructStopMessage is STOP message +func ConstructProofOfAcceptOrRejectMessage() []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) + byteBuffer.WriteByte(byte(common.TRANSACTION)) + byteBuffer.WriteByte(byte(CROSS_TX_PROOF)) + return byteBuffer.Bytes() +} diff --git a/node/node.go b/node/node.go index f4c37b59f8..bbde77dc87 100644 --- a/node/node.go +++ b/node/node.go @@ -4,6 +4,7 @@ import ( "harmony-benchmark/blockchain" "harmony-benchmark/consensus" "harmony-benchmark/log" + "harmony-benchmark/p2p" "net" "os" "strconv" @@ -25,6 +26,8 @@ type Node struct { pendingTxMutex sync.Mutex crossTxToReturnMutex sync.Mutex + + ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used to return proof-of-accept } // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client diff --git a/node/node_handler.go b/node/node_handler.go index 32aca5f94c..0a9d58bd77 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -108,8 +108,9 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { txToReturn = append(txToReturn, tx) } } - // TODO: return the transaction list to requester + case CROSS_TX_PROOF: + // TODO: implement this } } @@ -160,6 +161,13 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { } } +// This is called by consensus participants to verify the block they are running consensus on +func (node *Node) SendBackProofOfAcceptOrReject() { + if node.ClientPeer != nil { + p2p.SendMessage(*node.ClientPeer, ConstructProofOfAcceptOrRejectMessage()) + } +} + // This is called by consensus participants to verify the block they are running consensus on func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool { return node.UtxoPool.VerifyTransactions(newBlock.Transactions) @@ -184,5 +192,7 @@ func (node *Node) PostConsensusProcessing(newBlock *blockchain.Block) { } node.addCrossTxsToReturn(node.CrossTxsInConsensus) node.CrossTxsInConsensus = []*blockchain.CrossShardTxAndProof{} + + node.SendBackProofOfAcceptOrReject() } }