From 2d052ef9742c6a8ad06bc3a312719c3f45ddb3b7 Mon Sep 17 00:00:00 2001 From: MGSousa <31368750+MGSousa@users.noreply.github.com> Date: Wed, 6 Jan 2021 23:05:35 +0000 Subject: [PATCH] Added some query filters on Job executions (#878) * Added query filters on GetExecutions from Job * FIX grpc job executions tests * chore: change struct name JobExecutionsOptions to ExecutionOptions * chore: move job execution timezone into ExecutionOptions struct * Changed job executions marshaling to json Co-authored-by: Miguel Sousa --- dkron/api.go | 15 +++++++++- dkron/dashboard.go | 6 +++- dkron/grpc.go | 6 +++- dkron/grpc_test.go | 4 +-- dkron/storage.go | 7 ++--- dkron/store.go | 70 ++++++++++++++++++++++++++++----------------- dkron/store_test.go | 5 +++- 7 files changed, 76 insertions(+), 37 deletions(-) diff --git a/dkron/api.go b/dkron/api.go index aa857c47e..094fd3170 100644 --- a/dkron/api.go +++ b/dkron/api.go @@ -328,16 +328,29 @@ func (h *HTTPTransport) restoreHandler(c *gin.Context) { func (h *HTTPTransport) executionsHandler(c *gin.Context) { jobName := c.Param("job") + sort := c.DefaultQuery("_sort", "") + if sort == "id" { + sort = "started_at" + } + order := c.DefaultQuery("_order", "DESC") + job, err := h.agent.Store.GetJob(jobName, nil) if err != nil { c.AbortWithError(http.StatusNotFound, err) return } - executions, err := h.agent.Store.GetExecutions(job.Name, job.GetTimeLocation()) + executions, err := h.agent.Store.GetExecutions(job.Name, + &ExecutionOptions{ + Sort: sort, + Order: order, + Timezone: job.GetTimeLocation(), + }, + ) if err != nil { if err == buntdb.ErrNotFound { renderJSON(c, http.StatusOK, &[]Execution{}) + log.Error(err) return } log.Error(err) diff --git a/dkron/dashboard.go b/dkron/dashboard.go index 5d7072ab8..cde0ded0e 100644 --- a/dkron/dashboard.go +++ b/dkron/dashboard.go @@ -97,7 +97,11 @@ func (a *Agent) dashboardExecutionsHandler(c *gin.Context) { jobLocation = job.GetTimeLocation() } - groups, byGroup, err := a.Store.GetGroupedExecutions(jobName, jobLocation) + groups, byGroup, err := a.Store.GetGroupedExecutions(jobName, + &ExecutionOptions{ + Timezone: jobLocation, + }, + ) if err != nil { log.Error(err) } diff --git a/dkron/grpc.go b/dkron/grpc.go index aecc348f8..b93a34589 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -221,7 +221,11 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E }, nil } - exg, err := grpcs.agent.Store.GetExecutionGroup(execution, job.GetTimeLocation()) + exg, err := grpcs.agent.Store.GetExecutionGroup(execution, + &ExecutionOptions{ + Timezone: job.GetTimeLocation(), + }, + ) if err != nil { log.WithError(err).WithField("group", execution.Group).Error("grpc: Error getting execution group.") return nil, err diff --git a/dkron/grpc_test.go b/dkron/grpc_test.go index 83c0e6f76..7e14300d9 100644 --- a/dkron/grpc_test.go +++ b/dkron/grpc_test.go @@ -76,14 +76,14 @@ func TestGRPCExecutionDone(t *testing.T) { rc := NewGRPCClient(nil, a) rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) - execs, err := a.Store.GetExecutions("test", nil) + execs, err := a.Store.GetExecutions("test", &ExecutionOptions{}) require.NoError(t, err) assert.Len(t, execs, 1) assert.Equal(t, string(testExecution.Output), string(execs[0].Output)) // Test run a dependent job - execs, err = a.Store.GetExecutions("child-test", nil) + execs, err = a.Store.GetExecutions("child-test", &ExecutionOptions{}) require.NoError(t, err) assert.Len(t, execs, 1) diff --git a/dkron/storage.go b/dkron/storage.go index 714d2c677..0eb9687e3 100644 --- a/dkron/storage.go +++ b/dkron/storage.go @@ -2,7 +2,6 @@ package dkron import ( "io" - "time" ) // Storage is the interface that should be used by any @@ -16,9 +15,9 @@ type Storage interface { SetExecutionDone(execution *Execution) (bool, error) GetJobs(options *JobOptions) ([]*Job, error) GetJob(name string, options *JobOptions) (*Job, error) - GetExecutions(jobName string, timezone *time.Location) ([]*Execution, error) - GetExecutionGroup(execution *Execution, timezone *time.Location) ([]*Execution, error) - GetGroupedExecutions(jobName string, timezone *time.Location) (map[int64][]*Execution, []int64, error) + GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error) + GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error) + GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error) Shutdown() error Snapshot(w io.WriteCloser) error Restore(r io.ReadCloser) error diff --git a/dkron/store.go b/dkron/store.go index 91f98d6e2..c432f8a10 100644 --- a/dkron/store.go +++ b/dkron/store.go @@ -47,6 +47,13 @@ type JobOptions struct { Status string } +// ExecutionOptions additional options like "Sort" will be ready for JSON marshall +type ExecutionOptions struct { + Sort string + Order string + Timezone *time.Location +} + type kv struct { Key string Value []byte @@ -56,6 +63,9 @@ type kv struct { func NewStore() (*Store, error) { db, err := buntdb.Open(":memory:") db.CreateIndex("name", jobsPrefix+":*", buntdb.IndexJSON("name")) + db.CreateIndex("started_at", executionsPrefix+":*", buntdb.IndexJSON("started_at")) + db.CreateIndex("finished_at", executionsPrefix+":*", buntdb.IndexJSON("finished_at")) + db.CreateIndex("attempt", executionsPrefix+":*", buntdb.IndexJSON("attempt")) db.CreateIndex("displayname", jobsPrefix+":*", buntdb.IndexJSON("displayname")) db.CreateIndex("schedule", jobsPrefix+":*", buntdb.IndexJSON("schedule")) db.CreateIndex("success_count", jobsPrefix+":*", buntdb.IndexJSON("success_count")) @@ -416,22 +426,22 @@ func (s *Store) DeleteJob(name string) (*Job, error) { } // GetExecutions returns the executions given a Job name. -func (s *Store) GetExecutions(jobName string, timezone *time.Location) ([]*Execution, error) { +func (s *Store) GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error) { prefix := fmt.Sprintf("%s:%s:", executionsPrefix, jobName) - kvs, err := s.list(prefix, true) + kvs, err := s.list(prefix, true, opts) if err != nil { return nil, err } - return s.unmarshalExecutions(kvs, timezone) + return s.unmarshalExecutions(kvs, opts.Timezone) } -func (s *Store) list(prefix string, checkRoot bool) ([]kv, error) { +func (s *Store) list(prefix string, checkRoot bool, opts *ExecutionOptions) ([]kv, error) { var found bool kvs := []kv{} - err := s.db.View(s.listTxFunc(prefix, &kvs, &found)) + err := s.db.View(s.listTxFunc(prefix, &kvs, &found, opts)) if err == nil && !found && checkRoot { return nil, buntdb.ErrNotFound } @@ -439,26 +449,32 @@ func (s *Store) list(prefix string, checkRoot bool) ([]kv, error) { return kvs, err } -func (*Store) listTxFunc(prefix string, kvs *[]kv, found *bool) func(tx *buntdb.Tx) error { - return func(tx *buntdb.Tx) error { - err := tx.Ascend("", func(key, value string) bool { - if strings.HasPrefix(key, prefix) { - *found = true - // ignore self in listing - if !bytes.Equal(trimDirectoryKey([]byte(key)), []byte(prefix)) { - kv := kv{Key: key, Value: []byte(value)} - *kvs = append(*kvs, kv) - } +func (*Store) listTxFunc(prefix string, kvs *[]kv, found *bool, opts *ExecutionOptions) func(tx *buntdb.Tx) error { + fnc := func(key, value string) bool { + if strings.HasPrefix(key, prefix) { + *found = true + // ignore self in listing + if !bytes.Equal(trimDirectoryKey([]byte(key)), []byte(prefix)) { + kv := kv{Key: key, Value: []byte(value)} + *kvs = append(*kvs, kv) } - return true - }) + } + return true + } + + return func(tx *buntdb.Tx) (err error) { + if opts.Order == "DESC" { + err = tx.Descend(opts.Sort, fnc) + } else { + err = tx.Ascend(opts.Sort, fnc) + } return err } } // GetExecutionGroup returns all executions in the same group of a given execution -func (s *Store) GetExecutionGroup(execution *Execution, timezone *time.Location) ([]*Execution, error) { - res, err := s.GetExecutions(execution.JobName, timezone) +func (s *Store) GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error) { + res, err := s.GetExecutions(execution.JobName, opts) if err != nil { return nil, err } @@ -474,8 +490,8 @@ func (s *Store) GetExecutionGroup(execution *Execution, timezone *time.Location) // GetGroupedExecutions returns executions for a job grouped and with an ordered index // to facilitate access. -func (s *Store) GetGroupedExecutions(jobName string, timezone *time.Location) (map[int64][]*Execution, []int64, error) { - execs, err := s.GetExecutions(jobName, timezone) +func (s *Store) GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error) { + execs, err := s.GetExecutions(jobName, opts) if err != nil { return nil, nil, err } @@ -505,7 +521,7 @@ func (*Store) setExecutionTxFunc(key string, pbe *dkronpb.Execution) func(tx *bu // more recent, avoiding non ordered execution set if i != "" { var p dkronpb.Execution - if err := proto.Unmarshal([]byte(i), &p); err != nil { + if err := json.Unmarshal([]byte(i), &p); err != nil { return err } // Compare existing execution @@ -514,7 +530,7 @@ func (*Store) setExecutionTxFunc(key string, pbe *dkronpb.Execution) func(tx *bu } } - eb, err := proto.Marshal(pbe) + eb, err := json.Marshal(pbe) if err != nil { return err } @@ -545,7 +561,7 @@ func (s *Store) SetExecution(execution *Execution) (string, error) { return "", err } - execs, err := s.GetExecutions(execution.JobName, nil) + execs, err := s.GetExecutions(execution.JobName, &ExecutionOptions{}) if err != nil && err != buntdb.ErrNotFound { log.WithError(err). WithField("job", execution.JobName). @@ -620,8 +636,8 @@ func (s *Store) unmarshalExecutions(items []kv, timezone *time.Location) ([]*Exe for _, item := range items { var pbe dkronpb.Execution - if err := proto.Unmarshal(item.Value, &pbe); err != nil { - log.WithError(err).WithField("key", item.Key).Debug("error unmarshaling") + if err := json.Unmarshal(item.Value, &pbe); err != nil { + log.WithError(err).WithField("key", item.Key).Debug("error unmarshaling JSON") return nil, err } execution := NewExecutionFromProto(&pbe) @@ -640,7 +656,7 @@ func (s *Store) computeStatus(jobName string, exGroup int64, tx *buntdb.Tx) (str found := false prefix := fmt.Sprintf("%s:%s:", executionsPrefix, jobName) - if err := s.listTxFunc(prefix, &kvs, &found)(tx); err != nil { + if err := s.listTxFunc(prefix, &kvs, &found, &ExecutionOptions{})(tx); err != nil { return "", err } diff --git a/dkron/store_test.go b/dkron/store_test.go index 6be945e6b..5ee9aa9b8 100644 --- a/dkron/store_test.go +++ b/dkron/store_test.go @@ -69,7 +69,10 @@ func TestStore(t *testing.T) { _, err = s.SetExecution(testExecution2) require.NoError(t, err) - execs, err := s.GetExecutions("test", nil) + execs, err := s.GetExecutions("test", &ExecutionOptions{ + Sort: "started_at", + Order: "DESC", + }) assert.NoError(t, err) testExecution.Id = testExecution.Key()