Skip to content

Commit

Permalink
Merge pull request #749 from distribworks/refactor_run
Browse files Browse the repository at this point in the history
Refactor Run
  • Loading branch information
Victor Castell authored May 7, 2020
2 parents f1e230b + b4e0654 commit 2576a77
Show file tree
Hide file tree
Showing 19 changed files with 738 additions and 680 deletions.
199 changes: 108 additions & 91 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dkron

import (
"crypto/tls"
"encoding/json"
"errors"
"expvar"
"fmt"
Expand All @@ -23,12 +22,15 @@ import (
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"github.com/juliangruber/go-intersect"
"github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
raftTimeout = 10 * time.Second
raftTimeout = 30 * time.Second
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently committed entries.
raftLogCacheSize = 512
Expand All @@ -41,6 +43,9 @@ var (
// ErrLeaderNotFound is returned when obtained leader is not found in member list
ErrLeaderNotFound = errors.New("no member leader found in member list")

// ErrNoSuitableServer returns an error in case no suitable server to send the request is found.
ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting")

runningExecutions sync.Map
)

Expand Down Expand Up @@ -101,6 +106,8 @@ type Agent struct {
peers map[string][]*ServerParts
localPeers map[raft.ServerAddress]*ServerParts
peerLock sync.RWMutex

activeExecutions sync.Map
}

// ProcessorFactory is a function type that creates a new instance
Expand Down Expand Up @@ -159,19 +166,40 @@ func (a *Agent) Start() error {
// Expose the node name
expNode.Set(a.config.NodeName)

//Use the value of "RPCPort" if AdvertiseRPCPort has not been set
if a.config.AdvertiseRPCPort <= 0 {
a.config.AdvertiseRPCPort = a.config.RPCPort
}

if a.config.Server {
a.StartServer()
} else {
// Create a listener at the desired port.
addr := a.getRPCAddr()
l, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal(err)
}

opts := []grpc.ServerOption{}
if a.TLSConfig != nil {
tc := credentials.NewTLS(a.TLSConfig)
opts = append(opts, grpc.Creds(tc))
}

grpcServer := grpc.NewServer(opts...)
as := NewAgentServer(a)
proto.RegisterAgentServer(grpcServer, as)
go grpcServer.Serve(l)
}

if a.GRPCClient == nil {
a.GRPCClient = NewGRPCClient(nil, a)
}

tags := a.serf.LocalMember().Tags
if a.config.Server {
tags["rpc_addr"] = a.getRPCAddr() // Address that clients will use to RPC to servers
tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort)
}
tags["rpc_addr"] = a.getRPCAddr() // Address that clients will use to RPC to servers
tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort)
a.serf.SetTags(tags)

