-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.go
116 lines (94 loc) · 2.32 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package dkvs
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"sync"
"time"
)
// Node is an autonomous kvs node that can be either a slave or a master
type Node struct {
ID string `json:"id"`
MasterID string `json:"master"`
Address string `json:"addr"`
nodes map[string]*Node
nMutex sync.RWMutex
storage Storage
transport Transport
}
// ReadValue searches the value for the provided key in the storage
func (n *Node) ReadValue(key string) ([]byte, error) {
return n.storage.Get(key)
}
// ReadMultipleValues searches for values associated with a range of keys
func (n *Node) ReadMultipleValues(keys ...string) ([]byte, error) {
type payload struct {
Key string `json:"k"`
Value string `json:"v"`
Error error `json:"e"`
}
p := make([]*payload, 0)
for _, k := range keys {
v, err := n.storage.Get(k)
p = append(p, &payload{
Key: k,
Value: string(v),
Error: err,
})
}
return json.Marshal(p)
}
// ListNodes returns a slice of all nodes
func (n *Node) ListNodes() ([]*Node, error) {
// if the node list is empty (for example, in a slave that just got started)
if n.nodes == nil {
// we fetch the list from the master
}
nodes := make([]*Node, 0)
n.nMutex.Lock()
defer n.nMutex.Unlock()
for _, node := range n.nodes {
nodes = append(nodes, node)
}
return nodes, nil
}
// IsMaster checks if the current node is the master
func (n *Node) IsMaster() bool {
return n.MasterID == n.ID
}
const allowedCharacters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
func init() {
rand.Seed(time.Now().UnixNano())
}
// generates a (pseudo) random ID
func newID(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = allowedCharacters[rand.Intn(len(allowedCharacters))]
}
return string(b)
}
func newNode(addr string) (*Node, error) {
id := newID(16)
n := &Node{
ID: id,
nodes: make(map[string]*Node),
Address: addr,
storage: NewStore(),
transport: NewHTTPTransport(),
}
go func() {
err := n.transport.Start(n)
if err != nil {
panic(fmt.Sprintf("failed to start transport with error: %v", err))
}
}()
log.Printf("Created node with id %s\n", n.ID)
return n, nil
}
// Close properly closes the node
func (n *Node) Close() error {
// todo: send a message to master indicating that the node shut down
return n.transport.Stop()
}