From e42d2b2c08f88aed0176ebd7e313d55fc307a6a0 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Thu, 7 Dec 2017 21:56:52 +0100 Subject: [PATCH] Valkeyrie (#320) Switch to valkeyrie --- Gopkg.lock | 22 ++++++++++++++-------- Gopkg.toml | 5 ++--- dkron/agent.go | 2 +- dkron/agent_test.go | 26 ++++++++++++-------------- dkron/api.go | 2 +- dkron/job.go | 2 +- dkron/job_test.go | 2 +- dkron/queries.go | 2 +- dkron/rpc.go | 2 +- dkron/store.go | 28 ++++++++++++++-------------- dkron/store_test.go | 2 +- 11 files changed, 49 insertions(+), 46 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index ab14b8190..0ef063014 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -19,6 +19,18 @@ revision = "f006c2ac4710855cf0f916dd6b77acf6b048dc6e" version = "v1.0.3" +[[projects]] + branch = "master" + name = "github.com/abronan/leadership" + packages = ["."] + revision = "76df1b7fa3841b453d70d7183e8c02a87538c0ee" + +[[projects]] + branch = "master" + name = "github.com/abronan/valkeyrie" + packages = [".","store","store/consul","store/etcd/v2","store/zookeeper"] + revision = "063d875e3c5fd734fa2aa12fac83829f62acfc70" + [[projects]] branch = "master" name = "github.com/armon/circbuf" @@ -61,15 +73,9 @@ revision = "346938d642f2ec3594ed81d874461961cd0faa76" version = "v1.1.0" -[[projects]] - name = "github.com/docker/leadership" - packages = ["."] - revision = "bfc7753dd48af19513b29deec23c364bf0f274eb" - version = "v0.1.0" - [[projects]] name = "github.com/docker/libkv" - packages = [".","store","store/consul","store/etcd","store/zookeeper"] + packages = ["store"] revision = "aabc039ad04deb721e234f99cd1b4aa28ac71a40" version = "v0.2.1" @@ -374,6 +380,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "f1b404a2ffb7c4e37df5fc3c52f836fe8768833a76377318d8aa1eff236e0ce4" + inputs-digest = "a149d4138acfbe0b0d25685b174b86773ad89b22f9cdaa32e2513c378c867ff9" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 559341bd5..87d856642 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -34,11 +34,10 @@ name = "github.com/armon/go-metrics" [[constraint]] - name = "github.com/docker/leadership" + name = "github.com/abronan/leadership" [[constraint]] - name = "github.com/abronan/libkv" - revision = "5e4bb288a9a74320bb03f5c18d6bdbab0d8049de" + name = "github.com/abronan/valkeyrie" [[constraint]] name = "github.com/gin-contrib/multitemplate" diff --git a/dkron/agent.go b/dkron/agent.go index 77bf763a9..bb77fb654 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -16,8 +16,8 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/abronan/leadership" metrics "github.com/armon/go-metrics" - "github.com/docker/leadership" "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" "github.com/mitchellh/cli" diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 16e7cf2e1..69b846d3c 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" - "github.com/docker/libkv" - "github.com/docker/libkv/store" + "github.com/abronan/valkeyrie" + "github.com/abronan/valkeyrie/store" "github.com/hashicorp/serf/testutil" "github.com/mitchellh/cli" "github.com/stretchr/testify/assert" @@ -84,7 +84,7 @@ func TestAgentCommand_runForElection(t *testing.T) { ShutdownCh: shutdownCh, } - client, err := libkv.NewStore("etcd", []string{etcdAddr}, &store.Config{}) + client, err := valkeyrie.NewStore("etcd", []string{etcdAddr}, &store.Config{}) if err != nil { panic(err) } @@ -106,7 +106,7 @@ func TestAgentCommand_runForElection(t *testing.T) { go a1.Run(args) // Wait for the first agent to start and set itself as leader - kv1, err := watchOrDie(t, client, "dkron/leader") + kv1, err := watchOrDie(client, "dkron/leader") if err != nil { t.Fatal(err) } @@ -141,37 +141,35 @@ func TestAgentCommand_runForElection(t *testing.T) { // Wait until test2 steps as leader rewatch: - kv2, err := watchOrDie(t, client, "dkron/leader") + kv2, err := watchOrDie(client, "dkron/leader") if err != nil { t.Fatal(err) } if len(kv2.Value) == 0 || string(kv2.Value) == a1Name { goto rewatch } + t.Logf("%s is the current leader", kv2.Value) assert.Equal(t, a2Name, string(kv2.Value)) } -func watchOrDie(t *testing.T, client store.Store, key string) (*store.KVPair, error) { +func watchOrDie(client store.Store, key string) (*store.KVPair, error) { for { - resultCh, err := client.Watch(key, nil) + resultCh, err := client.Watch(key, nil, nil) if err != nil { + if err == store.ErrKeyNotFound { + continue + } return nil, err } - _, more := <-resultCh - if more == false { - // The channel is closed, recreate the watch - continue - } // If the channel worked, read the actual value kv := <-resultCh - t.Logf("Value for key %s: %s", key, string(kv.Value)) return kv, nil } } func Test_processFilteredNodes(t *testing.T) { - client, err := libkv.NewStore("etcd", []string{etcdAddr}, &store.Config{}) + client, err := valkeyrie.NewStore("etcd", []string{etcdAddr}, &store.Config{}) err = client.DeleteTree("dkron") if err != nil { if err == store.ErrNotReachable { diff --git a/dkron/api.go b/dkron/api.go index c1cd01501..1469acb91 100644 --- a/dkron/api.go +++ b/dkron/api.go @@ -6,7 +6,7 @@ import ( "net/http" "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" + "github.com/abronan/valkeyrie/store" gin "github.com/gin-gonic/gin" ) diff --git a/dkron/job.go b/dkron/job.go index 90a56ec76..7bd51ae56 100644 --- a/dkron/job.go +++ b/dkron/job.go @@ -7,7 +7,7 @@ import ( "time" "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" + "github.com/abronan/valkeyrie/store" ) const ( diff --git a/dkron/job_test.go b/dkron/job_test.go index d5c1aec65..edfe746cb 100644 --- a/dkron/job_test.go +++ b/dkron/job_test.go @@ -3,7 +3,7 @@ package dkron import ( "testing" - s "github.com/docker/libkv/store" + s "github.com/abronan/valkeyrie/store" "github.com/stretchr/testify/assert" ) diff --git a/dkron/queries.go b/dkron/queries.go index 83c25b548..823507b03 100644 --- a/dkron/queries.go +++ b/dkron/queries.go @@ -4,7 +4,7 @@ import ( "encoding/json" "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" + "github.com/abronan/valkeyrie/store" "github.com/hashicorp/serf/serf" ) diff --git a/dkron/rpc.go b/dkron/rpc.go index 384b9c138..2fbd341f7 100644 --- a/dkron/rpc.go +++ b/dkron/rpc.go @@ -9,8 +9,8 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/abronan/valkeyrie/store" metrics "github.com/armon/go-metrics" - "github.com/docker/libkv/store" "github.com/hashicorp/serf/serf" ) diff --git a/dkron/store.go b/dkron/store.go index 194c86462..898a9315e 100644 --- a/dkron/store.go +++ b/dkron/store.go @@ -6,11 +6,11 @@ import ( "sort" "github.com/Sirupsen/logrus" - "github.com/docker/libkv" - "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" - "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/zookeeper" + "github.com/abronan/valkeyrie" + "github.com/abronan/valkeyrie/store" + "github.com/abronan/valkeyrie/store/consul" + etcd "github.com/abronan/valkeyrie/store/etcd/v2" + "github.com/abronan/valkeyrie/store/zookeeper" "github.com/victorcoder/dkron/cron" ) @@ -30,7 +30,7 @@ func init() { } func NewStore(backend string, machines []string, a *AgentCommand, keyspace string) *Store { - s, err := libkv.NewStore(store.Backend(backend), machines, nil) + s, err := valkeyrie.NewStore(store.Backend(backend), machines, nil) if err != nil { log.Error(err) } @@ -41,7 +41,7 @@ func NewStore(backend string, machines []string, a *AgentCommand, keyspace strin "keyspace": keyspace, }).Debug("store: Backend config") - _, err = s.List(keyspace) + _, err = s.List(keyspace, nil) if err != store.ErrKeyNotFound && err != nil { log.WithError(err).Fatal("store: Store backend not reachable") } @@ -181,7 +181,7 @@ func (s *Store) validateJob(job *Job) error { // GetJobs returns all jobs func (s *Store) GetJobs() ([]*Job, error) { - res, err := s.Client.List(s.keyspace + "/jobs/") + res, err := s.Client.List(s.keyspace+"/jobs/", nil) if err != nil { if err == store.ErrKeyNotFound { log.Debug("store: No jobs found") @@ -205,7 +205,7 @@ func (s *Store) GetJobs() ([]*Job, error) { // Get a job func (s *Store) GetJob(name string) (*Job, error) { - res, err := s.Client.Get(s.keyspace + "/jobs/" + name) + res, err := s.Client.Get(s.keyspace+"/jobs/"+name, nil) if err != nil { return nil, err } @@ -244,7 +244,7 @@ func (s *Store) DeleteJob(name string) (*Job, error) { func (s *Store) GetExecutions(jobName string) ([]*Execution, error) { prefix := fmt.Sprintf("%s/executions/%s", s.keyspace, jobName) - res, err := s.Client.List(prefix) + res, err := s.Client.List(prefix, nil) if err != nil { return nil, err } @@ -270,11 +270,11 @@ func (s *Store) GetExecutions(jobName string) ([]*Execution, error) { } func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) { - res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName)) + res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName), nil) if err != nil { return nil, err } - if len(res) == 0{ + if len(res) == 0 { return []*Execution{}, nil } @@ -287,7 +287,7 @@ func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) { } func (s *Store) GetExecutionGroup(execution *Execution) ([]*Execution, error) { - res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, execution.JobName)) + res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, execution.JobName), nil) if err != nil { return nil, err } @@ -376,7 +376,7 @@ func (s *Store) DeleteExecutions(jobName string) error { // Retrieve the leader from the store func (s *Store) GetLeader() []byte { - res, err := s.Client.Get(s.LeaderKey()) + res, err := s.Client.Get(s.LeaderKey(), nil) if err != nil { if err == store.ErrNotReachable { log.Fatal("store: Store not reachable, be sure you have an existing key-value store running is running and is reachable.") diff --git a/dkron/store_test.go b/dkron/store_test.go index 4729a01a8..345a05230 100644 --- a/dkron/store_test.go +++ b/dkron/store_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - s "github.com/docker/libkv/store" + s "github.com/abronan/valkeyrie/store" ) func TestStore(t *testing.T) {