diff --git a/dkron/grpc.go b/dkron/grpc.go index c4df07f63..7be8d5332 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -123,6 +123,9 @@ func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJ // If everything is ok, remove the job grpcs.agent.sched.RemoveJob(job) + if job.Ephemeral { + grpcs.logger.WithField("job", job.Name).Info("grpc: Done deleting ephemeral job") + } return &proto.DeleteJobResponse{Job: jpb}, nil } @@ -252,6 +255,16 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E } } + if job.Ephemeral && job.Status == StatusSuccess { + if _, err := grpcs.DeleteJob(ctx, &proto.DeleteJobRequest{JobName: job.Name}); err != nil { + return nil, err + } + return &proto.ExecutionDoneResponse{ + From: grpcs.agent.config.NodeName, + Payload: []byte("deleted"), + }, nil + } + return &proto.ExecutionDoneResponse{ From: grpcs.agent.config.NodeName, Payload: []byte("saved"), diff --git a/dkron/grpc_test.go b/dkron/grpc_test.go index 9db5b6ce8..b4197d26a 100644 --- a/dkron/grpc_test.go +++ b/dkron/grpc_test.go @@ -104,4 +104,17 @@ func TestGRPCExecutionDone(t *testing.T) { err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) assert.Error(t, err, ErrExecutionDoneForDeletedJob) + + // Test ephemeral jobs + testJob.Ephemeral = true + + err = a.Store.SetJob(testJob, true) + require.NoError(t, err) + + err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) + assert.NoError(t, err) + + j, err := a.Store.GetJob("test", nil) + assert.Error(t, err) + assert.Nil(t, j) } diff --git a/dkron/job.go b/dkron/job.go index efb4d60b6..d02c0c5f4 100644 --- a/dkron/job.go +++ b/dkron/job.go @@ -10,9 +10,9 @@ import ( "github.com/distribworks/dkron/v3/ntime" "github.com/distribworks/dkron/v3/plugin" proto "github.com/distribworks/dkron/v3/plugin/types" - "github.com/golang/protobuf/ptypes" "github.com/sirupsen/logrus" "github.com/tidwall/buntdb" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -66,10 +66,10 @@ type Job struct { // Cron expression for the job. When to run the job. Schedule string `json:"schedule"` - // Owner of the job. + // Arbitrary string indicating the owner of the job. Owner string `json:"owner"` - // Owner email of the job. + // Email address to use for notifications. OwnerEmail string `json:"owner_email"` // Number of successful executions of this job. @@ -108,32 +108,35 @@ type Job struct { // Job id of job that this job is dependent upon. ParentJob string `json:"parent_job"` - // Processors to use for this job + // Processors to use for this job. Processors map[string]plugin.Config `json:"processors"` - // Concurrency policy for this job (allow, forbid) + // Concurrency policy for this job (allow, forbid). Concurrency string `json:"concurrency"` - // Executor plugin to be used in this job + // Executor plugin to be used in this job. Executor string `json:"executor"` - // Executor args + // Configuration arguments for the specific executor. ExecutorConfig plugin.ExecutorPluginConfig `json:"executor_config"` - // Computed job status + // Computed job status. Status string `json:"status"` - // Computed next execution + // Computed next execution. Next time.Time `json:"next"` + // Delete the job after the first successful execution. + Ephemeral bool `json:"ephemeral"` + + // The job will not be executed after this time. ExpiresAt ntime.NullableTime `json:"expires_at"` + logger *logrus.Entry } // NewJobFromProto create a new Job from a PB Job struct func NewJobFromProto(in *proto.Job, logger *logrus.Entry) *Job { - next, _ := ptypes.Timestamp(in.GetNext()) - job := &Job{ ID: in.Name, Name: in.Name, @@ -154,17 +157,22 @@ func NewJobFromProto(in *proto.Job, logger *logrus.Entry) *Job { ExecutorConfig: in.ExecutorConfig, Status: in.Status, Metadata: in.Metadata, - Next: next, + Next: in.GetNext().AsTime(), + Ephemeral: in.Ephemeral, logger: logger, } if in.GetLastSuccess().GetHasValue() { - t, _ := ptypes.Timestamp(in.GetLastSuccess().GetTime()) + t := in.GetLastSuccess().GetTime().AsTime() job.LastSuccess.Set(t) } if in.GetLastError().GetHasValue() { - t, _ := ptypes.Timestamp(in.GetLastError().GetTime()) + t := in.GetLastError().GetTime().AsTime() job.LastError.Set(t) } + if in.GetExpiresAt().GetHasValue() { + t := in.GetExpiresAt().GetTime().AsTime() + job.ExpiresAt.Set(t) + } procs := make(map[string]plugin.Config) for k, v := range in.Processors { @@ -184,15 +192,23 @@ func (j *Job) ToProto() *proto.Job { HasValue: j.LastSuccess.HasValue(), } if j.LastSuccess.HasValue() { - lastSuccess.Time, _ = ptypes.TimestampProto(j.LastSuccess.Get()) + lastSuccess.Time = timestamppb.New(j.LastSuccess.Get()) } lastError := &proto.Job_NullableTime{ HasValue: j.LastError.HasValue(), } if j.LastError.HasValue() { - lastError.Time, _ = ptypes.TimestampProto(j.LastError.Get()) + lastError.Time = timestamppb.New(j.LastError.Get()) + } + + next := timestamppb.New(j.Next) + + expiresAt := &proto.Job_NullableTime{ + HasValue: j.ExpiresAt.HasValue(), + } + if j.ExpiresAt.HasValue() { + expiresAt.Time = timestamppb.New(j.ExpiresAt.Get()) } - next, _ := ptypes.TimestampProto(j.Next) processors := make(map[string]*proto.PluginConfig) for k, v := range j.Processors { @@ -221,6 +237,8 @@ func (j *Job) ToProto() *proto.Job { LastSuccess: lastSuccess, LastError: lastError, Next: next, + Ephemeral: j.Ephemeral, + ExpiresAt: expiresAt, } } @@ -299,7 +317,9 @@ func (j *Job) GetNext() (time.Time, error) { } func (j *Job) isRunnable(logger *logrus.Entry) bool { - if j.Disabled { + if j.Disabled || (j.ExpiresAt.HasValue() && time.Now().After(j.ExpiresAt.Get())) { + logger.WithField("job", j.Name). + Debug("job: Skipping execution because job is disabled or expired") return false } diff --git a/dkron/job_test.go b/dkron/job_test.go index 5a44b68e4..3c7f0155f 100644 --- a/dkron/job_test.go +++ b/dkron/job_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/distribworks/dkron/v3/ntime" "github.com/distribworks/dkron/v3/plugin" proto "github.com/distribworks/dkron/v3/plugin/types" "github.com/hashicorp/serf/testutil" @@ -127,6 +128,9 @@ func Test_isRunnable(t *testing.T) { a.Start() time.Sleep(2 * time.Second) + var exp ntime.NullableTime + exp.Set(time.Now().AddDate(0, 0, -1)) + testCases := []struct { name string job *Job @@ -159,6 +163,24 @@ func Test_isRunnable(t *testing.T) { }, want: true, }, + { + name: "disabled", + job: &Job{ + Name: "test_job", + Disabled: true, + Agent: a, + }, + want: false, + }, + { + name: "expired", + job: &Job{ + Name: "test_job", + ExpiresAt: exp, + Agent: a, + }, + want: false, + }, } log := getTestLogger() diff --git a/dkron/ui-dist/asset-manifest.json b/dkron/ui-dist/asset-manifest.json index 92b5b1755..d3235ed6f 100644 --- a/dkron/ui-dist/asset-manifest.json +++ b/dkron/ui-dist/asset-manifest.json @@ -1,23 +1,23 @@ { "files": { "main.css": "./static/css/main.356b93bb.chunk.css", - "main.js": "./static/js/main.080a47b0.chunk.js", - "main.js.map": "./static/js/main.080a47b0.chunk.js.map", - "runtime-main.js": "./static/js/runtime-main.7a7f3ef5.js", - "runtime-main.js.map": "./static/js/runtime-main.7a7f3ef5.js.map", - "static/js/2.f94c37f3.chunk.js": "./static/js/2.f94c37f3.chunk.js", - "static/js/2.f94c37f3.chunk.js.map": "./static/js/2.f94c37f3.chunk.js.map", - "static/js/3.7b2d85fb.chunk.js": "./static/js/3.7b2d85fb.chunk.js", - "static/js/3.7b2d85fb.chunk.js.map": "./static/js/3.7b2d85fb.chunk.js.map", + "main.js": "./static/js/main.b44783f2.chunk.js", + "main.js.map": "./static/js/main.b44783f2.chunk.js.map", + "runtime-main.js": "./static/js/runtime-main.47b35914.js", + "runtime-main.js.map": "./static/js/runtime-main.47b35914.js.map", + "static/js/2.0df4b2d4.chunk.js": "./static/js/2.0df4b2d4.chunk.js", + "static/js/2.0df4b2d4.chunk.js.map": "./static/js/2.0df4b2d4.chunk.js.map", + "static/js/3.3693b650.chunk.js": "./static/js/3.3693b650.chunk.js", + "static/js/3.3693b650.chunk.js.map": "./static/js/3.3693b650.chunk.js.map", "index.html": "./index.html", "static/css/main.356b93bb.chunk.css.map": "./static/css/main.356b93bb.chunk.css.map", - "static/js/2.f94c37f3.chunk.js.LICENSE.txt": "./static/js/2.f94c37f3.chunk.js.LICENSE.txt", + "static/js/2.0df4b2d4.chunk.js.LICENSE.txt": "./static/js/2.0df4b2d4.chunk.js.LICENSE.txt", "static/media/dkron-logo.cd9d7840.png": "./static/media/dkron-logo.cd9d7840.png" }, "entrypoints": [ - "static/js/runtime-main.7a7f3ef5.js", - "static/js/2.f94c37f3.chunk.js", + "static/js/runtime-main.47b35914.js", + "static/js/2.0df4b2d4.chunk.js", "static/css/main.356b93bb.chunk.css", - "static/js/main.080a47b0.chunk.js" + "static/js/main.b44783f2.chunk.js" ] } \ No newline at end of file diff --git a/dkron/ui-dist/index.html b/dkron/ui-dist/index.html index ff76e73f6..fecae871d 100644 --- a/dkron/ui-dist/index.html +++ b/dkron/ui-dist/index.html @@ -3,4 +3,4 @@ window.DKRON_TOTAL_JOBS={{.DKRON_TOTAL_JOBS}}; window.DKRON_SUCCESSFUL_JOBS={{.DKRON_SUCCESSFUL_JOBS}}; window.DKRON_FAILED_JOBS={{.DKRON_FAILED_JOBS}}; - window.DKRON_UNTRIGGERED_JOBS={{.DKRON_UNTRIGGERED_JOBS}};