Skip to content

Commit

Permalink
Merge branch 'master' into libkv
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Sep 15, 2015
2 parents 6edcc08 + a8cf6fc commit 8b373ca
Show file tree
Hide file tree
Showing 27 changed files with 1,234 additions and 158 deletions.
25 changes: 25 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
language: go

go:
- 1.5

# let us have speedy Docker-based Travis workers
# http://docs.travis-ci.com/user/migrating-from-legacy/#tl%3Bdr
sudo: false

before_install:
- go get golang.org/x/tools/cmd/vet
- go get golang.org/x/tools/cmd/cover
- go get github.com/mattn/goveralls
- go get github.com/golang/lint/golint
- go get github.com/GeertJohan/fgt

before_script:
- scripts/travis_etcd.sh 2.1.1

script:
- ./etcd/etcd >/dev/null 2>&1 &
- scripts/validate-gofmt
- go vet ./...
# - fgt golint ./...
- go test -v ./dkron
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## 0.0.4 (2015-08-23)

- Compiled with Go 1.5
- Includes cluster nodes view in the UI

## 0.0.3 (2015-08-20)

- Initial release
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ all: deps

deps:
go get -d -v ./...

prmd:
prmd doc --prepend docs/docs/overview.md static/schema.json | sed 's/\<a name\=.*a\>//' > docs/docs/api.md

