-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
146 lines (131 loc) · 4.22 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package http2raft
import (
"errors"
"flag"
"fmt"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/lni/dragonboat/v3"
"github.com/lni/dragonboat/v3/config"
"github.com/lni/dragonboat/v3/logger"
"github.com/lni/dragonboat/v3/statemachine"
)
const (
defaultNodeID = 1
defaultClusterID = 1
)
var (
errBadPeersSyntax = errors.New("bad syntax for 'peers' param")
)
var (
nodeID = flag.Int("raft_node_id", defaultNodeID, "raft node ID to use")
clusterID = flag.Int("raft_cluster_id", defaultClusterID, "raft cluster ID to use")
raftAddr = flag.String("raft_addr", "", "raft node address")
dataDir = flag.String("data_dir", "./", "raft node data directory")
peersStr = flag.String("peers", "",
"raft peers list, comma separated list of values in format nodeID:host:port")
join = flag.Bool("join", false, "joining a new node")
readTimeout = flag.Duration("read_timeout", 3*time.Second, "time out for read operations")
writeTimeout = flag.Duration("write_timeout", 3*time.Second, "time out for write operations")
)
// Start starts taft-node, sets up routing amd starts HTTP listener
func Start(addr string, smFactoryFunc func(clusterID, nodeID uint64) statemachine.IStateMachine) {
// start raft node
raftNode, err := startRaftNode(smFactoryFunc)
if err != nil {
// there is no reason to continue if no raft
panic(err)
}
defer raftNode.Stop()
// prepare controller
keys := keysController{
readTimeOut: *readTimeout,
writeTimeOut: *writeTimeout,
clusterID: uint64(*clusterID),
raftNode: raftNode,
clientSession: raftNode.GetNoOPSession(uint64(*clusterID)),
}
// setup routing
// catch all traffic as for now, the URL path will act as key name (query string is ignored)
keyHandler := func(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
// GET requests translates to raft SyncRead
keys.readKey(w, req)
case http.MethodPost, http.MethodPut, http.MethodDelete, http.MethodPatch:
// POST, PUT, DELETE, PATCH requests translates to raft SyncPropose
keys.writeKey(w, req)
default:
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
}
}
http.HandleFunc("/", keyHandler)
// start http listener
http.ListenAndServe(addr, nil)
}
func startRaftNode(smFactoryFunc func(clusterID, nodeID uint64) statemachine.IStateMachine) (*dragonboat.NodeHost, error) {
// adjust logging level
logger.GetLogger("raft").SetLevel(logger.WARNING)
logger.GetLogger("rsm").SetLevel(logger.WARNING)
logger.GetLogger("transport").SetLevel(logger.WARNING)
// populate peers list (if any specified in param)
peers := make(map[uint64]string)
if *peersStr != "" {
for _, pStr := range strings.Split(*peersStr, ",") {
peerParts := strings.Split(pStr, ":")
if len(peerParts) < 3 || peerParts[0] == "" || peerParts[1] == "" || peerParts[2] == "" {
flag.Usage()
return nil, errBadPeersSyntax
}
peerNodeID, err := strconv.Atoi(peerParts[0])
if err != nil {
flag.Usage()
return nil, errBadPeersSyntax
}
peers[uint64(peerNodeID)] = peerParts[1] + ":" + peerParts[2]
}
// set current raft node addr from peers if it wasn't passed via raft_addr param
if *raftAddr == "" && len(peers) > 0 {
*raftAddr = peers[uint64(*nodeID)]
}
}
if *raftAddr == "" {
return nil, errors.New("raft_addr parameter is required")
}
// prepare configs
// TODO: move hardcoded numbers to CLI params
raftConfig := config.Config{
NodeID: uint64(*nodeID),
ClusterID: uint64(*clusterID),
ElectionRTT: 10,
HeartbeatRTT: 1,
CheckQuorum: true,
SnapshotEntries: 10,
CompactionOverhead: 5,
}
dataDirPath := filepath.Join(
*dataDir,
"http2raft",
fmt.Sprintf("cluster-%d", *clusterID),
fmt.Sprintf("node-%d", *nodeID),
)
nodeConfig := config.NodeHostConfig{
WALDir: dataDirPath,
NodeHostDir: dataDirPath,
RTTMillisecond: 200,
RaftAddress: *raftAddr,
EnableMetrics: true,
}
// create node and start/join the cluster
raftNode, err := dragonboat.NewNodeHost(nodeConfig)
if err != nil {
return nil, err
}
if err := raftNode.StartCluster(peers, *join, smFactoryFunc, raftConfig); err != nil {
return nil, err
}
return raftNode, nil
}