Skip to content

Commit

Permalink
Valkeyrie (distribworks#320)
Browse files Browse the repository at this point in the history
Switch to valkeyrie
  • Loading branch information
Victor Castell authored Dec 7, 2017
1 parent 3783ba0 commit e42d2b2
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 46 deletions.
22 changes: 14 additions & 8 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@
name = "github.com/armon/go-metrics"

[[constraint]]
name = "github.com/docker/leadership"
name = "github.com/abronan/leadership"

[[constraint]]
name = "github.com/abronan/libkv"
revision = "5e4bb288a9a74320bb03f5c18d6bdbab0d8049de"
name = "github.com/abronan/valkeyrie"

[[constraint]]
name = "github.com/gin-contrib/multitemplate"
Expand Down
2 changes: 1 addition & 1 deletion dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"time"

"github.com/Sirupsen/logrus"
"github.com/abronan/leadership"
metrics "github.com/armon/go-metrics"
"github.com/docker/leadership"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/cli"
Expand Down
26 changes: 12 additions & 14 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"testing"
"time"

"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/abronan/valkeyrie"
"github.com/abronan/valkeyrie/store"
"github.com/hashicorp/serf/testutil"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestAgentCommand_runForElection(t *testing.T) {
ShutdownCh: shutdownCh,
}

client, err := libkv.NewStore("etcd", []string{etcdAddr}, &store.Config{})
client, err := valkeyrie.NewStore("etcd", []string{etcdAddr}, &store.Config{})
if err != nil {
panic(err)
}
Expand All @@ -106,7 +106,7 @@ func TestAgentCommand_runForElection(t *testing.T) {
go a1.Run(args)

// Wait for the first agent to start and set itself as leader
kv1, err := watchOrDie(t, client, "dkron/leader")
kv1, err := watchOrDie(client, "dkron/leader")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -141,37 +141,35 @@ func TestAgentCommand_runForElection(t *testing.T) {

// Wait until test2 steps as leader
rewatch:
kv2, err := watchOrDie(t, client, "dkron/leader")
kv2, err := watchOrDie(client, "dkron/leader")
if err != nil {
t.Fatal(err)
}
if len(kv2.Value) == 0 || string(kv2.Value) == a1Name {
goto rewatch
}
t.Logf("%s is the current leader", kv2.Value)
assert.Equal(t, a2Name, string(kv2.Value))
}

func watchOrDie(t *testing.T, client store.Store, key string) (*store.KVPair, error) {
func watchOrDie(client store.Store, key string) (*store.KVPair, error) {
for {
resultCh, err := client.Watch(key, nil)
resultCh, err := client.Watch(key, nil, nil)
if err != nil {
if err == store.ErrKeyNotFound {
continue
}
return nil, err
}

_, more := <-resultCh
if more == false {
// The channel is closed, recreate the watch
continue
}
// If the channel worked, read the actual value
kv := <-resultCh
t.Logf("Value for key %s: %s", key, string(kv.Value))
return kv, nil
}
}

func Test_processFilteredNodes(t *testing.T) {
client, err := libkv.NewStore("etcd", []string{etcdAddr}, &store.Config{})
client, err := valkeyrie.NewStore("etcd", []string{etcdAddr}, &store.Config{})
err = client.DeleteTree("dkron")
if err != nil {
if err == store.ErrNotReachable {
Expand Down
2 changes: 1 addition & 1 deletion dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net/http"

"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/abronan/valkeyrie/store"
gin "github.com/gin-gonic/gin"
)

Expand Down
2 changes: 1 addition & 1 deletion dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/abronan/valkeyrie/store"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package dkron
import (
"testing"

s "github.com/docker/libkv/store"
s "github.com/abronan/valkeyrie/store"
"github.com/stretchr/testify/assert"
)

Expand Down
2 changes: 1 addition & 1 deletion dkron/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/json"

"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/abronan/valkeyrie/store"
"github.com/hashicorp/serf/serf"
)

Expand Down
2 changes: 1 addition & 1 deletion dkron/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"

"github.com/Sirupsen/logrus"
"github.com/abronan/valkeyrie/store"
metrics "github.com/armon/go-metrics"
"github.com/docker/libkv/store"
"github.com/hashicorp/serf/serf"
)

Expand Down
28 changes: 14 additions & 14 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"sort"

"github.com/Sirupsen/logrus"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/abronan/valkeyrie"
"github.com/abronan/valkeyrie/store"
"github.com/abronan/valkeyrie/store/consul"
etcd "github.com/abronan/valkeyrie/store/etcd/v2"
"github.com/abronan/valkeyrie/store/zookeeper"
"github.com/victorcoder/dkron/cron"
)

Expand All @@ -30,7 +30,7 @@ func init() {
}

func NewStore(backend string, machines []string, a *AgentCommand, keyspace string) *Store {
s, err := libkv.NewStore(store.Backend(backend), machines, nil)
s, err := valkeyrie.NewStore(store.Backend(backend), machines, nil)
if err != nil {
log.Error(err)
}
Expand All @@ -41,7 +41,7 @@ func NewStore(backend string, machines []string, a *AgentCommand, keyspace strin
"keyspace": keyspace,
}).Debug("store: Backend config")

_, err = s.List(keyspace)
_, err = s.List(keyspace, nil)
if err != store.ErrKeyNotFound && err != nil {
log.WithError(err).Fatal("store: Store backend not reachable")
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *Store) validateJob(job *Job) error {

// GetJobs returns all jobs
func (s *Store) GetJobs() ([]*Job, error) {
res, err := s.Client.List(s.keyspace + "/jobs/")
res, err := s.Client.List(s.keyspace+"/jobs/", nil)
if err != nil {
if err == store.ErrKeyNotFound {
log.Debug("store: No jobs found")
Expand All @@ -205,7 +205,7 @@ func (s *Store) GetJobs() ([]*Job, error) {

// Get a job
func (s *Store) GetJob(name string) (*Job, error) {
res, err := s.Client.Get(s.keyspace + "/jobs/" + name)
res, err := s.Client.Get(s.keyspace+"/jobs/"+name, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *Store) DeleteJob(name string) (*Job, error) {

func (s *Store) GetExecutions(jobName string) ([]*Execution, error) {
prefix := fmt.Sprintf("%s/executions/%s", s.keyspace, jobName)
res, err := s.Client.List(prefix)
res, err := s.Client.List(prefix, nil)
if err != nil {
return nil, err
}
Expand All @@ -270,11 +270,11 @@ func (s *Store) GetExecutions(jobName string) ([]*Execution, error) {
}

func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) {
res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName))
res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, jobName), nil)
if err != nil {
return nil, err
}
if len(res) == 0{
if len(res) == 0 {
return []*Execution{}, nil
}

Expand All @@ -287,7 +287,7 @@ func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) {
}

func (s *Store) GetExecutionGroup(execution *Execution) ([]*Execution, error) {
res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, execution.JobName))
res, err := s.Client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, execution.JobName), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -376,7 +376,7 @@ func (s *Store) DeleteExecutions(jobName string) error {

// Retrieve the leader from the store
func (s *Store) GetLeader() []byte {
res, err := s.Client.Get(s.LeaderKey())
res, err := s.Client.Get(s.LeaderKey(), nil)
if err != nil {
if err == store.ErrNotReachable {
log.Fatal("store: Store not reachable, be sure you have an existing key-value store running is running and is reachable.")
Expand Down
2 changes: 1 addition & 1 deletion dkron/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"
"time"

s "github.com/docker/libkv/store"
s "github.com/abronan/valkeyrie/store"
)

func TestStore(t *testing.T) {
Expand Down

0 comments on commit e42d2b2

Please sign in to comment.