Skip to content

Commit

Permalink
query other servers before start & add test
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-kripakov-m10 committed Dec 20, 2023
1 parent 6558f56 commit f8e12a8
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 7 deletions.
4 changes: 0 additions & 4 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,6 @@ func (a *Agent) setupRaft() error {
ServerAddressProvider: serverAddressProvider,
}
transport := raft.NewNetworkTransportWithConfig(transConfig)
rpcIP := net.ParseIP(a.config.Tags["rpc_addr"])
port, err := strconv.Atoi(a.config.Tags["port"])
rpcAddr := &net.TCPAddr{IP: rpcIP, Port: port}
a.serverLookup.AddServer(&ServerParts{ID: a.config.NodeName, RPCAddr: rpcAddr})
a.raftTransport = transport

config := raft.DefaultConfig()
Expand Down
94 changes: 93 additions & 1 deletion dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

var (
logLevel = "error"
logLevel = "info"
)

func TestAgentCommand_runForElection(t *testing.T) {
Expand Down Expand Up @@ -636,3 +636,95 @@ func Test_selectNodes(t *testing.T) {
})
}
}

func Test_clusterWillRecoverAfterIpChange(t *testing.T) {
a1, rfn1 := buildAndRunAgent("test8", []string{}, 3)
defer rfn1()
a2, rfn2 := buildAndRunAgent("test9", []string{a1.bindRPCAddr()[:len(a1.bindRPCAddr())-4] + "8946"}, 3)
defer rfn2()
a3, rfn3 := buildAndRunAgent("test10", []string{a1.bindRPCAddr()[:len(a1.bindRPCAddr())-4] + "8946", a2.bindRPCAddr()[:len(a2.bindRPCAddr())-4] + "8946"}, 3)
defer rfn3()
time.Sleep(2 * time.Second)
assert.True(t, a1.IsLeader() || a2.IsLeader() || a3.IsLeader())
servers := a1.raft.GetConfiguration().Configuration().Servers
assert.Equal(t, 3, len(servers))
servers = a2.raft.GetConfiguration().Configuration().Servers
assert.Equal(t, 3, len(servers))
servers = a3.raft.GetConfiguration().Configuration().Servers
assert.Equal(t, 3, len(servers))

_ = a1.Stop()

time.Sleep(30 * time.Second)

assert.True(t, !a1.IsLeader() && (a2.IsLeader() || a3.IsLeader()))

//servers = a2.raft.GetConfiguration().Configuration().Servers
//assert.Equal(t, 2, len(servers))
//servers = a3.raft.GetConfiguration().Configuration().Servers
//assert.Equal(t, 2, len(servers))

_ = a2.Stop()

time.Sleep(20 * time.Second)

assert.True(t, !a1.IsLeader() && !a2.IsLeader() && !a3.IsLeader())

//servers = a3.raft.GetConfiguration().Configuration().Servers
//assert.Equal(t, 1, len(servers))

a1, rfn1 = buildAndRunAgent("test8", []string{a3.bindRPCAddr()[:len(a3.bindRPCAddr())-4] + "8946"}, 3)
defer rfn1()
a2, rfn2 = buildAndRunAgent("test9", []string{a3.bindRPCAddr()[:len(a3.bindRPCAddr())-4] + "8946"}, 3)
defer rfn2()

time.Sleep(10 * time.Second)

assert.True(t, a1.IsLeader() || a2.IsLeader() || a3.IsLeader())
servers = a1.raft.GetConfiguration().Configuration().Servers
assert.Equal(t, 3, len(servers))
servers = a2.raft.GetConfiguration().Configuration().Servers
assert.Equal(t, 3, len(servers))
servers = a3.raft.GetConfiguration().Configuration().Servers
assert.Equal(t, 3, len(servers))
}

func buildAndRunAgent(
nodeName string,
startJoin []string,
bootstrapExpect int,
) (*Agent, func()) {

dir, err := os.MkdirTemp("", fmt.Sprintf("test-%s", nodeName))
if err != nil {
panic(err.Error())
}
defer os.RemoveAll(dir)
ip, returnFn := testutil.TakeIP()
defer returnFn()
addr := ip.String()

// Start another agent
c := DefaultConfig()
c.BindAddr = addr
c.StartJoin = startJoin
c.NodeName = nodeName
c.Server = true
c.LogLevel = logLevel
c.BootstrapExpect = bootstrapExpect
c.DevMode = true
c.DataDir = dir
c.RaftMultiplier = 1

a2 := NewAgent(c)
err = a2.Start()
if err != nil {
panic(err.Error())
}

return a2, func() {
_ = a2.Stop()
returnFn()
_ = os.RemoveAll(dir)
}
}
45 changes: 44 additions & 1 deletion dkron/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dkron

import (
"strings"
"time"

"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
Expand All @@ -11,6 +12,9 @@ const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)

// maxPeerRetries limits how many invalidate attempts are made
maxPeerRetries = 6
)

// nodeJoin is used to handle join events on the serf cluster
Expand Down Expand Up @@ -110,7 +114,46 @@ func (a *Agent) maybeBootstrap() {
return
}

// TODO: Query each of the servers and make sure they report no Raft peers.
// Query each of the servers and make sure they report no Raft peers.
for _, server := range servers {
var peers []string

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
configuration, err := a.GRPCClient.RaftGetConfiguration(server.RPCAddr.String())
if err != nil {
nextRetry := (1 << attempt) * time.Second
a.logger.Error("Failed to confirm peer status for server (will retry).",
"server", server.Name,
"retry_interval", nextRetry.String(),
"error", err,
)
time.Sleep(nextRetry)
} else {
for _, peer := range configuration.Servers {
peers = append(peers, peer.Id)
}
break
}
}

// Found a node with some Raft peers, stop bootstrap since there's
// evidence of an existing cluster. We should get folded in by the
// existing servers if that's the case, so it's cleaner to sit as a
// candidate with no peers so we don't cause spurious elections.
// It's OK this is racy, because even with an initial bootstrap
// as long as one peer runs bootstrap things will work, and if we
// have multiple peers bootstrap in the same way, that's OK. We
// just don't want a server added much later to do a live bootstrap
// and interfere with the cluster. This isn't required for Raft's
// correctness because no server in the existing cluster will vote
// for this server, but it makes things much more stable.
if len(peers) > 0 {
a.logger.Info("Existing Raft peers reported by server, disabling bootstrap mode", "server", server.Name)
a.config.BootstrapExpect = 0
return
}
}

// Update the peer set
// Attempt a live bootstrap!
Expand Down
2 changes: 1 addition & 1 deletion dkron/server_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (sl *ServerLookup) AddServer(server *ServerParts) {
sl.lock.Lock()
defer sl.lock.Unlock()
sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server
sl.idToServer[raft.ServerID(server.ID)] = server
sl.idToServer[raft.ServerID(server.Name)] = server
}

func (sl *ServerLookup) RemoveServer(server *ServerParts) {
Expand Down

0 comments on commit f8e12a8

Please sign in to comment.