Skip to content

Commit

Permalink
Add logger to FromProto instantiation (#971)
Browse files Browse the repository at this point in the history
* Add logger to FromProto instantiation

It was failing under certain conditions
  • Loading branch information
Victor Castell authored May 24, 2021
1 parent f386690 commit 620e9c7
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion dkron/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (d *dkronFSM) applySetJob(buf []byte) interface{} {
if err := proto.Unmarshal(buf, &pj); err != nil {
return err
}
job := NewJobFromProto(&pj)
job := NewJobFromProto(&pj, d.logger)
if err := d.store.SetJob(job, false); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequ
}

// If everything is ok, add the job to the scheduler
job := NewJobFromProto(setJobReq.Job)
job := NewJobFromProto(setJobReq.Job, grpcs.logger)
job.Agent = grpcs.agent
if err := grpcs.agent.sched.AddJob(job); err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions dkron/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (grpcc *GRPCClient) GetJob(addr, jobName string) (*Job, error) {
return nil, err
}

return NewJobFromProto(gjr.Job), nil
return NewJobFromProto(gjr.Job, grpcc.logger), nil
}

// Leave calls Leave method on the gRPC server
Expand Down Expand Up @@ -225,7 +225,7 @@ func (grpcc *GRPCClient) DeleteJob(jobName string) (*Job, error) {
return nil, err
}

job := NewJobFromProto(res.Job)
job := NewJobFromProto(res.Job, grpcc.logger)

return job, nil
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func (grpcc *GRPCClient) RunJob(jobName string) (*Job, error) {
return nil, err
}

job := NewJobFromProto(res.Job)
job := NewJobFromProto(res.Job, grpcc.logger)

return job, nil
}
Expand Down
4 changes: 3 additions & 1 deletion dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,12 @@ type Job struct {
// Computed next execution
Next time.Time `json:"next"`

ExpiresAt ntime.NullableTime `json:"expires_at"`
logger *logrus.Entry
}

// NewJobFromProto create a new Job from a PB Job struct
func NewJobFromProto(in *proto.Job) *Job {
func NewJobFromProto(in *proto.Job, logger *logrus.Entry) *Job {
next, _ := ptypes.Timestamp(in.GetNext())

job := &Job{
Expand All @@ -154,6 +155,7 @@ func NewJobFromProto(in *proto.Job) *Job {
Status: in.Status,
Metadata: in.Metadata,
Next: next,
logger: logger,
}
if in.GetLastSuccess().GetHasValue() {
t, _ := ptypes.Timestamp(in.GetLastSuccess().GetTime())
Expand Down
2 changes: 1 addition & 1 deletion dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestNewJobFromProto(t *testing.T) {
}
in.Processors = proc

j := NewJobFromProto(in)
j := NewJobFromProto(in, nil)
assert.Equal(t, testConfig, j.Processors)
}

Expand Down
8 changes: 4 additions & 4 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error {
return err
}

ej = NewJobFromProto(&pbej)
ej = NewJobFromProto(&pbej, s.logger)

if ej.Name != "" {
// When the job runs, these status vars are updated
Expand Down Expand Up @@ -328,7 +328,7 @@ func (s *Store) GetJobs(options *JobOptions) ([]*Job, error) {
return false
}
}
job := NewJobFromProto(&pbj)
job := NewJobFromProto(&pbj, s.logger)
job.logger = s.logger

if options == nil ||
Expand Down Expand Up @@ -364,7 +364,7 @@ func (s *Store) GetJob(name string, options *JobOptions) (*Job, error) {
return nil, err
}

job := NewJobFromProto(&pbj)
job := NewJobFromProto(&pbj, s.logger)
job.logger = s.logger

return job, nil
Expand Down Expand Up @@ -410,7 +410,7 @@ func (s *Store) DeleteJob(name string) (*Job, error) {
if len(pbj.DependentJobs) > 0 {
return ErrDependentJobs
}
job = NewJobFromProto(&pbj)
job = NewJobFromProto(&pbj, s.logger)

if err := s.deleteExecutionsTxFunc(name)(tx); err != nil {
return err
Expand Down

0 comments on commit 620e9c7

Please sign in to comment.