Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Aug 16, 2023
1 parent 567c79a commit 38336bf
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
52 changes: 37 additions & 15 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type Corrector interface {
}

type correct struct {
eg errgroup.Group
cfg *config.Data
discoverer discoverer.Client
indexInfos valdsync.Map[string, *payload.Info_Index_Count]
Expand Down Expand Up @@ -74,6 +73,12 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) {
return nil, err
}

// DEBUG:
c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool {
log.Debugf("index info: addr(%s), stored(%d), uncommitted(%d)", addr, info.GetStored(), info.GetUncommitted())
return true
})

// This blocks. Should we run with errorgroup?
log.Info("starting correction...")
if err := c.correct(ctx, addrs); err != nil {
Expand Down Expand Up @@ -104,12 +109,24 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) {
func (c *correct) correct(ctx context.Context, addrs []string) (err error) {
if err := c.discoverer.GetClient().OrderedRange(ctx, addrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
eg, ctx := errgroup.New(ctx)
eg.Limitation(c.cfg.Server.GetGRPCStreamConcurrency())

vc := vald.NewValdClient(conn)
stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{})
if err != nil {
return err
}

finalize := func() error {
err = eg.Wait()
if err != nil {
log.Errorf("err group returned error: %v", err)
return err
}
return nil
}

for {
select {
case <-ctx.Done():
Expand All @@ -118,9 +135,10 @@ func (c *correct) correct(ctx context.Context, addrs []string) (err error) {
res, err := stream.Recv()
if errors.Is(err, io.EOF) {
log.Debugf("StreamListObject stream finished for agent %s", addr)
return nil
return finalize()
}
if err != nil {
log.Errorf("StreamListObject stream finished unexpectedly: %v", err)
return err
}

Expand All @@ -132,19 +150,23 @@ func (c *correct) correct(ctx context.Context, addrs []string) (err error) {
}

log.Debugf("received object in StreamListObject: agent(%s), id(%s), timestamp(%v)", addr, res.GetVector().GetId(), res.GetVector().GetTimestamp())
if err := c.checkConsistency(
ctx,
&vectorReplica{
addr: addr,
vec: res.GetVector(),
},
addrs,
); err != nil {
// TODO: errors.Join?
// keep processing other vectors even if one vector failed
log.Error(err)
continue
}
eg.Go(func() error {
if err := c.checkConsistency(
ctx,
&vectorReplica{
addr: addr,
vec: res.GetVector(),
},
addrs,
); err != nil {
// TODO: errors.Join?
// keep processing other vectors even if one vector failed
log.Error(err)
// continue
return err
}
return nil
})
}
}
},
Expand Down
6 changes: 5 additions & 1 deletion pkg/index/job/correction/usecase/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,15 @@ func (c *run) Start(ctx context.Context) (<-chan error, error) {

start := time.Now()
dech, err := c.corrector.Start(ctx)
if err != nil {
log.Errorf("index correction process failed: %v", err)
return nil, err
}
end := time.Since(start)
log.Infof("correction finished in %v", end)

// FIXME: 以下をやめてシンプルにStartを抜けたらself SIGTERMで終了させる方がいいかも
// その場合echは無視する
// その場合echは無視することになる
ech := make(chan error, 100)
c.eg.Go(safety.RecoverFunc(func() error {
for {
Expand Down

0 comments on commit 38336bf

Please sign in to comment.