From 2c0a458d3b7fcc7cae5bb4b3a2f5684e7aaa6779 Mon Sep 17 00:00:00 2001 From: kpango Date: Fri, 21 Jul 2023 15:55:44 +0900 Subject: [PATCH] refactor index manager service Signed-off-by: kpango --- internal/timeutil/time_test.go | 6 ++++++ pkg/manager/index/service/indexer.go | 26 ++++++++++++++------------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/internal/timeutil/time_test.go b/internal/timeutil/time_test.go index 577ba2a6a95..3f343cd46d0 100644 --- a/internal/timeutil/time_test.go +++ b/internal/timeutil/time_test.go @@ -80,6 +80,12 @@ func TestParse(t *testing.T) { want: 0, wantErr: true, }, + { + name: "returns 0 and incorrect string error when t is minus value", + t: "-1", + want: 0, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/manager/index/service/indexer.go b/pkg/manager/index/service/indexer.go index 365db870c6f..7eb71427e2c 100644 --- a/pkg/manager/index/service/indexer.go +++ b/pkg/manager/index/service/indexer.go @@ -121,7 +121,9 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { ech <- err case err = <-sech: ech <- err - case <-it.C: + case <-it.C: // index duration ticker + // execute CreateIndex. This execution ignores low index agent, + // and does not immediately execute SaveIndex. err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateIndex"), true, false) if err != nil { ech <- err @@ -129,7 +131,9 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { err = nil } it.Reset(idx.indexDuration) - case <-itl.C: + case <-itl.C: // index duration limit ticker + // execute CreateIndex. This execution always executes CreateIndex regardless of the state of the uncommitted index, + // but does not immediately execute SaveIndex. err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateIndex"), false, false) if err != nil { ech <- err @@ -137,7 +141,9 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { err = nil } itl.Reset(idx.indexDurationLimit) - case <-stl.C: + case <-stl.C: // save index duration limit ticker + // execute CreateIndex. This execution always executes CreateIndex regardless of the state of the uncommitted index, + // and immediately execute SaveIndex using CreateAndSaveIndex operation. err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateAndSaveIndex"), false, true) if err != nil { ech <- err @@ -162,12 +168,12 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { select { case <-ctx.Done(): return - case addr := <-idx.saveIndexTargetAddrCh: - idx.schMap.Delete(addr) + case addr := <-idx.saveIndexTargetAddrCh: // this channel value send from execute func after thier CreateIndex operation when argument immediateSaving=false. _, err := idx.client.GetClient(). Do(grpc.WithGRPCMethod(ctx, "core.v1.Agent/SaveIndex"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { return agent.NewAgentClient(conn).SaveIndex(ctx, &payload.Empty{}, copts...) }) + idx.schMap.Delete(addr) // unlock duplicate signal sending. if err != nil { log.Warnf("an error occurred while calling SaveIndex of %s: %s", addr, err) select { @@ -198,7 +204,7 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip, immediateSavi idx.indexing.Store(true) defer idx.indexing.Store(false) addrs := idx.client.GetAddrs(ctx) - err = idx.client.GetClient().OrderedRangeConcurrent(ctx, addrs, + return errors.Join(idx.client.GetClient().OrderedRangeConcurrent(ctx, addrs, idx.concurrency, func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption, @@ -227,7 +233,7 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip, immediateSavi log.Warnf("an error occurred while calling CreateIndex of %s: %s", addr, err) return err } - _, ok := idx.schMap.Load(addr) + _, ok := idx.schMap.Load(addr) // prevent duplicate save index signal. if !ok { select { case <-ctx.Done(): @@ -249,11 +255,7 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip, immediateSavi } idx.waitForNextSaving(ctx) return nil - }) - if err != nil { - return err - } - return idx.loadInfos(ctx) + }), idx.loadInfos(ctx)) } func (idx *index) waitForNextSaving(ctx context.Context) {