Skip to content

Commit

Permalink
(WIP - consul like approach) add server_lookup to make dkron independ…
Browse files Browse the repository at this point in the history
…ent from server node IP on raft layer
  • Loading branch information
ivan-kripakov-m10 committed Dec 15, 2023
1 parent f829f54 commit 9ef70de
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 6 deletions.
26 changes: 21 additions & 5 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ type Agent struct {

// peers is used to track the known Dkron servers. This is
// used for region forwarding and clustering.
peers map[string][]*ServerParts
localPeers map[raft.ServerAddress]*ServerParts
peerLock sync.RWMutex
peers map[string][]*ServerParts
localPeers map[raft.ServerAddress]*ServerParts
peerLock sync.RWMutex
serverLookup *ServerLookup

activeExecutions sync.Map

Expand Down Expand Up @@ -316,7 +317,19 @@ func (a *Agent) setupRaft() error {
logger = a.logger.Logger.Writer()
}

transport := raft.NewNetworkTransport(a.raftLayer, 3, raftTimeout, logger)
var serverAddressProvider raft.ServerAddressProvider = a.serverLookup

transConfig := &raft.NetworkTransportConfig{
Stream: a.raftLayer,
MaxPool: 3,
Timeout: 10 * time.Second,
ServerAddressProvider: serverAddressProvider,
}
transport := raft.NewNetworkTransportWithConfig(transConfig)
rpcIP := net.ParseIP(a.config.Tags["rpc_addr"])
port, err := strconv.Atoi(a.config.Tags["port"])
rpcAddr := &net.TCPAddr{IP: rpcIP, Port: port}
a.serverLookup.AddServer(&ServerParts{ID: a.config.NodeName, RPCAddr: rpcAddr})
a.raftTransport = transport

config := raft.DefaultConfig()
Expand Down Expand Up @@ -540,6 +553,7 @@ func (a *Agent) SetConfig(c *Config) {

// StartServer launch a new dkron server process
func (a *Agent) StartServer() {
a.serverLookup = NewServerLookup()
if a.Store == nil {
s, err := NewStore(a.logger)
if err != nil {
Expand Down Expand Up @@ -710,7 +724,9 @@ func (a *Agent) eventLoop() {
a.localMemberEvent(me)
case serf.EventMemberReap:
a.localMemberEvent(me)
case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore
case serf.EventMemberUpdate:
a.logger.WithField("event", e.String()).Info("agent: event member update")
case serf.EventUser, serf.EventQuery: // Ignore
default:
a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event")
}
Expand Down
3 changes: 2 additions & 1 deletion dkron/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (a *Agent) nodeJoin(me serf.MemberEvent) {
continue
}
a.logger.WithField("server", parts.Name).Info("adding server")

a.serverLookup.AddServer(parts)
// Check if this server is known
found := false
a.peerLock.Lock()
Expand Down Expand Up @@ -174,6 +174,7 @@ func (a *Agent) nodeFailed(me serf.MemberEvent) {
delete(a.localPeers, raft.ServerAddress(parts.Addr.String()))
}
a.peerLock.Unlock()
a.serverLookup.RemoveServer(parts)
}
}

Expand Down
75 changes: 75 additions & 0 deletions dkron/server_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package dkron

import (
"fmt"
"sync"

"github.com/hashicorp/raft"
)

// ServerLookup encapsulates looking up servers by id and address
type ServerLookup struct {
lock sync.RWMutex
addressToServer map[raft.ServerAddress]*ServerParts
idToServer map[raft.ServerID]*ServerParts
}

func NewServerLookup() *ServerLookup {
return &ServerLookup{
addressToServer: make(map[raft.ServerAddress]*ServerParts),
idToServer: make(map[raft.ServerID]*ServerParts),
}
}

func (sl *ServerLookup) AddServer(server *ServerParts) {
sl.lock.Lock()
defer sl.lock.Unlock()
sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server
sl.idToServer[raft.ServerID(server.ID)] = server
}

func (sl *ServerLookup) RemoveServer(server *ServerParts) {
sl.lock.Lock()
defer sl.lock.Unlock()
delete(sl.addressToServer, raft.ServerAddress(server.RPCAddr.String()))
delete(sl.idToServer, raft.ServerID(server.ID))
}

// Implements the ServerAddressProvider interface
func (sl *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
sl.lock.RLock()
defer sl.lock.RUnlock()
svr, ok := sl.idToServer[id]
if !ok {
return "", fmt.Errorf("Could not find address for server id %v", id)
}
return raft.ServerAddress(svr.RPCAddr.String()), nil
}

// Server looks up the server by address, returns a boolean if not found
func (sl *ServerLookup) Server(addr raft.ServerAddress) *ServerParts {
sl.lock.RLock()
defer sl.lock.RUnlock()
return sl.addressToServer[addr]
}

func (sl *ServerLookup) Servers() []*ServerParts {
sl.lock.RLock()
defer sl.lock.RUnlock()
var ret []*ServerParts
for _, svr := range sl.addressToServer {
ret = append(ret, svr)
}
return ret
}

func (sl *ServerLookup) CheckServers(fn func(srv *ServerParts) bool) {
sl.lock.RLock()
defer sl.lock.RUnlock()

for _, srv := range sl.addressToServer {
if !fn(srv) {
return
}
}
}

0 comments on commit 9ef70de

Please sign in to comment.