Skip to content

Commit

Permalink
restore jobs with file
Browse files Browse the repository at this point in the history
  • Loading branch information
wei.huang committed Apr 7, 2020
1 parent c550717 commit b04fecf
Show file tree
Hide file tree
Showing 8 changed files with 754 additions and 0 deletions.
18 changes: 18 additions & 0 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,3 +841,21 @@ func (a *Agent) GetActiveExecutions() ([]*proto.Execution, error) {

return executions, nil
}

func (a *Agent) recursiveSetJob(jobs []*Job) []string {
result := make([]string, 0)
for _, job := range jobs {
err := a.GRPCClient.SetJob(job)
if err != nil {
result = append(result, "fail create "+job.Name)
continue
} else {
result = append(result, "success create "+job.Name)
if len(job.ChildJobs) > 0 {
recursiveResult := a.recursiveSetJob(job.ChildJobs)
result = append(result, recursiveResult...)
}
}
}
return result
}
39 changes: 39 additions & 0 deletions dkron/api.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package dkron

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/gin-contrib/expvar"
Expand Down Expand Up @@ -69,6 +71,7 @@ func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup, middleware ...gin.HandlerF
v1.GET("/leader", h.leaderHandler)
v1.GET("/isleader", h.isLeaderHandler)
v1.POST("/leave", h.leaveHandler)
v1.POST("/restore", h.restoreHandler)

v1.GET("/busy", h.busyHandler)

Expand Down Expand Up @@ -218,6 +221,42 @@ func (h *HTTPTransport) jobRunHandler(c *gin.Context) {
renderJSON(c, http.StatusOK, job)
}

// Restore jobs from file.
// Overwrite job if the job is exist.
func (h *HTTPTransport) restoreHandler(c *gin.Context) {
file, _, err := c.Request.FormFile("file")
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
return
}

data, err := ioutil.ReadAll(file)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
var jobs []*Job
err = json.Unmarshal(data, &jobs)

if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}

jobTree, err := generateJobTree(jobs)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
result := h.agent.recursiveSetJob(jobTree)
resp, err := json.Marshal(result)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
renderJSON(c, http.StatusOK, string(resp))
}

func (h *HTTPTransport) executionsHandler(c *gin.Context) {
jobName := c.Param("job")

Expand Down
40 changes: 40 additions & 0 deletions dkron/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -275,6 +278,43 @@ func TestAPIJobCreateUpdateJobWithInvalidParentIsNotCreated(t *testing.T) {
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestAPIJobRestore(t *testing.T) {
port := "8109"
baseURL := fmt.Sprintf("http://localhost:%s/v1/restore", port)
dir, a := setupAPITest(t, port)
defer os.RemoveAll(dir)
defer a.Stop()

bodyBuffer := &bytes.Buffer{}
bodyWriter := multipart.NewWriter(bodyBuffer)

fileWriter, err := bodyWriter.CreateFormFile("file", "testBackupJobs.json")
if err != nil {
t.Fatalf("CreateFormFile error: %s", err)
}

file, err := os.Open("../scripts/testBackupJobs.json")
if err != nil {
t.Fatalf("open job json file error: %s", err)
}
defer file.Close()

io.Copy(fileWriter, file)

contentType := bodyWriter.FormDataContentType()
bodyWriter.Close()

resp, _ := http.Post(baseURL, contentType, bodyBuffer)
respBody, _ := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
rs := string(respBody)
t.Log("restore response: ", rs)
if strings.Contains(rs, "fail") {
t.Fatalf("restore json file request error: %s", rs)
}

}

// postJob POSTs the given json to the jobs endpoint and returns the response
func postJob(t *testing.T, port string, jsonStr []byte) *http.Response {
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
Expand Down
68 changes: 68 additions & 0 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type Job struct {
// Jobs that are dependent upon this one will be run after this job runs.
DependentJobs []string `json:"dependent_jobs"`

// Job pointer that are dependent upon this one
ChildJobs []*Job `json:"-"`

// Job id of job that this job is dependent upon.
ParentJob string `json:"parent_job"`

Expand Down Expand Up @@ -365,3 +368,68 @@ func isSlug(candidate string) (bool, string) {
whyNot := illegalCharPattern.FindString(candidate)
return whyNot == "", whyNot
}

// generate Job Tree
func generateJobTree(jobs []*Job) ([]*Job, error) {
length := len(jobs)
j := 0
for i := 0; i < length; i++ {
rejobs, isTopParentNodeFlag, err := findParentJobAndValidateJob(jobs, j)
if err != nil {
return nil, err
}
if isTopParentNodeFlag {
j++
}
jobs = rejobs
}
return jobs, nil
}

// findParentJobAndValidateJob...
func findParentJobAndValidateJob(jobs []*Job, index int) ([]*Job, bool, error) {
childJob := jobs[index]
// Validate job
if err := childJob.Validate(); err != nil {
return nil, false, err
}
if childJob.ParentJob == "" {
return jobs, true, nil
}
for _, parentJob := range jobs {
if parentJob.Name == childJob.Name {
continue
}
if childJob.ParentJob == parentJob.Name {
parentJob.ChildJobs = append(parentJob.ChildJobs, childJob)
jobs = append(jobs[:index], jobs[index+1:]...)
return jobs, false, nil
}
if len(parentJob.ChildJobs) > 0 {
flag := findParentJobInChildJobs(parentJob.ChildJobs, childJob)
if flag {
jobs = append(jobs[:index], jobs[index+1:]...)
return jobs, false, nil
}
}
}
return nil, false, ErrNoParent
}

func findParentJobInChildJobs(jobs []*Job, job *Job) bool {
for _, parentJob := range jobs {
if job.ParentJob == parentJob.Name {
parentJob.ChildJobs = append(parentJob.ChildJobs, job)
return true
} else {
if len(parentJob.ChildJobs) > 0 {
flag := findParentJobInChildJobs(parentJob.ChildJobs, job)
if flag {
return true
}
}

}
}
return false
}
Loading

0 comments on commit b04fecf

Please sign in to comment.