Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Dec 13, 2022
1 parent 1cc3781 commit 27a1d42
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 19 deletions.
14 changes: 11 additions & 3 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type ngt struct {
// counters
nocie uint64 // number of create index execution
nogce uint64 // number of proactive GC execution
wfci uint64 // wait for create indexing

// configurations
inMem bool // in-memory mode
Expand Down Expand Up @@ -827,6 +828,12 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
if ic == 0 {
return errors.ErrUncommittedIndexNotFound
}
wf := atomic.AddUint64(&n.wfci, 1)
if wf > 1 {
atomic.AddUint64(&n.wfci, ^uint64(0))
log.Warnf("concurrent create index waiting detected this request will be ignored, concurrent: %d", wf)
return nil
}
err = func() error {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
Expand All @@ -835,10 +842,12 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
runtime.Gosched()
select {
case <-ctx.Done():
atomic.AddUint64(&n.wfci, ^uint64(0))
return ctx.Err()
case <-ticker.C:
}
}
atomic.AddUint64(&n.wfci, ^uint64(0))
return nil
}()
if err != nil {
Expand Down Expand Up @@ -1183,12 +1192,11 @@ func (n *ngt) Exists(uuid string) (oid uint32, ok bool) {
if !ok {
oid, ok = n.kvs.Get(uuid)
if !ok {
log.Debugf("Exists\tuuid: %s's data not found in kvsdb and insert vqueue\terror: %v", uuid, errors.ErrObjectIDNotFound(uuid))
// log.Debugf("Exists\tuuid: %s's data not found in kvsdb and insert vqueue\terror: %v", uuid, errors.ErrObjectIDNotFound(uuid))
return 0, false
}
if n.vq.DVExists(uuid) {
log.Debugf("Exists\tuuid: %s's data found in kvsdb and not found in insert vqueue, but delete vqueue data exists. the object will be delete soon\terror: %v",
uuid, errors.ErrObjectIDNotFound(uuid))
// log.Debugf("Exists\tuuid: %s's data found in kvsdb and not found in insert vqueue, but delete vqueue data exists. the object will be delete soon\terror: %v", uuid, errors.ErrObjectIDNotFound(uuid))
return 0, false
}
}
Expand Down
53 changes: 37 additions & 16 deletions pkg/agent/core/ngt/service/ngt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11349,7 +11349,9 @@ type index struct {

func Test_ngt_InsertUpsert(t *testing.T) {
type args struct {
idxes []index
idxes []index
poolSize uint32
bulkSize int
}
type fields struct {
svcCfg *config.NGT
Expand Down Expand Up @@ -11437,7 +11439,9 @@ func Test_ngt_InsertUpsert(t *testing.T) {
{
name: "insert & upsert 100 random",
args: args{
idxes: createRandomData(10000000),
idxes: createRandomData(10000000),
poolSize: 100000,
bulkSize: 100000,
},
fields: fields{
svcCfg: &config.NGT{
Expand Down Expand Up @@ -11478,7 +11482,7 @@ func Test_ngt_InsertUpsert(t *testing.T) {
if err != nil {
tt.Errorf("failed to init ngt service, error = %v", err)
}

var wg sync.WaitGroup
count := 0
for _, idx := range test.args.idxes {
count++
Expand All @@ -11487,36 +11491,53 @@ func Test_ngt_InsertUpsert(t *testing.T) {
tt.Errorf("error = %v", err)
}

if count%1000 == 0 {
err = n.CreateAndSaveIndex(ctx, 100)
if err != nil {
tt.Errorf("error creating index: %v", err)
}
if count >= test.args.bulkSize {
wg.Add(1)
go func() {
defer wg.Done()
err = n.CreateAndSaveIndex(ctx, test.args.poolSize)
if err != nil {
tt.Errorf("error creating index: %v", err)
}
}()
count = 0
}
}
wg.Wait()

err = n.CreateAndSaveIndex(ctx, uint32(count%100))
err = n.CreateAndSaveIndex(ctx, test.args.poolSize)
if err != nil {
tt.Errorf("error creating index: %v", err)
}

var wgu sync.WaitGroup
count = 0
for _, idx := range test.args.idxes {
count++
err = n.Update(idx.uuid, idx.vec)
err = n.Delete(idx.uuid)
if err != nil {
tt.Errorf("delete error = %v", err)
}
err = n.Insert(idx.uuid, idx.vec)
if err := checkFunc(test.want, err); err != nil {
tt.Errorf("error = %v", err)
}

if count%1000 == 0 {
err = n.CreateAndSaveIndex(ctx, 100)
if err != nil {
tt.Errorf("error creating index: %v", err)
}
if count >= test.args.bulkSize {
wgu.Add(1)
go func() {
defer wgu.Done()
err = n.CreateAndSaveIndex(ctx, test.args.poolSize)
if err != nil {
tt.Errorf("error creating index: %v", err)
}
}()
count = 0
}
}
wgu.Wait()

err = n.CreateAndSaveIndex(ctx, uint32(count%100))
err = n.CreateAndSaveIndex(ctx, test.args.poolSize)
if err != nil {
tt.Errorf("error creating index: %v", err)
}
Expand Down

0 comments on commit 27a1d42

Please sign in to comment.