Skip to content

Commit

Permalink
Fix linting issues (#1336)
Browse files Browse the repository at this point in the history
Gardening
  • Loading branch information
Victor Castell authored May 28, 2023
1 parent 04b9efc commit 07ac49f
Show file tree
Hide file tree
Showing 18 changed files with 146 additions and 114 deletions.
30 changes: 21 additions & 9 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ func (a *Agent) Start() error {
rand.Seed(time.Now().UnixNano())

// Normalize configured addresses
a.config.normalizeAddrs()
if err := a.config.normalizeAddrs(); err != nil {
return err
}

s, err := a.setupSerf()
if err != nil {
Expand Down Expand Up @@ -213,7 +215,11 @@ func (a *Agent) Start() error {
grpcServer := grpc.NewServer(opts...)
as := NewAgentServer(a, a.logger)
proto.RegisterAgentServer(grpcServer, as)
go grpcServer.Serve(l)
go func() {
if err := grpcServer.Serve(l); err != nil {
a.logger.Fatal(err)
}
}()
}

if a.GRPCClient == nil {
Expand All @@ -223,7 +229,9 @@ func (a *Agent) Start() error {
tags := a.serf.LocalMember().Tags
tags["rpc_addr"] = a.advertiseRPCAddr() // Address that clients will use to RPC to servers
tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort)
a.serf.SetTags(tags)
if err := a.serf.SetTags(tags); err != nil {
return fmt.Errorf("agent: Error setting tags: %w", err)
}

go a.eventLoop()
a.ready = true
Expand Down Expand Up @@ -253,13 +261,17 @@ func (a *Agent) JoinLAN(addrs []string) (int, error) {
func (a *Agent) Stop() error {
a.logger.Info("agent: Called member stop, now stopping")

if a.config.Server && a.sched.Started() {
<-a.sched.Stop().Done()
}

if a.config.Server {
a.raft.Shutdown()
a.Store.Shutdown()
if a.sched.Started() {
<-a.sched.Stop().Done()
}

// TODO: Check why Shutdown().Error() is not working
_ = a.raft.Shutdown()

if err := a.Store.Shutdown(); err != nil {
return err
}
}

if err := a.serf.Leave(); err != nil {
Expand Down
51 changes: 24 additions & 27 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ func TestAgentCommand_runForElection(t *testing.T) {
c.DataDir = dir

a1 := NewAgent(c)
if err := a1.Start(); err != nil {
t.Fatal(err)
}
err = a1.Start()
require.NoError(t, err)

time.Sleep(2 * time.Second)

Expand All @@ -75,9 +74,8 @@ func TestAgentCommand_runForElection(t *testing.T) {
c.DataDir = dir

a2 := NewAgent(c)
if err := a2.Start(); err != nil {
t.Fatal(err)
}
err = a2.Start()
require.NoError(t, err)

time.Sleep(2 * time.Second)

Expand All @@ -95,22 +93,21 @@ func TestAgentCommand_runForElection(t *testing.T) {
c.DataDir = dir

a3 := NewAgent(c)
if err := a3.Start(); err != nil {
t.Fatal(err)
}
err = a3.Start()
require.NoError(t, err)

time.Sleep(2 * time.Second)

// Send a shutdown request
a1.Stop()
_ = a1.Stop()

// Wait until a follower steps as leader
time.Sleep(2 * time.Second)
assert.True(t, (a2.IsLeader() || a3.IsLeader()))
log.Println(a3.IsLeader())

a2.Stop()
a3.Stop()
// _ = a2.Stop()
// _ = a3.Stop()
}

func lastSelector(nodes []Node) int {
Expand Down Expand Up @@ -146,7 +143,7 @@ func Test_getTargetNodes(t *testing.T) {
c.DataDir = dir

a1 := NewAgent(c)
a1.Start()
_ = a1.Start()

time.Sleep(2 * time.Second)

Expand All @@ -168,7 +165,7 @@ func Test_getTargetNodes(t *testing.T) {
c.DataDir = dir

a2 := NewAgent(c)
a2.Start()
_ = a2.Start()

// Start another agent
ip3, returnFn3 := testutil.TakeIP()
Expand All @@ -192,7 +189,7 @@ func Test_getTargetNodes(t *testing.T) {
c.DataDir = dir

a3 := NewAgent(c)
a3.Start()
_ = a3.Start()

time.Sleep(2 * time.Second)

Expand Down Expand Up @@ -322,9 +319,9 @@ func Test_getTargetNodes(t *testing.T) {
})

// Clean up
a1.Stop()
a2.Stop()
a3.Stop()
_ = a1.Stop()
_ = a2.Stop()
_ = a3.Stop()
}

func TestEncrypt(t *testing.T) {
Expand All @@ -346,12 +343,12 @@ func TestEncrypt(t *testing.T) {
c.DataDir = dir

a := NewAgent(c)
a.Start()
_ = a.Start()

time.Sleep(2 * time.Second)

assert.True(t, a.serf.EncryptionEnabled())
a.Stop()
_ = a.Stop()
}

func Test_advertiseRPCAddr(t *testing.T) {
Expand All @@ -374,7 +371,7 @@ func Test_advertiseRPCAddr(t *testing.T) {
c.DataDir = dir

a := NewAgent(c)
a.Start()
_ = a.Start()

time.Sleep(2 * time.Second)

Expand All @@ -383,7 +380,7 @@ func Test_advertiseRPCAddr(t *testing.T) {

assert.Equal(t, exRPCAddr, advertiseRPCAddr)

a.Stop()
_ = a.Stop()
}

func Test_bindRPCAddr(t *testing.T) {
Expand All @@ -403,17 +400,17 @@ func Test_bindRPCAddr(t *testing.T) {
c.LogLevel = logLevel
c.DevMode = true
c.DataDir = dir
c.AdvertiseAddr = c.BindAddr

a := NewAgent(c)
a.Start()
err = a.Start()
require.NoError(t, err)

time.Sleep(2 * time.Second)

bindRPCAddr := a.bindRPCAddr()
exRPCAddr := a1Addr + ":6868"

assert.Equal(t, exRPCAddr, bindRPCAddr)
a.Stop()
}

func TestAgentConfig(t *testing.T) {
Expand All @@ -436,15 +433,15 @@ func TestAgentConfig(t *testing.T) {
c.DevMode = true

a := NewAgent(c)
a.Start()
_ = a.Start()

time.Sleep(2 * time.Second)

assert.NotEqual(t, a.config.AdvertiseAddr, a.config.BindAddr)
assert.NotEmpty(t, a.config.AdvertiseAddr)
assert.Equal(t, advAddr+":8946", a.config.AdvertiseAddr)

a.Stop()
_ = a.Stop()
}

func Test_getQualifyingNodes(t *testing.T) {
Expand Down
46 changes: 27 additions & 19 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ func (h *HTTPTransport) ServeHTTP() {
"address": h.agent.config.HTTPAddr,
}).Info("api: Running HTTP server")

go h.Engine.Run(h.agent.config.HTTPAddr)
go func() {
if err := h.Engine.Run(h.agent.config.HTTPAddr); err != nil {
h.logger.WithError(err).Error("api: Error starting HTTP server")
}
}()
}

// APIRoutes registers the api routes on the gin RouterGroup.
Expand Down Expand Up @@ -233,15 +237,15 @@ func (h *HTTPTransport) jobCreateOrUpdateHandler(c *gin.Context) {

// Parse values from JSON
if err := c.BindJSON(&job); err != nil {
c.Writer.WriteString(fmt.Sprintf("Unable to parse payload: %s.", err))
_, _ = c.Writer.WriteString(fmt.Sprintf("Unable to parse payload: %s.", err))
h.logger.Error(err)
return
}

// Validate job
if err := job.Validate(); err != nil {
c.AbortWithStatus(http.StatusBadRequest)
c.Writer.WriteString(fmt.Sprintf("Job contains invalid value: %s.", err))
_, _ = c.Writer.WriteString(fmt.Sprintf("Job contains invalid value: %s.", err))
return
}

Expand All @@ -253,13 +257,17 @@ func (h *HTTPTransport) jobCreateOrUpdateHandler(c *gin.Context) {
} else {
c.AbortWithStatus(http.StatusInternalServerError)
}
c.Writer.WriteString(s.Message())
_, _ = c.Writer.WriteString(s.Message())
return
}

// Immediately run the job if so requested
if _, exists := c.GetQuery("runoncreate"); exists {
go h.agent.GRPCClient.RunJob(job.Name)
go func() {
if _, err := h.agent.GRPCClient.RunJob(job.Name); err != nil {
h.logger.WithError(err).Error("api: Unable to run job.")
}
}()
}

c.Header("Location", fmt.Sprintf("%s/%s", c.Request.RequestURI, job.Name))
Expand All @@ -272,7 +280,7 @@ func (h *HTTPTransport) jobDeleteHandler(c *gin.Context) {
// Call gRPC DeleteJob
job, err := h.agent.GRPCClient.DeleteJob(jobName)
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
_ = c.AbortWithError(http.StatusNotFound, err)
return
}
renderJSON(c, http.StatusOK, job)
Expand All @@ -284,7 +292,7 @@ func (h *HTTPTransport) jobRunHandler(c *gin.Context) {
// Call gRPC RunJob
job, err := h.agent.GRPCClient.RunJob(jobName)
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
_ = c.AbortWithError(http.StatusNotFound, err)
return
}

Expand All @@ -298,32 +306,32 @@ func (h *HTTPTransport) jobRunHandler(c *gin.Context) {
func (h *HTTPTransport) restoreHandler(c *gin.Context) {
file, _, err := c.Request.FormFile("file")
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
_ = c.AbortWithError(http.StatusNotFound, err)
return
}

data, err := ioutil.ReadAll(file)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
var jobs []*Job
err = json.Unmarshal(data, &jobs)

if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}

jobTree, err := generateJobTree(jobs)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
result := h.agent.recursiveSetJob(jobTree)
resp, err := json.Marshal(result)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
renderJSON(c, http.StatusOK, string(resp))
Expand All @@ -349,7 +357,7 @@ func (h *HTTPTransport) executionsHandler(c *gin.Context) {

job, err := h.agent.Store.GetJob(jobName, nil)
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
_ = c.AbortWithError(http.StatusNotFound, err)
return
}

Expand Down Expand Up @@ -390,7 +398,7 @@ func (h *HTTPTransport) executionHandler(c *gin.Context) {

job, err := h.agent.Store.GetJob(jobName, nil)
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
_ = c.AbortWithError(http.StatusNotFound, err)
return
}

Expand Down Expand Up @@ -436,7 +444,7 @@ func (h *HTTPTransport) membersHandler(c *gin.Context) {
func (h *HTTPTransport) leaderHandler(c *gin.Context) {
member, err := h.agent.leaderMember()
if err != nil {
c.AbortWithError(http.StatusInternalServerError, err)
_ = c.AbortWithError(http.StatusInternalServerError, err)
}
if member == nil {
c.AbortWithStatus(http.StatusNotFound)
Expand All @@ -455,7 +463,7 @@ func (h *HTTPTransport) isLeaderHandler(c *gin.Context) {

func (h *HTTPTransport) leaveHandler(c *gin.Context) {
if err := h.agent.Stop(); err != nil {
c.AbortWithError(http.StatusInternalServerError, err)
_ = c.AbortWithError(http.StatusInternalServerError, err)
}
renderJSON(c, http.StatusOK, h.agent.peers)
}
Expand All @@ -465,7 +473,7 @@ func (h *HTTPTransport) jobToggleHandler(c *gin.Context) {

job, err := h.agent.Store.GetJob(jobName, nil)
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
_ = c.AbortWithError(http.StatusNotFound, err)
return
}

Expand All @@ -474,7 +482,7 @@ func (h *HTTPTransport) jobToggleHandler(c *gin.Context) {

// Call gRPC SetJob
if err := h.agent.GRPCClient.SetJob(job); err != nil {
c.AbortWithError(http.StatusUnprocessableEntity, err)
_ = c.AbortWithError(http.StatusUnprocessableEntity, err)
return
}

Expand All @@ -487,7 +495,7 @@ func (h *HTTPTransport) busyHandler(c *gin.Context) {

exs, err := h.agent.GetActiveExecutions()
if err != nil {
c.AbortWithError(http.StatusInternalServerError, err)
_ = c.AbortWithError(http.StatusInternalServerError, err)
return
}

Expand Down
Loading

0 comments on commit 07ac49f

Please sign in to comment.