From 861770800541e6994e1287c3a53c3a974069c156 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Mon, 22 Feb 2021 20:52:52 +0100 Subject: [PATCH 01/17] Make random function injectable Refactor filterNodes --- dkron/agent.go | 79 ++++++++++++++++++------------------- dkron/agent_test.go | 96 ++++++++++++++++++--------------------------- dkron/run.go | 27 +++++++------ 3 files changed, 93 insertions(+), 109 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index bba1b33b0..63c896c4f 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -11,6 +11,7 @@ import ( "net" "os" "path/filepath" + "sort" "strconv" "sync" "time" @@ -692,7 +693,12 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) { return } -func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) { +// The default selector function for processFilteredNodes +func defaultSelector(nodes []serf.Member) int { + return rand.Intn(len(nodes)) +} + +func (a *Agent) processFilteredNodes(job *Job, selectFunc func([]serf.Member) int) ([]serf.Member, error) { // The final set of nodes will be the intersection of all groups tags := make(map[string]string) @@ -705,65 +711,56 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st // on the same region. tags["region"] = a.config.Region - // Make a set of all members - allNodes := make(map[string]serf.Member) - for _, member := range a.serf.Members() { - if member.Status == serf.StatusAlive { - allNodes[member.Name] = member + // Get all living members + allNodes := a.serf.Members() + for i := len(allNodes) - 1; i >= 0; i-- { + if allNodes[i].Status != serf.StatusAlive { + allNodes[i] = allNodes[len(allNodes)-1] + allNodes = allNodes[:len(allNodes)-1] } } - execNodes, tags, cardinality, err := filterNodes(allNodes, tags) - if err != nil { - return nil, nil, err - } + // Sort the nodes to make selection from them predictable + sort.Slice(allNodes, func(i, j int) bool { return allNodes[i].Name < allNodes[j].Name }) - // Create an array of node names to aid in computing resulting set based on cardinality - var names []string - for name := range execNodes { - names = append(names, name) + execNodes, cardinality, err := filterNodes(allNodes, tags) + if err != nil { + return nil, err } - nodes := make(map[string]string) + numNodes := len(execNodes) for ; cardinality > 0; cardinality-- { - // Pick a node, any node - randomIndex := rand.Intn(len(names)) - m := execNodes[names[randomIndex]] - - // Store name and address - if addr, ok := m.Tags["rpc_addr"]; ok { - nodes[m.Name] = addr - } else { - nodes[m.Name] = m.Addr.String() - } + // Select a node + chosenIndex := selectFunc(execNodes[:numNodes]) - // Swap picked node with the first one and shorten array, so node can't get picked again - names[randomIndex], names[0] = names[0], names[randomIndex] - names = names[1:] + // Swap picked node with the last one and reduce choices so it can't get picked again + execNodes[numNodes-1], execNodes[chosenIndex] = execNodes[chosenIndex], execNodes[numNodes-1] + numNodes-- } - return nodes, tags, nil + return execNodes[numNodes:], nil } -// filterNodes determines which of the provided nodes have the given tags +// filterNodes determines which of the execNodes have the given tags // Returns: // * the map of allNodes that match the provided tags -// * a clean map of tag values without cardinality -// * cardinality, i.e. the max number of nodes that should be targeted, regardless of the -// number of nodes in the resulting map. +// * cardinality, i.e. the max number of nodes that should be targeted. This can be no higher +// than the number of nodes in the resulting map // * an error if a cardinality was malformed -func filterNodes(allNodes map[string]serf.Member, tags map[string]string) (map[string]serf.Member, map[string]string, int, error) { +func filterNodes(allNodes []serf.Member, tags map[string]string) ([]serf.Member, int, error) { ct, cardinality, err := cleanTags(tags) if err != nil { - return nil, nil, 0, err + return nil, 0, err } - matchingNodes := make(map[string]serf.Member) + matchingNodes := make([]serf.Member, len(allNodes)) + copy(matchingNodes, allNodes) - // Filter nodes that lack tags - for name, member := range allNodes { - if nodeMatchesTags(member, ct) { - matchingNodes[name] = member + // Remove nodes that do not have the selected tags + for i := len(matchingNodes) - 1; i >= 0; i-- { + if !nodeMatchesTags(matchingNodes[i], ct) { + matchingNodes[i] = matchingNodes[len(matchingNodes)-1] + matchingNodes = matchingNodes[:len(matchingNodes)-1] } } @@ -772,7 +769,7 @@ func filterNodes(allNodes map[string]serf.Member, tags map[string]string) (map[s cardinality = len(matchingNodes) } - return matchingNodes, ct, cardinality, nil + return matchingNodes, cardinality, nil } // This function is called when a client request the RPCAddress diff --git a/dkron/agent_test.go b/dkron/agent_test.go index a446de336..79b8e9be8 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -113,6 +113,10 @@ func TestAgentCommand_runForElection(t *testing.T) { a3.Stop() } +func lastSelector(nodes []serf.Member) int { + return len(nodes) - 1 +} + func Test_processFilteredNodes(t *testing.T) { dir, err := ioutil.TempDir("", "dkron-test") require.NoError(t, err) @@ -200,13 +204,12 @@ func Test_processFilteredNodes(t *testing.T) { }, } - nodes, tags, err := a1.processFilteredNodes(job) + nodes, err := a1.processFilteredNodes(job, lastSelector) require.NoError(t, err) - assert.Contains(t, nodes, "test1") - assert.Contains(t, nodes, "test2") + assert.Exactly(t, "test1", nodes[0].Name) + assert.Exactly(t, "test2", nodes[1].Name) assert.Len(t, nodes, 2) - assert.Equal(t, tags["tag"], "test") // Test cardinality of 1 with two qualified nodes returns 1 node job2 := &Job{ @@ -216,7 +219,7 @@ func Test_processFilteredNodes(t *testing.T) { }, } - nodes, _, err = a1.processFilteredNodes(job2) + nodes, err = a1.processFilteredNodes(job2, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 1) @@ -226,13 +229,13 @@ func Test_processFilteredNodes(t *testing.T) { Name: "test_job_3", } - nodes, _, err = a1.processFilteredNodes(job3) + nodes, err = a1.processFilteredNodes(job3, lastSelector) require.NoError(t, err) assert.Len(t, nodes, 3) - assert.Contains(t, nodes, "test1") - assert.Contains(t, nodes, "test2") - assert.Contains(t, nodes, "test3") + assert.Exactly(t, "test1", nodes[0].Name) + assert.Exactly(t, "test2", nodes[1].Name) + assert.Exactly(t, "test3", nodes[2].Name) // Test exclusive tag returns correct node job4 := &Job{ @@ -242,11 +245,11 @@ func Test_processFilteredNodes(t *testing.T) { }, } - nodes, _, err = a1.processFilteredNodes(job4) + nodes, err = a1.processFilteredNodes(job4, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 1) - assert.Contains(t, nodes, "test3") + assert.Exactly(t, "test3", nodes[0].Name) // Test existing tag but no matching value returns no nodes job5 := &Job{ @@ -256,7 +259,7 @@ func Test_processFilteredNodes(t *testing.T) { }, } - nodes, _, err = a1.processFilteredNodes(job5) + nodes, err = a1.processFilteredNodes(job5, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 0) @@ -270,11 +273,10 @@ func Test_processFilteredNodes(t *testing.T) { }, } - nodes, tags, err = a1.processFilteredNodes(job6) + nodes, err = a1.processFilteredNodes(job6, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 0) - assert.Equal(t, tags["tag"], "test") // Test matching tags with cardinality of 2 but only 1 matching node returns correct node job7 := &Job{ @@ -285,13 +287,11 @@ func Test_processFilteredNodes(t *testing.T) { }, } - nodes, tags, err = a1.processFilteredNodes(job7) + nodes, err = a1.processFilteredNodes(job7, defaultSelector) require.NoError(t, err) - assert.Contains(t, nodes, "test2") assert.Len(t, nodes, 1) - assert.Equal(t, tags["tag"], "test") - assert.Equal(t, tags["extra"], "tag") + assert.Exactly(t, "test2", nodes[0].Name) // Test two tags matching same 3 servers and cardinality of 1 should always return 1 server @@ -310,32 +310,24 @@ func Test_processFilteredNodes(t *testing.T) { }, } distrib := make(map[string]int) - var sampleSize = 1000 + var sampleSize = 999 for i := 0; i < sampleSize; i++ { - nodes, tags, err = a1.processFilteredNodes(job8) + // round-robin on the selected nodes to come out at an exactly equal distribution + roundRobinSelector := func(nodes []serf.Member) int { return i % len(nodes) } + nodes, err = a1.processFilteredNodes(job8, roundRobinSelector) require.NoError(t, err) assert.Len(t, nodes, 1) - assert.Equal(t, tags["additional"], "value") - assert.Equal(t, tags["additional2"], "value2") - for name := range nodes { - distrib[name] = distrib[name] + 1 - } + distrib[nodes[0].Name]++ } - // Each node must have been chosen between 30% and 36% of the time, - // for the distribution to be considered equal. - // Note: This test should almost never, but still can, fail even if the - // code is fine. To fix this, the randomizer ought to be mocked. + // Each node must have been chosen 1/3 of the time. for name, count := range distrib { - fmt.Println(name, float64(count)/float64(sampleSize)*100.0, "%") + fmt.Println(name, float64(count)/float64(sampleSize)*100.0, "%", count) } - assert.Greater(t, float64(distrib["test1"])/float64(sampleSize), 0.25) - assert.Less(t, float64(distrib["test1"])/float64(sampleSize), 0.40) - assert.Greater(t, float64(distrib["test2"])/float64(sampleSize), 0.25) - assert.Less(t, float64(distrib["test2"])/float64(sampleSize), 0.40) - assert.Greater(t, float64(distrib["test3"])/float64(sampleSize), 0.25) - assert.Less(t, float64(distrib["test3"])/float64(sampleSize), 0.40) + assert.Exactly(t, sampleSize/3, distrib["test1"]) + assert.Exactly(t, sampleSize/3, distrib["test2"]) + assert.Exactly(t, sampleSize/3, distrib["test3"]) // Clean up a1.Stop() @@ -464,8 +456,8 @@ func TestAgentConfig(t *testing.T) { } func Test_filterNodes(t *testing.T) { - nodes := map[string]serf.Member{ - "node1": { + nodes := []serf.Member{ + { Tags: map[string]string{ "region": "global", "tag": "test", @@ -473,7 +465,7 @@ func Test_filterNodes(t *testing.T) { "tagfor2": "2", }, }, - "node2": { + { Tags: map[string]string{ "region": "global", "tag": "test", @@ -481,7 +473,7 @@ func Test_filterNodes(t *testing.T) { "tagfor2": "2", }, }, - "node3": { + { Tags: map[string]string{ "region": "global", "tag": "test", @@ -490,14 +482,13 @@ func Test_filterNodes(t *testing.T) { }, } type args struct { - execNodes map[string]serf.Member + execNodes []serf.Member tags map[string]string } tests := []struct { name string args args - want map[string]serf.Member - want1 map[string]string + want []serf.Member want2 int wantErr bool }{ @@ -508,7 +499,6 @@ func Test_filterNodes(t *testing.T) { tags: map[string]string{"tag": "test"}, }, want: nodes, - want1: map[string]string{"tag": "test"}, want2: 3, wantErr: false, }, @@ -518,8 +508,7 @@ func Test_filterNodes(t *testing.T) { execNodes: nodes, tags: map[string]string{"just1": "value"}, }, - want: map[string]serf.Member{"node1": nodes["node1"]}, - want1: map[string]string{"just1": "value"}, + want: []serf.Member{nodes[0]}, want2: 1, wantErr: false, }, @@ -529,8 +518,7 @@ func Test_filterNodes(t *testing.T) { execNodes: nodes, tags: map[string]string{"just2": "value"}, }, - want: map[string]serf.Member{"node2": nodes["node2"]}, - want1: map[string]string{"just2": "value"}, + want: []serf.Member{nodes[1]}, want2: 1, wantErr: false, }, @@ -540,8 +528,7 @@ func Test_filterNodes(t *testing.T) { execNodes: nodes, tags: map[string]string{"tagfor2": "2"}, }, - want: map[string]serf.Member{"node1": nodes["node1"], "node2": nodes["node2"]}, - want1: map[string]string{"tagfor2": "2"}, + want: []serf.Member{nodes[0], nodes[1]}, want2: 2, wantErr: false, }, @@ -551,8 +538,7 @@ func Test_filterNodes(t *testing.T) { execNodes: nodes, tags: map[string]string{"unknown": "value"}, }, - want: map[string]serf.Member{}, - want1: map[string]string{"unknown": "value"}, + want: []serf.Member{}, want2: 0, wantErr: false, }, @@ -563,14 +549,13 @@ func Test_filterNodes(t *testing.T) { tags: map[string]string{"tag": "test:1"}, }, want: nodes, - want1: map[string]string{"tag": "test"}, want2: 1, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, got1, got2, err := filterNodes(tt.args.execNodes, tt.args.tags) + got, got2, err := filterNodes(tt.args.execNodes, tt.args.tags) if (err != nil) != tt.wantErr { t.Errorf("filterNodes() error = %v, wantErr %v", err, tt.wantErr) return @@ -578,9 +563,6 @@ func Test_filterNodes(t *testing.T) { if !reflect.DeepEqual(got, tt.want) { t.Errorf("filterNodes() got = %v, want %v", got, tt.want) } - if !reflect.DeepEqual(got1, tt.want1) { - t.Errorf("filterNodes() got1 = %v, want %v", got1, tt.want1) - } if got2 != tt.want2 { t.Errorf("filterNodes() got2 = %v, want %v", got2, tt.want2) } diff --git a/dkron/run.go b/dkron/run.go index 72af8e0f6..213fc8063 100644 --- a/dkron/run.go +++ b/dkron/run.go @@ -7,7 +7,7 @@ import ( "github.com/hashicorp/serf/serf" ) -// Run call the agents to run a job. Returns a job with it's new status and next schedule. +// Run call the agents to run a job. Returns a job with its new status and next schedule. func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { job, err := a.Store.GetJob(jobName, nil) if err != nil { @@ -28,35 +28,40 @@ func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { // In the first execution attempt we build and filter the target nodes // but we use the existing node target in case of retry. - var filterMap map[string]string + var targetNodes []serf.Member if ex.Attempt <= 1 { - filterMap, _, err = a.processFilteredNodes(job) + targetNodes, err = a.processFilteredNodes(job, defaultSelector) if err != nil { return nil, fmt.Errorf("run error processing filtered nodes: %w", err) } } else { - // In case of retrying, find the rpc address of the node or return with an error - var addr string + // In case of retrying, find the node or return with an error for _, m := range a.serf.Members() { if ex.NodeName == m.Name { if m.Status == serf.StatusAlive { - addr = m.Tags["rpc_addr"] + targetNodes = []serf.Member{m} + break } else { return nil, fmt.Errorf("retry node is gone: %s for job %s", ex.NodeName, ex.JobName) } } } - filterMap = map[string]string{ex.NodeName: addr} } // In case no nodes found, return reporting the error - if len(filterMap) < 1 { + if len(targetNodes) < 1 { return nil, fmt.Errorf("no target nodes found to run job %s", ex.JobName) } - a.logger.WithField("nodes", filterMap).Debug("agent: Filtered nodes to run") + a.logger.WithField("nodes", targetNodes).Debug("agent: Filtered nodes to run") var wg sync.WaitGroup - for _, v := range filterMap { + for _, v := range targetNodes { + // Determine node address + addr, ok := v.Tags["rpc_addr"] + if !ok { + addr = v.Addr.String() + } + // Call here client GRPC AgentRun wg.Add(1) go func(node string, wg *sync.WaitGroup) { @@ -73,7 +78,7 @@ func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { "node": node, }).Error("agent: Error calling AgentRun") } - }(v, &wg) + }(addr, &wg) } wg.Wait() From 8b7a1b8f953704a2a6774f18badc9ab59af6e892 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Mon, 22 Feb 2021 20:53:02 +0100 Subject: [PATCH 02/17] Build improvements --- Makefile | 3 +++ scripts/test | 1 + 2 files changed, 4 insertions(+) diff --git a/Makefile b/Makefile index 394822874..c33ac8d70 100644 --- a/Makefile +++ b/Makefile @@ -46,6 +46,9 @@ doc: test: @bash --norc -i ./scripts/test +localtest: + go test -v ./... | sed ''/PASS/s//$$(printf "\033[32mPASS\033[0m")/'' | sed ''/FAIL/s//$$(printf "\033[31mFAIL\033[0m")/'' + updatetestcert: wget https://badssl.com/certs/badssl.com-client.p12 -q -O badssl.com-client.p12 openssl pkcs12 -in badssl.com-client.p12 -nocerts -nodes -passin pass:badssl.com -out builtin/bins/dkron-executor-http/testdata/badssl.com-client-key-decrypted.pem diff --git a/scripts/test b/scripts/test index 67afac004..4ffb7d060 100755 --- a/scripts/test +++ b/scripts/test @@ -1,6 +1,7 @@ #!/usr/bin/env bash set -e +set -o pipefail docker build -t dkron . docker run dkron scripts/validate-gofmt docker run dkron go vet From c544c3db9a69340e581928d4c5c0be9522d91c83 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Thu, 3 Jun 2021 23:26:48 +0200 Subject: [PATCH 03/17] Format help texts so 'dkron help agent' looks better --- dkron/config.go | 101 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 22 deletions(-) diff --git a/dkron/config.go b/dkron/config.go index 746eb5abc..63b946f13 100644 --- a/dkron/config.go +++ b/dkron/config.go @@ -211,28 +211,85 @@ func ConfigFlagSet() *flag.FlagSet { c := DefaultConfig() cmdFlags := flag.NewFlagSet("agent flagset", flag.ContinueOnError) - cmdFlags.Bool("server", false, "This node is running in server mode") - cmdFlags.String("node-name", c.NodeName, "Name of this node. Must be unique in the cluster") - cmdFlags.String("bind-addr", c.BindAddr, "Specifies which address the agent should bind to for network services, including the internal gossip protocol and RPC mechanism. This should be specified in IP format, and can be used to easily bind all network services to the same address. The value supports go-sockaddr/template format.") - cmdFlags.String("advertise-addr", "", "Address used to advertise to other nodes in the cluster. By default, the bind address is advertised. The value supports go-sockaddr/template format.") - cmdFlags.String("http-addr", c.HTTPAddr, "Address to bind the UI web server to. Only used when server. The value supports go-sockaddr/template format.") - cmdFlags.String("profile", c.Profile, "Profile is used to control the timing profiles used") - cmdFlags.StringSlice("join", []string{}, "An initial agent to join with. This flag can be specified multiple times") - cmdFlags.StringSlice("retry-join", []string{}, "Address of an agent to join at start time with retries enabled. Can be specified multiple times.") - cmdFlags.Int("retry-max", 0, "Maximum number of join attempts. Defaults to 0, which will retry indefinitely.") - cmdFlags.String("retry-interval", DefaultRetryInterval.String(), "Time to wait between join attempts.") - cmdFlags.Int("raft-multiplier", c.RaftMultiplier, "An integer multiplier used by servers to scale key Raft timing parameters. Omitting this value or setting it to 0 uses default timing described below. Lower values are used to tighten timing and increase sensitivity while higher values relax timings and reduce sensitivity. Tuning this affects the time it takes to detect leader failures and to perform leader elections, at the expense of requiring more network and CPU resources for better performance. By default, Dkron will use a lower-performance timing that's suitable for minimal Dkron servers, currently equivalent to setting this to a value of 5 (this default may be changed in future versions of Dkron, depending if the target minimum server profile changes). Setting this to a value of 1 will configure Raft to its highest-performance mode is recommended for production Dkron servers. The maximum allowed value is 10.") - cmdFlags.StringSlice("tag", []string{}, "Tag can be specified multiple times to attach multiple key/value tag pairs to the given node, specified as key=value") - cmdFlags.String("encrypt", "", "Key for encrypting network traffic. Must be a base64-encoded 16-byte key") - cmdFlags.String("log-level", c.LogLevel, "Log level (debug|info|warn|error|fatal|panic)") - cmdFlags.Int("rpc-port", c.RPCPort, "RPC Port used to communicate with clients. Only used when server. The RPC IP Address will be the same as the bind address") - cmdFlags.Int("advertise-rpc-port", 0, "Use the value of rpc-port by default") - cmdFlags.Int("bootstrap-expect", 0, "Provides the number of expected servers in the datacenter. Either this value should not be provided or the value must agree with other servers in the cluster. When provided, Dkron waits until the specified number of servers are available and then bootstraps the cluster. This allows an initial leader to be elected automatically. This flag requires server mode.") - cmdFlags.String("data-dir", c.DataDir, "Specifies the directory to use for server-specific data, including the replicated log. By default, this is the top-level data-dir, like [/var/lib/dkron]") - cmdFlags.String("datacenter", c.Datacenter, "Specifies the data center of the local agent. All members of a datacenter should share a local LAN connection.") - cmdFlags.String("region", c.Region, "Specifies the region the Dkron agent is a member of. A region typically maps to a geographic region, for example us, with potentially multiple zones, which map to datacenters such as us-west and us-east") - cmdFlags.String("serf-reconnect-timeout", c.SerfReconnectTimeout, "This is the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone. In Kubernetes, you might need this to about 5s, because there is no reason to try reconnects for default 24h value. Also Raft behaves oddly if node is not reaped and returned with same ID, but different IP. Format there: https://golang.org/pkg/time/#ParseDuration") - cmdFlags.Bool("ui", true, "Enable the web UI on this node. The node must be server.") + cmdFlags.Bool("server", false, + "This node is running in server mode") + cmdFlags.String("node-name", c.NodeName, + "Name of this node. Must be unique in the cluster") + cmdFlags.String("bind-addr", c.BindAddr, + `Specifies which address the agent should bind to for network services, +including the internal gossip protocol and RPC mechanism. This should be +specified in IP format, and can be used to easily bind all network services +to the same address. The value supports go-sockaddr/template format. +`) + cmdFlags.String("advertise-addr", "", + `Address used to advertise to other nodes in the cluster. By default, +the bind address is advertised. The value supports +go-sockaddr/template format.`) + cmdFlags.String("http-addr", c.HTTPAddr, + `Address to bind the UI web server to. Only used when server. The value +supports go-sockaddr/template format.`) + cmdFlags.String("profile", c.Profile, + "Profile is used to control the timing profiles used") + cmdFlags.StringSlice("join", []string{}, + "An initial agent to join with. This flag can be specified multiple times") + cmdFlags.StringSlice("retry-join", []string{}, + `Address of an agent to join at start time with retries enabled. +Can be specified multiple times.`) + cmdFlags.Int("retry-max", 0, + `Maximum number of join attempts. Defaults to 0, which will retry indefinitely.`) + cmdFlags.String("retry-interval", DefaultRetryInterval.String(), + "Time to wait between join attempts.") + cmdFlags.Int("raft-multiplier", c.RaftMultiplier, + `An integer multiplier used by servers to scale key Raft timing parameters. +Omitting this value or setting it to 0 uses default timing described below. +Lower values are used to tighten timing and increase sensitivity while higher +values relax timings and reduce sensitivity. Tuning this affects the time it +takes to detect leader failures and to perform leader elections, at the expense +of requiring more network and CPU resources for better performance. By default, +Dkron will use a lower-performance timing that's suitable for minimal Dkron +servers, currently equivalent to setting this to a value of 5 (this default +may be changed in future versions of Dkron, depending if the target minimum +server profile changes). Setting this to a value of 1 will configure Raft to +its highest-performance mode is recommended for production Dkron servers. +The maximum allowed value is 10.`) + cmdFlags.StringSlice("tag", []string{}, + `Tag can be specified multiple times to attach multiple key/value tag pairs +to the given node, specified as key=value`) + cmdFlags.String("encrypt", "", + "Key for encrypting network traffic. Must be a base64-encoded 16-byte key") + cmdFlags.String("log-level", c.LogLevel, + "Log level (debug|info|warn|error|fatal|panic)") + cmdFlags.Int("rpc-port", c.RPCPort, + `RPC Port used to communicate with clients. Only used when server. +The RPC IP Address will be the same as the bind address.`) + cmdFlags.Int("advertise-rpc-port", 0, + "Use the value of rpc-port by default") + cmdFlags.Int("bootstrap-expect", 0, + `Provides the number of expected servers in the datacenter. Either this value +should not be provided or the value must agree with other servers in the +cluster. When provided, Dkron waits until the specified number of servers are +available and then bootstraps the cluster. This allows an initial leader to be +elected automatically. This flag requires server mode.`) + cmdFlags.String("data-dir", c.DataDir, + `Specifies the directory to use for server-specific data, including the +replicated log. By default, this is the top-level data-dir, +like [/var/lib/dkron]`) + cmdFlags.String("datacenter", c.Datacenter, + `Specifies the data center of the local agent. All members of a datacenter +should share a local LAN connection.`) + cmdFlags.String("region", c.Region, + `Specifies the region the Dkron agent is a member of. A region typically maps +to a geographic region, for example us, with potentially multiple zones, which +map to datacenters such as us-west and us-east`) + cmdFlags.String("serf-reconnect-timeout", c.SerfReconnectTimeout, + `This is the amount of time to attempt to reconnect to a failed node before +giving up and considering it completely gone. In Kubernetes, you might need +this to about 5s, because there is no reason to try reconnects for default +24h value. Also Raft behaves oddly if node is not reaped and returned with +same ID, but different IP. +Format there: https://golang.org/pkg/time/#ParseDuration`) + cmdFlags.Bool("ui", true, + "Enable the web UI on this node. The node must be server.") // Notifications cmdFlags.String("mail-host", "", "Mail server host address to use for notifications") From 86ab43d3ab6d47346472b588311459ff81513d27 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Fri, 4 Jun 2021 22:34:05 +0200 Subject: [PATCH 04/17] Simplify non-graceful termination check --- cmd/agent.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/cmd/agent.go b/cmd/agent.go index d2e2792de..98da58201 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -98,14 +98,8 @@ WAIT: goto WAIT } - // Check if we should do a graceful leave - graceful := false - if sig == syscall.SIGTERM || sig == os.Interrupt { - graceful = true - } - // Fail fast if not doing a graceful leave - if !graceful { + if sig != syscall.SIGTERM && sig != os.Interrupt { return 1 } From f34eef2b03c2a522b1cea592e30ae0b37e69b550 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Sat, 5 Jun 2021 00:53:19 +0200 Subject: [PATCH 05/17] Refactor processFilteredNodes some more --- dkron/agent.go | 78 +++++++++++++-------------------------------- dkron/agent_test.go | 3 +- 2 files changed, 24 insertions(+), 57 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index 63c896c4f..5cd3317e7 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -699,77 +699,43 @@ func defaultSelector(nodes []serf.Member) int { } func (a *Agent) processFilteredNodes(job *Job, selectFunc func([]serf.Member) int) ([]serf.Member, error) { - // The final set of nodes will be the intersection of all groups - tags := make(map[string]string) - - // Actually copy the map - for key, val := range job.Tags { - tags[key] = val + ct, cardinality, err := cleanTags(job.Tags) + if err != nil { + return nil, err } - // Always filter by region tag as we currently only target nodes - // on the same region. - tags["region"] = a.config.Region - - // Get all living members - allNodes := a.serf.Members() - for i := len(allNodes) - 1; i >= 0; i-- { - if allNodes[i].Status != serf.StatusAlive { - allNodes[i] = allNodes[len(allNodes)-1] - allNodes = allNodes[:len(allNodes)-1] - } - } + // Determine the usable set of nodes + nodes := filterArray(a.serf.Members(), func(node serf.Member) bool { + return node.Status == serf.StatusAlive && + node.Tags["region"] == a.config.Region && + nodeMatchesTags(node, ct) + }) // Sort the nodes to make selection from them predictable - sort.Slice(allNodes, func(i, j int) bool { return allNodes[i].Name < allNodes[j].Name }) - - execNodes, cardinality, err := filterNodes(allNodes, tags) - if err != nil { - return nil, err - } + sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) - numNodes := len(execNodes) - for ; cardinality > 0; cardinality-- { + numNodes := len(nodes) + for ; cardinality > 0 && numNodes > 0; cardinality-- { // Select a node - chosenIndex := selectFunc(execNodes[:numNodes]) + chosenIndex := selectFunc(nodes[:numNodes]) // Swap picked node with the last one and reduce choices so it can't get picked again - execNodes[numNodes-1], execNodes[chosenIndex] = execNodes[chosenIndex], execNodes[numNodes-1] + nodes[numNodes-1], nodes[chosenIndex] = nodes[chosenIndex], nodes[numNodes-1] numNodes-- } - return execNodes[numNodes:], nil + return nodes[numNodes:], nil } -// filterNodes determines which of the execNodes have the given tags -// Returns: -// * the map of allNodes that match the provided tags -// * cardinality, i.e. the max number of nodes that should be targeted. This can be no higher -// than the number of nodes in the resulting map -// * an error if a cardinality was malformed -func filterNodes(allNodes []serf.Member, tags map[string]string) ([]serf.Member, int, error) { - ct, cardinality, err := cleanTags(tags) - if err != nil { - return nil, 0, err - } - - matchingNodes := make([]serf.Member, len(allNodes)) - copy(matchingNodes, allNodes) - - // Remove nodes that do not have the selected tags - for i := len(matchingNodes) - 1; i >= 0; i-- { - if !nodeMatchesTags(matchingNodes[i], ct) { - matchingNodes[i] = matchingNodes[len(matchingNodes)-1] - matchingNodes = matchingNodes[:len(matchingNodes)-1] +// Returns all items from an array for which filterFunc returns true, +func filterArray(arr []serf.Member, filterFunc func(serf.Member) bool) []serf.Member { + for i := len(arr) - 1; i >= 0; i-- { + if !filterFunc(arr[i]) { + arr[i] = arr[len(arr)-1] + arr = arr[:len(arr)-1] } } - - // limit the cardinality to the number of possible nodes - if len(matchingNodes) < cardinality { - cardinality = len(matchingNodes) - } - - return matchingNodes, cardinality, nil + return arr } // This function is called when a client request the RPCAddress diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 79b8e9be8..7b6db307d 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "log" "os" - "reflect" "testing" "time" @@ -455,6 +454,7 @@ func TestAgentConfig(t *testing.T) { a.Stop() } +/* func Test_filterNodes(t *testing.T) { nodes := []serf.Member{ { @@ -569,3 +569,4 @@ func Test_filterNodes(t *testing.T) { }) } } +*/ From 872ffb7abcecd9fd436b12a3ceded0fd512a478b Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Sat, 5 Jun 2021 00:53:48 +0200 Subject: [PATCH 06/17] Simplify comment --- dkron/tags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dkron/tags.go b/dkron/tags.go index bf2ec9e5e..00d387476 100644 --- a/dkron/tags.go +++ b/dkron/tags.go @@ -38,7 +38,7 @@ func cleanTags(tags map[string]string) (map[string]string, int, error) { return cleanTags, cardinality, nil } -// nodeMatchesTags encapsulates the logic of testing if a node matches all of the provided tags +// nodeMatchesTags tests if a node matches all of the provided tags func nodeMatchesTags(node serf.Member, tags map[string]string) bool { for k, v := range tags { nodeVal, present := node.Tags[k] From e5fe83875512546a8b526e61bf51e51a975e2b6f Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Tue, 8 Jun 2021 00:14:32 +0200 Subject: [PATCH 07/17] Pass tags instead of job to processFilteredNodes --- dkron/agent.go | 4 +-- dkron/agent_test.go | 80 ++++++++++++++------------------------------- dkron/run.go | 2 +- 3 files changed, 27 insertions(+), 59 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index 5cd3317e7..695f44333 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -698,8 +698,8 @@ func defaultSelector(nodes []serf.Member) int { return rand.Intn(len(nodes)) } -func (a *Agent) processFilteredNodes(job *Job, selectFunc func([]serf.Member) int) ([]serf.Member, error) { - ct, cardinality, err := cleanTags(job.Tags) +func (a *Agent) processFilteredNodes(tags map[string]string, selectFunc func([]serf.Member) int) ([]serf.Member, error) { + ct, cardinality, err := cleanTags(tags) if err != nil { return nil, err } diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 7b6db307d..e7215781e 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -196,14 +196,9 @@ func Test_processFilteredNodes(t *testing.T) { time.Sleep(2 * time.Second) // Test cardinality of 2 returns correct nodes - job := &Job{ - Name: "test_job_1", - Tags: map[string]string{ - "tag": "test:2", - }, - } + tags := map[string]string{"tag": "test:2"} - nodes, err := a1.processFilteredNodes(job, lastSelector) + nodes, err := a1.processFilteredNodes(tags, lastSelector) require.NoError(t, err) assert.Exactly(t, "test1", nodes[0].Name) @@ -211,24 +206,17 @@ func Test_processFilteredNodes(t *testing.T) { assert.Len(t, nodes, 2) // Test cardinality of 1 with two qualified nodes returns 1 node - job2 := &Job{ - Name: "test_job_2", - Tags: map[string]string{ - "tag": "test:1", - }, - } + tags2 := map[string]string{"tag": "test:1"} - nodes, err = a1.processFilteredNodes(job2, defaultSelector) + nodes, err = a1.processFilteredNodes(tags2, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 1) // Test no cardinality specified, all nodes returned - job3 := &Job{ - Name: "test_job_3", - } + var tags3 map[string]string - nodes, err = a1.processFilteredNodes(job3, lastSelector) + nodes, err = a1.processFilteredNodes(tags3, lastSelector) require.NoError(t, err) assert.Len(t, nodes, 3) @@ -237,56 +225,40 @@ func Test_processFilteredNodes(t *testing.T) { assert.Exactly(t, "test3", nodes[2].Name) // Test exclusive tag returns correct node - job4 := &Job{ - Name: "test_job_4", - Tags: map[string]string{ - "tag": "test_client:1", - }, - } + tags4 := map[string]string{"tag": "test_client:1"} - nodes, err = a1.processFilteredNodes(job4, defaultSelector) + nodes, err = a1.processFilteredNodes(tags4, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 1) assert.Exactly(t, "test3", nodes[0].Name) // Test existing tag but no matching value returns no nodes - job5 := &Job{ - Name: "test_job_5", - Tags: map[string]string{ - "tag": "no_tag", - }, - } + tags5 := map[string]string{"tag": "no_tag"} - nodes, err = a1.processFilteredNodes(job5, defaultSelector) + nodes, err = a1.processFilteredNodes(tags5, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 0) // Test 1 matching and 1 not matching tag returns no nodes - job6 := &Job{ - Name: "test_job_6", - Tags: map[string]string{ - "foo": "bar:1", - "tag": "test:2", - }, + tags6 := map[string]string{ + "foo": "bar:1", + "tag": "test:2", } - nodes, err = a1.processFilteredNodes(job6, defaultSelector) + nodes, err = a1.processFilteredNodes(tags6, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 0) // Test matching tags with cardinality of 2 but only 1 matching node returns correct node - job7 := &Job{ - Name: "test_job_7", - Tags: map[string]string{ - "tag": "test:2", - "extra": "tag:2", - }, + tags7 := map[string]string{ + "tag": "test:2", + "extra": "tag:2", } - nodes, err = a1.processFilteredNodes(job7, defaultSelector) + nodes, err = a1.processFilteredNodes(tags7, defaultSelector) require.NoError(t, err) assert.Len(t, nodes, 1) @@ -298,22 +270,18 @@ func Test_processFilteredNodes(t *testing.T) { // sometimes fail (=return no nodes at all) due to the use of math.rand // Statistically, about 33% should succeed and the rest should fail if // the code is buggy. - // Another bug caused one node to be favored over the others. With a - // large enough number of attempts, each node should be chosen about 1/3 - // of the time. - job8 := &Job{ - Name: "test_job_8", - Tags: map[string]string{ - "additional": "value:1", - "additional2": "value2:1", - }, + // Another bug caused one node to be favored over the others. With a large + // enough number of attempts, each node should be chosen 1/3 of the time. + tags8 := map[string]string{ + "additional": "value:1", + "additional2": "value2:1", } distrib := make(map[string]int) var sampleSize = 999 for i := 0; i < sampleSize; i++ { // round-robin on the selected nodes to come out at an exactly equal distribution roundRobinSelector := func(nodes []serf.Member) int { return i % len(nodes) } - nodes, err = a1.processFilteredNodes(job8, roundRobinSelector) + nodes, err = a1.processFilteredNodes(tags8, roundRobinSelector) require.NoError(t, err) assert.Len(t, nodes, 1) diff --git a/dkron/run.go b/dkron/run.go index 213fc8063..1bf217588 100644 --- a/dkron/run.go +++ b/dkron/run.go @@ -30,7 +30,7 @@ func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { // but we use the existing node target in case of retry. var targetNodes []serf.Member if ex.Attempt <= 1 { - targetNodes, err = a.processFilteredNodes(job, defaultSelector) + targetNodes, err = a.processFilteredNodes(job.Tags, defaultSelector) if err != nil { return nil, fmt.Errorf("run error processing filtered nodes: %w", err) } From 358adb78af956ef068084c2e2d79f722ba68df48 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Tue, 8 Jun 2021 22:11:59 +0200 Subject: [PATCH 08/17] Short circuit processFilteredNodes --- dkron/agent.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index 695f44333..d528d3b48 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -711,11 +711,16 @@ func (a *Agent) processFilteredNodes(tags map[string]string, selectFunc func([]s nodeMatchesTags(node, ct) }) + // Return all nodes immediately if they're all going to be selected + numNodes := len(nodes) + if numNodes <= cardinality { + return nodes, nil + } + // Sort the nodes to make selection from them predictable sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) - numNodes := len(nodes) - for ; cardinality > 0 && numNodes > 0; cardinality-- { + for ; cardinality > 0; cardinality-- { // Select a node chosenIndex := selectFunc(nodes[:numNodes]) From 0dbd9394ab72e0a4b5e99bf46f2846de3530175c Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Tue, 8 Jun 2021 23:19:46 +0200 Subject: [PATCH 09/17] Split processFilteredNodes --- dkron/agent.go | 26 +++++++++++++++++++------- dkron/agent_test.go | 3 +++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index d528d3b48..d2df73c65 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -693,15 +693,19 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) { return } -// The default selector function for processFilteredNodes -func defaultSelector(nodes []serf.Member) int { - return rand.Intn(len(nodes)) +func (a *Agent) processFilteredNodes(tags map[string]string, selectFunc func([]serf.Member) int) ([]serf.Member, error) { + nodes, card, err := a.getQualifyingNodes(tags) + if err != nil { + return nil, err + } + + return selectNodes(nodes, card, selectFunc), nil } -func (a *Agent) processFilteredNodes(tags map[string]string, selectFunc func([]serf.Member) int) ([]serf.Member, error) { +func (a *Agent) getQualifyingNodes(tags map[string]string) ([]serf.Member, int, error) { ct, cardinality, err := cleanTags(tags) if err != nil { - return nil, err + return nil, 0, err } // Determine the usable set of nodes @@ -710,11 +714,19 @@ func (a *Agent) processFilteredNodes(tags map[string]string, selectFunc func([]s node.Tags["region"] == a.config.Region && nodeMatchesTags(node, ct) }) + return nodes, cardinality, nil +} + +// The default selector function for processFilteredNodes +func defaultSelector(nodes []serf.Member) int { + return rand.Intn(len(nodes)) +} +func selectNodes(nodes []serf.Member, cardinality int, selectFunc func([]serf.Member) int) []serf.Member { // Return all nodes immediately if they're all going to be selected numNodes := len(nodes) if numNodes <= cardinality { - return nodes, nil + return nodes } // Sort the nodes to make selection from them predictable @@ -729,7 +741,7 @@ func (a *Agent) processFilteredNodes(tags map[string]string, selectFunc func([]s numNodes-- } - return nodes[numNodes:], nil + return nodes[numNodes:] } // Returns all items from an array for which filterFunc returns true, diff --git a/dkron/agent_test.go b/dkron/agent_test.go index e7215781e..4a5951568 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "log" "os" + "sort" "testing" "time" @@ -201,6 +202,7 @@ func Test_processFilteredNodes(t *testing.T) { nodes, err := a1.processFilteredNodes(tags, lastSelector) require.NoError(t, err) + sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) assert.Exactly(t, "test1", nodes[0].Name) assert.Exactly(t, "test2", nodes[1].Name) assert.Len(t, nodes, 2) @@ -219,6 +221,7 @@ func Test_processFilteredNodes(t *testing.T) { nodes, err = a1.processFilteredNodes(tags3, lastSelector) require.NoError(t, err) + sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) assert.Len(t, nodes, 3) assert.Exactly(t, "test1", nodes[0].Name) assert.Exactly(t, "test2", nodes[1].Name) From 9b049e3fb29be12c7a7f49a517f5d8f645446231 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Wed, 9 Jun 2021 00:31:00 +0200 Subject: [PATCH 10/17] Eliminate error from cleantags --- dkron/agent.go | 19 ++++++------------- dkron/agent_test.go | 33 +++++++++++++++++---------------- dkron/run.go | 5 +---- dkron/tags.go | 9 +++++---- dkron/tags_test.go | 43 ++++++++++++++++++------------------------- 5 files changed, 47 insertions(+), 62 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index d2df73c65..130f78620 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -693,20 +693,13 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) { return } -func (a *Agent) processFilteredNodes(tags map[string]string, selectFunc func([]serf.Member) int) ([]serf.Member, error) { - nodes, card, err := a.getQualifyingNodes(tags) - if err != nil { - return nil, err - } - - return selectNodes(nodes, card, selectFunc), nil +func (a *Agent) getTargetNodes(tags map[string]string, selectFunc func([]serf.Member) int) []serf.Member { + nodes, card := a.getQualifyingNodes(tags) + return selectNodes(nodes, card, selectFunc) } -func (a *Agent) getQualifyingNodes(tags map[string]string) ([]serf.Member, int, error) { - ct, cardinality, err := cleanTags(tags) - if err != nil { - return nil, 0, err - } +func (a *Agent) getQualifyingNodes(tags map[string]string) ([]serf.Member, int) { + ct, cardinality := cleanTags(tags, a.logger) // Determine the usable set of nodes nodes := filterArray(a.serf.Members(), func(node serf.Member) bool { @@ -714,7 +707,7 @@ func (a *Agent) getQualifyingNodes(tags map[string]string) ([]serf.Member, int, node.Tags["region"] == a.config.Region && nodeMatchesTags(node, ct) }) - return nodes, cardinality, nil + return nodes, cardinality } // The default selector function for processFilteredNodes diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 4a5951568..f95394c77 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -199,8 +199,7 @@ func Test_processFilteredNodes(t *testing.T) { // Test cardinality of 2 returns correct nodes tags := map[string]string{"tag": "test:2"} - nodes, err := a1.processFilteredNodes(tags, lastSelector) - require.NoError(t, err) + nodes := a1.getTargetNodes(tags, lastSelector) sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) assert.Exactly(t, "test1", nodes[0].Name) @@ -210,16 +209,14 @@ func Test_processFilteredNodes(t *testing.T) { // Test cardinality of 1 with two qualified nodes returns 1 node tags2 := map[string]string{"tag": "test:1"} - nodes, err = a1.processFilteredNodes(tags2, defaultSelector) - require.NoError(t, err) + nodes = a1.getTargetNodes(tags2, defaultSelector) assert.Len(t, nodes, 1) // Test no cardinality specified, all nodes returned var tags3 map[string]string - nodes, err = a1.processFilteredNodes(tags3, lastSelector) - require.NoError(t, err) + nodes = a1.getTargetNodes(tags3, lastSelector) sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) assert.Len(t, nodes, 3) @@ -230,8 +227,7 @@ func Test_processFilteredNodes(t *testing.T) { // Test exclusive tag returns correct node tags4 := map[string]string{"tag": "test_client:1"} - nodes, err = a1.processFilteredNodes(tags4, defaultSelector) - require.NoError(t, err) + nodes = a1.getTargetNodes(tags4, defaultSelector) assert.Len(t, nodes, 1) assert.Exactly(t, "test3", nodes[0].Name) @@ -239,8 +235,7 @@ func Test_processFilteredNodes(t *testing.T) { // Test existing tag but no matching value returns no nodes tags5 := map[string]string{"tag": "no_tag"} - nodes, err = a1.processFilteredNodes(tags5, defaultSelector) - require.NoError(t, err) + nodes = a1.getTargetNodes(tags5, defaultSelector) assert.Len(t, nodes, 0) @@ -250,8 +245,7 @@ func Test_processFilteredNodes(t *testing.T) { "tag": "test:2", } - nodes, err = a1.processFilteredNodes(tags6, defaultSelector) - require.NoError(t, err) + nodes = a1.getTargetNodes(tags6, defaultSelector) assert.Len(t, nodes, 0) @@ -261,12 +255,20 @@ func Test_processFilteredNodes(t *testing.T) { "extra": "tag:2", } - nodes, err = a1.processFilteredNodes(tags7, defaultSelector) - require.NoError(t, err) + nodes = a1.getTargetNodes(tags7, defaultSelector) assert.Len(t, nodes, 1) assert.Exactly(t, "test2", nodes[0].Name) + // Test invalid cardinality yields 0 nodes + tags9 := map[string]string{ + "tag": "test:invalid", + } + + nodes = a1.getTargetNodes(tags9, defaultSelector) + + assert.Len(t, nodes, 0) + // Test two tags matching same 3 servers and cardinality of 1 should always return 1 server // Do this multiple times: an old bug caused this to sometimes succeed and @@ -284,8 +286,7 @@ func Test_processFilteredNodes(t *testing.T) { for i := 0; i < sampleSize; i++ { // round-robin on the selected nodes to come out at an exactly equal distribution roundRobinSelector := func(nodes []serf.Member) int { return i % len(nodes) } - nodes, err = a1.processFilteredNodes(tags8, roundRobinSelector) - require.NoError(t, err) + nodes = a1.getTargetNodes(tags8, roundRobinSelector) assert.Len(t, nodes, 1) distrib[nodes[0].Name]++ diff --git a/dkron/run.go b/dkron/run.go index 1bf217588..2e834b966 100644 --- a/dkron/run.go +++ b/dkron/run.go @@ -30,10 +30,7 @@ func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { // but we use the existing node target in case of retry. var targetNodes []serf.Member if ex.Attempt <= 1 { - targetNodes, err = a.processFilteredNodes(job.Tags, defaultSelector) - if err != nil { - return nil, fmt.Errorf("run error processing filtered nodes: %w", err) - } + targetNodes = a.getTargetNodes(job.Tags, defaultSelector) } else { // In case of retrying, find the node or return with an error for _, m := range a.serf.Members() { diff --git a/dkron/tags.go b/dkron/tags.go index 00d387476..3a63b9d2b 100644 --- a/dkron/tags.go +++ b/dkron/tags.go @@ -1,16 +1,16 @@ package dkron import ( - "fmt" "strconv" "strings" "github.com/hashicorp/serf/serf" + "github.com/sirupsen/logrus" ) // cleanTags takes the tag spec and returns strictly key:value pairs // along with the lowest cardinality specified -func cleanTags(tags map[string]string) (map[string]string, int, error) { +func cleanTags(tags map[string]string, logger *logrus.Entry) (map[string]string, int) { cardinality := int(^uint(0) >> 1) // MaxInt cleanTags := make(map[string]string, len(tags)) @@ -26,7 +26,8 @@ func cleanTags(tags map[string]string) (map[string]string, int, error) { tagCard, err := strconv.Atoi(vparts[1]) if err != nil { // Tag value is malformed - return nil, 0, fmt.Errorf("improper cardinality specified for tag %s: %v", k, vparts[1]) + tagCard = 0 + logger.Errorf("improper cardinality specified for tag %s: %v", k, vparts[1]) } if tagCard < cardinality { @@ -35,7 +36,7 @@ func cleanTags(tags map[string]string) (map[string]string, int, error) { } } - return cleanTags, cardinality, nil + return cleanTags, cardinality } // nodeMatchesTags tests if a node matches all of the provided tags diff --git a/dkron/tags_test.go b/dkron/tags_test.go index 61d7160c5..c1f4925cb 100644 --- a/dkron/tags_test.go +++ b/dkron/tags_test.go @@ -18,41 +18,34 @@ func Test_cleanTags(t *testing.T) { wantErr bool }{ { - name: "Clean Tags", - args: args{map[string]string{"key1": "value1", "key2": "value2"}}, - want: map[string]string{"key1": "value1", "key2": "value2"}, - want1: maxInt, - wantErr: false, + name: "Clean Tags", + args: args{map[string]string{"key1": "value1", "key2": "value2"}}, + want: map[string]string{"key1": "value1", "key2": "value2"}, + want1: maxInt, }, { - name: "With Cardinality", - args: args{map[string]string{"key1": "value1", "key2": "value2:5"}}, - want: map[string]string{"key1": "value1", "key2": "value2"}, - want1: 5, - wantErr: false, + name: "With Cardinality", + args: args{map[string]string{"key1": "value1", "key2": "value2:5"}}, + want: map[string]string{"key1": "value1", "key2": "value2"}, + want1: 5, }, { - name: "With Multiple Cardinalities", - args: args{map[string]string{"key1": "value1:2", "key2": "value2:5"}}, - want: map[string]string{"key1": "value1", "key2": "value2"}, - want1: 2, - wantErr: false, + name: "With Multiple Cardinalities", + args: args{map[string]string{"key1": "value1:2", "key2": "value2:5"}}, + want: map[string]string{"key1": "value1", "key2": "value2"}, + want1: 2, }, { - name: "With String Cardinality", - args: args{map[string]string{"key1": "value1", "key2": "value2:cardinality"}}, - want: nil, - want1: 0, - wantErr: true, + name: "With String Cardinality", + args: args{map[string]string{"key1": "value1", "key2": "value2:cardinality"}}, + want: map[string]string{"key1": "value1", "key2": "value2"}, + want1: 0, }, } + logger := getTestLogger() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, got1, err := cleanTags(tt.args.tags) - if (err != nil) != tt.wantErr { - t.Errorf("cleanTags() error = %v, wantErr %v", err, tt.wantErr) - return - } + got, got1 := cleanTags(tt.args.tags, logger) if !reflect.DeepEqual(got, tt.want) { t.Logf("got map: %#v", got) t.Logf("want map: %#v", tt.want) From c9cc3e6c2cdbb2989d450b363b959d3989308f57 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Wed, 9 Jun 2021 23:59:36 +0200 Subject: [PATCH 11/17] Add tests for selectNodes and filterArray Add type alias for serf.Member --- dkron/agent.go | 18 ++++++---- dkron/agent_test.go | 88 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 92 insertions(+), 14 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index 130f78620..64f466a3c 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -51,6 +51,9 @@ var ( runningExecutions sync.Map ) +// Node is a shorter, more descriptive name for serf.Member +type Node = serf.Member + // Agent is the main struct that represents a dkron agent type Agent struct { // ProcessorPlugins maps processor plugins @@ -693,16 +696,18 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) { return } -func (a *Agent) getTargetNodes(tags map[string]string, selectFunc func([]serf.Member) int) []serf.Member { +func (a *Agent) getTargetNodes(tags map[string]string, selectFunc func([]Node) int) []Node { nodes, card := a.getQualifyingNodes(tags) return selectNodes(nodes, card, selectFunc) } -func (a *Agent) getQualifyingNodes(tags map[string]string) ([]serf.Member, int) { +// getQualifyingNodes returns all nodes in the cluster that are +// alive, in this agent's region and have all given tags +func (a *Agent) getQualifyingNodes(tags map[string]string) ([]Node, int) { ct, cardinality := cleanTags(tags, a.logger) // Determine the usable set of nodes - nodes := filterArray(a.serf.Members(), func(node serf.Member) bool { + nodes := filterArray(a.serf.Members(), func(node Node) bool { return node.Status == serf.StatusAlive && node.Tags["region"] == a.config.Region && nodeMatchesTags(node, ct) @@ -711,11 +716,12 @@ func (a *Agent) getQualifyingNodes(tags map[string]string) ([]serf.Member, int) } // The default selector function for processFilteredNodes -func defaultSelector(nodes []serf.Member) int { +func defaultSelector(nodes []Node) int { return rand.Intn(len(nodes)) } -func selectNodes(nodes []serf.Member, cardinality int, selectFunc func([]serf.Member) int) []serf.Member { +// selectNodes selects at most #cardinality from the given nodes using the selectFunc +func selectNodes(nodes []Node, cardinality int, selectFunc func([]Node) int) []Node { // Return all nodes immediately if they're all going to be selected numNodes := len(nodes) if numNodes <= cardinality { @@ -738,7 +744,7 @@ func selectNodes(nodes []serf.Member, cardinality int, selectFunc func([]serf.Me } // Returns all items from an array for which filterFunc returns true, -func filterArray(arr []serf.Member, filterFunc func(serf.Member) bool) []serf.Member { +func filterArray(arr []Node, filterFunc func(Node) bool) []Node { for i := len(arr) - 1; i >= 0; i-- { if !filterFunc(arr[i]) { arr[i] = arr[len(arr)-1] diff --git a/dkron/agent_test.go b/dkron/agent_test.go index f95394c77..c0166b7fc 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -113,7 +113,7 @@ func TestAgentCommand_runForElection(t *testing.T) { a3.Stop() } -func lastSelector(nodes []serf.Member) int { +func lastSelector(nodes []Node) int { return len(nodes) - 1 } @@ -428,7 +428,7 @@ func TestAgentConfig(t *testing.T) { /* func Test_filterNodes(t *testing.T) { - nodes := []serf.Member{ + nodes := []Node{ { Tags: map[string]string{ "region": "global", @@ -454,13 +454,13 @@ func Test_filterNodes(t *testing.T) { }, } type args struct { - execNodes []serf.Member + execNodes []Node tags map[string]string } tests := []struct { name string args args - want []serf.Member + want []Node want2 int wantErr bool }{ @@ -480,7 +480,7 @@ func Test_filterNodes(t *testing.T) { execNodes: nodes, tags: map[string]string{"just1": "value"}, }, - want: []serf.Member{nodes[0]}, + want: []Node{nodes[0]}, want2: 1, wantErr: false, }, @@ -490,7 +490,7 @@ func Test_filterNodes(t *testing.T) { execNodes: nodes, tags: map[string]string{"just2": "value"}, }, - want: []serf.Member{nodes[1]}, + want: []Node{nodes[1]}, want2: 1, wantErr: false, }, @@ -500,7 +500,7 @@ func Test_filterNodes(t *testing.T) { execNodes: nodes, tags: map[string]string{"tagfor2": "2"}, }, - want: []serf.Member{nodes[0], nodes[1]}, + want: []Node{nodes[0], nodes[1]}, want2: 2, wantErr: false, }, @@ -510,7 +510,7 @@ func Test_filterNodes(t *testing.T) { execNodes: nodes, tags: map[string]string{"unknown": "value"}, }, - want: []serf.Member{}, + want: []Node{}, want2: 0, wantErr: false, }, @@ -542,3 +542,75 @@ func Test_filterNodes(t *testing.T) { } } */ + +func Test_filterArray(t *testing.T) { + n1 := Node{Name: "node1"} + n2 := Node{Name: "node2"} + n3 := Node{Name: "node3"} + matchAll := func(m Node) bool { return true } + matchNone := func(m Node) bool { return false } + filtertests := []struct { + name string + in []Node + filter func(Node) bool + expect []Node + }{ + {"No items match", []Node{n1, n2, n3}, matchNone, []Node{}}, + {"All items match", []Node{n1, n2, n3}, matchAll, []Node{n1, n2, n3}}, + {"Empty input returns empty output", []Node{}, matchAll, []Node{}}, + {"All but first match", []Node{n1, n2, n3}, func(m Node) bool { return m.Name != "node1" }, []Node{n2, n3}}, + {"All but last match", []Node{n1, n2, n3}, func(m Node) bool { return m.Name != "node3" }, []Node{n1, n2}}, + {"Middle does not match", []Node{n1, n2, n3}, func(m Node) bool { return m.Name != "node2" }, []Node{n1, n3}}, + {"Only middle matches", []Node{n1, n2, n3}, func(m Node) bool { return m.Name == "node2" }, []Node{n2}}, + } + + for _, tt := range filtertests { + t.Run(tt.name, func(t *testing.T) { + actual := filterArray(tt.in, tt.filter) + assert.Len(t, actual, len(tt.expect)) + for _, expectedItem := range tt.expect { + assert.Contains(t, actual, expectedItem) + } + }) + } +} + +func Test_selectNodes(t *testing.T) { + n1 := Node{Name: "node1"} + n2 := Node{Name: "node2"} + n3 := Node{Name: "node3"} + node2Selector := func(nodes []Node) int { + for i, node := range nodes { + if node.Name == "node2" { + return i + } + } + panic("This shouldn't happen") + } + filtertests := []struct { + name string + in []Node + cardinality int + selector func([]Node) int + expect []Node + }{ + {"Cardinality 0 returns none", []Node{n1, n2, n3}, 0, defaultSelector, []Node{}}, + {"Cardinality < 0 returns none", []Node{n1, n2, n3}, -1, defaultSelector, []Node{}}, + {"Cardinality > #nodes returns all", []Node{n1, n2, n3}, 1000, defaultSelector, []Node{n1, n2, n3}}, + {"Cardinality = #nodes returns all", []Node{n1, n2, n3}, 3, defaultSelector, []Node{n1, n2, n3}}, + {"Cardinality = 1 returns one", []Node{n1, n2, n3}, 1, lastSelector, []Node{n3}}, + {"Cardinality = 2 returns two", []Node{n1, n2, n3}, 2, lastSelector, []Node{n2, n3}}, + {"Pick node2", []Node{n1, n2, n3}, 1, node2Selector, []Node{n2}}, + {"No nodes, card>0 returns none", []Node{}, 2, defaultSelector, []Node{}}, + {"No nodes, card=0 returns none", []Node{}, 0, defaultSelector, []Node{}}, + } + for _, tt := range filtertests { + t.Run(tt.name, func(t *testing.T) { + actual := selectNodes(tt.in, tt.cardinality, tt.selector) + assert.Len(t, actual, len(tt.expect)) + for _, expectedItem := range tt.expect { + assert.Contains(t, actual, expectedItem) + } + }) + } +} From 9ac6767317b447b17612331a30f67b9580f32030 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Thu, 10 Jun 2021 23:58:33 +0200 Subject: [PATCH 12/17] Add tests for getQualifyingsNodes Factor out cleanTags from getQualifyingNodes --- dkron/agent.go | 15 ++-- dkron/agent_test.go | 182 ++++++++++++++++++++++---------------------- 2 files changed, 100 insertions(+), 97 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index 64f466a3c..05987b343 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -697,22 +697,21 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) { } func (a *Agent) getTargetNodes(tags map[string]string, selectFunc func([]Node) int) []Node { - nodes, card := a.getQualifyingNodes(tags) - return selectNodes(nodes, card, selectFunc) + bareTags, cardinality := cleanTags(tags, a.logger) + nodes := a.getQualifyingNodes(a.serf.Members(), bareTags) + return selectNodes(nodes, cardinality, selectFunc) } // getQualifyingNodes returns all nodes in the cluster that are // alive, in this agent's region and have all given tags -func (a *Agent) getQualifyingNodes(tags map[string]string) ([]Node, int) { - ct, cardinality := cleanTags(tags, a.logger) - +func (a *Agent) getQualifyingNodes(nodes []Node, bareTags map[string]string) []Node { // Determine the usable set of nodes - nodes := filterArray(a.serf.Members(), func(node Node) bool { + qualifiers := filterArray(nodes, func(node Node) bool { return node.Status == serf.StatusAlive && node.Tags["region"] == a.config.Region && - nodeMatchesTags(node, ct) + nodeMatchesTags(node, bareTags) }) - return nodes, cardinality + return qualifiers } // The default selector function for processFilteredNodes diff --git a/dkron/agent_test.go b/dkron/agent_test.go index c0166b7fc..25334ea71 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -426,122 +426,126 @@ func TestAgentConfig(t *testing.T) { a.Stop() } -/* -func Test_filterNodes(t *testing.T) { - nodes := []Node{ - { - Tags: map[string]string{ - "region": "global", - "tag": "test", - "just1": "value", - "tagfor2": "2", - }, +func Test_getQualifyingNodes(t *testing.T) { + n1 := Node{ + Status: serf.StatusAlive, + Tags: map[string]string{ + "region": "global", + "tag": "test", + "just1": "value", + "tagfor2": "2", }, - { - Tags: map[string]string{ - "region": "global", - "tag": "test", - "just2": "value", - "tagfor2": "2", - }, + } + n2 := Node{ + Status: serf.StatusAlive, + Tags: map[string]string{ + "region": "global", + "tag": "test", + "just2": "value", + "tagfor2": "2", }, - { - Tags: map[string]string{ - "region": "global", - "tag": "test", - "just3": "value", - }, + } + n3 := Node{ + Status: serf.StatusAlive, + Tags: map[string]string{ + "region": "global", + "tag": "test", + "just3": "value", }, } - type args struct { - execNodes []Node - tags map[string]string + n4 := Node{ + Status: serf.StatusNone, + Tags: map[string]string{ + "region": "global", + "dead": "true", + "just1": "value", + }, + } + n5 := Node{ + Status: serf.StatusAlive, + Tags: map[string]string{ + "region": "atlantis", + "just1": "value", + }, } tests := []struct { name string - args args + inNodes []Node + inTags map[string]string want []Node - want2 int - wantErr bool }{ { - name: "All nodes tag", - args: args{ - execNodes: nodes, - tags: map[string]string{"tag": "test"}, - }, - want: nodes, - want2: 3, - wantErr: false, + name: "All nodes match tag", + inNodes: []Node{n1, n2, n3}, + inTags: map[string]string{"tag": "test"}, + want: []Node{n1, n2, n3}, + }, + { + name: "Only node1 matches tag", + inNodes: []Node{n1, n2, n3}, + inTags: map[string]string{"just1": "value"}, + want: []Node{n1}, + }, + { + name: "Only node2 matches tag", + inNodes: []Node{n1, n2, n3}, + inTags: map[string]string{"just2": "value"}, + want: []Node{n2}, }, { - name: "Just node1 tag", - args: args{ - execNodes: nodes, - tags: map[string]string{"just1": "value"}, - }, - want: []Node{nodes[0]}, - want2: 1, - wantErr: false, + name: "Tag matches two nodes", + inNodes: []Node{n1, n2, n3}, + inTags: map[string]string{"tagfor2": "2"}, + want: []Node{n1, n2}, }, { - name: "Just node2 tag", - args: args{ - execNodes: nodes, - tags: map[string]string{"just2": "value"}, - }, - want: []Node{nodes[1]}, - want2: 1, - wantErr: false, + name: "No nodes match tag", + inNodes: []Node{n1, n2, n3}, + inTags: map[string]string{"unknown": "value"}, + want: []Node{}, }, { - name: "Matching 2 nodes", - args: args{ - execNodes: nodes, - tags: map[string]string{"tagfor2": "2"}, - }, - want: []Node{nodes[0], nodes[1]}, - want2: 2, - wantErr: false, + name: "Dead nodes don't match", + inNodes: []Node{n1, n4}, + inTags: map[string]string{}, + want: []Node{n1}, }, { - name: "No matching nodes", - args: args{ - execNodes: nodes, - tags: map[string]string{"unknown": "value"}, - }, + name: "No nodes returns no nodes", + inNodes: []Node{}, + inTags: map[string]string{"just1": "value"}, want: []Node{}, - want2: 0, - wantErr: false, }, { - name: "All nodes low cardinality", - args: args{ - execNodes: nodes, - tags: map[string]string{"tag": "test:1"}, - }, - want: nodes, - want2: 1, - wantErr: false, + name: "No tags matches all nodes", + inNodes: []Node{n1, n2, n3}, + inTags: map[string]string{}, + want: []Node{n1, n2, n3}, + }, + { + name: "Nodes out of region don't match", + inNodes: []Node{n1, n5}, + inTags: map[string]string{}, + want: []Node{n1}, + }, + { + name: "Multiple tags match correct nodes", + inNodes: []Node{n1, n2, n3}, + inTags: map[string]string{"tag": "test", "tagfor2": "2"}, + want: []Node{n1, n2}, }, } + agentStub := NewAgent(DefaultConfig()) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, got2, err := filterNodes(tt.args.execNodes, tt.args.tags) - if (err != nil) != tt.wantErr { - t.Errorf("filterNodes() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("filterNodes() got = %v, want %v", got, tt.want) - } - if got2 != tt.want2 { - t.Errorf("filterNodes() got2 = %v, want %v", got2, tt.want2) + actual := agentStub.getQualifyingNodes(tt.inNodes, tt.inTags) + assert.Len(t, actual, len(tt.want)) + for _, expectedItem := range tt.want { + assert.Contains(t, actual, expectedItem) } }) } } -*/ func Test_filterArray(t *testing.T) { n1 := Node{Name: "node1"} @@ -587,7 +591,7 @@ func Test_selectNodes(t *testing.T) { } panic("This shouldn't happen") } - filtertests := []struct { + selectertests := []struct { name string in []Node cardinality int @@ -604,7 +608,7 @@ func Test_selectNodes(t *testing.T) { {"No nodes, card>0 returns none", []Node{}, 2, defaultSelector, []Node{}}, {"No nodes, card=0 returns none", []Node{}, 0, defaultSelector, []Node{}}, } - for _, tt := range filtertests { + for _, tt := range selectertests { t.Run(tt.name, func(t *testing.T) { actual := selectNodes(tt.in, tt.cardinality, tt.selector) assert.Len(t, actual, len(tt.expect)) From 53e6a382cc8b5d7caa87abf1265242c6486337ed Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Fri, 11 Jun 2021 22:28:21 +0200 Subject: [PATCH 13/17] Rearrange getTargetNodes test Remove sort from selectNodes --- dkron/agent.go | 4 - dkron/agent_test.go | 175 +++++++++++++++++++++++++------------------- 2 files changed, 98 insertions(+), 81 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index 05987b343..86656ae85 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -11,7 +11,6 @@ import ( "net" "os" "path/filepath" - "sort" "strconv" "sync" "time" @@ -727,9 +726,6 @@ func selectNodes(nodes []Node, cardinality int, selectFunc func([]Node) int) []N return nodes } - // Sort the nodes to make selection from them predictable - sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) - for ; cardinality > 0; cardinality-- { // Select a node chosenIndex := selectFunc(nodes[:numNodes]) diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 25334ea71..c7ccbfbc2 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -117,7 +117,7 @@ func lastSelector(nodes []Node) int { return len(nodes) - 1 } -func Test_processFilteredNodes(t *testing.T) { +func Test_getTargetNodes(t *testing.T) { dir, err := ioutil.TempDir("", "dkron-test") require.NoError(t, err) defer os.RemoveAll(dir) @@ -196,109 +196,130 @@ func Test_processFilteredNodes(t *testing.T) { time.Sleep(2 * time.Second) - // Test cardinality of 2 returns correct nodes - tags := map[string]string{"tag": "test:2"} + t.Run("Test cardinality of 2 returns correct nodes", func(t *testing.T) { + tags := map[string]string{"tag": "test:2"} - nodes := a1.getTargetNodes(tags, lastSelector) + nodes := a1.getTargetNodes(tags, lastSelector) - sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) - assert.Exactly(t, "test1", nodes[0].Name) - assert.Exactly(t, "test2", nodes[1].Name) - assert.Len(t, nodes, 2) + sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) + assert.Exactly(t, "test1", nodes[0].Name) + assert.Exactly(t, "test2", nodes[1].Name) + assert.Len(t, nodes, 2) + }) - // Test cardinality of 1 with two qualified nodes returns 1 node - tags2 := map[string]string{"tag": "test:1"} + t.Run("Test cardinality of 1 with two qualified nodes returns 1 node", func(t *testing.T) { + tags2 := map[string]string{"tag": "test:1"} - nodes = a1.getTargetNodes(tags2, defaultSelector) + nodes := a1.getTargetNodes(tags2, defaultSelector) - assert.Len(t, nodes, 1) + assert.Len(t, nodes, 1) + }) - // Test no cardinality specified, all nodes returned - var tags3 map[string]string + t.Run("Test no cardinality specified, all nodes returned", func(t *testing.T) { + var tags3 map[string]string - nodes = a1.getTargetNodes(tags3, lastSelector) + nodes := a1.getTargetNodes(tags3, lastSelector) - sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) - assert.Len(t, nodes, 3) - assert.Exactly(t, "test1", nodes[0].Name) - assert.Exactly(t, "test2", nodes[1].Name) - assert.Exactly(t, "test3", nodes[2].Name) + sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) + assert.Len(t, nodes, 3) + assert.Exactly(t, "test1", nodes[0].Name) + assert.Exactly(t, "test2", nodes[1].Name) + assert.Exactly(t, "test3", nodes[2].Name) + }) - // Test exclusive tag returns correct node - tags4 := map[string]string{"tag": "test_client:1"} + t.Run("Test exclusive tag returns correct node", func(t *testing.T) { + tags4 := map[string]string{"tag": "test_client:1"} - nodes = a1.getTargetNodes(tags4, defaultSelector) + nodes := a1.getTargetNodes(tags4, defaultSelector) - assert.Len(t, nodes, 1) - assert.Exactly(t, "test3", nodes[0].Name) + assert.Len(t, nodes, 1) + assert.Exactly(t, "test3", nodes[0].Name) + }) - // Test existing tag but no matching value returns no nodes - tags5 := map[string]string{"tag": "no_tag"} + t.Run("Test existing tag but no matching value returns no nodes", func(t *testing.T) { + tags5 := map[string]string{"tag": "no_tag"} - nodes = a1.getTargetNodes(tags5, defaultSelector) + nodes := a1.getTargetNodes(tags5, defaultSelector) - assert.Len(t, nodes, 0) + assert.Len(t, nodes, 0) + }) - // Test 1 matching and 1 not matching tag returns no nodes - tags6 := map[string]string{ - "foo": "bar:1", - "tag": "test:2", - } + t.Run("Test 1 matching and 1 not matching tag returns no nodes", func(t *testing.T) { + tags6 := map[string]string{ + "foo": "bar:1", + "tag": "test:2", + } - nodes = a1.getTargetNodes(tags6, defaultSelector) + nodes := a1.getTargetNodes(tags6, defaultSelector) - assert.Len(t, nodes, 0) + assert.Len(t, nodes, 0) + }) - // Test matching tags with cardinality of 2 but only 1 matching node returns correct node - tags7 := map[string]string{ - "tag": "test:2", - "extra": "tag:2", - } + t.Run("Test matching tags with cardinality of 2 but only 1 matching node returns correct node", func(t *testing.T) { + tags7 := map[string]string{ + "tag": "test:2", + "extra": "tag:2", + } - nodes = a1.getTargetNodes(tags7, defaultSelector) + nodes := a1.getTargetNodes(tags7, defaultSelector) - assert.Len(t, nodes, 1) - assert.Exactly(t, "test2", nodes[0].Name) + assert.Len(t, nodes, 1) + assert.Exactly(t, "test2", nodes[0].Name) + }) - // Test invalid cardinality yields 0 nodes - tags9 := map[string]string{ - "tag": "test:invalid", - } + t.Run("Test invalid cardinality yields 0 nodes", func(t *testing.T) { + tags9 := map[string]string{ + "tag": "test:invalid", + } - nodes = a1.getTargetNodes(tags9, defaultSelector) + nodes := a1.getTargetNodes(tags9, defaultSelector) + + assert.Len(t, nodes, 0) + }) + + t.Run("Test two tags matching same 3 servers and cardinality of 1 should always return 1 server", func(t *testing.T) { + // Do this multiple times: an old bug caused this to sometimes succeed and + // sometimes fail (=return no nodes at all) due to the use of math.rand + // Statistically, about 33% should succeed and the rest should fail if + // the code is buggy. + // Another bug caused one node to be favored over the others. With a large + // enough number of attempts, each node should be chosen 1/3 of the time. + tags8 := map[string]string{ + "additional": "value:1", + "additional2": "value2:1", + } + distrib := make(map[string]int) - assert.Len(t, nodes, 0) + // Modified version of getTargetNodes + faked_getTargetNodes := func(tags map[string]string, selectFunc func(nodes []Node) int) []Node { + bareTags, card := cleanTags(tags, a1.logger) + allNodes := a1.serf.Members() - // Test two tags matching same 3 servers and cardinality of 1 should always return 1 server + // Sort the nodes: serf.Members() doesn't always return the nodes in the same order, which skews the results. + sort.Slice(allNodes, func(i, j int) bool { return allNodes[i].Name < allNodes[j].Name }) - // Do this multiple times: an old bug caused this to sometimes succeed and - // sometimes fail (=return no nodes at all) due to the use of math.rand - // Statistically, about 33% should succeed and the rest should fail if - // the code is buggy. - // Another bug caused one node to be favored over the others. With a large - // enough number of attempts, each node should be chosen 1/3 of the time. - tags8 := map[string]string{ - "additional": "value:1", - "additional2": "value2:1", - } - distrib := make(map[string]int) - var sampleSize = 999 - for i := 0; i < sampleSize; i++ { - // round-robin on the selected nodes to come out at an exactly equal distribution - roundRobinSelector := func(nodes []serf.Member) int { return i % len(nodes) } - nodes = a1.getTargetNodes(tags8, roundRobinSelector) + nodes := a1.getQualifyingNodes(allNodes, bareTags) + return selectNodes(nodes, card, selectFunc) + } - assert.Len(t, nodes, 1) - distrib[nodes[0].Name]++ - } + var sampleSize = 999 + for i := 0; i < sampleSize; i++ { + roundRobinSelector := func(nodes []Node) int { return i % len(nodes) } - // Each node must have been chosen 1/3 of the time. - for name, count := range distrib { - fmt.Println(name, float64(count)/float64(sampleSize)*100.0, "%", count) - } - assert.Exactly(t, sampleSize/3, distrib["test1"]) - assert.Exactly(t, sampleSize/3, distrib["test2"]) - assert.Exactly(t, sampleSize/3, distrib["test3"]) + nodes := faked_getTargetNodes(tags8, roundRobinSelector) + + assert.Len(t, nodes, 1) + distrib[nodes[0].Name]++ + } + + // Each node must have been chosen 1/3 of the time. + for name, count := range distrib { + fmt.Println(name, float64(count)/float64(sampleSize)*100.0, "%", count) + } + assert.Exactly(t, sampleSize/3, distrib["test1"]) + assert.Exactly(t, sampleSize/3, distrib["test2"]) + assert.Exactly(t, sampleSize/3, distrib["test3"]) + }) // Clean up a1.Stop() From f3e763143a87ac3f44e86cd463bbbfebbbba1b0a Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Fri, 11 Jun 2021 23:25:16 +0200 Subject: [PATCH 14/17] Update comment --- dkron/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dkron/agent.go b/dkron/agent.go index 86656ae85..2393263e0 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -713,7 +713,7 @@ func (a *Agent) getQualifyingNodes(nodes []Node, bareTags map[string]string) []N return qualifiers } -// The default selector function for processFilteredNodes +// The default selector function for getTargetNodes/selectNodes func defaultSelector(nodes []Node) int { return rand.Intn(len(nodes)) } From a767b586ba9b8f668bab409137769e2812b5f407 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Tue, 15 Jun 2021 00:08:17 +0200 Subject: [PATCH 15/17] Use Node in place of serf.Member in Run --- dkron/run.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dkron/run.go b/dkron/run.go index 2e834b966..9e2f44121 100644 --- a/dkron/run.go +++ b/dkron/run.go @@ -28,7 +28,7 @@ func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { // In the first execution attempt we build and filter the target nodes // but we use the existing node target in case of retry. - var targetNodes []serf.Member + var targetNodes []Node if ex.Attempt <= 1 { targetNodes = a.getTargetNodes(job.Tags, defaultSelector) } else { @@ -36,7 +36,7 @@ func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { for _, m := range a.serf.Members() { if ex.NodeName == m.Name { if m.Status == serf.StatusAlive { - targetNodes = []serf.Member{m} + targetNodes = []Node{m} break } else { return nil, fmt.Errorf("retry node is gone: %s for job %s", ex.NodeName, ex.JobName) From 19f19cdb84d61b04e6744dcbee06a536a447ebcd Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Tue, 15 Jun 2021 00:08:27 +0200 Subject: [PATCH 16/17] Remove unused code --- dkron/agent.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/dkron/agent.go b/dkron/agent.go index 2393263e0..d1bbe93cd 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -594,33 +594,6 @@ func (a *Agent) IsLeader() bool { return a.raft.State() == raft.Leader } -// Members is used to return the members of the serf cluster -func (a *Agent) Members() []serf.Member { - return a.serf.Members() -} - -// LocalMember is used to return the local node -func (a *Agent) LocalMember() serf.Member { - return a.serf.LocalMember() -} - -// Leader is used to return the Raft leader -func (a *Agent) Leader() raft.ServerAddress { - return a.raft.Leader() -} - -// Servers returns a list of known server -func (a *Agent) Servers() (members []*ServerParts) { - for _, member := range a.serf.Members() { - ok, parts := isServer(member) - if !ok || member.Status != serf.StatusAlive { - continue - } - members = append(members, parts) - } - return members -} - // LocalServers returns a list of the local known server func (a *Agent) LocalServers() (members []*ServerParts) { for _, member := range a.serf.Members() { From 5329cf8c37f15bfa49e3f287bf989b10cdaae832 Mon Sep 17 00:00:00 2001 From: Yuri van Oers Date: Tue, 15 Jun 2021 20:55:18 +0200 Subject: [PATCH 17/17] Revert "Remove unused code" This reverts commit 19f19cdb84d61b04e6744dcbee06a536a447ebcd. --- dkron/agent.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/dkron/agent.go b/dkron/agent.go index d1bbe93cd..2393263e0 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -594,6 +594,33 @@ func (a *Agent) IsLeader() bool { return a.raft.State() == raft.Leader } +// Members is used to return the members of the serf cluster +func (a *Agent) Members() []serf.Member { + return a.serf.Members() +} + +// LocalMember is used to return the local node +func (a *Agent) LocalMember() serf.Member { + return a.serf.LocalMember() +} + +// Leader is used to return the Raft leader +func (a *Agent) Leader() raft.ServerAddress { + return a.raft.Leader() +} + +// Servers returns a list of known server +func (a *Agent) Servers() (members []*ServerParts) { + for _, member := range a.serf.Members() { + ok, parts := isServer(member) + if !ok || member.Status != serf.StatusAlive { + continue + } + members = append(members, parts) + } + return members +} + // LocalServers returns a list of the local known server func (a *Agent) LocalServers() (members []*ServerParts) { for _, member := range a.serf.Members() {