Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Aug 30, 2015
1 parent efdbbcc commit 4ce4725
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 278 deletions.
38 changes: 16 additions & 22 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type AgentCommand struct {
ShutdownCh <-chan struct{}
serf *serf.Serf
config *Config
etcd *etcdClient
store *Store
eventCh chan serf.Event
sched *Scheduler
Expand Down Expand Up @@ -326,7 +325,6 @@ func (a *AgentCommand) Run(args []string) int {
a.join(a.config.StartJoin, true)

if a.config.Server {
a.etcd = NewEtcdClient(a.config.EtcdMachines, a, a.config.Keyspace)
a.store = NewStore(a.config.EtcdMachines, a, a.config.Keyspace)
a.sched = NewScheduler()

Expand Down Expand Up @@ -398,32 +396,28 @@ func (a *AgentCommand) Synopsis() string {

// Leader election routine
func (a *AgentCommand) ElectLeader() bool {
leaderKey := a.etcd.GetLeader()
leaderKey := a.store.GetLeader()

if leaderKey != "" {
if !a.serverAlive(leaderKey) {
if leaderKey != nil {
if !a.serverAlive(string(leaderKey)) {
log.Debug("Trying to set itself as leader")
res, err := a.etcd.Client.CompareAndSwap(a.config.Keyspace+"/leader", a.config.Tags["key"], 0, leaderKey, 0)
if err != nil {
// res, err := a.etcd.Client.CompareAndSwap(a.config.Keyspace+"/leader", a.config.Tags["key"], 0, leaderKey, 0)
success, err := a.store.TryLeaderSwap(a.config.Tags["key"], string(leaderKey))
if err != nil || success == false {
log.Errorln("Error trying to set itself as leader", err)
return false
}

log.WithFields(logrus.Fields{
"old_leader": res.PrevNode.Value,
"new_leader": res.Node.Value,
}).Debug("Leader Swap")
return true
} else {
log.Printf("The current leader [%s] is active", leaderKey)
}
} else {
log.Debug("Trying to set itself as leader")
res, err := a.etcd.Client.Create(a.config.Keyspace+"/leader", a.config.NodeName, 0)
err := a.store.SetLeader(a.config.Tags["key"])
if err != nil {
log.Error(res, err)
log.Error(err)
}
log.Printf("Successfully set [%s] as leader", a.config.NodeName)
log.Printf("Successfully set [%s] as leader", a.config.Tags["key"])
return true
}

Expand All @@ -443,12 +437,12 @@ func (a *AgentCommand) serverAlive(key string) bool {

// Utility method to check if the node calling the method is the leader.
func (a *AgentCommand) isLeader() bool {
return a.config.Tags["key"] == a.etcd.GetLeader()
return a.config.Tags["key"] == string(a.store.GetLeader())
}

// Utility method to get leader nodename
func (a *AgentCommand) leaderMember() (*serf.Member, error) {
leader := a.etcd.GetLeader()
leader := string(a.store.GetLeader())
for _, member := range a.serf.Members() {
if key, ok := member.Tags["key"]; ok {
if key == leader {
Expand All @@ -473,7 +467,7 @@ func (a *AgentCommand) eventLoop() {
if (e.EventType() == serf.EventMemberFailed || e.EventType() == serf.EventMemberLeave) && a.config.Server {
failed := e.(serf.MemberEvent)
for _, member := range failed.Members {
if member.Tags["key"] == a.etcd.GetLeader() && member.Status != serf.StatusAlive {
if member.Tags["key"] == string(a.store.GetLeader()) && member.Status != serf.StatusAlive {
if a.ElectLeader() {
a.schedule()
}
Expand Down Expand Up @@ -527,7 +521,7 @@ func (a *AgentCommand) eventLoop() {
ex := a.setExecution(query.Payload)

// Save job status
job, err := a.etcd.GetJob(ex.JobName)
job, err := a.store.GetJob(ex.JobName)
if err != nil {
log.Fatal(err)
}
Expand All @@ -539,7 +533,7 @@ func (a *AgentCommand) eventLoop() {
job.ErrorCount = job.ErrorCount + 1
}

if err := a.etcd.SetJob(job); err != nil {
if err := a.store.SetJob(job); err != nil {
log.Fatal(err)
}
query.Respond([]byte("saved"))
Expand All @@ -556,7 +550,7 @@ func (a *AgentCommand) eventLoop() {
// Start or restart scheduler
func (a *AgentCommand) schedule() {
log.Debug("Restarting scheduler")
jobs, err := a.etcd.GetJobs()
jobs, err := a.store.GetJobs()
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -709,7 +703,7 @@ func (a *AgentCommand) setExecution(payload []byte) *Execution {
}

// Save the new execution to etcd
if _, err := a.etcd.SetExecution(&ex); err != nil {
if _, err := a.store.SetExecution(&ex); err != nil {
log.Fatal(err)
}

Expand Down
10 changes: 5 additions & 5 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (a *AgentCommand) jobCreateOrUpdateHandler(w http.ResponseWriter, r *http.R
}

// Save the new job to etcd
if err = a.etcd.SetJob(&job); err != nil {
if err = a.store.SetJob(&job); err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(422) // unprocessable entity
if err := json.NewEncoder(w).Encode(err); err != nil {
Expand All @@ -115,7 +115,7 @@ func (a *AgentCommand) jobCreateOrUpdateHandler(w http.ResponseWriter, r *http.R
return
}

a.schedulerRestartQuery(a.etcd.GetLeader())
a.schedulerRestartQuery(string(a.store.GetLeader()))

w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusCreated)
Expand All @@ -128,7 +128,7 @@ func (a *AgentCommand) executionsHandler(w http.ResponseWriter, r *http.Request)
vars := mux.Vars(r)
job := vars["job"]

executions, err := a.etcd.GetExecutions(job)
executions, err := a.store.GetExecutions(job)
if err != nil {
log.Error(err)
}
Expand All @@ -152,7 +152,7 @@ func (a *AgentCommand) jobDeleteHandler(w http.ResponseWriter, r *http.Request)
vars := mux.Vars(r)
job := vars["job"]

if err := a.etcd.DeleteJob(job); err != nil {
if err := a.store.DeleteJob(job); err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusNotFound)
if err := json.NewEncoder(w).Encode(err); err != nil {
Expand Down Expand Up @@ -190,7 +190,7 @@ func (a *AgentCommand) jobRunHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
job := vars["job"]

j, err := a.etcd.GetJob(job)
j, err := a.store.GetJob(job)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusNotFound)
Expand Down
56 changes: 28 additions & 28 deletions dkron/dashboard.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package dkron

import (
"encoding/json"
// "encoding/json"
"html/template"
"net/http"

etcdc "github.com/coreos/go-etcd/etcd"
// etcdc "github.com/coreos/go-etcd/etcd"
"github.com/gorilla/mux"
)

Expand Down Expand Up @@ -37,35 +37,35 @@ func (a *AgentCommand) dashboardIndexHandler(w http.ResponseWriter, r *http.Requ
tmpl := template.Must(template.New("dashboard.html.tmpl").ParseFiles(
"templates/dashboard.html.tmpl", "templates/index.html.tmpl", "templates/status.html.tmpl"))

rr := etcdc.NewRawRequest("GET", "../version", nil, nil)
res, err := a.etcd.Client.SendRequest(rr)
if err != nil {
log.Error(err)
}
var version struct {
Etcdserver string `json:"etcdserver"`
Etcdcluster string `json:"etcdcluster"`
}
json.Unmarshal(res.Body, &version)

var ss *EtcdServerStats
rr = etcdc.NewRawRequest("GET", "stats/self", nil, nil)
res, err = a.etcd.Client.SendRequest(rr)
if err != nil {
log.Error(err)
}
json.Unmarshal(res.Body, &ss)
// rr := etcdc.NewRawRequest("GET", "../version", nil, nil)
// res, err := a.etcd.Client.SendRequest(rr)
// if err != nil {
// log.Error(err)
// }
// var version struct {
// Etcdserver string `json:"etcdserver"`
// Etcdcluster string `json:"etcdcluster"`
// }
// json.Unmarshal(res.Body, &version)

// var ss *EtcdServerStats
// rr = etcdc.NewRawRequest("GET", "stats/self", nil, nil)
// res, err = a.etcd.Client.SendRequest(rr)
// if err != nil {
// log.Error(err)
// }
// json.Unmarshal(res.Body, &ss)

data := struct {
Common *commonDashboardData
EtcdVersion string
Stats *EtcdServerStats
StartTime string
// Stats *EtcdServerStats
StartTime string
}{
Common: newCommonDashboardData(a, a.config.NodeName),
EtcdVersion: version.Etcdserver,
Stats: ss,
StartTime: ss.LeaderInfo.StartTime.Format("2/Jan/2006 15:05:05"),
Common: newCommonDashboardData(a, a.config.NodeName),
// EtcdVersion: version.Etcdserver,
// Stats: ss,
// StartTime: ss.LeaderInfo.StartTime.Format("2/Jan/2006 15:05:05"),
}

if err := tmpl.Execute(w, data); err != nil {
Expand All @@ -76,7 +76,7 @@ func (a *AgentCommand) dashboardIndexHandler(w http.ResponseWriter, r *http.Requ
func (a *AgentCommand) dashboardJobsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")

jobs, _ := a.etcd.GetJobs()
jobs, _ := a.store.GetJobs()

funcs := template.FuncMap{
"isSuccess": func(job *Job) bool {
Expand Down Expand Up @@ -106,7 +106,7 @@ func (a *AgentCommand) dashboardExecutionsHandler(w http.ResponseWriter, r *http
vars := mux.Vars(r)
job := vars["job"]

execs, _ := a.etcd.GetExecutions(job)
execs, _ := a.store.GetExecutions(job)

tmpl := template.Must(template.New("dashboard.html.tmpl").Funcs(template.FuncMap{
"html": func(value []byte) template.HTML {
Expand Down
Loading

0 comments on commit 4ce4725

Please sign in to comment.