go a.eventLoop()
Expand Down Expand Up @@ -447,11 +475,6 @@ func (a *Agent) SetConfig(c *Config) {

// StartServer launch a new dkron server process
func (a *Agent) StartServer() {
//Use the value of "RPCPort" if AdvertiseRPCPort has not been set
if a.config.AdvertiseRPCPort <= 0 {
a.config.AdvertiseRPCPort = a.config.RPCPort
}

if a.Store == nil {
s, err := NewStore()
if err != nil {
Expand Down Expand Up @@ -634,62 +657,6 @@ func (a *Agent) eventLoop() {
}
}

if e.EventType() == serf.EventQuery {
query := e.(*serf.Query)

if query.Name == QueryRunJob {
log.WithFields(logrus.Fields{
"query": query.Name,
"payload": string(query.Payload),
"at": query.LTime,
}).Debug("agent: Running job")

var rqp RunQueryParam
if err := json.Unmarshal(query.Payload, &rqp); err != nil {
log.WithField("query", QueryRunJob).Fatal("agent: Error unmarshaling query payload")
}

log.WithFields(logrus.Fields{
"job": rqp.Execution.JobName,
}).Info("agent: Starting job")

// There are two error types to handle here:
// Key not found when the job is removed from store
// Dial tcp error
// In case of deleted job or other error, we should report and break the flow.
// On dial error we should retry with a limit.
i := 0
RetryGetJob:
job, err := a.GRPCClient.GetJob(rqp.RPCAddr, rqp.Execution.JobName)
if err != nil {
if err == ErrRPCDialing {
if i < 10 {
i++
goto RetryGetJob
}
log.WithError(err).Fatal("agent: A working RPC connection to a Dkron server must exists.")
}
log.WithError(err).Error("agent: Error on rpc.GetJob call")
continue
}
log.WithField("job", job.Name).Debug("agent: GetJob by RPC")

ex := rqp.Execution
ex.StartedAt = time.Now()
ex.NodeName = a.config.NodeName

go func() {
if err := a.invokeJob(job, ex); err != nil {
log.WithError(err).Error("agent: Error invoking job")
}
}()

// Respond with the execution JSON though it is not used in the destination
exJSON, _ := json.Marshal(ex)
query.Respond(exJSON)
}
}

case <-serfShutdownCh:
log.Warn("agent: Serf shutdown detected, quitting")
return
Expand All @@ -710,9 +677,9 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) {
return
}

func (a *Agent) processFilteredNodes(job *Job) ([]string, map[string]string, error) {
var nodes []string
var candidates []string
func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) {
// candidates will contain a set of candidates by tags
// the final set of nodes will be the intesection of all groups
tags := make(map[string]string)

// Actually copy the map
Expand All @@ -724,42 +691,73 @@ func (a *Agent) processFilteredNodes(job *Job) ([]string, map[string]string, err
// on the same region.
tags["region"] = a.config.Region

candidates := [][]string{}
for jtk, jtv := range tags {
var tc []string
if tc = strings.Split(jtv, ":"); len(tc) == 2 {
tv := tc[0]
cans := []string{}
tc := strings.Split(jtv, ":")

// Set original tag to clean tag
tags[jtk] = tv
tv := tc[0]

count, err := strconv.Atoi(tc[1])
if err != nil {
return nil, nil, err
}
// Set original tag to clean tag
tags[jtk] = tv

for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
for mtk, mtv := range member.Tags {
if mtk == jtk && mtv == tv {
candidates = append(candidates, member.Name)
}
for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
for mtk, mtv := range member.Tags {
if mtk == jtk && mtv == tv {
cans = append(cans, member.Name)
}
}
}
}

// In case there is cardinality in the tag, randomize the order and select the amount of nodes
// or else just add all nodes to the result.
if len(tc) == 2 {
f := []string{}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
rand.Shuffle(len(cans), func(i, j int) {
cans[i], cans[j] = cans[j], cans[i]
})

count, err := strconv.Atoi(tc[1])
if err != nil {
return nil, nil, err
}
for i := 1; i <= count; i++ {
if len(candidates) == 0 {
if len(cans) == 0 {
break
}
nodes = append(nodes, candidates[0])
candidates = candidates[1:]
f = append(f, cans[0])
cans = cans[1:]
}
candidates = nil
cans = f
}
if len(cans) > 0 {
candidates = append(candidates, cans)
}
}

// The final result will be the intersection of all candidates.
nodes := make(map[string]string)
r := candidates[0]
for i := 1; i <= len(candidates)-1; i++ {
isec := intersect.Simple(r, candidates[i]).([]interface{})
// Empty the slice
r = []string{}

// Refill with the intersection
for _, v := range isec {
r = append(r, v.(string))
}
}

for _, n := range r {
for _, m := range a.serf.Members() {
if n == m.Name {
// If the server is missing the rpc_addr tag, default to the serf advertise addr
nodes[n] = m.Tags["rpc_addr"]
}
}
}

Expand Down Expand Up @@ -844,3 +842,22 @@ func (a *Agent) recursiveSetJob(jobs []*Job) []string {
}
return result
}

// Check if the server is alive and select it
func (a *Agent) checkAndSelectServer() (string, error) {
var peers []string
for _, p := range a.LocalServers() {
peers = append(peers, p.RPCAddr.String())
}

for _, peer := range peers {
log.Debugf("Checking peer: %v", peer)
conn, err := net.DialTimeout("tcp", peer, 1*time.Second)
if err == nil {
conn.Close()
log.Debugf("Found good peer: %v", peer)
return peer, nil
}
}
return "", ErrNoSuitableServer
}
64 changes: 61 additions & 3 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,27 @@ func Test_processFilteredNodes(t *testing.T) {
a2 := NewAgent(c)
a2.Start()

// Start another agent
ip3, returnFn3 := testutil.TakeIP()
defer returnFn3()
a3Addr := ip3.String()

c = DefaultConfig()
c.BindAddr = a3Addr
c.StartJoin = []string{a1Addr + ":8946"}
c.NodeName = "test3"
c.Server = false
c.LogLevel = logLevel
c.Tags = map[string]string{
"tag": "test_client",
"extra": "tag",
}
c.DevMode = true
c.DataDir = dir

a3 := NewAgent(c)
a3.Start()

time.Sleep(2 * time.Second)

job := &Job{
Expand All @@ -157,16 +178,53 @@ func Test_processFilteredNodes(t *testing.T) {
}

nodes, tags, err := a1.processFilteredNodes(job)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

assert.Contains(t, nodes, "test1")
assert.Contains(t, nodes, "test2")
assert.Len(t, nodes, 2)
assert.Equal(t, tags["tag"], "test")

job2 := &Job{
Name: "test_job_2",
Tags: map[string]string{
"tag": "test:1",
},
}

nodes, _, err = a1.processFilteredNodes(job2)
require.NoError(t, err)

assert.Len(t, nodes, 1)

job3 := &Job{
Name: "test_job_3",
}

nodes, _, err = a1.processFilteredNodes(job3)
require.NoError(t, err)

assert.Len(t, nodes, 3)
assert.Contains(t, nodes, "test1")
assert.Contains(t, nodes, "test2")
assert.Contains(t, nodes, "test3")

job4 := &Job{
Name: "test_job_4",
Tags: map[string]string{
"tag": "test_client:1",
},
}

nodes, _, err = a1.processFilteredNodes(job4)
require.NoError(t, err)

assert.Len(t, nodes, 1)
assert.Contains(t, nodes, "test3")

a1.Stop()
a2.Stop()
a3.Stop()
}

func TestEncrypt(t *testing.T) {
Expand Down
Loading

0 comments on commit 2576a77

Please sign in to comment.