Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix NGT flush logic #2598

Merged
merged 17 commits into from
Sep 11, 2024
6 changes: 6 additions & 0 deletions pkg/agent/core/ngt/handler/grpc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
},
}, info.Get())...)
code = codes.FailedPrecondition
case errors.Is(err, errors.ErrFlushingIsInProgress):
err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...)
code = codes.Aborted

Check warning on line 69 in pkg/agent/core/ngt/handler/grpc/index.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/index.go#L67-L69

Added lines #L67 - L69 were not covered by tests
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(fmt.Sprintf("CreateIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...)
code = codes.Canceled
Expand Down Expand Up @@ -149,6 +152,9 @@
},
}, info.Get())...)
code = codes.FailedPrecondition
case errors.Is(err, errors.ErrFlushingIsInProgress):
err = status.WrapWithAborted("CreateAndSaveIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...)
code = codes.Aborted

Check warning on line 157 in pkg/agent/core/ngt/handler/grpc/index.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/index.go#L155-L157

Added lines #L155 - L157 were not covered by tests
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...)
code = codes.Canceled
Expand Down
40 changes: 26 additions & 14 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@
}
return ctx.Err()
case <-tick.C:
if n.vq.IVQLen() >= n.alen {
if n != nil && n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen {

Check warning on line 911 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L911

Added line #L911 was not covered by tests
err = n.CreateIndex(ctx, n.poolSize)
}
case <-limit.C:
Expand Down Expand Up @@ -1249,7 +1249,7 @@
runtime.GC()
atomic.AddUint64(&n.nogce, 1)

if n.inMem {
if !n.inMem {

Check warning on line 1252 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1252

Added line #L1252 was not covered by tests
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
// delete file
err = file.DeleteDir(ctx, n.path)
if err != nil {
Expand All @@ -1265,20 +1265,20 @@
}
}

nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency))

nvq, err := vqueue.New()
if err != nil {
log.Errorf("failed to create new vector vector queue. error: %v", err)
}
// nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency))
//
// nvq, err := vqueue.New()
// if err != nil {
// log.Errorf("failed to create new vector vector queue. error: %v", err)
// }

// renew instance
nn, err := newNGT(n.cfg, n.opts...)
if err != nil {
return err
}
nn.kvs = nkvs
nn.vq = nvq
// nn.kvs = nkvs
// nn.vq = nvq

// Regenerate with flags set
nn.flushing.Store(true)
Expand All @@ -1299,8 +1299,11 @@
}
}()

if n.isReadReplica {
switch {
case n.isReadReplica:
return errors.ErrWriteOperationToReadReplica
case n.IsFlushing():
return errors.ErrFlushingIsInProgress

Check warning on line 1306 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1305-L1306

Added lines #L1305 - L1306 were not covered by tests
}

ic := n.vq.IVQLen() + n.vq.DVQLen()
Comment on lines +1307 to 1311
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note

Added logic to check if a vqueue reference could be nil if a flush is running.

Expand Down Expand Up @@ -1941,15 +1944,24 @@
}

func (n *ngt) Len() uint64 {
return n.kvs.Len()
if n != nil && n.kvs != nil && !n.IsFlushing() {
return n.kvs.Len()
}
return 0

Check warning on line 1950 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1950

Added line #L1950 was not covered by tests
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
}

func (n *ngt) InsertVQueueBufferLen() uint64 {
return uint64(n.vq.IVQLen())
if n != nil && n.vq != nil && !n.IsFlushing() {
return uint64(n.vq.IVQLen())
}
return 0

Check warning on line 1957 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1957

Added line #L1957 was not covered by tests
}

func (n *ngt) DeleteVQueueBufferLen() uint64 {
return uint64(n.vq.DVQLen())
if n != nil && n.vq != nil && !n.IsFlushing() {
return uint64(n.vq.DVQLen())
}
return 0

Check warning on line 1964 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1964

Added line #L1964 was not covered by tests
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
}

func (n *ngt) GetDimensionSize() int {
Expand Down
Loading