Skip to content

Commit

Permalink
[bugfix] append timing of vqueue existing map
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed May 31, 2021
1 parent e61079c commit 2041d7c
Show file tree
Hide file tree
Showing 5 changed files with 828 additions and 42 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ require (
golang.org/x/net v0.0.0-20210525063256-abc453219eb5
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea
golang.org/x/sys v0.0.0-20210531080801-fdfd190a6549
gonum.org/v1/hdf5 v0.0.0-20200504100616-496fefe91614
gonum.org/v1/plot v0.9.0
google.golang.org/api v0.47.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,8 @@ golang.org/x/sys v0.0.0-20210503080704-8803ae5d1324/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea h1:+WiDlPBBaO+h9vPNZi8uJ3k4BkKQB7Iow3aqwHVA5hI=
golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210531080801-fdfd190a6549 h1:OL5GcZ2XPkte3dpfuFQ9o884vrE3BZQhajdntNMruv4=
golang.org/x/sys v0.0.0-20210531080801-fdfd190a6549/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
62 changes: 23 additions & 39 deletions pkg/agent/core/ngt/service/vqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ type Queue interface {
}

type vqueue struct {
ich chan index // ich is insert channel
uii []index // uii is un inserted index
imu sync.Mutex // insert mutex
uiim map[string]index // uiim is un inserted index map (this value is used for GetVector operation to return queued vector cache data)
dch chan key // dch is delete channel
udk []key // udk is un deleted key
dmu sync.Mutex // delete mutex
udim map[string]int64 // udim is un deleted index map (this value is used for Exists operation to return cache data existence)
ich chan index // ich is insert channel
uii []index // uii is un inserted index
imu sync.Mutex // insert mutex
uiim uiim // uiim is un inserted index map (this value is used for GetVector operation to return queued vector cache data)
dch chan key // dch is delete channel
udk []key // udk is un deleted key
dmu sync.Mutex // delete mutex
udim udim // udim is un deleted index map (this value is used for Exists operation to return cache data existence)
eg errgroup.Group
finalizingInsert atomic.Value
finalizingDelete atomic.Value
Expand Down Expand Up @@ -96,10 +96,8 @@ func New(opts ...Option) (Queue, error) {
}
vq.ich = make(chan index, vq.ichSize)
vq.uii = make([]index, 0, vq.iBufSize)
vq.uiim = make(map[string]index, vq.iBufSize)
vq.dch = make(chan key, vq.dchSize)
vq.udk = make([]key, 0, vq.dBufSize)
vq.udim = make(map[string]int64, vq.dBufSize)
vq.finalizingInsert.Store(false)
vq.finalizingDelete.Store(false)
vq.closed.Store(true)
Expand Down Expand Up @@ -164,11 +162,13 @@ func (v *vqueue) PushInsert(uuid string, vector []float32, date int64) error {
if date == 0 {
date = time.Now().UnixNano()
}
v.ich <- index{
idx := index{
uuid: uuid,
vector: vector,
date: date,
}
v.uiim.Store(uuid, idx)
v.ich <- idx
return nil
}

Expand All @@ -180,6 +180,7 @@ func (v *vqueue) PushDelete(uuid string, date int64) error {
if date == 0 {
date = time.Now().UnixNano()
}
v.udim.Store(uuid, date)
v.dch <- key{
uuid: uuid,
date: date,
Expand Down Expand Up @@ -230,16 +231,12 @@ func (v *vqueue) RangePopDelete(ctx context.Context, f func(uuid string) bool) {
}

func (v *vqueue) GetVector(uuid string) ([]float32, bool) {
v.imu.Lock()
vec, ok := v.uiim[uuid]
v.imu.Unlock()
vec, ok := v.uiim.Load(uuid)
if !ok {
// data not in the insert queue then return not exists(false)
return nil, false
}
v.dmu.Lock()
di, ok := v.udim[uuid]
v.dmu.Unlock()
di, ok := v.udim.Load(uuid)
if !ok {
// data not in the delete queue but exists in insert queue then return exists(true)
return vec.vector, true
Expand All @@ -252,16 +249,12 @@ func (v *vqueue) GetVector(uuid string) ([]float32, bool) {
}

func (v *vqueue) IVExists(uuid string) bool {
v.imu.Lock()
vec, ok := v.uiim[uuid]
v.imu.Unlock()
vec, ok := v.uiim.Load(uuid)
if !ok {
// data not in the insert queue then return not exists(false)
return false
}
v.dmu.Lock()
di, ok := v.udim[uuid]
v.dmu.Unlock()
di, ok := v.udim.Load(uuid)
if !ok {
// data not in the delete queue but exists in insert queue then return exists(true)
return true
Expand All @@ -271,15 +264,11 @@ func (v *vqueue) IVExists(uuid string) bool {
}

func (v *vqueue) DVExists(uuid string) bool {
v.dmu.Lock()
di, ok := v.udim[uuid]
v.dmu.Unlock()
di, ok := v.udim.Load(uuid)
if !ok {
return false
}
v.imu.Lock()
vec, ok := v.uiim[uuid]
v.imu.Unlock()
vec, ok := v.uiim.Load(uuid)
if !ok {
// data not in the insert queue then return not exists(false)
return true
Expand All @@ -292,14 +281,12 @@ func (v *vqueue) DVExists(uuid string) bool {
func (v *vqueue) addInsert(i index) {
v.imu.Lock()
v.uii = append(v.uii, i)
v.uiim[i.uuid] = i
v.imu.Unlock()
}

func (v *vqueue) addDelete(d key) {
v.dmu.Lock()
v.udk = append(v.udk, d)
v.udim[d.uuid] = d.date
v.dmu.Unlock()
}

Expand All @@ -316,9 +303,7 @@ func (v *vqueue) flushAndLoadInsert() (uii []index) {
dup := make(map[string]bool, len(uii)/2)
dl := make([]int, 0, len(uii)/2)
for i, idx := range uii {
v.imu.Lock()
delete(v.uiim, idx.uuid)
v.imu.Unlock()
v.uiim.Delete(idx.uuid)
// if duplicated data exists current loop's data is old due to the uii's sort order
if dup[idx.uuid] {
// if duplicated add id to delete wait list
Expand Down Expand Up @@ -351,9 +336,7 @@ func (v *vqueue) flushAndLoadDelete() (udk []key) {
dup := make(map[string]bool, len(udk)/2)
dl := make([]int, 0, len(udk)/2)
for i, idx := range udk {
v.dmu.Lock()
delete(v.udim, idx.uuid)
v.dmu.Unlock()
v.udim.Delete(idx.uuid)
if dup[idx.uuid] {
dl = append(dl, i)
} else {
Expand Down Expand Up @@ -391,12 +374,13 @@ func (v *vqueue) flushAndLoadDelete() (udk []key) {
v.imu.Unlock()
sort.Sort(sort.Reverse(sort.IntSlice(dl)))
for _, i := range dl {
v.imu.Lock()
// remove from existing map
delete(v.uiim, v.uii[i].uuid)
v.imu.Lock()
uuid := v.uii[i].uuid
// remove unnecessary insert vector queue data
v.uii = append(v.uii[:i], v.uii[i+1:]...)
v.imu.Unlock()
v.uiim.Load(uuid)
}
return udk
}
Expand Down
Loading

0 comments on commit 2041d7c

Please sign in to comment.