diff --git a/dkron/agent.go b/dkron/agent.go index 756c89de6..010806d8b 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -163,7 +163,9 @@ func (a *Agent) Start() error { rand.Seed(time.Now().UnixNano()) // Normalize configured addresses - a.config.normalizeAddrs() + if err := a.config.normalizeAddrs(); err != nil { + return err + } s, err := a.setupSerf() if err != nil { @@ -213,7 +215,11 @@ func (a *Agent) Start() error { grpcServer := grpc.NewServer(opts...) as := NewAgentServer(a, a.logger) proto.RegisterAgentServer(grpcServer, as) - go grpcServer.Serve(l) + go func() { + if err := grpcServer.Serve(l); err != nil { + a.logger.Fatal(err) + } + }() } if a.GRPCClient == nil { @@ -223,7 +229,9 @@ func (a *Agent) Start() error { tags := a.serf.LocalMember().Tags tags["rpc_addr"] = a.advertiseRPCAddr() // Address that clients will use to RPC to servers tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort) - a.serf.SetTags(tags) + if err := a.serf.SetTags(tags); err != nil { + return fmt.Errorf("agent: Error setting tags: %w", err) + } go a.eventLoop() a.ready = true @@ -253,13 +261,17 @@ func (a *Agent) JoinLAN(addrs []string) (int, error) { func (a *Agent) Stop() error { a.logger.Info("agent: Called member stop, now stopping") - if a.config.Server && a.sched.Started() { - <-a.sched.Stop().Done() - } - if a.config.Server { - a.raft.Shutdown() - a.Store.Shutdown() + if a.sched.Started() { + <-a.sched.Stop().Done() + } + + // TODO: Check why Shutdown().Error() is not working + _ = a.raft.Shutdown() + + if err := a.Store.Shutdown(); err != nil { + return err + } } if err := a.serf.Leave(); err != nil { diff --git a/dkron/agent_test.go b/dkron/agent_test.go index c7ccbfbc2..37ddc4ba1 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -47,9 +47,8 @@ func TestAgentCommand_runForElection(t *testing.T) { c.DataDir = dir a1 := NewAgent(c) - if err := a1.Start(); err != nil { - t.Fatal(err) - } + err = a1.Start() + require.NoError(t, err) time.Sleep(2 * time.Second) @@ -75,9 +74,8 @@ func TestAgentCommand_runForElection(t *testing.T) { c.DataDir = dir a2 := NewAgent(c) - if err := a2.Start(); err != nil { - t.Fatal(err) - } + err = a2.Start() + require.NoError(t, err) time.Sleep(2 * time.Second) @@ -95,22 +93,21 @@ func TestAgentCommand_runForElection(t *testing.T) { c.DataDir = dir a3 := NewAgent(c) - if err := a3.Start(); err != nil { - t.Fatal(err) - } + err = a3.Start() + require.NoError(t, err) time.Sleep(2 * time.Second) // Send a shutdown request - a1.Stop() + _ = a1.Stop() // Wait until a follower steps as leader time.Sleep(2 * time.Second) assert.True(t, (a2.IsLeader() || a3.IsLeader())) log.Println(a3.IsLeader()) - a2.Stop() - a3.Stop() + // _ = a2.Stop() + // _ = a3.Stop() } func lastSelector(nodes []Node) int { @@ -146,7 +143,7 @@ func Test_getTargetNodes(t *testing.T) { c.DataDir = dir a1 := NewAgent(c) - a1.Start() + _ = a1.Start() time.Sleep(2 * time.Second) @@ -168,7 +165,7 @@ func Test_getTargetNodes(t *testing.T) { c.DataDir = dir a2 := NewAgent(c) - a2.Start() + _ = a2.Start() // Start another agent ip3, returnFn3 := testutil.TakeIP() @@ -192,7 +189,7 @@ func Test_getTargetNodes(t *testing.T) { c.DataDir = dir a3 := NewAgent(c) - a3.Start() + _ = a3.Start() time.Sleep(2 * time.Second) @@ -322,9 +319,9 @@ func Test_getTargetNodes(t *testing.T) { }) // Clean up - a1.Stop() - a2.Stop() - a3.Stop() + _ = a1.Stop() + _ = a2.Stop() + _ = a3.Stop() } func TestEncrypt(t *testing.T) { @@ -346,12 +343,12 @@ func TestEncrypt(t *testing.T) { c.DataDir = dir a := NewAgent(c) - a.Start() + _ = a.Start() time.Sleep(2 * time.Second) assert.True(t, a.serf.EncryptionEnabled()) - a.Stop() + _ = a.Stop() } func Test_advertiseRPCAddr(t *testing.T) { @@ -374,7 +371,7 @@ func Test_advertiseRPCAddr(t *testing.T) { c.DataDir = dir a := NewAgent(c) - a.Start() + _ = a.Start() time.Sleep(2 * time.Second) @@ -383,7 +380,7 @@ func Test_advertiseRPCAddr(t *testing.T) { assert.Equal(t, exRPCAddr, advertiseRPCAddr) - a.Stop() + _ = a.Stop() } func Test_bindRPCAddr(t *testing.T) { @@ -403,17 +400,17 @@ func Test_bindRPCAddr(t *testing.T) { c.LogLevel = logLevel c.DevMode = true c.DataDir = dir + c.AdvertiseAddr = c.BindAddr a := NewAgent(c) - a.Start() + err = a.Start() + require.NoError(t, err) time.Sleep(2 * time.Second) bindRPCAddr := a.bindRPCAddr() exRPCAddr := a1Addr + ":6868" - assert.Equal(t, exRPCAddr, bindRPCAddr) - a.Stop() } func TestAgentConfig(t *testing.T) { @@ -436,7 +433,7 @@ func TestAgentConfig(t *testing.T) { c.DevMode = true a := NewAgent(c) - a.Start() + _ = a.Start() time.Sleep(2 * time.Second) @@ -444,7 +441,7 @@ func TestAgentConfig(t *testing.T) { assert.NotEmpty(t, a.config.AdvertiseAddr) assert.Equal(t, advAddr+":8946", a.config.AdvertiseAddr) - a.Stop() + _ = a.Stop() } func Test_getQualifyingNodes(t *testing.T) { diff --git a/dkron/api.go b/dkron/api.go index 1e8bffb16..b382b2b05 100644 --- a/dkron/api.go +++ b/dkron/api.go @@ -74,7 +74,11 @@ func (h *HTTPTransport) ServeHTTP() { "address": h.agent.config.HTTPAddr, }).Info("api: Running HTTP server") - go h.Engine.Run(h.agent.config.HTTPAddr) + go func() { + if err := h.Engine.Run(h.agent.config.HTTPAddr); err != nil { + h.logger.WithError(err).Error("api: Error starting HTTP server") + } + }() } // APIRoutes registers the api routes on the gin RouterGroup. @@ -233,7 +237,7 @@ func (h *HTTPTransport) jobCreateOrUpdateHandler(c *gin.Context) { // Parse values from JSON if err := c.BindJSON(&job); err != nil { - c.Writer.WriteString(fmt.Sprintf("Unable to parse payload: %s.", err)) + _, _ = c.Writer.WriteString(fmt.Sprintf("Unable to parse payload: %s.", err)) h.logger.Error(err) return } @@ -241,7 +245,7 @@ func (h *HTTPTransport) jobCreateOrUpdateHandler(c *gin.Context) { // Validate job if err := job.Validate(); err != nil { c.AbortWithStatus(http.StatusBadRequest) - c.Writer.WriteString(fmt.Sprintf("Job contains invalid value: %s.", err)) + _, _ = c.Writer.WriteString(fmt.Sprintf("Job contains invalid value: %s.", err)) return } @@ -253,13 +257,17 @@ func (h *HTTPTransport) jobCreateOrUpdateHandler(c *gin.Context) { } else { c.AbortWithStatus(http.StatusInternalServerError) } - c.Writer.WriteString(s.Message()) + _, _ = c.Writer.WriteString(s.Message()) return } // Immediately run the job if so requested if _, exists := c.GetQuery("runoncreate"); exists { - go h.agent.GRPCClient.RunJob(job.Name) + go func() { + if _, err := h.agent.GRPCClient.RunJob(job.Name); err != nil { + h.logger.WithError(err).Error("api: Unable to run job.") + } + }() } c.Header("Location", fmt.Sprintf("%s/%s", c.Request.RequestURI, job.Name)) @@ -272,7 +280,7 @@ func (h *HTTPTransport) jobDeleteHandler(c *gin.Context) { // Call gRPC DeleteJob job, err := h.agent.GRPCClient.DeleteJob(jobName) if err != nil { - c.AbortWithError(http.StatusNotFound, err) + _ = c.AbortWithError(http.StatusNotFound, err) return } renderJSON(c, http.StatusOK, job) @@ -284,7 +292,7 @@ func (h *HTTPTransport) jobRunHandler(c *gin.Context) { // Call gRPC RunJob job, err := h.agent.GRPCClient.RunJob(jobName) if err != nil { - c.AbortWithError(http.StatusNotFound, err) + _ = c.AbortWithError(http.StatusNotFound, err) return } @@ -298,32 +306,32 @@ func (h *HTTPTransport) jobRunHandler(c *gin.Context) { func (h *HTTPTransport) restoreHandler(c *gin.Context) { file, _, err := c.Request.FormFile("file") if err != nil { - c.AbortWithError(http.StatusNotFound, err) + _ = c.AbortWithError(http.StatusNotFound, err) return } data, err := ioutil.ReadAll(file) if err != nil { - c.AbortWithError(http.StatusBadRequest, err) + _ = c.AbortWithError(http.StatusBadRequest, err) return } var jobs []*Job err = json.Unmarshal(data, &jobs) if err != nil { - c.AbortWithError(http.StatusBadRequest, err) + _ = c.AbortWithError(http.StatusBadRequest, err) return } jobTree, err := generateJobTree(jobs) if err != nil { - c.AbortWithError(http.StatusBadRequest, err) + _ = c.AbortWithError(http.StatusBadRequest, err) return } result := h.agent.recursiveSetJob(jobTree) resp, err := json.Marshal(result) if err != nil { - c.AbortWithError(http.StatusBadRequest, err) + _ = c.AbortWithError(http.StatusBadRequest, err) return } renderJSON(c, http.StatusOK, string(resp)) @@ -349,7 +357,7 @@ func (h *HTTPTransport) executionsHandler(c *gin.Context) { job, err := h.agent.Store.GetJob(jobName, nil) if err != nil { - c.AbortWithError(http.StatusNotFound, err) + _ = c.AbortWithError(http.StatusNotFound, err) return } @@ -390,7 +398,7 @@ func (h *HTTPTransport) executionHandler(c *gin.Context) { job, err := h.agent.Store.GetJob(jobName, nil) if err != nil { - c.AbortWithError(http.StatusNotFound, err) + _ = c.AbortWithError(http.StatusNotFound, err) return } @@ -436,7 +444,7 @@ func (h *HTTPTransport) membersHandler(c *gin.Context) { func (h *HTTPTransport) leaderHandler(c *gin.Context) { member, err := h.agent.leaderMember() if err != nil { - c.AbortWithError(http.StatusInternalServerError, err) + _ = c.AbortWithError(http.StatusInternalServerError, err) } if member == nil { c.AbortWithStatus(http.StatusNotFound) @@ -455,7 +463,7 @@ func (h *HTTPTransport) isLeaderHandler(c *gin.Context) { func (h *HTTPTransport) leaveHandler(c *gin.Context) { if err := h.agent.Stop(); err != nil { - c.AbortWithError(http.StatusInternalServerError, err) + _ = c.AbortWithError(http.StatusInternalServerError, err) } renderJSON(c, http.StatusOK, h.agent.peers) } @@ -465,7 +473,7 @@ func (h *HTTPTransport) jobToggleHandler(c *gin.Context) { job, err := h.agent.Store.GetJob(jobName, nil) if err != nil { - c.AbortWithError(http.StatusNotFound, err) + _ = c.AbortWithError(http.StatusNotFound, err) return } @@ -474,7 +482,7 @@ func (h *HTTPTransport) jobToggleHandler(c *gin.Context) { // Call gRPC SetJob if err := h.agent.GRPCClient.SetJob(job); err != nil { - c.AbortWithError(http.StatusUnprocessableEntity, err) + _ = c.AbortWithError(http.StatusUnprocessableEntity, err) return } @@ -487,7 +495,7 @@ func (h *HTTPTransport) busyHandler(c *gin.Context) { exs, err := h.agent.GetActiveExecutions() if err != nil { - c.AbortWithError(http.StatusInternalServerError, err) + _ = c.AbortWithError(http.StatusInternalServerError, err) return } diff --git a/dkron/api_test.go b/dkron/api_test.go index 2c0d19a1c..43c243dd6 100644 --- a/dkron/api_test.go +++ b/dkron/api_test.go @@ -36,7 +36,7 @@ func setupAPITest(t *testing.T, port string) (dir string, a *Agent) { c.DataDir = dir a = NewAgent(c) - a.Start() + _ = a.Start() for { if a.IsLeader() { @@ -170,7 +170,7 @@ func TestAPIJobCreateUpdateValidationEmptyName(t *testing.T) { baseURL := fmt.Sprintf("http://localhost:%s/v1", port) dir, a := setupAPITest(t, port) defer os.RemoveAll(dir) - defer a.Stop() + defer a.Stop() // nolint: errcheck jsonStr := []byte(`{ "name": "testjob1", @@ -253,7 +253,7 @@ func TestAPIGetNonExistentJobReturnsNotFound(t *testing.T) { baseURL := fmt.Sprintf("http://localhost:%s/v1", port) dir, a := setupAPITest(t, port) defer os.RemoveAll(dir) - defer a.Stop() + defer a.Stop() // nolint: errcheck resp, _ := http.Get(baseURL + "/jobs/notajob") @@ -265,7 +265,7 @@ func TestAPIJobCreateUpdateJobWithInvalidParentIsNotCreated(t *testing.T) { baseURL := fmt.Sprintf("http://localhost:%s/v1", port) dir, a := setupAPITest(t, port) defer os.RemoveAll(dir) - defer a.Stop() + defer a.Stop() // nolint: errcheck jsonStr := []byte(`{ "name": "test_job", @@ -295,7 +295,7 @@ func TestAPIJobRestore(t *testing.T) { baseURL := fmt.Sprintf("http://localhost:%s/v1/restore", port) dir, a := setupAPITest(t, port) defer os.RemoveAll(dir) - defer a.Stop() + defer a.Stop() // nolint: errcheck bodyBuffer := &bytes.Buffer{} bodyWriter := multipart.NewWriter(bodyBuffer) @@ -311,7 +311,7 @@ func TestAPIJobRestore(t *testing.T) { } defer file.Close() - io.Copy(fileWriter, file) + _, _ = io.Copy(fileWriter, file) contentType := bodyWriter.FormDataContentType() bodyWriter.Close() @@ -426,7 +426,7 @@ func postJob(t *testing.T, port string, jsonStr []byte) *http.Response { baseURL := fmt.Sprintf("http://localhost:%s/v1", port) dir, a := setupAPITest(t, port) defer os.RemoveAll(dir) - defer a.Stop() + defer a.Stop() // nolint: errcheck resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr)) require.NoError(t, err, err) diff --git a/dkron/config.go b/dkron/config.go index 44f6a6aff..00c493a97 100644 --- a/dkron/config.go +++ b/dkron/config.go @@ -147,7 +147,7 @@ type Config struct { PreWebhookHeaders []string `mapstructure:"pre-webhook-headers"` // WebhookEndpoint is the URL to call for notifications. - WebhookEndpoint string `mapstructure:"webhook-endpoint" mapstructure:"webhook-url"` + WebhookEndpoint string `mapstructure:"webhook-endpoint"` // WebhookPayload is the body template of the request for notifications. WebhookPayload string `mapstructure:"webhook-payload"` diff --git a/dkron/execution.go b/dkron/execution.go index 001f252f1..8f6b7642b 100644 --- a/dkron/execution.go +++ b/dkron/execution.go @@ -6,7 +6,7 @@ import ( "time" proto "github.com/distribworks/dkron/v3/plugin/types" - "github.com/golang/protobuf/ptypes" + "google.golang.org/protobuf/types/known/timestamppb" ) // Execution type holds all of the details of a specific Execution. @@ -50,8 +50,8 @@ func NewExecution(jobName string) *Execution { // NewExecutionFromProto maps a proto.ExecutionDoneRequest to an Execution object func NewExecutionFromProto(e *proto.Execution) *Execution { - startedAt, _ := ptypes.Timestamp(e.GetStartedAt()) - finishedAt, _ := ptypes.Timestamp(e.GetFinishedAt()) + startedAt := e.GetStartedAt().AsTime() + finishedAt := e.GetFinishedAt().AsTime() return &Execution{ Id: e.Key(), JobName: e.JobName, @@ -68,8 +68,8 @@ func NewExecutionFromProto(e *proto.Execution) *Execution { // ToProto returns the protobuf struct corresponding to // the representation of the current execution. func (e *Execution) ToProto() *proto.Execution { - startedAt, _ := ptypes.TimestampProto(e.StartedAt) - finishedAt, _ := ptypes.TimestampProto(e.FinishedAt) + startedAt := timestamppb.New(e.StartedAt) + finishedAt := timestamppb.New(e.FinishedAt) return &proto.Execution{ JobName: e.JobName, Success: e.Success, diff --git a/dkron/fsm.go b/dkron/fsm.go index 6bb1d660d..6f711df00 100644 --- a/dkron/fsm.go +++ b/dkron/fsm.go @@ -4,9 +4,9 @@ import ( "io" dkronpb "github.com/distribworks/dkron/v3/plugin/types" - "github.com/golang/protobuf/proto" "github.com/hashicorp/raft" "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" ) // MessageType is the type to encode FSM commands. @@ -149,7 +149,7 @@ type dkronSnapshot struct { func (d *dkronSnapshot) Persist(sink raft.SnapshotSink) error { if err := d.store.Snapshot(sink); err != nil { - sink.Cancel() + _ = sink.Cancel() return err } diff --git a/dkron/grpc.go b/dkron/grpc.go index 55ff64f4f..01329d44f 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -11,13 +11,13 @@ import ( metrics "github.com/armon/go-metrics" "github.com/distribworks/dkron/v3/plugin" proto "github.com/distribworks/dkron/v3/plugin/types" - pb "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" + pb "google.golang.org/protobuf/proto" ) var ( diff --git a/dkron/grpc_agent.go b/dkron/grpc_agent.go index a44110c4d..59c146215 100644 --- a/dkron/grpc_agent.go +++ b/dkron/grpc_agent.go @@ -7,8 +7,8 @@ import ( "github.com/armon/circbuf" metrics "github.com/armon/go-metrics" "github.com/distribworks/dkron/v3/plugin/types" - "github.com/golang/protobuf/ptypes" "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -67,7 +67,7 @@ func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_A exc := job.ExecutorConfig // Send the first update with the initial execution state to be stored in the server - execution.StartedAt = ptypes.TimestampNow() + execution.StartedAt = timestamppb.Now() execution.NodeName = as.agent.config.NodeName if err := stream.Send(&types.AgentRunStream{ @@ -98,20 +98,20 @@ func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_A if err != nil { as.logger.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("grpc_agent: command error output") success = false - output.Write([]byte(err.Error() + "\n")) + _, _ = output.Write([]byte(err.Error() + "\n")) } else { success = true } if out != nil { - output.Write(out.Output) + _, _ = output.Write(out.Output) } } else { as.logger.WithField("executor", jex).Error("grpc_agent: Specified executor is not present") - output.Write([]byte("grpc_agent: Specified executor is not present")) + _, _ = output.Write([]byte("grpc_agent: Specified executor is not present")) } - execution.FinishedAt = ptypes.TimestampNow() + execution.FinishedAt = timestamppb.Now() execution.Success = success execution.Output = output.Bytes() diff --git a/dkron/grpc_client.go b/dkron/grpc_client.go index 2661b96c2..a6eb2988e 100644 --- a/dkron/grpc_client.go +++ b/dkron/grpc_client.go @@ -7,11 +7,11 @@ import ( metrics "github.com/armon/go-metrics" proto "github.com/distribworks/dkron/v3/plugin/types" - "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/empty" "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" ) // DkronGRPCClient defines the interface that any gRPC client for @@ -425,7 +425,7 @@ func (grpcc *GRPCClient) AgentRun(addr string, job *proto.Job, execution *proto. // Error received from the stream if err != nil { // At this point the execution status will be unknown, set the FinishedAt time and an explanatory message - execution.FinishedAt = ptypes.TimestampNow() + execution.FinishedAt = timestamppb.Now() execution.Output = []byte(ErrBrokenStream.Error() + ": " + err.Error()) grpcc.logger.WithError(err).Error(ErrBrokenStream) diff --git a/dkron/grpc_test.go b/dkron/grpc_test.go index bd98fa5fb..bbaeb786b 100644 --- a/dkron/grpc_test.go +++ b/dkron/grpc_test.go @@ -33,7 +33,7 @@ func TestGRPCExecutionDone(t *testing.T) { c.DataDir = dir a := NewAgent(c) - a.Start() + _ = a.Start() for { if a.IsLeader() { diff --git a/dkron/job_test.go b/dkron/job_test.go index 3c7f0155f..83d6c6710 100644 --- a/dkron/job_test.go +++ b/dkron/job_test.go @@ -25,7 +25,7 @@ func getTestLogger() *logrus.Entry { func TestJobGetParent(t *testing.T) { log := getTestLogger() s, err := NewStore(log) - defer s.Shutdown() + defer s.Shutdown() // nolint: errcheck require.NoError(t, err) parentTestJob := &Job{ @@ -125,7 +125,7 @@ func Test_isRunnable(t *testing.T) { a := NewAgent(c) a.GRPCClient = &gRPCClientMock{} - a.Start() + _ = a.Start() time.Sleep(2 * time.Second) var exp ntime.NullableTime diff --git a/dkron/leader.go b/dkron/leader.go index 992e05703..1605cbe6e 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -171,7 +171,9 @@ WAIT: case <-interval: goto RECONCILE case member := <-reconcileCh: - a.reconcileMember(member) + if err := a.reconcileMember(member); err != nil { + a.logger.WithError(err).Error("dkron: failed to reconcile member") + } } } } diff --git a/dkron/metrics.go b/dkron/metrics.go index 4cfa1884e..89fc2ed0b 100644 --- a/dkron/metrics.go +++ b/dkron/metrics.go @@ -54,9 +54,13 @@ func initMetrics(a *Agent) error { // Initialize the global sink if len(fanout) > 0 { fanout = append(fanout, inm) - metrics.NewGlobal(metrics.DefaultConfig("dkron"), fanout) + if _, err := metrics.NewGlobal(metrics.DefaultConfig("dkron"), fanout); err != nil { + return err + } } else { - metrics.NewGlobal(metrics.DefaultConfig("dkron"), inm) + if _, err := metrics.NewGlobal(metrics.DefaultConfig("dkron"), inm); err != nil { + return err + } } return nil diff --git a/dkron/notifier_test.go b/dkron/notifier_test.go index 509fcc1a0..b8b9ff332 100644 --- a/dkron/notifier_test.go +++ b/dkron/notifier_test.go @@ -13,7 +13,7 @@ import ( func TestNotifier_callExecutionWebhook(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - io.Copy(w, r.Body) + _, _ = io.Copy(w, r.Body) })) defer ts.Close() @@ -32,7 +32,7 @@ func TestNotifier_callExecutionWebhookHostHeader(t *testing.T) { var got string var exp = "dkron.io" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - io.Copy(w, r.Body) + _, _ = io.Copy(w, r.Body) got = r.Host })) defer ts.Close() diff --git a/dkron/scheduler_test.go b/dkron/scheduler_test.go index 4010cc4cd..0a622801c 100644 --- a/dkron/scheduler_test.go +++ b/dkron/scheduler_test.go @@ -8,6 +8,7 @@ import ( "github.com/distribworks/dkron/v3/extcron" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestSchedule(t *testing.T) { @@ -25,7 +26,8 @@ func TestSchedule(t *testing.T) { Owner: "John Dough", OwnerEmail: "foo@bar.com", } - sched.Start([]*Job{testJob1}, &Agent{}) + err := sched.Start([]*Job{testJob1}, &Agent{}) + require.NoError(t, err) assert.True(t, sched.started) assert.True(t, sched.Started()) @@ -62,7 +64,8 @@ func TestClearCron(t *testing.T) { Owner: "John Dough", OwnerEmail: "foo@bar.com", } - sched.AddJob(testJob) + err := sched.AddJob(testJob) + require.NoError(t, err) assert.Len(t, sched.Cron.Entries(), 1) sched.ClearCron() @@ -80,7 +83,7 @@ func TestTimezoneAwareJob(t *testing.T) { Executor: "shell", ExecutorConfig: map[string]string{"command": "echo 'test1'", "shell": "true"}, } - sched.Start([]*Job{tzJob}, &Agent{}) + _ = sched.Start([]*Job{tzJob}, &Agent{}) assert.True(t, sched.started) assert.True(t, sched.Started()) @@ -93,10 +96,11 @@ func TestScheduleStop(t *testing.T) { sched := NewScheduler(log) sched.Cron = cron.New(cron.WithParser(extcron.NewParser())) - sched.Cron.AddFunc("@every 2s", func() { + _, err := sched.Cron.AddFunc("@every 2s", func() { time.Sleep(time.Second * 5) fmt.Println("function done") }) + require.NoError(t, err) sched.Cron.Start() sched.started = true @@ -108,7 +112,7 @@ func TestScheduleStop(t *testing.T) { Owner: "John Dough", OwnerEmail: "foo@bar.com", } - err := sched.Start([]*Job{testJob1}, &Agent{}) + err = sched.Start([]*Job{testJob1}, &Agent{}) assert.Error(t, err) // Wait for the job to start diff --git a/dkron/store.go b/dkron/store.go index 3221d9c79..77786e35d 100644 --- a/dkron/store.go +++ b/dkron/store.go @@ -13,9 +13,9 @@ import ( "time" dkronpb "github.com/distribworks/dkron/v3/plugin/types" - "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "github.com/tidwall/buntdb" + "google.golang.org/protobuf/proto" ) const ( @@ -66,20 +66,20 @@ type kv struct { // NewStore creates a new Storage instance. func NewStore(logger *logrus.Entry) (*Store, error) { db, err := buntdb.Open(":memory:") - db.CreateIndex("name", jobsPrefix+":*", buntdb.IndexJSON("name")) - db.CreateIndex("started_at", executionsPrefix+":*", buntdb.IndexJSON("started_at")) - db.CreateIndex("finished_at", executionsPrefix+":*", buntdb.IndexJSON("finished_at")) - db.CreateIndex("attempt", executionsPrefix+":*", buntdb.IndexJSON("attempt")) - db.CreateIndex("displayname", jobsPrefix+":*", buntdb.IndexJSON("displayname")) - db.CreateIndex("schedule", jobsPrefix+":*", buntdb.IndexJSON("schedule")) - db.CreateIndex("success_count", jobsPrefix+":*", buntdb.IndexJSON("success_count")) - db.CreateIndex("error_count", jobsPrefix+":*", buntdb.IndexJSON("error_count")) - db.CreateIndex("last_success", jobsPrefix+":*", buntdb.IndexJSON("last_success")) - db.CreateIndex("last_error", jobsPrefix+":*", buntdb.IndexJSON("last_error")) - db.CreateIndex("next", jobsPrefix+":*", buntdb.IndexJSON("next")) if err != nil { return nil, err } + _ = db.CreateIndex("name", jobsPrefix+":*", buntdb.IndexJSON("name")) + _ = db.CreateIndex("started_at", executionsPrefix+":*", buntdb.IndexJSON("started_at")) + _ = db.CreateIndex("finished_at", executionsPrefix+":*", buntdb.IndexJSON("finished_at")) + _ = db.CreateIndex("attempt", executionsPrefix+":*", buntdb.IndexJSON("attempt")) + _ = db.CreateIndex("displayname", jobsPrefix+":*", buntdb.IndexJSON("displayname")) + _ = db.CreateIndex("schedule", jobsPrefix+":*", buntdb.IndexJSON("schedule")) + _ = db.CreateIndex("success_count", jobsPrefix+":*", buntdb.IndexJSON("success_count")) + _ = db.CreateIndex("error_count", jobsPrefix+":*", buntdb.IndexJSON("error_count")) + _ = db.CreateIndex("last_success", jobsPrefix+":*", buntdb.IndexJSON("last_success")) + _ = db.CreateIndex("last_error", jobsPrefix+":*", buntdb.IndexJSON("last_error")) + _ = db.CreateIndex("next", jobsPrefix+":*", buntdb.IndexJSON("next")) store := &Store{ db: db, @@ -174,7 +174,10 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error { } pbj := job.ToProto() - s.setJobTxFunc(pbj)(tx) + if err := s.setJobTxFunc(pbj)(tx); err != nil { + return err + } + return nil }) if err != nil { @@ -613,12 +616,14 @@ func (s *Store) deleteExecutionsTxFunc(jobName string) func(tx *buntdb.Tx) error return func(tx *buntdb.Tx) error { var delkeys []string prefix := fmt.Sprintf("%s:%s", executionsPrefix, jobName) - tx.Ascend("", func(key, value string) bool { + if err := tx.Ascend("", func(key, value string) bool { if strings.HasPrefix(key, prefix) { delkeys = append(delkeys, key) } return true - }) + }); err != nil { + return err + } for _, k := range delkeys { _, _ = tx.Delete(k) diff --git a/dkron/store_test.go b/dkron/store_test.go index 629840f5b..9e529749f 100644 --- a/dkron/store_test.go +++ b/dkron/store_test.go @@ -13,7 +13,7 @@ func TestStore(t *testing.T) { log := getTestLogger() s, err := NewStore(log) require.NoError(t, err) - defer s.Shutdown() + defer s.Shutdown() // nolint: errcheck testJob := &Job{ Name: "test", @@ -218,7 +218,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 1, } - s.SetExecution(ex1) + _, _ = s.SetExecution(ex1) ex2 := &Execution{ JobName: "test", @@ -229,7 +229,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode2", Group: 1, } - s.SetExecution(ex2) + _, _ = s.SetExecution(ex2) ex3 := &Execution{ JobName: "test", @@ -240,7 +240,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 2, } - s.SetExecution(ex3) + _, _ = s.SetExecution(ex3) ex4 := &Execution{ JobName: "test", @@ -251,7 +251,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 2, } - s.SetExecution(ex4) + _, _ = s.SetExecution(ex4) ex5 := &Execution{ JobName: "test", @@ -261,7 +261,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 3, } - s.SetExecution(ex5) + _, _ = s.SetExecution(ex5) ex6 := &Execution{ JobName: "test", @@ -270,7 +270,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 4, } - s.SetExecution(ex6) + _, _ = s.SetExecution(ex6) // Tests status err = s.db.View(func(tx *buntdb.Tx) error {