diff --git a/dkron/fsm.go b/dkron/fsm.go index 78c3677b0..fe7351800 100644 --- a/dkron/fsm.go +++ b/dkron/fsm.go @@ -81,7 +81,7 @@ func (d *dkronFSM) applySetJob(buf []byte) interface{} { if err := proto.Unmarshal(buf, &pj); err != nil { return err } - job := NewJobFromProto(&pj) + job := NewJobFromProto(&pj, d.logger) if err := d.store.SetJob(job, false); err != nil { return err } diff --git a/dkron/grpc.go b/dkron/grpc.go index 61fe8bf0f..c4df07f63 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -91,7 +91,7 @@ func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequ } // If everything is ok, add the job to the scheduler - job := NewJobFromProto(setJobReq.Job) + job := NewJobFromProto(setJobReq.Job, grpcs.logger) job.Agent = grpcs.agent if err := grpcs.agent.sched.AddJob(job); err != nil { return nil, err diff --git a/dkron/grpc_client.go b/dkron/grpc_client.go index ff419924f..e0700dd19 100644 --- a/dkron/grpc_client.go +++ b/dkron/grpc_client.go @@ -131,7 +131,7 @@ func (grpcc *GRPCClient) GetJob(addr, jobName string) (*Job, error) { return nil, err } - return NewJobFromProto(gjr.Job), nil + return NewJobFromProto(gjr.Job, grpcc.logger), nil } // Leave calls Leave method on the gRPC server @@ -225,7 +225,7 @@ func (grpcc *GRPCClient) DeleteJob(jobName string) (*Job, error) { return nil, err } - job := NewJobFromProto(res.Job) + job := NewJobFromProto(res.Job, grpcc.logger) return job, nil } @@ -260,7 +260,7 @@ func (grpcc *GRPCClient) RunJob(jobName string) (*Job, error) { return nil, err } - job := NewJobFromProto(res.Job) + job := NewJobFromProto(res.Job, grpcc.logger) return job, nil } diff --git a/dkron/job.go b/dkron/job.go index 5c7fe41a5..efb4d60b6 100644 --- a/dkron/job.go +++ b/dkron/job.go @@ -126,11 +126,12 @@ type Job struct { // Computed next execution Next time.Time `json:"next"` + ExpiresAt ntime.NullableTime `json:"expires_at"` logger *logrus.Entry } // NewJobFromProto create a new Job from a PB Job struct -func NewJobFromProto(in *proto.Job) *Job { +func NewJobFromProto(in *proto.Job, logger *logrus.Entry) *Job { next, _ := ptypes.Timestamp(in.GetNext()) job := &Job{ @@ -154,6 +155,7 @@ func NewJobFromProto(in *proto.Job) *Job { Status: in.Status, Metadata: in.Metadata, Next: next, + logger: logger, } if in.GetLastSuccess().GetHasValue() { t, _ := ptypes.Timestamp(in.GetLastSuccess().GetTime()) diff --git a/dkron/job_test.go b/dkron/job_test.go index 4bc85c75e..5a44b68e4 100644 --- a/dkron/job_test.go +++ b/dkron/job_test.go @@ -89,7 +89,7 @@ func TestNewJobFromProto(t *testing.T) { } in.Processors = proc - j := NewJobFromProto(in) + j := NewJobFromProto(in, nil) assert.Equal(t, testConfig, j.Processors) } diff --git a/dkron/store.go b/dkron/store.go index 3e9ec7b22..204c31c49 100644 --- a/dkron/store.go +++ b/dkron/store.go @@ -136,7 +136,7 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error { return err } - ej = NewJobFromProto(&pbej) + ej = NewJobFromProto(&pbej, s.logger) if ej.Name != "" { // When the job runs, these status vars are updated @@ -328,7 +328,7 @@ func (s *Store) GetJobs(options *JobOptions) ([]*Job, error) { return false } } - job := NewJobFromProto(&pbj) + job := NewJobFromProto(&pbj, s.logger) job.logger = s.logger if options == nil || @@ -364,7 +364,7 @@ func (s *Store) GetJob(name string, options *JobOptions) (*Job, error) { return nil, err } - job := NewJobFromProto(&pbj) + job := NewJobFromProto(&pbj, s.logger) job.logger = s.logger return job, nil @@ -410,7 +410,7 @@ func (s *Store) DeleteJob(name string) (*Job, error) { if len(pbj.DependentJobs) > 0 { return ErrDependentJobs } - job = NewJobFromProto(&pbj) + job = NewJobFromProto(&pbj, s.logger) if err := s.deleteExecutionsTxFunc(name)(tx); err != nil { return err