Skip to content

Commit

Permalink
Added some query filters on Job executions (#878)
Browse files Browse the repository at this point in the history
* Added query filters on GetExecutions from Job

* FIX grpc job executions tests

* chore: change struct name JobExecutionsOptions to ExecutionOptions

* chore: move job execution timezone into ExecutionOptions struct

* Changed job executions marshaling to json

Co-authored-by: Miguel Sousa <[email protected]>
  • Loading branch information
MGSousa and Miguel Sousa authored Jan 6, 2021
1 parent d050db5 commit 2d052ef
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 37 deletions.
15 changes: 14 additions & 1 deletion dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,16 +328,29 @@ func (h *HTTPTransport) restoreHandler(c *gin.Context) {
func (h *HTTPTransport) executionsHandler(c *gin.Context) {
jobName := c.Param("job")

sort := c.DefaultQuery("_sort", "")
if sort == "id" {
sort = "started_at"
}
order := c.DefaultQuery("_order", "DESC")

job, err := h.agent.Store.GetJob(jobName, nil)
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
return
}

executions, err := h.agent.Store.GetExecutions(job.Name, job.GetTimeLocation())
executions, err := h.agent.Store.GetExecutions(job.Name,
&ExecutionOptions{
Sort: sort,
Order: order,
Timezone: job.GetTimeLocation(),
},
)
if err != nil {
if err == buntdb.ErrNotFound {
renderJSON(c, http.StatusOK, &[]Execution{})
log.Error(err)
return
}
log.Error(err)
Expand Down
6 changes: 5 additions & 1 deletion dkron/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ func (a *Agent) dashboardExecutionsHandler(c *gin.Context) {
jobLocation = job.GetTimeLocation()
}

groups, byGroup, err := a.Store.GetGroupedExecutions(jobName, jobLocation)
groups, byGroup, err := a.Store.GetGroupedExecutions(jobName,
&ExecutionOptions{
Timezone: jobLocation,
},
)
if err != nil {
log.Error(err)
}
Expand Down
6 changes: 5 additions & 1 deletion dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E
}, nil
}

exg, err := grpcs.agent.Store.GetExecutionGroup(execution, job.GetTimeLocation())
exg, err := grpcs.agent.Store.GetExecutionGroup(execution,
&ExecutionOptions{
Timezone: job.GetTimeLocation(),
},
)
if err != nil {
log.WithError(err).WithField("group", execution.Group).Error("grpc: Error getting execution group.")
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions dkron/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ func TestGRPCExecutionDone(t *testing.T) {

rc := NewGRPCClient(nil, a)
rc.ExecutionDone(a.advertiseRPCAddr(), testExecution)
execs, err := a.Store.GetExecutions("test", nil)
execs, err := a.Store.GetExecutions("test", &ExecutionOptions{})
require.NoError(t, err)

assert.Len(t, execs, 1)
assert.Equal(t, string(testExecution.Output), string(execs[0].Output))

// Test run a dependent job
execs, err = a.Store.GetExecutions("child-test", nil)
execs, err = a.Store.GetExecutions("child-test", &ExecutionOptions{})
require.NoError(t, err)

assert.Len(t, execs, 1)
Expand Down
7 changes: 3 additions & 4 deletions dkron/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dkron

import (
"io"
"time"
)

// Storage is the interface that should be used by any
Expand All @@ -16,9 +15,9 @@ type Storage interface {
SetExecutionDone(execution *Execution) (bool, error)
GetJobs(options *JobOptions) ([]*Job, error)
GetJob(name string, options *JobOptions) (*Job, error)
GetExecutions(jobName string, timezone *time.Location) ([]*Execution, error)
GetExecutionGroup(execution *Execution, timezone *time.Location) ([]*Execution, error)
GetGroupedExecutions(jobName string, timezone *time.Location) (map[int64][]*Execution, []int64, error)
GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error)
GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error)
GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error)
Shutdown() error
Snapshot(w io.WriteCloser) error
Restore(r io.ReadCloser) error
Expand Down
70 changes: 43 additions & 27 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ type JobOptions struct {
Status string
}

// ExecutionOptions additional options like "Sort" will be ready for JSON marshall
type ExecutionOptions struct {
Sort string
Order string
Timezone *time.Location
}

type kv struct {
Key string
Value []byte
Expand All @@ -56,6 +63,9 @@ type kv struct {
func NewStore() (*Store, error) {
db, err := buntdb.Open(":memory:")
db.CreateIndex("name", jobsPrefix+":*", buntdb.IndexJSON("name"))
db.CreateIndex("started_at", executionsPrefix+":*", buntdb.IndexJSON("started_at"))
db.CreateIndex("finished_at", executionsPrefix+":*", buntdb.IndexJSON("finished_at"))
db.CreateIndex("attempt", executionsPrefix+":*", buntdb.IndexJSON("attempt"))
db.CreateIndex("displayname", jobsPrefix+":*", buntdb.IndexJSON("displayname"))
db.CreateIndex("schedule", jobsPrefix+":*", buntdb.IndexJSON("schedule"))
db.CreateIndex("success_count", jobsPrefix+":*", buntdb.IndexJSON("success_count"))
Expand Down Expand Up @@ -416,49 +426,55 @@ func (s *Store) DeleteJob(name string) (*Job, error) {
}

// GetExecutions returns the executions given a Job name.
func (s *Store) GetExecutions(jobName string, timezone *time.Location) ([]*Execution, error) {
func (s *Store) GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error) {
prefix := fmt.Sprintf("%s:%s:", executionsPrefix, jobName)

kvs, err := s.list(prefix, true)
kvs, err := s.list(prefix, true, opts)
if err != nil {
return nil, err
}

return s.unmarshalExecutions(kvs, timezone)
return s.unmarshalExecutions(kvs, opts.Timezone)
}

func (s *Store) list(prefix string, checkRoot bool) ([]kv, error) {
func (s *Store) list(prefix string, checkRoot bool, opts *ExecutionOptions) ([]kv, error) {
var found bool
kvs := []kv{}

err := s.db.View(s.listTxFunc(prefix, &kvs, &found))
err := s.db.View(s.listTxFunc(prefix, &kvs, &found, opts))
if err == nil && !found && checkRoot {
return nil, buntdb.ErrNotFound
}

return kvs, err
}

func (*Store) listTxFunc(prefix string, kvs *[]kv, found *bool) func(tx *buntdb.Tx) error {
return func(tx *buntdb.Tx) error {
err := tx.Ascend("", func(key, value string) bool {
if strings.HasPrefix(key, prefix) {
*found = true
// ignore self in listing
if !bytes.Equal(trimDirectoryKey([]byte(key)), []byte(prefix)) {
kv := kv{Key: key, Value: []byte(value)}
*kvs = append(*kvs, kv)
}
func (*Store) listTxFunc(prefix string, kvs *[]kv, found *bool, opts *ExecutionOptions) func(tx *buntdb.Tx) error {
fnc := func(key, value string) bool {
if strings.HasPrefix(key, prefix) {
*found = true
// ignore self in listing
if !bytes.Equal(trimDirectoryKey([]byte(key)), []byte(prefix)) {
kv := kv{Key: key, Value: []byte(value)}
*kvs = append(*kvs, kv)
}
return true
})
}
return true
}

return func(tx *buntdb.Tx) (err error) {
if opts.Order == "DESC" {
err = tx.Descend(opts.Sort, fnc)
} else {
err = tx.Ascend(opts.Sort, fnc)
}
return err
}
}

// GetExecutionGroup returns all executions in the same group of a given execution
func (s *Store) GetExecutionGroup(execution *Execution, timezone *time.Location) ([]*Execution, error) {
res, err := s.GetExecutions(execution.JobName, timezone)
func (s *Store) GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error) {
res, err := s.GetExecutions(execution.JobName, opts)
if err != nil {
return nil, err
}
Expand All @@ -474,8 +490,8 @@ func (s *Store) GetExecutionGroup(execution *Execution, timezone *time.Location)

// GetGroupedExecutions returns executions for a job grouped and with an ordered index
// to facilitate access.
func (s *Store) GetGroupedExecutions(jobName string, timezone *time.Location) (map[int64][]*Execution, []int64, error) {
execs, err := s.GetExecutions(jobName, timezone)
func (s *Store) GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error) {
execs, err := s.GetExecutions(jobName, opts)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -505,7 +521,7 @@ func (*Store) setExecutionTxFunc(key string, pbe *dkronpb.Execution) func(tx *bu
// more recent, avoiding non ordered execution set
if i != "" {
var p dkronpb.Execution
if err := proto.Unmarshal([]byte(i), &p); err != nil {
if err := json.Unmarshal([]byte(i), &p); err != nil {
return err
}
// Compare existing execution
Expand All @@ -514,7 +530,7 @@ func (*Store) setExecutionTxFunc(key string, pbe *dkronpb.Execution) func(tx *bu
}
}

eb, err := proto.Marshal(pbe)
eb, err := json.Marshal(pbe)
if err != nil {
return err
}
Expand Down Expand Up @@ -545,7 +561,7 @@ func (s *Store) SetExecution(execution *Execution) (string, error) {
return "", err
}

execs, err := s.GetExecutions(execution.JobName, nil)
execs, err := s.GetExecutions(execution.JobName, &ExecutionOptions{})
if err != nil && err != buntdb.ErrNotFound {
log.WithError(err).
WithField("job", execution.JobName).
Expand Down Expand Up @@ -620,8 +636,8 @@ func (s *Store) unmarshalExecutions(items []kv, timezone *time.Location) ([]*Exe
for _, item := range items {
var pbe dkronpb.Execution

if err := proto.Unmarshal(item.Value, &pbe); err != nil {
log.WithError(err).WithField("key", item.Key).Debug("error unmarshaling")
if err := json.Unmarshal(item.Value, &pbe); err != nil {
log.WithError(err).WithField("key", item.Key).Debug("error unmarshaling JSON")
return nil, err
}
execution := NewExecutionFromProto(&pbe)
Expand All @@ -640,7 +656,7 @@ func (s *Store) computeStatus(jobName string, exGroup int64, tx *buntdb.Tx) (str
found := false
prefix := fmt.Sprintf("%s:%s:", executionsPrefix, jobName)

if err := s.listTxFunc(prefix, &kvs, &found)(tx); err != nil {
if err := s.listTxFunc(prefix, &kvs, &found, &ExecutionOptions{})(tx); err != nil {
return "", err
}

Expand Down
5 changes: 4 additions & 1 deletion dkron/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func TestStore(t *testing.T) {
_, err = s.SetExecution(testExecution2)
require.NoError(t, err)

execs, err := s.GetExecutions("test", nil)
execs, err := s.GetExecutions("test", &ExecutionOptions{
Sort: "started_at",
Order: "DESC",
})
assert.NoError(t, err)

testExecution.Id = testExecution.Key()
Expand Down

0 comments on commit 2d052ef

Please sign in to comment.