diff --git a/pkg/discoverer/k8s/service/discover.go b/pkg/discoverer/k8s/service/discover.go index a4e315cfda..c98931ee6f 100644 --- a/pkg/discoverer/k8s/service/discover.go +++ b/pkg/discoverer/k8s/service/discover.go @@ -49,8 +49,8 @@ type Discoverer interface { type discoverer struct { maxPods int nodes valdsync.Map[string, *node.Node] - nodeMetrics nodeMetricsMap - pods podsMap + nodeMetrics valdsync.Map[string, mnode.Node] + pods valdsync.Map[string, []pod.Pod] podMetrics valdsync.Map[string, mpod.Pod] podsByNode atomic.Value podsByNamespace atomic.Value diff --git a/pkg/discoverer/k8s/service/nodemetricsmap.go b/pkg/discoverer/k8s/service/nodemetricsmap.go deleted file mode 100644 index 4199e31ee8..0000000000 --- a/pkg/discoverer/k8s/service/nodemetricsmap.go +++ /dev/null @@ -1,392 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package service - -import ( - "sync" - "sync/atomic" - "unsafe" - - mnode "github.com/vdaas/vald/internal/k8s/metrics/node" -) - -// Map is like a Go map[interface{}]interface{} but is safe for concurrent use -// by multiple goroutines without additional locking or coordination. -// Loads, stores, and deletes run in amortized constant time. -// -// The Map type is specialized. Most code should use a plain Go map instead, -// with separate locking or coordination, for better type safety and to make it -// easier to maintain other invariants along with the map content. -// -// The Map type is optimized for two common use cases: (1) when the entry for a given -// key is only ever written once but read many times, as in caches that only grow, -// or (2) when multiple goroutines read, write, and overwrite entries for disjoint -// sets of keys. In these two cases, use of a Map may significantly reduce lock -// contention compared to a Go map paired with a separate Mutex or RWMutex. -// -// The zero Map is empty and ready for use. A Map must not be copied after first use. -type nodeMetricsMap struct { - mu sync.Mutex - - // read contains the portion of the map's contents that are safe for - // concurrent access (with or without mu held). - // - // The read field itself is always safe to load, but must only be stored with - // mu held. - // - // Entries stored in read may be updated concurrently without mu, but updating - // a previously-expunged entry requires that the entry be copied to the dirty - // map and unexpunged with mu held. - read atomic.Value // readOnly - - // dirty contains the portion of the map's contents that require mu to be - // held. To ensure that the dirty map can be promoted to the read map quickly, - // it also includes all of the non-expunged entries in the read map. - // - // Expunged entries are not stored in the dirty map. An expunged entry in the - // clean map must be unexpunged and added to the dirty map before a new value - // can be stored to it. - // - // If the dirty map is nil, the next write to the map will initialize it by - // making a shallow copy of the clean map, omitting stale entries. - dirty map[string]*entryNodeMetricsMap - - // misses counts the number of loads since the read map was last updated that - // needed to lock mu to determine whether the key was present. - // - // Once enough misses have occurred to cover the cost of copying the dirty - // map, the dirty map will be promoted to the read map (in the unamended - // state) and the next store to the map will make a new dirty copy. - misses int -} - -// readOnly is an immutable struct stored atomically in the Map.read field. -type readOnlyNodeMetricsMap struct { - m map[string]*entryNodeMetricsMap - amended bool // true if the dirty map contains some key not in m. -} - -// expunged is an arbitrary pointer that marks entries which have been deleted -// from the dirty map. -// skipcq: GSC-G103 -var expungedNodeMetricsMap = unsafe.Pointer(new(mnode.Node)) - -// An entry is a slot in the map corresponding to a particular key. -type entryNodeMetricsMap struct { - // p points to the interface{} value stored for the entry. - // - // If p == nil, the entry has been deleted and m.dirty == nil. - // - // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry - // is missing from m.dirty. - // - // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty - // != nil, in m.dirty[key]. - // - // An entry can be deleted by atomic replacement with nil: when m.dirty is - // next created, it will atomically replace nil with expunged and leave - // m.dirty[key] unset. - // - // An entry's associated value can be updated by atomic replacement, provided - // p != expunged. If p == expunged, an entry's associated value can be updated - // only after first setting m.dirty[key] = e so that lookups using the dirty - // map find the entry. - p unsafe.Pointer // *interface{} -} - -func newEntryNodeMetricsMap(i mnode.Node) *entryNodeMetricsMap { - // skipcq: GSC-G103 - return &entryNodeMetricsMap{p: unsafe.Pointer(&i)} -} - -// Load returns the value stored in the map for a key, or nil if no -// value is present. -// The ok result indicates whether value was found in the map. -func (m *nodeMetricsMap) Load(key string) (value mnode.Node, ok bool) { - read, _ := m.read.Load().(readOnlyNodeMetricsMap) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - // Avoid reporting a spurious miss if m.dirty got promoted while we were - // blocked on m.mu. (If further loads of the same key will not miss, it's - // not worth copying the dirty map for this key.) - read, _ = m.read.Load().(readOnlyNodeMetricsMap) - e, ok = read.m[key] - if !ok && read.amended { - e, ok = m.dirty[key] - // Regardless of whether the entry was present, record a miss: this key - // will take the slow path until the dirty map is promoted to the read - // map. - m.missLocked() - } - m.mu.Unlock() - } - if !ok { - return value, false - } - return e.load() -} - -func (e *entryNodeMetricsMap) load() (value mnode.Node, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expungedNodeMetricsMap { - return value, false - } - return *(*mnode.Node)(p), true -} - -// Store sets the value for a key. -func (m *nodeMetricsMap) Store(key string, value mnode.Node) { - read, _ := m.read.Load().(readOnlyNodeMetricsMap) - if e, ok := read.m[key]; ok && e.tryStore(&value) { - return - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnlyNodeMetricsMap) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - // The entry was previously expunged, which implies that there is a - // non-nil dirty map and this entry is not in it. - m.dirty[key] = e - } - e.storeLocked(&value) - } else if e, ok := m.dirty[key]; ok { - e.storeLocked(&value) - } else { - if !read.amended { - // We're adding the first new key to the dirty map. - // Make sure it is allocated and mark the read-only map as incomplete. - m.dirtyLocked() - m.read.Store(readOnlyNodeMetricsMap{m: read.m, amended: true}) - } - m.dirty[key] = newEntryNodeMetricsMap(value) - } - m.mu.Unlock() -} - -// tryStore stores a value if the entry has not been expunged. -// -// If the entry is expunged, tryStore returns false and leaves the entry -// unchanged. -func (e *entryNodeMetricsMap) tryStore(i *mnode.Node) bool { - for { - p := atomic.LoadPointer(&e.p) - if p == expungedNodeMetricsMap { - return false - } - // skipcq: GSC-G103 - if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { - return true - } - } -} - -// unexpungeLocked ensures that the entry is not marked as expunged. -// -// If the entry was previously expunged, it must be added to the dirty map -// before m.mu is unlocked. -func (e *entryNodeMetricsMap) unexpungeLocked() (wasExpunged bool) { - return atomic.CompareAndSwapPointer(&e.p, expungedNodeMetricsMap, nil) -} - -// storeLocked unconditionally stores a value to the entry. -// -// The entry must be known not to be expunged. -func (e *entryNodeMetricsMap) storeLocked(i *mnode.Node) { - // skipcq: GSC-G103 - atomic.StorePointer(&e.p, unsafe.Pointer(i)) -} - -// LoadOrStore returns the existing value for the key if present. -// Otherwise, it stores and returns the given value. -// The loaded result is true if the value was loaded, false if stored. -func (m *nodeMetricsMap) LoadOrStore(key string, value mnode.Node) (actual mnode.Node, loaded bool) { - // Avoid locking if it's a clean hit. - read, _ := m.read.Load().(readOnlyNodeMetricsMap) - if e, ok := read.m[key]; ok { - actual, loaded, ok := e.tryLoadOrStore(value) - if ok { - return actual, loaded - } - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnlyNodeMetricsMap) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - m.dirty[key] = e - } - actual, loaded, _ = e.tryLoadOrStore(value) - } else if e, ok := m.dirty[key]; ok { - actual, loaded, _ = e.tryLoadOrStore(value) - m.missLocked() - } else { - if !read.amended { - // We're adding the first new key to the dirty map. - // Make sure it is allocated and mark the read-only map as incomplete. - m.dirtyLocked() - m.read.Store(readOnlyNodeMetricsMap{m: read.m, amended: true}) - } - m.dirty[key] = newEntryNodeMetricsMap(value) - actual, loaded = value, false - } - m.mu.Unlock() - - return actual, loaded -} - -// tryLoadOrStore atomically loads or stores a value if the entry is not -// expunged. -// -// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and -// returns with ok==false. -func (e *entryNodeMetricsMap) tryLoadOrStore(i mnode.Node) (actual mnode.Node, loaded, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == expungedNodeMetricsMap { - return actual, false, false - } - if p != nil { - return *(*mnode.Node)(p), true, true - } - - // Copy the interface after the first load to make this method more amenable - // to escape analysis: if we hit the "load" path or the entry is expunged, we - // shouldn't bother heap-allocating. - ic := i - for { - // skipcq: GSC-G103 - if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { - return i, false, true - } - p = atomic.LoadPointer(&e.p) - if p == expungedNodeMetricsMap { - return actual, false, false - } - if p != nil { - return *(*mnode.Node)(p), true, true - } - } -} - -// Delete deletes the value for a key. -func (m *nodeMetricsMap) Delete(key string) { - read, _ := m.read.Load().(readOnlyNodeMetricsMap) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - read, _ = m.read.Load().(readOnlyNodeMetricsMap) - e, ok = read.m[key] - if !ok && read.amended { - delete(m.dirty, key) - } - m.mu.Unlock() - } - if ok { - e.delete() - } -} - -func (e *entryNodeMetricsMap) delete() (hadValue bool) { - for { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expungedNodeMetricsMap { - return false - } - if atomic.CompareAndSwapPointer(&e.p, p, nil) { - return true - } - } -} - -// Range calls f sequentially for each key and value present in the map. -// If f returns false, range stops the iteration. -// -// Range does not necessarily correspond to any consistent snapshot of the Map's -// contents: no key will be visited more than once, but if the value for any key -// is stored or deleted concurrently, Range may reflect any mapping for that key -// from any point during the Range call. -// -// Range may be O(N) with the number of elements in the map even if f returns -// false after a constant number of calls. -func (m *nodeMetricsMap) Range(f func(key string, value mnode.Node) bool) { - // We need to be able to iterate over all of the keys that were already - // present at the start of the call to Range. - // If read.amended is false, then read.m satisfies that property without - // requiring us to hold m.mu for a long time. - read, _ := m.read.Load().(readOnlyNodeMetricsMap) - if read.amended { - // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) - // (assuming the caller does not break out early), so a call to Range - // amortizes an entire copy of the map: we can promote the dirty copy - // immediately! - m.mu.Lock() - read, _ = m.read.Load().(readOnlyNodeMetricsMap) - if read.amended { - read = readOnlyNodeMetricsMap{m: m.dirty} - m.read.Store(read) - m.dirty = nil - m.misses = 0 - } - m.mu.Unlock() - } - - for k, e := range read.m { - v, ok := e.load() - if !ok { - continue - } - if !f(k, v) { - break - } - } -} - -func (m *nodeMetricsMap) missLocked() { - m.misses++ - if m.misses < len(m.dirty) { - return - } - m.read.Store(readOnlyNodeMetricsMap{m: m.dirty}) - m.dirty = nil - m.misses = 0 -} - -func (m *nodeMetricsMap) dirtyLocked() { - if m.dirty != nil { - return - } - - read, _ := m.read.Load().(readOnlyNodeMetricsMap) - m.dirty = make(map[string]*entryNodeMetricsMap, len(read.m)) - for k, e := range read.m { - if !e.tryExpungeLocked() { - m.dirty[k] = e - } - } -} - -func (e *entryNodeMetricsMap) tryExpungeLocked() (isExpunged bool) { - p := atomic.LoadPointer(&e.p) - for p == nil { - if atomic.CompareAndSwapPointer(&e.p, nil, expungedNodeMetricsMap) { - return true - } - p = atomic.LoadPointer(&e.p) - } - return p == expungedNodeMetricsMap -} diff --git a/pkg/discoverer/k8s/service/nodemetricsmap_test.go b/pkg/discoverer/k8s/service/nodemetricsmap_test.go deleted file mode 100644 index e9ca5570a1..0000000000 --- a/pkg/discoverer/k8s/service/nodemetricsmap_test.go +++ /dev/null @@ -1,553 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package service - -// NOT IMPLEMENTED BELOW -// -// func Test_nodeMetricsMap_Load(t *testing.T) { -// type args struct { -// key string -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryNodeMetricsMap -// misses int -// } -// type want struct { -// wantValue mnode.Node -// wantOk bool -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, mnode.Node, bool) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, gotValue mnode.Node, gotOk bool) error { -// if !reflect.DeepEqual(gotValue, w.wantValue) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotValue, w.wantValue) -// } -// if !reflect.DeepEqual(gotOk, w.wantOk) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotOk, w.wantOk) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &nodeMetricsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// gotValue, gotOk := m.Load(test.args.key) -// if err := checkFunc(test.want, gotValue, gotOk); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_nodeMetricsMap_Store(t *testing.T) { -// type args struct { -// key string -// value mnode.Node -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryNodeMetricsMap -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &nodeMetricsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Store(test.args.key, test.args.value) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// } -// -// func Test_nodeMetricsMap_LoadOrStore(t *testing.T) { -// type args struct { -// key string -// value mnode.Node -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryNodeMetricsMap -// misses int -// } -// type want struct { -// wantActual mnode.Node -// wantLoaded bool -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, mnode.Node, bool) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, gotActual mnode.Node, gotLoaded bool) error { -// if !reflect.DeepEqual(gotActual, w.wantActual) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotActual, w.wantActual) -// } -// if !reflect.DeepEqual(gotLoaded, w.wantLoaded) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotLoaded, w.wantLoaded) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &nodeMetricsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// gotActual, gotLoaded := m.LoadOrStore(test.args.key, test.args.value) -// if err := checkFunc(test.want, gotActual, gotLoaded); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_nodeMetricsMap_Delete(t *testing.T) { -// type args struct { -// key string -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryNodeMetricsMap -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &nodeMetricsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Delete(test.args.key) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// } -// -// func Test_nodeMetricsMap_Range(t *testing.T) { -// type args struct { -// f func(key string, value mnode.Node) bool -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryNodeMetricsMap -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// f:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// f:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &nodeMetricsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Range(test.args.f) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// } diff --git a/pkg/discoverer/k8s/service/podsmap.go b/pkg/discoverer/k8s/service/podsmap.go deleted file mode 100644 index aac514a26f..0000000000 --- a/pkg/discoverer/k8s/service/podsmap.go +++ /dev/null @@ -1,392 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package service - -import ( - "sync" - "sync/atomic" - "unsafe" - - "github.com/vdaas/vald/internal/k8s/pod" -) - -// Map is like a Go map[interface{}]interface{} but is safe for concurrent use -// by multiple goroutines without additional locking or coordination. -// Loads, stores, and deletes run in amortized constant time. -// -// The Map type is specialized. Most code should use a plain Go map instead, -// with separate locking or coordination, for better type safety and to make it -// easier to maintain other invariants along with the map content. -// -// The Map type is optimized for two common use cases: (1) when the entry for a given -// key is only ever written once but read many times, as in caches that only grow, -// or (2) when multiple goroutines read, write, and overwrite entries for disjoint -// sets of keys. In these two cases, use of a Map may significantly reduce lock -// contention compared to a Go map paired with a separate Mutex or RWMutex. -// -// The zero Map is empty and ready for use. A Map must not be copied after first use. -type podsMap struct { - mu sync.Mutex - - // read contains the portion of the map's contents that are safe for - // concurrent access (with or without mu held). - // - // The read field itself is always safe to load, but must only be stored with - // mu held. - // - // Entries stored in read may be updated concurrently without mu, but updating - // a previously-expunged entry requires that the entry be copied to the dirty - // map and unexpunged with mu held. - read atomic.Value // readOnly - - // dirty contains the portion of the map's contents that require mu to be - // held. To ensure that the dirty map can be promoted to the read map quickly, - // it also includes all of the non-expunged entries in the read map. - // - // Expunged entries are not stored in the dirty map. An expunged entry in the - // clean map must be unexpunged and added to the dirty map before a new value - // can be stored to it. - // - // If the dirty map is nil, the next write to the map will initialize it by - // making a shallow copy of the clean map, omitting stale entries. - dirty map[string]*entryPodsMap - - // misses counts the number of loads since the read map was last updated that - // needed to lock mu to determine whether the key was present. - // - // Once enough misses have occurred to cover the cost of copying the dirty - // map, the dirty map will be promoted to the read map (in the unamended - // state) and the next store to the map will make a new dirty copy. - misses int -} - -// readOnly is an immutable struct stored atomically in the Map.read field. -type readOnlyPodsMap struct { - m map[string]*entryPodsMap - amended bool // true if the dirty map contains some key not in m. -} - -// expunged is an arbitrary pointer that marks entries which have been deleted -// from the dirty map. -// skipcq: GSC-G103 -var expungedPodsMap = unsafe.Pointer(new([]pod.Pod)) - -// An entry is a slot in the map corresponding to a particular key. -type entryPodsMap struct { - // p points to the interface{} value stored for the entry. - // - // If p == nil, the entry has been deleted and m.dirty == nil. - // - // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry - // is missing from m.dirty. - // - // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty - // != nil, in m.dirty[key]. - // - // An entry can be deleted by atomic replacement with nil: when m.dirty is - // next created, it will atomically replace nil with expunged and leave - // m.dirty[key] unset. - // - // An entry's associated value can be updated by atomic replacement, provided - // p != expunged. If p == expunged, an entry's associated value can be updated - // only after first setting m.dirty[key] = e so that lookups using the dirty - // map find the entry. - p unsafe.Pointer // *interface{} -} - -func newEntryPodsMap(i []pod.Pod) *entryPodsMap { - // skipcq: GSC-G103 - return &entryPodsMap{p: unsafe.Pointer(&i)} -} - -// Load returns the value stored in the map for a key, or nil if no -// value is present. -// The ok result indicates whether value was found in the map. -func (m *podsMap) Load(key string) (value []pod.Pod, ok bool) { - read, _ := m.read.Load().(readOnlyPodsMap) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - // Avoid reporting a spurious miss if m.dirty got promoted while we were - // blocked on m.mu. (If further loads of the same key will not miss, it's - // not worth copying the dirty map for this key.) - read, _ = m.read.Load().(readOnlyPodsMap) - e, ok = read.m[key] - if !ok && read.amended { - e, ok = m.dirty[key] - // Regardless of whether the entry was present, record a miss: this key - // will take the slow path until the dirty map is promoted to the read - // map. - m.missLocked() - } - m.mu.Unlock() - } - if !ok { - return value, false - } - return e.load() -} - -func (e *entryPodsMap) load() (value []pod.Pod, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expungedPodsMap { - return value, false - } - return *(*[]pod.Pod)(p), true -} - -// Store sets the value for a key. -func (m *podsMap) Store(key string, value []pod.Pod) { - read, _ := m.read.Load().(readOnlyPodsMap) - if e, ok := read.m[key]; ok && e.tryStore(&value) { - return - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnlyPodsMap) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - // The entry was previously expunged, which implies that there is a - // non-nil dirty map and this entry is not in it. - m.dirty[key] = e - } - e.storeLocked(&value) - } else if e, ok := m.dirty[key]; ok { - e.storeLocked(&value) - } else { - if !read.amended { - // We're adding the first new key to the dirty map. - // Make sure it is allocated and mark the read-only map as incomplete. - m.dirtyLocked() - m.read.Store(readOnlyPodsMap{m: read.m, amended: true}) - } - m.dirty[key] = newEntryPodsMap(value) - } - m.mu.Unlock() -} - -// tryStore stores a value if the entry has not been expunged. -// -// If the entry is expunged, tryStore returns false and leaves the entry -// unchanged. -func (e *entryPodsMap) tryStore(i *[]pod.Pod) bool { - for { - p := atomic.LoadPointer(&e.p) - if p == expungedPodsMap { - return false - } - // skipcq: GSC-G103 - if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { - return true - } - } -} - -// unexpungeLocked ensures that the entry is not marked as expunged. -// -// If the entry was previously expunged, it must be added to the dirty map -// before m.mu is unlocked. -func (e *entryPodsMap) unexpungeLocked() (wasExpunged bool) { - return atomic.CompareAndSwapPointer(&e.p, expungedPodsMap, nil) -} - -// storeLocked unconditionally stores a value to the entry. -// -// The entry must be known not to be expunged. -func (e *entryPodsMap) storeLocked(i *[]pod.Pod) { - // skipcq: GSC-G103 - atomic.StorePointer(&e.p, unsafe.Pointer(i)) -} - -// LoadOrStore returns the existing value for the key if present. -// Otherwise, it stores and returns the given value. -// The loaded result is true if the value was loaded, false if stored. -func (m *podsMap) LoadOrStore(key string, value []pod.Pod) (actual []pod.Pod, loaded bool) { - // Avoid locking if it's a clean hit. - read, _ := m.read.Load().(readOnlyPodsMap) - if e, ok := read.m[key]; ok { - actual, loaded, ok := e.tryLoadOrStore(value) - if ok { - return actual, loaded - } - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnlyPodsMap) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - m.dirty[key] = e - } - actual, loaded, _ = e.tryLoadOrStore(value) - } else if e, ok := m.dirty[key]; ok { - actual, loaded, _ = e.tryLoadOrStore(value) - m.missLocked() - } else { - if !read.amended { - // We're adding the first new key to the dirty map. - // Make sure it is allocated and mark the read-only map as incomplete. - m.dirtyLocked() - m.read.Store(readOnlyPodsMap{m: read.m, amended: true}) - } - m.dirty[key] = newEntryPodsMap(value) - actual, loaded = value, false - } - m.mu.Unlock() - - return actual, loaded -} - -// tryLoadOrStore atomically loads or stores a value if the entry is not -// expunged. -// -// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and -// returns with ok==false. -func (e *entryPodsMap) tryLoadOrStore(i []pod.Pod) (actual []pod.Pod, loaded, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == expungedPodsMap { - return actual, false, false - } - if p != nil { - return *(*[]pod.Pod)(p), true, true - } - - // Copy the interface after the first load to make this method more amenable - // to escape analysis: if we hit the "load" path or the entry is expunged, we - // shouldn't bother heap-allocating. - ic := i - for { - // skipcq: GSC-G103 - if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { - return i, false, true - } - p = atomic.LoadPointer(&e.p) - if p == expungedPodsMap { - return actual, false, false - } - if p != nil { - return *(*[]pod.Pod)(p), true, true - } - } -} - -// Delete deletes the value for a key. -func (m *podsMap) Delete(key string) { - read, _ := m.read.Load().(readOnlyPodsMap) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - read, _ = m.read.Load().(readOnlyPodsMap) - e, ok = read.m[key] - if !ok && read.amended { - delete(m.dirty, key) - } - m.mu.Unlock() - } - if ok { - e.delete() - } -} - -func (e *entryPodsMap) delete() (hadValue bool) { - for { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expungedPodsMap { - return false - } - if atomic.CompareAndSwapPointer(&e.p, p, nil) { - return true - } - } -} - -// Range calls f sequentially for each key and value present in the map. -// If f returns false, range stops the iteration. -// -// Range does not necessarily correspond to any consistent snapshot of the Map's -// contents: no key will be visited more than once, but if the value for any key -// is stored or deleted concurrently, Range may reflect any mapping for that key -// from any point during the Range call. -// -// Range may be O(N) with the number of elements in the map even if f returns -// false after a constant number of calls. -func (m *podsMap) Range(f func(key string, value []pod.Pod) bool) { - // We need to be able to iterate over all of the keys that were already - // present at the start of the call to Range. - // If read.amended is false, then read.m satisfies that property without - // requiring us to hold m.mu for a long time. - read, _ := m.read.Load().(readOnlyPodsMap) - if read.amended { - // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) - // (assuming the caller does not break out early), so a call to Range - // amortizes an entire copy of the map: we can promote the dirty copy - // immediately! - m.mu.Lock() - read, _ = m.read.Load().(readOnlyPodsMap) - if read.amended { - read = readOnlyPodsMap{m: m.dirty} - m.read.Store(read) - m.dirty = nil - m.misses = 0 - } - m.mu.Unlock() - } - - for k, e := range read.m { - v, ok := e.load() - if !ok { - continue - } - if !f(k, v) { - break - } - } -} - -func (m *podsMap) missLocked() { - m.misses++ - if m.misses < len(m.dirty) { - return - } - m.read.Store(readOnlyPodsMap{m: m.dirty}) - m.dirty = nil - m.misses = 0 -} - -func (m *podsMap) dirtyLocked() { - if m.dirty != nil { - return - } - - read, _ := m.read.Load().(readOnlyPodsMap) - m.dirty = make(map[string]*entryPodsMap, len(read.m)) - for k, e := range read.m { - if !e.tryExpungeLocked() { - m.dirty[k] = e - } - } -} - -func (e *entryPodsMap) tryExpungeLocked() (isExpunged bool) { - p := atomic.LoadPointer(&e.p) - for p == nil { - if atomic.CompareAndSwapPointer(&e.p, nil, expungedPodsMap) { - return true - } - p = atomic.LoadPointer(&e.p) - } - return p == expungedPodsMap -} diff --git a/pkg/discoverer/k8s/service/podsmap_test.go b/pkg/discoverer/k8s/service/podsmap_test.go deleted file mode 100644 index 6a2e8ce5f8..0000000000 --- a/pkg/discoverer/k8s/service/podsmap_test.go +++ /dev/null @@ -1,553 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package service - -// NOT IMPLEMENTED BELOW -// -// func Test_podsMap_Load(t *testing.T) { -// type args struct { -// key string -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryPodsMap -// misses int -// } -// type want struct { -// wantValue []pod.Pod -// wantOk bool -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, []pod.Pod, bool) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, gotValue []pod.Pod, gotOk bool) error { -// if !reflect.DeepEqual(gotValue, w.wantValue) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotValue, w.wantValue) -// } -// if !reflect.DeepEqual(gotOk, w.wantOk) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotOk, w.wantOk) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &podsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// gotValue, gotOk := m.Load(test.args.key) -// if err := checkFunc(test.want, gotValue, gotOk); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_podsMap_Store(t *testing.T) { -// type args struct { -// key string -// value []pod.Pod -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryPodsMap -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &podsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Store(test.args.key, test.args.value) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// } -// -// func Test_podsMap_LoadOrStore(t *testing.T) { -// type args struct { -// key string -// value []pod.Pod -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryPodsMap -// misses int -// } -// type want struct { -// wantActual []pod.Pod -// wantLoaded bool -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, []pod.Pod, bool) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, gotActual []pod.Pod, gotLoaded bool) error { -// if !reflect.DeepEqual(gotActual, w.wantActual) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotActual, w.wantActual) -// } -// if !reflect.DeepEqual(gotLoaded, w.wantLoaded) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotLoaded, w.wantLoaded) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &podsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// gotActual, gotLoaded := m.LoadOrStore(test.args.key, test.args.value) -// if err := checkFunc(test.want, gotActual, gotLoaded); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_podsMap_Delete(t *testing.T) { -// type args struct { -// key string -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryPodsMap -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &podsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Delete(test.args.key) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// } -// -// func Test_podsMap_Range(t *testing.T) { -// type args struct { -// f func(key string, value []pod.Pod) bool -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryPodsMap -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// f:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// f:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &podsMap{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Range(test.args.f) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// } diff --git a/pkg/manager/index/service/indexer.go b/pkg/manager/index/service/indexer.go index dcc55397cf..365db870c6 100644 --- a/pkg/manager/index/service/indexer.go +++ b/pkg/manager/index/service/indexer.go @@ -276,7 +276,7 @@ func (idx *index) loadInfos(ctx context.Context) (err error) { }() var u, ucu uint32 - var infoMap indexInfos + var infoMap valdsync.Map[string, *payload.Info_Index_Count] err = idx.client.GetClient().RangeConcurrent(ctx, len(idx.client.GetAddrs(ctx)), func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption, diff --git a/pkg/manager/index/service/indexinfos.go b/pkg/manager/index/service/indexinfos.go deleted file mode 100644 index 7e08b9c59b..0000000000 --- a/pkg/manager/index/service/indexinfos.go +++ /dev/null @@ -1,212 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package service - -import ( - "sync" - "sync/atomic" - "unsafe" - - "github.com/vdaas/vald/apis/grpc/v1/payload" -) - -type indexInfos struct { - mu sync.Mutex - read atomic.Value - dirty map[string]*entryIndexInfos - misses int -} - -type readOnlyIndexInfos struct { - m map[string]*entryIndexInfos - amended bool -} - -// skipcq: GSC-G103 -var expungedIndexInfos = unsafe.Pointer(new(*payload.Info_Index_Count)) - -type entryIndexInfos struct { - p unsafe.Pointer -} - -func newEntryIndexInfos(i *payload.Info_Index_Count) *entryIndexInfos { - // skipcq: GSC-G103 - return &entryIndexInfos{p: unsafe.Pointer(&i)} -} - -func (m *indexInfos) Load(key string) (value *payload.Info_Index_Count, ok bool) { - read, _ := m.read.Load().(readOnlyIndexInfos) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - read, _ = m.read.Load().(readOnlyIndexInfos) - e, ok = read.m[key] - if !ok && read.amended { - e, ok = m.dirty[key] - m.missLocked() - } - m.mu.Unlock() - } - if !ok { - return value, false - } - return e.load() -} - -func (e *entryIndexInfos) load() (value *payload.Info_Index_Count, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expungedIndexInfos { - return value, false - } - return *(**payload.Info_Index_Count)(p), true -} - -func (m *indexInfos) Store(key string, value *payload.Info_Index_Count) { - read, _ := m.read.Load().(readOnlyIndexInfos) - if e, ok := read.m[key]; ok && e.tryStore(&value) { - return - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnlyIndexInfos) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - m.dirty[key] = e - } - e.storeLocked(&value) - } else if e, ok := m.dirty[key]; ok { - e.storeLocked(&value) - } else { - if !read.amended { - m.dirtyLocked() - m.read.Store(readOnlyIndexInfos{m: read.m, amended: true}) - } - m.dirty[key] = newEntryIndexInfos(value) - } - m.mu.Unlock() -} - -func (e *entryIndexInfos) tryStore(i **payload.Info_Index_Count) bool { - for { - p := atomic.LoadPointer(&e.p) - if p == expungedIndexInfos { - return false - } - // skipcq: GSC-G103 - if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { - return true - } - } -} - -func (e *entryIndexInfos) unexpungeLocked() (wasExpunged bool) { - return atomic.CompareAndSwapPointer(&e.p, expungedIndexInfos, nil) -} - -func (e *entryIndexInfos) storeLocked(i **payload.Info_Index_Count) { - // skipcq: GSC-G103 - atomic.StorePointer(&e.p, unsafe.Pointer(i)) -} - -func (m *indexInfos) Delete(key string) { - read, _ := m.read.Load().(readOnlyIndexInfos) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - read, _ = m.read.Load().(readOnlyIndexInfos) - e, ok = read.m[key] - if !ok && read.amended { - delete(m.dirty, key) - } - m.mu.Unlock() - } - if ok { - e.delete() - } -} - -func (e *entryIndexInfos) delete() (hadValue bool) { - for { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expungedIndexInfos { - return false - } - if atomic.CompareAndSwapPointer(&e.p, p, nil) { - return true - } - } -} - -func (m *indexInfos) Range(f func(key string, value *payload.Info_Index_Count) bool) { - read, _ := m.read.Load().(readOnlyIndexInfos) - if read.amended { - m.mu.Lock() - read, _ = m.read.Load().(readOnlyIndexInfos) - if read.amended { - read = readOnlyIndexInfos{m: m.dirty} - m.read.Store(read) - m.dirty = nil - m.misses = 0 - } - m.mu.Unlock() - } - - for k, e := range read.m { - v, ok := e.load() - if !ok { - continue - } - if !f(k, v) { - break - } - } -} - -func (m *indexInfos) missLocked() { - m.misses++ - if m.misses < len(m.dirty) { - return - } - m.read.Store(readOnlyIndexInfos{m: m.dirty}) - m.dirty = nil - m.misses = 0 -} - -func (m *indexInfos) dirtyLocked() { - if m.dirty != nil { - return - } - - read, _ := m.read.Load().(readOnlyIndexInfos) - m.dirty = make(map[string]*entryIndexInfos, len(read.m)) - for k, e := range read.m { - if !e.tryExpungeLocked() { - m.dirty[k] = e - } - } -} - -func (e *entryIndexInfos) tryExpungeLocked() (isExpunged bool) { - p := atomic.LoadPointer(&e.p) - for p == nil { - if atomic.CompareAndSwapPointer(&e.p, nil, expungedIndexInfos) { - return true - } - p = atomic.LoadPointer(&e.p) - } - return p == expungedIndexInfos -} diff --git a/pkg/manager/index/service/indexinfos_test.go b/pkg/manager/index/service/indexinfos_test.go deleted file mode 100644 index 0922d8fbce..0000000000 --- a/pkg/manager/index/service/indexinfos_test.go +++ /dev/null @@ -1,439 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package service - -// NOT IMPLEMENTED BELOW -// -// func Test_indexInfos_Load(t *testing.T) { -// type args struct { -// key string -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryIndexInfos -// misses int -// } -// type want struct { -// wantValue *payload.Info_Index_Count -// wantOk bool -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, *payload.Info_Index_Count, bool) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, gotValue *payload.Info_Index_Count, gotOk bool) error { -// if !reflect.DeepEqual(gotValue, w.wantValue) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotValue, w.wantValue) -// } -// if !reflect.DeepEqual(gotOk, w.wantOk) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotOk, w.wantOk) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &indexInfos{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// gotValue, gotOk := m.Load(test.args.key) -// if err := checkFunc(test.want, gotValue, gotOk); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_indexInfos_Store(t *testing.T) { -// type args struct { -// key string -// value *payload.Info_Index_Count -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryIndexInfos -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// value:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &indexInfos{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Store(test.args.key, test.args.value) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// } -// -// func Test_indexInfos_Delete(t *testing.T) { -// type args struct { -// key string -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryIndexInfos -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// key:"", -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &indexInfos{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Delete(test.args.key) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// } -// -// func Test_indexInfos_Range(t *testing.T) { -// type args struct { -// f func(key string, value *payload.Info_Index_Count) bool -// } -// type fields struct { -// read atomic.Value -// dirty map[string]*entryIndexInfos -// misses int -// } -// type want struct { -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want) error { -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// f:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// f:nil, -// }, -// fields: fields { -// read:nil, -// dirty:nil, -// misses:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &indexInfos{ -// read: test.fields.read, -// dirty: test.fields.dirty, -// misses: test.fields.misses, -// } -// -// m.Range(test.args.f) -// if err := checkFunc(test.want); err != nil { -// tt.Errorf("error = %v", err) -// } -// }) -// } -// }