diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index ab67520d03..44f8b834be 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -163,10 +163,14 @@ func (c *correct) correct(ctx context.Context) (err error) { curTargetAgent := 0 jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency) if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, - func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { + func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) { // current address is the leftAgentAddrs[0] because this is OrderedRange and // leftAgentAddrs is copied from c.agentAddrs defer func() { + if err != nil { + // catch the err that happened in this scope using named return err + jobErrs = append(jobErrs, err) + } curTargetAgent++ }() @@ -187,7 +191,6 @@ func (c *correct) correct(ctx context.Context) (err error) { vc := vald.NewValdClient(conn) stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{}) if err != nil { - jobErrs = append(jobErrs, err) return err } @@ -215,11 +218,6 @@ func (c *correct) correct(ctx context.Context) (err error) { log.Info("bbolt all batch finished") } - // Aggregate errors for job status - if err != nil { - jobErrs = append(jobErrs, err) - } - log.Infof("correction finished for agent %s", addr) return err