4 changes: 2 additions & 2 deletions Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
etc: bin/etcd -name dcron1
dkron2: godep go run *.go agent -server -node=dkron2 -join=127.0.0.1:5002 -bind=127.0.0.1:5001 -http-addr=:8081
dkron3: godep go run *.go agent -server -node=dkron3 -join=127.0.0.1:5001 -bind=127.0.0.1:5002 -http-addr=:8082
dkron2: godep go run *.go agent -server -node=dkron2 -join=127.0.0.1:5002 -bind=127.0.0.1:5001 -http-addr=:8080
dkron3: godep go run *.go agent -server -node=dkron3 -join=127.0.0.1:5001 -bind=127.0.0.1:5002 -http-addr=:8081
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Dkron - Distributed, fault tolerant job scheduling system [![GoDoc](https://godoc.org/github.com/victorcoder/dkron?status.svg)](https://godoc.org/github.com/victorcoder/dkron)
# Dkron - Distributed, fault tolerant job scheduling system [![GoDoc](https://godoc.org/github.com/victorcoder/dkron?status.svg)](https://godoc.org/github.com/victorcoder/dkron) [![Build Status](https://travis-ci.org/victorcoder/dkron.svg?branch=master)](https://travis-ci.org/victorcoder/dkron)

Website: http://dkron.io/

Expand Down
2 changes: 1 addition & 1 deletion cron/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestSpecSchedule(t *testing.T) {
t.Error(err)
}
if !reflect.DeepEqual(actual, c.expected) {
t.Errorf("%s => (expected) %b != %b (actual)", c.expr, c.expected, actual)
t.Errorf("%s => (expected) %v != %v (actual)", c.expr, c.expected, actual)
}
}
}
2 changes: 1 addition & 1 deletion dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (a *AgentCommand) readConfig(args []string) *Config {
cmdFlags.Usage = func() { a.Ui.Output(a.Help()) }
cmdFlags.String("node", hostname, "node name")
viper.SetDefault("node_name", cmdFlags.Lookup("node").Value)
cmdFlags.String("bind", fmt.Sprintf("0.0.0.0:%s", DefaultBindPort), "address to bind listeners to")
cmdFlags.String("bind", fmt.Sprintf("0.0.0.0:%d", DefaultBindPort), "address to bind listeners to")
viper.SetDefault("bind_addr", cmdFlags.Lookup("bind").Value)
cmdFlags.String("http-addr", ":8080", "HTTP address")
viper.SetDefault("http_addr", cmdFlags.Lookup("http-addr").Value)
Expand Down
36 changes: 22 additions & 14 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestAgentCommandRun(t *testing.T) {
}

func TestAgentCommandElectLeader(t *testing.T) {
log.Level = logrus.FatalLevel
log.Level = logrus.ErrorLevel

shutdownCh := make(chan struct{})
defer close(shutdownCh)
Expand Down Expand Up @@ -90,6 +90,11 @@ func TestAgentCommandElectLeader(t *testing.T) {
resultCh <- a.Run(args)
}()

// Listen for leader key changes or timeout
receiver := make(chan *etcdc.Response)
stop := make(chan bool)
go etcd.Watch("/dkron/leader", 0, false, receiver, stop)

// Wait for the first agent to start and set itself as leader
time.Sleep(2 * time.Second)
test1Key := a.config.Tags["key"]
Expand Down Expand Up @@ -124,29 +129,32 @@ func TestAgentCommandElectLeader(t *testing.T) {
// Send a shutdown request
shutdownCh <- struct{}{}

receiver := make(chan *etcdc.Response)
stop := make(chan bool)
time.Sleep(2 * time.Second)

go etcd.Watch("/dkron/leader", 0, false, receiver, stop)

// Verify it runs "forever"
select {
case res := <-receiver:
if res.Node.Value == test2Key {
t.Logf("Leader changed: %s", res.Node.Value)
for exit := false; exit == false; {
select {
case res := <-receiver:
if res.Node.Value == test2Key {
t.Logf("Leader changed: %s", res.Node.Value)
stop <- true
exit = true
}
if res.Node.Value == test1Key {
t.Logf("Leader set to agent1: %s", res.Node.Value)
}
case <-time.After(10 * time.Second):
t.Fatal("No leader swap occurred")
stop <- true
exit = true
}
stop <- true
case <-time.After(10 * time.Second):
t.Fatal("No leader swap occurred")
stop <- true
}

shutdownCh2 <- struct{}{}
}

func Test_processFilteredNodes(t *testing.T) {
log.Level = logrus.FatalLevel
log.Level = logrus.ErrorLevel

shutdownCh := make(chan struct{})
defer close(shutdownCh)
Expand Down
142 changes: 76 additions & 66 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,52 @@ func (a *AgentCommand) ServeHTTP() {
}

func (a *AgentCommand) apiRoutes(r *mux.Router) {
r.Path("/v1").HandlerFunc(a.indexHandler)
subver := r.PathPrefix("/v1").Subrouter()
subver.HandleFunc("/", a.indexHandler)
subver.HandleFunc("/members", a.membersHandler)
subver.HandleFunc("/leader", a.leaderHandler)

subver.Path("/jobs").HandlerFunc(a.jobCreateOrUpdateHandler).Methods("POST", "PATCH")
subver.Path("/jobs").HandlerFunc(a.jobsHandler).Methods("GET")
sub := subver.PathPrefix("/jobs").Subrouter()
sub.HandleFunc("/", a.jobCreateOrUpdateHandler).Methods("POST", "PUT")
sub.HandleFunc("/", a.jobsHandler).Methods("GET")
sub.HandleFunc("/{job}", a.jobGetHandler).Methods("GET")
sub.HandleFunc("/{job}", a.jobDeleteHandler).Methods("DELETE")
sub.HandleFunc("/{job}", a.jobRunHandler).Methods("POST", "PUT")
sub.HandleFunc("/{job}", a.jobRunHandler).Methods("POST")

subex := subver.PathPrefix("/executions").Subrouter()
subex.HandleFunc("/{job}", a.executionsHandler).Methods("GET")
}

func (a *AgentCommand) indexHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
func printJson(w http.ResponseWriter, r *http.Request, v interface{}) error {
pretty := r.URL.Query().Get("pretty")
if pretty != "" {
j, _ := json.MarshalIndent(v, "", "\t")
if _, err := fmt.Fprintf(w, string(j)); err != nil {
return err
}
} else {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(v); err != nil {
return err
}
}

return nil
}

func (a *AgentCommand) indexHandler(w http.ResponseWriter, r *http.Request) {
local := a.serf.LocalMember()
stats := map[string]map[string]string{
"agent": map[string]string{
"agent": {
"name": local.Name,
"version": a.Version,
},
"serf": a.serf.Stats(),
"tags": local.Tags,
}

statsJson, _ := json.MarshalIndent(stats, "", "\t")
if _, err := fmt.Fprintf(w, string(statsJson)); err != nil {
if err := printJson(w, r, stats); err != nil {
log.Fatal(err)
}
}
Expand All @@ -79,9 +94,22 @@ func (a *AgentCommand) jobsHandler(w http.ResponseWriter, r *http.Request) {
log.Fatal(err)
}
log.Debug(jobs)
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(jobs); err != nil {

if err := printJson(w, r, jobs); err != nil {
log.Fatal(err)
}
}

func (a *AgentCommand) jobGetHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
jobName := vars["job"]

job, err := a.store.GetJob(jobName)
if err != nil {
log.Error(err)
}

if err := printJson(w, r, job); err != nil {
log.Fatal(err)
}
}
Expand Down Expand Up @@ -117,42 +145,17 @@ func (a *AgentCommand) jobCreateOrUpdateHandler(w http.ResponseWriter, r *http.R

a.schedulerRestartQuery(a.store.GetLeader())

w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusCreated)
if _, err := fmt.Fprintf(w, `{"result": "ok"}`); err != nil {
log.Fatal(err)
}
}

func (a *AgentCommand) executionsHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
job := vars["job"]

executions, err := a.store.GetExecutions(job)
if err != nil {
log.Error(err)
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(executions); err != nil {
panic(err)
}
}

func (a *AgentCommand) membersHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)

if err := json.NewEncoder(w).Encode(a.serf.Members()); err != nil {
if err := printJson(w, r, job); err != nil {
log.Fatal(err)
}
}

func (a *AgentCommand) jobDeleteHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
job := vars["job"]
jobName := vars["job"]

if err := a.store.DeleteJob(job); err != nil {
job, err := a.store.DeleteJob(jobName)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusNotFound)
if err := json.NewEncoder(w).Encode(err); err != nil {
Expand All @@ -161,50 +164,57 @@ func (a *AgentCommand) jobDeleteHandler(w http.ResponseWriter, r *http.Request)
return
}

w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, `{"result": "ok"}`); err != nil {
if err := printJson(w, r, job); err != nil {
log.Fatal(err)
}
}

func (a *AgentCommand) leaderHandler(w http.ResponseWriter, r *http.Request) {
member, err := a.leaderMember()
if err == nil {
func (a *AgentCommand) jobRunHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
jobName := vars["job"]

job, err := a.store.GetJob(jobName)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(member); err != nil {
w.WriteHeader(http.StatusNotFound)
if err := json.NewEncoder(w).Encode(err); err != nil {
log.Fatal(err)
}
return
}

w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusNotFound)
if err := json.NewEncoder(w).Encode(err); err != nil {
a.RunQuery(job)

if err := printJson(w, r, job); err != nil {
log.Fatal(err)
}
}

func (a *AgentCommand) jobRunHandler(w http.ResponseWriter, r *http.Request) {
func (a *AgentCommand) executionsHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
job := vars["job"]
jobName := vars["job"]

j, err := a.store.GetJob(job)
executions, err := a.store.GetExecutions(jobName)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusNotFound)
if err := json.NewEncoder(w).Encode(err); err != nil {
log.Fatal(err)
}
return
log.Error(err)
}

a.RunQuery(j)
if err := printJson(w, r, executions); err != nil {
log.Fatal(err)
}
}

w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, `{"result": "ok"}`); err != nil {
func (a *AgentCommand) membersHandler(w http.ResponseWriter, r *http.Request) {
if err := printJson(w, r, a.serf.Members()); err != nil {
log.Fatal(err)
}
}

func (a *AgentCommand) leaderHandler(w http.ResponseWriter, r *http.Request) {
member, err := a.leaderMember()
if err == nil {
if err := printJson(w, r, member); err != nil {
log.Fatal(err)
}
}
}
6 changes: 3 additions & 3 deletions dkron/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func setupAPITest(t *testing.T) (chan<- struct{}, <-chan int) {
return shutdownCh, resultCh
}

func TestAPIJobReschedule(t *testing.T) {
func TestAPIJobCreate(t *testing.T) {
shutdownCh, _ := setupAPITest(t)

var jsonStr = []byte(`{"name": "test_job", "schedule": "@every 2s", "command": "date", "owner": "mec", "owner_email": "[email protected]", "disabled": true}`)
Expand All @@ -51,8 +51,8 @@ func TestAPIJobReschedule(t *testing.T) {
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)

if string(body) != `{"result": "ok"}` {
t.Fatalf("error saving job: %", string(body))
if bytes.Equal(body, jsonStr) {
t.Fatalf("error saving job: %s", string(body))
}

// Send a shutdown request
Expand Down
4 changes: 2 additions & 2 deletions dkron/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func newCommonDashboardData(a *AgentCommand, nodeName string) *commonDashboardDa
}

func (a *AgentCommand) dashboardRoutes(r *mux.Router) {
r.Path("/dashboard").HandlerFunc(a.dashboardIndexHandler).Methods("GET")
subui := r.PathPrefix("/dashboard").Subrouter()
subui.HandleFunc("/", a.dashboardIndexHandler).Methods("GET")
subui.HandleFunc("/jobs", a.dashboardJobsHandler).Methods("GET")
subui.HandleFunc("/jobs/{job}/executions", a.dashboardExecutionsHandler).Methods("GET")
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (a *AgentCommand) dashboardExecutionsHandler(w http.ResponseWriter, r *http
}).ParseFiles("templates/dashboard.html.tmpl", "templates/executions.html.tmpl"))

if len(execs) > 100 {
execs = execs[len(execs)-100 : len(execs)]
execs = execs[len(execs)-100:]
}

data := struct {
Expand Down
Loading

0 comments on commit 8b373ca

Please sign in to comment.