Skip to content

Commit

Permalink
refactor: Use gRPC streaming calls to run jobs
Browse files Browse the repository at this point in the history
Serf queries works over UDP sending messages to nodes and expecting a
response, there is no guarantee or verification of the message delivery.
This was causing some job executions to be lost, around 1%-2% on edge
cases.

Solution

Agents will listen to gRPC calls from the servers, using server-side
streaming, the other way around as currently, agents will send Execution
progress as now.

Servers will actively order nodes to run the jobs, so we can verify the
job is actually being executed or report an error in case it's not.

This maintains the same guarantees in job status reporting and
streaming, also adding the possibility to cancel an execution in the
future.
  • Loading branch information
Victor Castell committed May 6, 2020
1 parent 90d7caa commit a9cf417
Show file tree
Hide file tree
Showing 18 changed files with 738 additions and 549 deletions.
197 changes: 107 additions & 90 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dkron

import (
"crypto/tls"
"encoding/json"
"errors"
"expvar"
"fmt"
Expand All @@ -23,8 +22,11 @@ import (
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"github.com/juliangruber/go-intersect"
"github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
Expand All @@ -41,6 +43,9 @@ var (
// ErrLeaderNotFound is returned when obtained leader is not found in member list
ErrLeaderNotFound = errors.New("no member leader found in member list")

// ErrNoSuitableServer returns an error in case no suitable server to send the request is found.
ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting")

runningExecutions sync.Map
)

Expand Down Expand Up @@ -101,6 +106,8 @@ type Agent struct {
peers map[string][]*ServerParts
localPeers map[raft.ServerAddress]*ServerParts
peerLock sync.RWMutex

activeExecutions sync.Map
}

// ProcessorFactory is a function type that creates a new instance
Expand Down Expand Up @@ -159,19 +166,40 @@ func (a *Agent) Start() error {
// Expose the node name
expNode.Set(a.config.NodeName)

//Use the value of "RPCPort" if AdvertiseRPCPort has not been set
if a.config.AdvertiseRPCPort <= 0 {
a.config.AdvertiseRPCPort = a.config.RPCPort
}

if a.config.Server {
a.StartServer()
} else {
// Create a listener at the desired port.
addr := a.getRPCAddr()
l, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal(err)
}

opts := []grpc.ServerOption{}
if a.TLSConfig != nil {
tc := credentials.NewTLS(a.TLSConfig)
opts = append(opts, grpc.Creds(tc))
}

grpcServer := grpc.NewServer(opts...)
as := NewAgentServer(a)
proto.RegisterAgentServer(grpcServer, as)
go grpcServer.Serve(l)
}

if a.GRPCClient == nil {
a.GRPCClient = NewGRPCClient(nil, a)
}

tags := a.serf.LocalMember().Tags
if a.config.Server {
tags["rpc_addr"] = a.getRPCAddr() // Address that clients will use to RPC to servers
tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort)
}
tags["rpc_addr"] = a.getRPCAddr() // Address that clients will use to RPC to servers
tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort)
a.serf.SetTags(tags)

