Skip to content

Commit

Permalink
Merge pull request #3 from simple-rules/stopConsensus
Browse files Browse the repository at this point in the history
adding message to stop running consensus
  • Loading branch information
alajko authored Jun 16, 2018
2 parents d292737 + fbe3d42 commit b68be08
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 40 deletions.
44 changes: 36 additions & 8 deletions aws-code/transaction_generator.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package main

import (
"harmony-benchmark/blockchain"
"math/rand"
"time"
"bufio"
"flag"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/node"
"harmony-benchmark/p2p"
"math/rand"
"os"
"strings"
"time"
)

func newRandTransaction() blockchain.Transaction {
Expand All @@ -18,16 +22,35 @@ func newRandTransaction() blockchain.Transaction {
return tx
}

func getPeers(Ip, Port, iplist string) []p2p.Peer {
file, _ := os.Open(iplist)
fscanner := bufio.NewScanner(file)
var peerList []p2p.Peer
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
ip, port, status := p[0], p[1], p[2]
if status == "leader" || ip == Ip && port == Port {
continue
}
peer := p2p.Peer{Port: port, Ip: ip}
peerList = append(peerList, peer)
}
return peerList
}
func main() {

ip := flag.String("ip", "127.0.0.1", "IP of the leader")
port := flag.String("port", "9000", "port of the leader.")
txToSend := flag.Int("tx_count", 100, "number of transaction")

ipfile := flag.String("ipfile", "local_iplist.txt", "file containing all ip addresses")
//getLeader to get ip,port and get totaltime I want to run
start := time.Now()
totalTime := 60.0
txs := make([]blockchain.Transaction, 10)
txCount := 0
for true {
if txCount >= *txToSend {
t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
fmt.Println(int(t.Sub(start)), start, totalTime)
break
}
for i := range txs {
Expand All @@ -36,7 +59,12 @@ func main() {
}
msg := node.ConstructTransactionListMessage(txs)
p2p.SendMessage(p2p.Peer{*ip, *port, "n/a"}, msg)
txCount += len(txs)
time.Sleep(1 * time.Second) // 10 transactions per second
time.Sleep(1 * time.Second) // 10 transactions per second
}
msg := node.ConstructStopMessage()
var leaderPeer p2p.Peer
leaderPeer.Ip = *ip
leaderPeer.Port = *port
peers := append(getPeers(*ip, *port, *ipfile), leaderPeer)
p2p.BroadcastMessage(peers, msg)
}
25 changes: 12 additions & 13 deletions consensus/consensus_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@ import (
"log"
"sync"

"harmony-benchmark/p2p"
"bytes"
"encoding/binary"
"errors"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/p2p"
)

var mutex = &sync.Mutex{}


func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
for { // keep waiting for new blocks
newBlock := <- blockChannel
newBlock := <-blockChannel
// TODO: think about potential race condition
if consensus.state == READY {
consensus.startConsensus(&newBlock)
Expand Down Expand Up @@ -134,23 +133,23 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
//#### Read payload data
offset := 0
// 4 byte consensus id
consensusId := binary.BigEndian.Uint32(payload[offset:offset+4])
consensusId := binary.BigEndian.Uint32(payload[offset : offset+4])
offset += 4

// 32 byte block hash
blockHash := payload[offset:offset+32]
blockHash := payload[offset : offset+32]
offset += 32

// 2 byte validator id
validatorId := string(payload[offset:offset+2])
validatorId := string(payload[offset : offset+2])
offset += 2

// 33 byte commit
commit := payload[offset:offset+33]
commit := payload[offset : offset+33]
offset += 33

// 64 byte of signature on previous data
signature := payload[offset:offset+64]
signature := payload[offset : offset+64]
offset += 64
//#### END: Read payload data

Expand Down Expand Up @@ -240,23 +239,23 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
//#### Read payload data
offset := 0
// 4 byte consensus id
consensusId := binary.BigEndian.Uint32(payload[offset:offset+4])
consensusId := binary.BigEndian.Uint32(payload[offset : offset+4])
offset += 4

// 32 byte block hash
blockHash := payload[offset:offset+32]
blockHash := payload[offset : offset+32]
offset += 32

// 2 byte validator id
validatorId := string(payload[offset:offset+2])
validatorId := string(payload[offset : offset+2])
offset += 2

// 32 byte response
response := payload[offset:offset+32]
response := payload[offset : offset+32]
offset += 32

// 64 byte of signature on previous data
signature := payload[offset:offset+64]
signature := payload[offset : offset+64]
offset += 64
//#### END: Read payload data

Expand Down
3 changes: 2 additions & 1 deletion consensus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func (msgType MessageType) String() string {
"COMMIT",
"CHALLENGE",
"RESPONSE",
"START_CONSENSUS"}
"START_CONSENSUS",
}

if msgType < ANNOUNCE || msgType > START_CONSENSUS {
return "Unknown"
Expand Down
2 changes: 1 addition & 1 deletion deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ while read ip port mode; do
go run ./benchmark_main.go -ip $ip -port $port -ipfile $ipfile&
done < $ipfile

go run ./aws-code/transaction_generator.go
go run ./aws-code/transaction_generator.go -ipfile $ipfile
9 changes: 5 additions & 4 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ n - 2 bytes - actual message payload
const NODE_TYPE_BYTES = 1
const ACTION_TYPE_BYTES = 1


// The category of messages
type MessageCategory byte

const (
COMMITTEE MessageCategory = iota
NODE
Expand All @@ -37,20 +37,21 @@ const (

// The specific types of message under committee category
type CommitteeMessageType byte

const (
CONSENSUS CommitteeMessageType = iota
// TODO: add more types
)


// The specific types of message under node category
type NodeMessageType byte

const (
TRANSACTION NodeMessageType = iota
CONTROL
// TODO: add more types
)


// Get the message category from the p2p message content
func GetMessageCategory(message []byte) (MessageCategory, error) {
if len(message) < NODE_TYPE_BYTES {
Expand All @@ -61,7 +62,7 @@ func GetMessageCategory(message []byte) (MessageCategory, error) {

// Get the action type from the p2p message content
func GetMessageType(message []byte) (byte, error) {
if len(message) < NODE_TYPE_BYTES + ACTION_TYPE_BYTES {
if len(message) < NODE_TYPE_BYTES+ACTION_TYPE_BYTES {
return 0, errors.New("Failed to get action type: no data available.")
}
return byte(message[NODE_TYPE_BYTES+ACTION_TYPE_BYTES-1]), nil
Expand Down
21 changes: 18 additions & 3 deletions node/message.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package node

import (
"harmony-benchmark/blockchain"
"bytes"
"harmony-benchmark/message"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/message"
)

type TransactionMessageType int
Expand All @@ -13,11 +13,26 @@ const (
SEND TransactionMessageType = iota
)

type ControlMessageType int

const (
STOP ControlMessageType = iota
)

//ConstructTransactionListMessage constructs serialized transactions
func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.TRANSACTION))
byteBuffer.WriteByte(byte(SEND))
encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(transactions)
return byteBuffer.Bytes()
}
}

//ConstructStopMessage is STOP message
func ConstructStopMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(message.NODE)})
byteBuffer.WriteByte(byte(message.CONTROL))
byteBuffer.WriteByte(byte(STOP))
return byteBuffer.Bytes()
}
27 changes: 17 additions & 10 deletions node/node.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package node

import (
"bytes"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/consensus"
"harmony-benchmark/message"
"harmony-benchmark/p2p"
"log"
"net"
"os"
"harmony-benchmark/p2p"
"harmony-benchmark/consensus"
"harmony-benchmark/message"
"harmony-benchmark/blockchain"
"bytes"
"encoding/gob"
"time"
)

// A node represents a program (machine) participating in the network
type Node struct {
consensus *consensus.Consensus
BlockChannel chan blockchain.Block
consensus *consensus.Consensus
consensus *consensus.Consensus
BlockChannel chan blockchain.Block
pendingTransactions []blockchain.Transaction
}

Expand Down Expand Up @@ -114,14 +115,20 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
log.Println(len(node.pendingTransactions))
case message.CONTROL:
controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP {
log.Println("Stopping Node")
os.Exit(0)
}

}
}
}

func (node *Node) WaitForConsensusReady(readySignal chan int) {
for { // keep waiting for consensus ready
<- readySignal
<-readySignal
// create a new block
newBlock := new(blockchain.Block)
for {
Expand Down Expand Up @@ -149,4 +156,4 @@ func NewNode(consensus *consensus.Consensus) Node {
node.consensus = consensus
node.BlockChannel = make(chan blockchain.Block)
return node
}
}

0 comments on commit b68be08

Please sign in to comment.