Skip to content

Commit

Permalink
Added gregroupedexecutions func
Browse files Browse the repository at this point in the history
Added logic for removing > 100 executions
  • Loading branch information
Victor Castell committed Dec 28, 2015
1 parent 3dd6001 commit 46b8fe6
Showing 1 changed file with 46 additions and 2 deletions.
48 changes: 46 additions & 2 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dkron
import (
"encoding/json"
"fmt"
"sort"

"github.com/Sirupsen/logrus"
"github.com/docker/libkv"
Expand Down Expand Up @@ -84,6 +85,7 @@ func (s *Store) GetJobs() ([]*Job, error) {
func (s *Store) GetJob(name string) (*Job, error) {
res, err := s.Client.Get(s.keyspace + "/jobs/" + name)
if err != nil {
log.Panic("")
return nil, err
}

Expand Down Expand Up @@ -173,10 +175,32 @@ func (s *Store) GetExecutionGroup(execution *Execution) ([]*Execution, error) {
return executions, nil
}

// Returns executions for a job grouped and with an ordered index
// to facilitate access.
func (s *Store) GetGroupedExecutions(jobName string) (map[int64][]*Execution, []int64, error) {
execs, err := s.GetExecutions(jobName)
if err != nil {
return nil, nil, err
}
groups := make(map[int64][]*Execution)
for _, exec := range execs {
groups[exec.Group] = append(groups[exec.Group], exec)
}

// Build a separate data structure to show in order
var byGroup int64arr
for key := range groups {
byGroup = append(byGroup, key)
}
sort.Sort(byGroup)

return groups, byGroup, nil
}

// Save a new execution and returns the key of the new saved item or an error.
func (s *Store) SetExecution(execution *Execution) (string, error) {
exJson, _ := json.Marshal(execution)
key := fmt.Sprintf("%d-%s", execution.StartedAt.UnixNano(), execution.NodeName)
key := execution.Key()

log.WithFields(logrus.Fields{
"job": execution.JobName,
Expand All @@ -188,6 +212,26 @@ func (s *Store) SetExecution(execution *Execution) (string, error) {
return "", err
}

execs, err := s.GetExecutions(execution.JobName)
if err != nil {
log.Errorf("store: No executions found for job %s", execution.JobName)
}

// Get and ordered array of all execution groups
var byGroup int64arr
for _, ex := range execs {
byGroup = append(byGroup, ex.Group)
}
sort.Sort(byGroup)

// Delete all execution results over the limit, starting from olders
for i := range byGroup[99:] {
err := s.Client.Delete(fmt.Sprintf("%s/executions/%s/%s", s.keyspace, execs[i].JobName, execs[i].Key()))
if err != nil {
log.Errorf("store: Trying to delete overflowed execution %s", execs[i].Key())
}
}

return key, nil
}

Expand All @@ -200,7 +244,7 @@ func (s *Store) GetLeader() *Leader {
res, err := s.Client.Get(s.keyspace + "/leader")
if err != nil {
if err == store.ErrNotReachable {
log.Fatal("Store not reachable, be sure you have an existing key-value store running is running and is reachable.")
log.Fatal("store: Store not reachable, be sure you have an existing key-value store running is running and is reachable.")
} else if err != store.ErrKeyNotFound {
log.Error(err)
}
Expand Down

0 comments on commit 46b8fe6

Please sign in to comment.