Skip to content

Commit

Permalink
feature: Implement ephemeral and expires at feature (#972)
Browse files Browse the repository at this point in the history
* Implement ephemeral and expires at feature

Ephemeral jobs will be deleted after the first successful execution
Expiring jobs won't be executed after the indicated datetime
  • Loading branch information
Victor Castell authored May 29, 2021
1 parent 426c8a3 commit 2538be5
Show file tree
Hide file tree
Showing 26 changed files with 446 additions and 337 deletions.
13 changes: 13 additions & 0 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJ

// If everything is ok, remove the job
grpcs.agent.sched.RemoveJob(job)
if job.Ephemeral {
grpcs.logger.WithField("job", job.Name).Info("grpc: Done deleting ephemeral job")
}

return &proto.DeleteJobResponse{Job: jpb}, nil
}
Expand Down Expand Up @@ -252,6 +255,16 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E
}
}

if job.Ephemeral && job.Status == StatusSuccess {
if _, err := grpcs.DeleteJob(ctx, &proto.DeleteJobRequest{JobName: job.Name}); err != nil {
return nil, err
}
return &proto.ExecutionDoneResponse{
From: grpcs.agent.config.NodeName,
Payload: []byte("deleted"),
}, nil
}

return &proto.ExecutionDoneResponse{
From: grpcs.agent.config.NodeName,
Payload: []byte("saved"),
Expand Down
13 changes: 13 additions & 0 deletions dkron/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,17 @@ func TestGRPCExecutionDone(t *testing.T) {
err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution)

assert.Error(t, err, ErrExecutionDoneForDeletedJob)

// Test ephemeral jobs
testJob.Ephemeral = true

err = a.Store.SetJob(testJob, true)
require.NoError(t, err)

err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution)
assert.NoError(t, err)

