Skip to content

Commit

Permalink
Merge branch 'master' into feature/restore_job_with_file
Browse files Browse the repository at this point in the history
  • Loading branch information
vision9527 authored and wei.huang committed Feb 7, 2020
2 parents ad58bed + 7da930c commit d225497
Show file tree
Hide file tree
Showing 37 changed files with 1,115 additions and 28,679 deletions.
34 changes: 33 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## [2.0.4] - 2020-01-31

* refactor: Remove dependency of the agent in store and reduce usage in Job (#669)
* chore: Upgrade gin (#669)
* chore: Add helper methods (#669)
* refactor: Move directory creation to the Store instantiation (#669)
* feat: Accept middlewares for API routes (#669)
* doc: ACL

## [2.0.3] - 2020-01-04

### Fixes

- Fix modal indexing in UI (#666)

### Changes

- Bump BadgerDB to 2.0.1 (#667)
- Allow templating of time format in notifications webhook (#665)

## [2.0.2] - 2019-12-11

### Features

- Search all jobs in dashboards with the search box
- Search all jobs in dashboards with the search box (#653)

### Fixes

- Validate empty job names (#659)
- Die on plugin communication error (#658)
- Revert GetStatus with concurrency forbid (#655)

### Changes

- Upgrade Angular to latest (#641)

## [2.0.1] - 2019-12-03

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ clean:

.PHONY: doc apidoc gen test
doc:
scripts/run doc --dir website/content/cli
#scripts/run doc --dir website/content/cli
cd website; hugo -d ../public
ghp-import -p public

Expand Down
30 changes: 16 additions & 14 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Agent struct {
// Pro features
GlobalLock bool
MemberEventHandler func(serf.Event)
ProAppliers LogAppliers

serf *serf.Serf
config *Config
Expand Down Expand Up @@ -295,7 +296,7 @@ func (a *Agent) setupRaft() error {
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
tmpFsm := newFSM(nil)
tmpFsm := newFSM(nil, nil)
if err := raft.RecoverCluster(config, tmpFsm,
logStore, stableStore, snapshots, transport, configuration); err != nil {
return fmt.Errorf("recovery failed: %v", err)
Expand Down Expand Up @@ -331,13 +332,14 @@ func (a *Agent) setupRaft() error {

// Instantiate the Raft systems. The second parameter is a finite state machine
// which stores the actual kv pairs and is operated upon through Apply().
fsm := newFSM(a.Store)
fsm := newFSM(a.Store, a.ProAppliers)
rft, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshots, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
a.leaderCh = rft.LeaderCh()
a.raft = rft

return nil
}

Expand Down Expand Up @@ -451,18 +453,7 @@ func (a *Agent) StartServer() {
}

if a.Store == nil {
dirExists, err := exists(a.config.DataDir)
if err != nil {
log.WithError(err).WithField("dir", a.config.DataDir).Fatal("Invalid Dir")
}
if !dirExists {
// Try to create the directory
err := os.Mkdir(a.config.DataDir, 0700)
if err != nil {
log.WithError(err).WithField("dir", a.config.DataDir).Fatal("Error Creating Dir")
}
}
s, err := NewStore(a, filepath.Join(a.config.DataDir, a.config.NodeName))
s, err := NewStore(filepath.Join(a.config.DataDir, a.config.NodeName))
if err != nil {
log.WithError(err).Fatal("dkron: Error initializing store")
}
Expand Down Expand Up @@ -572,6 +563,11 @@ func (a *Agent) LocalMember() serf.Member {
return a.serf.LocalMember()
}

// Leader is used to return the Raft leader
func (a *Agent) Leader() raft.ServerAddress {
return a.raft.Leader()
}

// Servers returns a list of known server
func (a *Agent) Servers() (members []*ServerParts) {
for _, member := range a.serf.Members() {
Expand Down Expand Up @@ -898,3 +894,9 @@ func (a *Agent) recursiveSetJob(jobs []*Job) []string {
}
return result
}

// RaftApply applies a command to the Raft log
func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture {
return a.raft.Apply(cmd, raftTimeout)

}
9 changes: 7 additions & 2 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (h *HTTPTransport) ServeHTTP() {
}

// APIRoutes registers the api routes on the gin RouterGroup.
func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup) {
func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup, middleware ...gin.HandlerFunc) {
r.GET("/debug/vars", expvar.Handler())

h.Engine.GET("/health", func(c *gin.Context) {
Expand All @@ -65,6 +65,7 @@ func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup) {

r.GET("/v1", h.indexHandler)
v1 := r.Group("/v1")
v1.Use(middleware...)
v1.GET("/", h.indexHandler)
v1.GET("/members", h.membersHandler)
v1.GET("/leader", h.leaderHandler)
Expand Down Expand Up @@ -120,7 +121,11 @@ func (h *HTTPTransport) indexHandler(c *gin.Context) {
func (h *HTTPTransport) jobsHandler(c *gin.Context) {
metadata := c.QueryMap("metadata")

jobs, err := h.agent.Store.GetJobs(&JobOptions{Metadata: metadata})
jobs, err := h.agent.Store.GetJobs(
&JobOptions{
Metadata: metadata,
},
)
if err != nil {
log.WithError(err).Error("api: Unable to get jobs, store not reachable.")
return
Expand Down
33 changes: 33 additions & 0 deletions dkron/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,39 @@ func TestAPIJobCreateUpdateValidationValidName(t *testing.T) {
assert.Equal(t, http.StatusCreated, resp.StatusCode)
}

func TestAPIJobCreateUpdateValidationEmptyName(t *testing.T) {
port := "8101"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
dir, a := setupAPITest(t, port)
defer os.RemoveAll(dir)
defer a.Stop()

jsonStr := []byte(`{
"name": "testjob1",
"schedule": "@every 1m",
"executor": "shell",
"executor_config": {"command": "date"},
"disabled": true
}`)

resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
require.NoError(t, err, err)
assert.Equal(t, http.StatusCreated, resp.StatusCode)

jsonStr = []byte(`{
"name": "",
"parent_job": "testjob1",
"schedule": "@every 1m",
"executor": "shell",
"executor_config": {"command": "date"},
"disabled": true
}`)

resp, err = http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
require.NoError(t, err, err)
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

func TestAPIJobCreateUpdateValidationBadSchedule(t *testing.T) {
resp := postJob(t, "8097", []byte(`{
"name": "testjob",
Expand Down
34 changes: 13 additions & 21 deletions dkron/assets/assets_vfsdata.go

Large diffs are not rendered by default.

20 changes: 18 additions & 2 deletions dkron/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,26 @@ const (
ExecutionDoneType
)

// LogApplier is the definition of a function that can apply a Raft log
type LogApplier func(buf []byte, index uint64) interface{}

// LogAppliers is a mapping of the Raft MessageType to the appropriate log
// applier
type LogAppliers map[MessageType]LogApplier

type dkronFSM struct {
store Storage
mu sync.Mutex

// proAppliers holds the set of pro only LogAppliers
proAppliers LogAppliers
}

// NewFSM is used to construct a new FSM with a blank state
func newFSM(store Storage) *dkronFSM {
func newFSM(store Storage, logAppliers LogAppliers) *dkronFSM {
return &dkronFSM{
store: store,
store: store,
proAppliers: logAppliers,
}
}

Expand All @@ -56,6 +67,11 @@ func (d *dkronFSM) Apply(l *raft.Log) interface{} {
return d.applySetExecution(buf[1:])
}

// Check enterprise only message types.
if applier, ok := d.proAppliers[msgType]; ok {
return applier(buf[1:], l.Index)
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequ
}

// DeleteJob broadcast a state change to the cluster members that will delete the job.
// Then restart the scheduler
// This only works on the leader
func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJobRequest) (*proto.DeleteJobResponse, error) {
defer metrics.MeasureSince([]string{"grpc", "delete_job"}, time.Now())
Expand All @@ -113,7 +112,7 @@ func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJ
}
jpb := job.ToProto()

// If everything is ok, restart the scheduler
// If everything is ok, remove the job
grpcs.agent.sched.RemoveJob(job)

return &proto.DeleteJobResponse{Job: jpb}, nil
Expand Down
44 changes: 40 additions & 4 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,44 @@ func (j *Job) String() string {
return fmt.Sprintf("\"Job: %s, scheduled at: %s, tags:%v\"", j.Name, j.Schedule, j.Tags)
}

// GetParent returns the parent job of a job
func (j *Job) GetParent() (*Job, error) {
// GetStatus returns the status of a job whether it's running, succeeded or failed
func (j *Job) GetStatus() string {
// Maybe we are testing
if j.Agent == nil {
return nil, ErrNoAgent
return StatusNotSet
}

execs, _ := j.Agent.Store.GetLastExecutionGroup(j.Name)
success := 0
failed := 0
for _, ex := range execs {
if ex.FinishedAt.IsZero() {
return StatusRunning
}
}

var status string
for _, ex := range execs {
if ex.Success {
success = success + 1
} else {
failed = failed + 1
}
}

if failed == 0 {
status = StatusSuccess
} else if failed > 0 && success == 0 {
status = StatusFailed
} else if failed > 0 && success > 0 {
status = StatusPartialyFailed
}

return status
}

// GetParent returns the parent job of a job
func (j *Job) GetParent(store *Store) (*Job, error) {
if j.Name == j.ParentJob {
return nil, ErrSameParent
}
Expand All @@ -260,7 +291,7 @@ func (j *Job) GetParent() (*Job, error) {
return nil, ErrNoParent
}

parentJob, err := j.Agent.Store.GetJob(j.ParentJob, nil)
parentJob, err := store.GetJob(j.ParentJob, nil)
if err != nil {
if err == badger.ErrKeyNotFound {
return nil, ErrParentJobNotFound
Expand Down Expand Up @@ -294,6 +325,7 @@ func (j *Job) isRunnable() bool {

if j.Concurrency == ConcurrencyForbid {
j.Agent.RefreshJobStatus(j.Name)
j.Status = j.GetStatus()
}

if j.Status == StatusRunning && j.Concurrency == ConcurrencyForbid {
Expand All @@ -316,6 +348,10 @@ func (j *Job) isRunnable() bool {

// Validate validates whether all values in the job are acceptable.
func (j *Job) Validate() error {
if j.Name == "" {
return fmt.Errorf("name cannot be empty")
}

if valid, chr := isSlug(j.Name); !valid {
return fmt.Errorf("name contains illegal character '%s'", chr)
}
Expand Down
Loading

0 comments on commit d225497

Please sign in to comment.