go a.eventLoop()
Expand Down Expand Up @@ -447,11 +475,6 @@ func (a *Agent) SetConfig(c *Config) {

// StartServer launch a new dkron server process
func (a *Agent) StartServer() {
//Use the value of "RPCPort" if AdvertiseRPCPort has not been set
if a.config.AdvertiseRPCPort <= 0 {
a.config.AdvertiseRPCPort = a.config.RPCPort
}

if a.Store == nil {
s, err := NewStore()
if err != nil {
Expand Down Expand Up @@ -634,62 +657,6 @@ func (a *Agent) eventLoop() {
}
}

if e.EventType() == serf.EventQuery {
query := e.(*serf.Query)

if query.Name == QueryRunJob {
log.WithFields(logrus.Fields{
"query": query.Name,
"payload": string(query.Payload),
"at": query.LTime,
}).Debug("agent: Running job")

var rqp RunQueryParam
if err := json.Unmarshal(query.Payload, &rqp); err != nil {
log.WithField("query", QueryRunJob).Fatal("agent: Error unmarshaling query payload")
}

log.WithFields(logrus.Fields{
"job": rqp.Execution.JobName,
}).Info("agent: Starting job")

// There are two error types to handle here:
// Key not found when the job is removed from store
// Dial tcp error
// In case of deleted job or other error, we should report and break the flow.
// On dial error we should retry with a limit.
i := 0
RetryGetJob:
job, err := a.GRPCClient.GetJob(rqp.RPCAddr, rqp.Execution.JobName)
if err != nil {
if err == ErrRPCDialing {
if i < 10 {
i++
goto RetryGetJob
}
log.WithError(err).Fatal("agent: A working RPC connection to a Dkron server must exists.")
}
log.WithError(err).Error("agent: Error on rpc.GetJob call")
continue
}
log.WithField("job", job.Name).Debug("agent: GetJob by RPC")

ex := rqp.Execution
ex.StartedAt = time.Now()
ex.NodeName = a.config.NodeName

go func() {
if err := a.invokeJob(job, ex); err != nil {
log.WithError(err).Error("agent: Error invoking job")
}
}()

// Respond with the execution JSON though it is not used in the destination
exJSON, _ := json.Marshal(ex)
query.Respond(exJSON)
}
}

case <-serfShutdownCh:
log.Warn("agent: Serf shutdown detected, quitting")
return
Expand All @@ -710,9 +677,9 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) {
return
}

func (a *Agent) processFilteredNodes(job *Job) ([]string, map[string]string, error) {
var nodes []string
var candidates []string
func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) {
// candidates will contain a set of candidates by tags
// the final set of nodes will be the intesection of all groups
tags := make(map[string]string)

// Actually copy the map
Expand All @@ -724,42 +691,73 @@ func (a *Agent) processFilteredNodes(job *Job) ([]string, map[string]string, err
// on the same region.
tags["region"] = a.config.Region

candidates := [][]string{}
for jtk, jtv := range tags {
var tc []string
if tc = strings.Split(jtv, ":"); len(tc) == 2 {
tv := tc[0]
cans := []string{}
tc := strings.Split(jtv, ":")

// Set original tag to clean tag
tags[jtk] = tv
tv := tc[0]

count, err := strconv.Atoi(tc[1])
if err != nil {
return nil, nil, err
}
// Set original tag to clean tag
tags[jtk] = tv

for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
for mtk, mtv := range member.Tags {
if mtk == jtk && mtv == tv {
candidates = append(candidates, member.Name)
}
for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
for mtk, mtv := range member.Tags {
if mtk == jtk && mtv == tv {
cans = append(cans, member.Name)
}
}
}
}

// In case there is cardinality in the tag, randomize the order and select the amount of nodes
// or else just add all nodes to the result.
if len(tc) == 2 {
f := []string{}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
rand.Shuffle(len(cans), func(i, j int) {
cans[i], cans[j] = cans[j], cans[i]
})

count, err := strconv.Atoi(tc[1])
if err != nil {
return nil, nil, err
}
for i := 1; i <= count; i++ {
if len(candidates) == 0 {
if len(cans) == 0 {
break
}
nodes = append(nodes, candidates[0])
candidates = candidates[1:]
f = append(f, cans[0])
cans = cans[1:]
}
candidates = nil
cans = f
}
if len(cans) > 0 {
candidates = append(candidates, cans)
}
}

// The final result will be the intersection of all candidates.
nodes := make(map[string]string)
r := candidates[0]
for i := 1; i <= len(candidates)-1; i++ {
isec := intersect.Simple(r, candidates[i]).([]interface{})
// Empty the slice
r = []string{}

// Refill with the intersection
for _, v := range isec {
r = append(r, v.(string))
}
}

for _, n := range r {
for _, m := range a.serf.Members() {
if n == m.Name {
// If the server is missing the rpc_addr tag, default to the serf advertise addr
nodes[n] = m.Tags["rpc_addr"]
}
}
}

Expand Down Expand Up @@ -844,3 +842,22 @@ func (a *Agent) recursiveSetJob(jobs []*Job) []string {
}
return result
}

// Check if the server is alive and select it
func (a *Agent) checkAndSelectServer() (string, error) {
var peers []string
for _, p := range a.LocalServers() {
peers = append(peers, p.RPCAddr.String())
}

for _, peer := range peers {
log.Debugf("Checking peer: %v", peer)
conn, err := net.DialTimeout("tcp", peer, 1*time.Second)
if err == nil {
conn.Close()
log.Debugf("Found good peer: %v", peer)
return peer, nil
}
}
return "", ErrNoSuitableServer
}
64 changes: 61 additions & 3 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,27 @@ func Test_processFilteredNodes(t *testing.T) {
a2 := NewAgent(c)
a2.Start()

// Start another agent
ip3, returnFn3 := testutil.TakeIP()
defer returnFn3()
a3Addr := ip3.String()

c = DefaultConfig()
c.BindAddr = a3Addr
c.StartJoin = []string{a1Addr + ":8946"}
c.NodeName = "test3"
c.Server = false
c.LogLevel = logLevel
c.Tags = map[string]string{
"tag": "test_client",
"extra": "tag",
}
c.DevMode = true
c.DataDir = dir

a3 := NewAgent(c)
a3.Start()

time.Sleep(2 * time.Second)

job := &Job{
Expand All @@ -157,16 +178,53 @@ func Test_processFilteredNodes(t *testing.T) {
}

nodes, tags, err := a1.processFilteredNodes(job)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

assert.Contains(t, nodes, "test1")
assert.Contains(t, nodes, "test2")
assert.Len(t, nodes, 2)
assert.Equal(t, tags["tag"], "test")

job2 := &Job{
Name: "test_job_2",
Tags: map[string]string{
"tag": "test:1",
},
}

nodes, _, err = a1.processFilteredNodes(job2)
require.NoError(t, err)

assert.Len(t, nodes, 1)

job3 := &Job{
Name: "test_job_3",
}

nodes, _, err = a1.processFilteredNodes(job3)
require.NoError(t, err)

assert.Len(t, nodes, 3)
assert.Contains(t, nodes, "test1")
assert.Contains(t, nodes, "test2")
assert.Contains(t, nodes, "test3")

job4 := &Job{
Name: "test_job_4",
Tags: map[string]string{
"tag": "test_client:1",
},
}

nodes, _, err = a1.processFilteredNodes(job4)
require.NoError(t, err)

assert.Len(t, nodes, 1)
assert.Contains(t, nodes, "test3")

a1.Stop()
a2.Stop()
a3.Stop()
}

func TestEncrypt(t *testing.T) {
Expand Down
Loading

0 comments on commit a9cf417

Please sign in to comment.