j, err := a.Store.GetJob("test", nil)
assert.Error(t, err)
assert.Nil(t, j)
}
56 changes: 38 additions & 18 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/distribworks/dkron/v3/ntime"
"github.com/distribworks/dkron/v3/plugin"
proto "github.com/distribworks/dkron/v3/plugin/types"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
"github.com/tidwall/buntdb"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand Down Expand Up @@ -66,10 +66,10 @@ type Job struct {
// Cron expression for the job. When to run the job.
Schedule string `json:"schedule"`

// Owner of the job.
// Arbitrary string indicating the owner of the job.
Owner string `json:"owner"`

// Owner email of the job.
// Email address to use for notifications.
OwnerEmail string `json:"owner_email"`

// Number of successful executions of this job.
Expand Down Expand Up @@ -108,32 +108,35 @@ type Job struct {
// Job id of job that this job is dependent upon.
ParentJob string `json:"parent_job"`

// Processors to use for this job
// Processors to use for this job.
Processors map[string]plugin.Config `json:"processors"`

// Concurrency policy for this job (allow, forbid)
// Concurrency policy for this job (allow, forbid).
Concurrency string `json:"concurrency"`

// Executor plugin to be used in this job
// Executor plugin to be used in this job.
Executor string `json:"executor"`

// Executor args
// Configuration arguments for the specific executor.
ExecutorConfig plugin.ExecutorPluginConfig `json:"executor_config"`

// Computed job status
// Computed job status.
Status string `json:"status"`

// Computed next execution
// Computed next execution.
Next time.Time `json:"next"`

// Delete the job after the first successful execution.
Ephemeral bool `json:"ephemeral"`

// The job will not be executed after this time.
ExpiresAt ntime.NullableTime `json:"expires_at"`

logger *logrus.Entry
}

// NewJobFromProto create a new Job from a PB Job struct
func NewJobFromProto(in *proto.Job, logger *logrus.Entry) *Job {
next, _ := ptypes.Timestamp(in.GetNext())

job := &Job{
ID: in.Name,
Name: in.Name,
Expand All @@ -154,17 +157,22 @@ func NewJobFromProto(in *proto.Job, logger *logrus.Entry) *Job {
ExecutorConfig: in.ExecutorConfig,
Status: in.Status,
Metadata: in.Metadata,
Next: next,
Next: in.GetNext().AsTime(),
Ephemeral: in.Ephemeral,
logger: logger,
}
if in.GetLastSuccess().GetHasValue() {
t, _ := ptypes.Timestamp(in.GetLastSuccess().GetTime())
t := in.GetLastSuccess().GetTime().AsTime()
job.LastSuccess.Set(t)
}
if in.GetLastError().GetHasValue() {
t, _ := ptypes.Timestamp(in.GetLastError().GetTime())
t := in.GetLastError().GetTime().AsTime()
job.LastError.Set(t)
}
if in.GetExpiresAt().GetHasValue() {
t := in.GetExpiresAt().GetTime().AsTime()
job.ExpiresAt.Set(t)
}

procs := make(map[string]plugin.Config)
for k, v := range in.Processors {
Expand All @@ -184,15 +192,23 @@ func (j *Job) ToProto() *proto.Job {
HasValue: j.LastSuccess.HasValue(),
}
if j.LastSuccess.HasValue() {
lastSuccess.Time, _ = ptypes.TimestampProto(j.LastSuccess.Get())
lastSuccess.Time = timestamppb.New(j.LastSuccess.Get())
}
lastError := &proto.Job_NullableTime{
HasValue: j.LastError.HasValue(),
}
if j.LastError.HasValue() {
lastError.Time, _ = ptypes.TimestampProto(j.LastError.Get())
lastError.Time = timestamppb.New(j.LastError.Get())
}

next := timestamppb.New(j.Next)

expiresAt := &proto.Job_NullableTime{
HasValue: j.ExpiresAt.HasValue(),
}
if j.ExpiresAt.HasValue() {
expiresAt.Time = timestamppb.New(j.ExpiresAt.Get())
}
next, _ := ptypes.TimestampProto(j.Next)

processors := make(map[string]*proto.PluginConfig)
for k, v := range j.Processors {
Expand Down Expand Up @@ -221,6 +237,8 @@ func (j *Job) ToProto() *proto.Job {
LastSuccess: lastSuccess,
LastError: lastError,
Next: next,
Ephemeral: j.Ephemeral,
ExpiresAt: expiresAt,
}
}

Expand Down Expand Up @@ -299,7 +317,9 @@ func (j *Job) GetNext() (time.Time, error) {
}

func (j *Job) isRunnable(logger *logrus.Entry) bool {
if j.Disabled {
if j.Disabled || (j.ExpiresAt.HasValue() && time.Now().After(j.ExpiresAt.Get())) {
logger.WithField("job", j.Name).
Debug("job: Skipping execution because job is disabled or expired")
return false
}

Expand Down
22 changes: 22 additions & 0 deletions dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/distribworks/dkron/v3/ntime"
"github.com/distribworks/dkron/v3/plugin"
proto "github.com/distribworks/dkron/v3/plugin/types"
"github.com/hashicorp/serf/testutil"
Expand Down Expand Up @@ -127,6 +128,9 @@ func Test_isRunnable(t *testing.T) {
a.Start()
time.Sleep(2 * time.Second)

var exp ntime.NullableTime
exp.Set(time.Now().AddDate(0, 0, -1))

testCases := []struct {
name string
job *Job
Expand Down Expand Up @@ -159,6 +163,24 @@ func Test_isRunnable(t *testing.T) {
},
want: true,
},
{
name: "disabled",
job: &Job{
Name: "test_job",
Disabled: true,
Agent: a,
},
want: false,
},
{
name: "expired",
job: &Job{
Name: "test_job",
ExpiresAt: exp,
Agent: a,
},
want: false,
},
}

log := getTestLogger()
Expand Down
24 changes: 12 additions & 12 deletions dkron/ui-dist/asset-manifest.json
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
{
"files": {
"main.css": "./static/css/main.356b93bb.chunk.css",
"main.js": "./static/js/main.080a47b0.chunk.js",
"main.js.map": "./static/js/main.080a47b0.chunk.js.map",
"runtime-main.js": "./static/js/runtime-main.7a7f3ef5.js",
"runtime-main.js.map": "./static/js/runtime-main.7a7f3ef5.js.map",
"static/js/2.f94c37f3.chunk.js": "./static/js/2.f94c37f3.chunk.js",
"static/js/2.f94c37f3.chunk.js.map": "./static/js/2.f94c37f3.chunk.js.map",
"static/js/3.7b2d85fb.chunk.js": "./static/js/3.7b2d85fb.chunk.js",
"static/js/3.7b2d85fb.chunk.js.map": "./static/js/3.7b2d85fb.chunk.js.map",
"main.js": "./static/js/main.b44783f2.chunk.js",
"main.js.map": "./static/js/main.b44783f2.chunk.js.map",
"runtime-main.js": "./static/js/runtime-main.47b35914.js",
"runtime-main.js.map": "./static/js/runtime-main.47b35914.js.map",
"static/js/2.0df4b2d4.chunk.js": "./static/js/2.0df4b2d4.chunk.js",
"static/js/2.0df4b2d4.chunk.js.map": "./static/js/2.0df4b2d4.chunk.js.map",
"static/js/3.3693b650.chunk.js": "./static/js/3.3693b650.chunk.js",
"static/js/3.3693b650.chunk.js.map": "./static/js/3.3693b650.chunk.js.map",
"index.html": "./index.html",
"static/css/main.356b93bb.chunk.css.map": "./static/css/main.356b93bb.chunk.css.map",
"static/js/2.f94c37f3.chunk.js.LICENSE.txt": "./static/js/2.f94c37f3.chunk.js.LICENSE.txt",
"static/js/2.0df4b2d4.chunk.js.LICENSE.txt": "./static/js/2.0df4b2d4.chunk.js.LICENSE.txt",
"static/media/dkron-logo.cd9d7840.png": "./static/media/dkron-logo.cd9d7840.png"
},
"entrypoints": [
"static/js/runtime-main.7a7f3ef5.js",
"static/js/2.f94c37f3.chunk.js",
"static/js/runtime-main.47b35914.js",
"static/js/2.0df4b2d4.chunk.js",
"static/css/main.356b93bb.chunk.css",
"static/js/main.080a47b0.chunk.js"
"static/js/main.b44783f2.chunk.js"
]
}
2 changes: 1 addition & 1 deletion dkron/ui-dist/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
window.DKRON_TOTAL_JOBS={{.DKRON_TOTAL_JOBS}};
window.DKRON_SUCCESSFUL_JOBS={{.DKRON_SUCCESSFUL_JOBS}};
window.DKRON_FAILED_JOBS={{.DKRON_FAILED_JOBS}};
window.DKRON_UNTRIGGERED_JOBS={{.DKRON_UNTRIGGERED_JOBS}};</script><link href="./static/css/main.356b93bb.chunk.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div><script>!function(e){function r(r){for(var n,i,a=r[0],c=r[1],l=r[2],s=0,p=[];s<a.length;s++)i=a[s],Object.prototype.hasOwnProperty.call(o,i)&&o[i]&&p.push(o[i][0]),o[i]=0;for(n in c)Object.prototype.hasOwnProperty.call(c,n)&&(e[n]=c[n]);for(f&&f(r);p.length;)p.shift()();return u.push.apply(u,l||[]),t()}function t(){for(var e,r=0;r<u.length;r++){for(var t=u[r],n=!0,a=1;a<t.length;a++){var c=t[a];0!==o[c]&&(n=!1)}n&&(u.splice(r--,1),e=i(i.s=t[0]))}return e}var n={},o={1:0},u=[];function i(r){if(n[r])return n[r].exports;var t=n[r]={i:r,l:!1,exports:{}};return e[r].call(t.exports,t,t.exports,i),t.l=!0,t.exports}i.e=function(e){var r=[],t=o[e];if(0!==t)if(t)r.push(t[2]);else{var n=new Promise((function(r,n){t=o[e]=[r,n]}));r.push(t[2]=n);var u,a=document.createElement("script");a.charset="utf-8",a.timeout=120,i.nc&&a.setAttribute("nonce",i.nc),a.src=function(e){return i.p+"static/js/"+({}[e]||e)+"."+{3:"7b2d85fb"}[e]+".chunk.js"}(e);var c=new Error;u=function(r){a.onerror=a.onload=null,clearTimeout(l);var t=o[e];if(0!==t){if(t){var n=r&&("load"===r.type?"missing":r.type),u=r&&r.target&&r.target.src;c.message="Loading chunk "+e+" failed.\n("+n+": "+u+")",c.name="ChunkLoadError",c.type=n,c.request=u,t[1](c)}o[e]=void 0}};var l=setTimeout((function(){u({type:"timeout",target:a})}),12e4);a.onerror=a.onload=u,document.head.appendChild(a)}return Promise.all(r)},i.m=e,i.c=n,i.d=function(e,r,t){i.o(e,r)||Object.defineProperty(e,r,{enumerable:!0,get:t})},i.r=function(e){"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},i.t=function(e,r){if(1&r&&(e=i(e)),8&r)return e;if(4&r&&"object"==typeof e&&e&&e.__esModule)return e;var t=Object.create(null);if(i.r(t),Object.defineProperty(t,"default",{enumerable:!0,value:e}),2&r&&"string"!=typeof e)for(var n in e)i.d(t,n,function(r){return e[r]}.bind(null,n));return t},i.n=function(e){var r=e&&e.__esModule?function(){return e.default}:function(){return e};return i.d(r,"a",r),r},i.o=function(e,r){return Object.prototype.hasOwnProperty.call(e,r)},i.p="./",i.oe=function(e){throw console.error(e),e};var a=this.webpackJsonpwebui=this.webpackJsonpwebui||[],c=a.push.bind(a);a.push=r,a=a.slice();for(var l=0;l<a.length;l++)r(a[l]);var f=c;t()}([])</script><script src="./static/js/2.f94c37f3.chunk.js"></script><script src="./static/js/main.080a47b0.chunk.js"></script></body></html>
window.DKRON_UNTRIGGERED_JOBS={{.DKRON_UNTRIGGERED_JOBS}};</script><link href="./static/css/main.356b93bb.chunk.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div><script>!function(e){function r(r){for(var n,i,a=r[0],c=r[1],l=r[2],s=0,p=[];s<a.length;s++)i=a[s],Object.prototype.hasOwnProperty.call(o,i)&&o[i]&&p.push(o[i][0]),o[i]=0;for(n in c)Object.prototype.hasOwnProperty.call(c,n)&&(e[n]=c[n]);for(f&&f(r);p.length;)p.shift()();return u.push.apply(u,l||[]),t()}function t(){for(var e,r=0;r<u.length;r++){for(var t=u[r],n=!0,a=1;a<t.length;a++){var c=t[a];0!==o[c]&&(n=!1)}n&&(u.splice(r--,1),e=i(i.s=t[0]))}return e}var n={},o={1:0},u=[];function i(r){if(n[r])return n[r].exports;var t=n[r]={i:r,l:!1,exports:{}};return e[r].call(t.exports,t,t.exports,i),t.l=!0,t.exports}i.e=function(e){var r=[],t=o[e];if(0!==t)if(t)r.push(t[2]);else{var n=new Promise((function(r,n){t=o[e]=[r,n]}));r.push(t[2]=n);var u,a=document.createElement("script");a.charset="utf-8",a.timeout=120,i.nc&&a.setAttribute("nonce",i.nc),a.src=function(e){return i.p+"static/js/"+({}[e]||e)+"."+{3:"3693b650"}[e]+".chunk.js"}(e);var c=new Error;u=function(r){a.onerror=a.onload=null,clearTimeout(l);var t=o[e];if(0!==t){if(t){var n=r&&("load"===r.type?"missing":r.type),u=r&&r.target&&r.target.src;c.message="Loading chunk "+e+" failed.\n("+n+": "+u+")",c.name="ChunkLoadError",c.type=n,c.request=u,t[1](c)}o[e]=void 0}};var l=setTimeout((function(){u({type:"timeout",target:a})}),12e4);a.onerror=a.onload=u,document.head.appendChild(a)}return Promise.all(r)},i.m=e,i.c=n,i.d=function(e,r,t){i.o(e,r)||Object.defineProperty(e,r,{enumerable:!0,get:t})},i.r=function(e){"undefined"!=typeof Symbol&&Symbol.toStringTag&&Object.defineProperty(e,Symbol.toStringTag,{value:"Module"}),Object.defineProperty(e,"__esModule",{value:!0})},i.t=function(e,r){if(1&r&&(e=i(e)),8&r)return e;if(4&r&&"object"==typeof e&&e&&e.__esModule)return e;var t=Object.create(null);if(i.r(t),Object.defineProperty(t,"default",{enumerable:!0,value:e}),2&r&&"string"!=typeof e)for(var n in e)i.d(t,n,function(r){return e[r]}.bind(null,n));return t},i.n=function(e){var r=e&&e.__esModule?function(){return e.default}:function(){return e};return i.d(r,"a",r),r},i.o=function(e,r){return Object.prototype.hasOwnProperty.call(e,r)},i.p="./",i.oe=function(e){throw console.error(e),e};var a=this.webpackJsonpwebui=this.webpackJsonpwebui||[],c=a.push.bind(a);a.push=r,a=a.slice();for(var l=0;l<a.length;l++)r(a[l]);var f=c;t()}([])</script><script src="./static/js/2.0df4b2d4.chunk.js"></script><script src="./static/js/main.b44783f2.chunk.js"></script></body></html>
3 changes: 3 additions & 0 deletions dkron/ui-dist/static/js/2.0df4b2d4.chunk.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dkron/ui-dist/static/js/2.0df4b2d4.chunk.js.map

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions dkron/ui-dist/static/js/2.f94c37f3.chunk.js

This file was deleted.

1 change: 0 additions & 1 deletion dkron/ui-dist/static/js/2.f94c37f3.chunk.js.map

This file was deleted.

Loading

0 comments on commit 2538be5

Please sign in to comment.