From d389ce2f211725af60a8dcfe884842ff473b44f1 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 15 Nov 2021 10:42:45 +0800 Subject: [PATCH] feat: replace sortedList with sortedUniqueList (#793) * feat: replace sortedList with sortedUniqueList Signed-off-by: Gaius --- manager/types/cdn_cluster.go | 2 +- manager/types/scheduler_cluster.go | 2 +- pkg/container/list/mocks/list_mock.go | 48 ++ pkg/container/list/sorted_list.go | 137 +++ pkg/container/list/sorted_list_test.go | 766 +++++++++++++++++ pkg/container/list/sorted_unique_list.go | 106 +++ pkg/container/list/sorted_unique_list_test.go | 786 ++++++++++++++++++ pkg/container/set/safe_set.go | 101 +++ pkg/container/set/safe_set_test.go | 382 +++++++++ pkg/container/set/set.go | 78 ++ pkg/container/set/set_test.go | 234 ++++++ pkg/structure/sortedlist/bucket.go | 49 -- pkg/structure/sortedlist/sorted_list.go | 237 ------ pkg/structure/sortedlist/sorted_list_test.go | 190 ----- scheduler/core/events.go | 19 +- .../core/scheduler/basic/basic_scheduler.go | 4 +- scheduler/supervisor/host.go | 5 + scheduler/supervisor/mocks/host_mock.go | 4 +- scheduler/supervisor/peer.go | 17 +- scheduler/supervisor/peer_test.go | 8 +- scheduler/supervisor/task.go | 87 +- scheduler/supervisor/task_test.go | 8 - 22 files changed, 2715 insertions(+), 555 deletions(-) create mode 100644 pkg/container/list/mocks/list_mock.go create mode 100644 pkg/container/list/sorted_list.go create mode 100644 pkg/container/list/sorted_list_test.go create mode 100644 pkg/container/list/sorted_unique_list.go create mode 100644 pkg/container/list/sorted_unique_list_test.go create mode 100644 pkg/container/set/safe_set.go create mode 100644 pkg/container/set/safe_set_test.go create mode 100644 pkg/container/set/set.go create mode 100644 pkg/container/set/set_test.go delete mode 100644 pkg/structure/sortedlist/bucket.go delete mode 100644 pkg/structure/sortedlist/sorted_list.go delete mode 100644 pkg/structure/sortedlist/sorted_list_test.go diff --git a/manager/types/cdn_cluster.go b/manager/types/cdn_cluster.go index eaf308f49..33c9379b0 100644 --- a/manager/types/cdn_cluster.go +++ b/manager/types/cdn_cluster.go @@ -51,6 +51,6 @@ type GetCDNClustersQuery struct { } type CDNClusterConfig struct { - LoadLimit uint `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1"` + LoadLimit uint `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"` NetTopology string `yaml:"netTopology" mapstructure:"netTopology" json:"net_topology"` } diff --git a/manager/types/scheduler_cluster.go b/manager/types/scheduler_cluster.go index 9f0efb715..58da0b352 100644 --- a/manager/types/scheduler_cluster.go +++ b/manager/types/scheduler_cluster.go @@ -57,7 +57,7 @@ type SchedulerClusterConfig struct { } type SchedulerClusterClientConfig struct { - LoadLimit uint `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1"` + LoadLimit uint `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"` } type SchedulerClusterScopes struct { diff --git a/pkg/container/list/mocks/list_mock.go b/pkg/container/list/mocks/list_mock.go new file mode 100644 index 000000000..5453e83df --- /dev/null +++ b/pkg/container/list/mocks/list_mock.go @@ -0,0 +1,48 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: d7y.io/dragonfly/v2/pkg/container/list (interfaces: Item) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockItem is a mock of Item interface. +type MockItem struct { + ctrl *gomock.Controller + recorder *MockItemMockRecorder +} + +// MockItemMockRecorder is the mock recorder for MockItem. +type MockItemMockRecorder struct { + mock *MockItem +} + +// NewMockItem creates a new mock instance. +func NewMockItem(ctrl *gomock.Controller) *MockItem { + mock := &MockItem{ctrl: ctrl} + mock.recorder = &MockItemMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockItem) EXPECT() *MockItemMockRecorder { + return m.recorder +} + +// SortedValue mocks base method. +func (m *MockItem) SortedValue() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SortedValue") + ret0, _ := ret[0].(int) + return ret0 +} + +// SortedValue indicates an expected call of SortedValue. +func (mr *MockItemMockRecorder) SortedValue() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SortedValue", reflect.TypeOf((*MockItem)(nil).SortedValue)) +} diff --git a/pkg/container/list/sorted_list.go b/pkg/container/list/sorted_list.go new file mode 100644 index 000000000..cb108c8f4 --- /dev/null +++ b/pkg/container/list/sorted_list.go @@ -0,0 +1,137 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate mockgen -destination ./mocks/list_mock.go -package mocks d7y.io/dragonfly/v2/pkg/container/list Item + +package list + +import ( + "container/list" + "sync" +) + +type Item interface { + SortedValue() int +} + +type SortedList interface { + Len() int + Insert(Item) + Remove(Item) + Contains(Item) bool + Range(func(Item) bool) + ReverseRange(fn func(Item) bool) +} + +type sortedList struct { + mu *sync.RWMutex + container *list.List +} + +func NewSortedList() SortedList { + return &sortedList{ + mu: &sync.RWMutex{}, + container: list.New(), + } +} + +func (l *sortedList) Len() int { + l.mu.RLock() + defer l.mu.RUnlock() + + return l.container.Len() +} + +func (l *sortedList) Insert(item Item) { + l.mu.Lock() + defer l.mu.Unlock() + + for e := l.container.Front(); e != nil; e = e.Next() { + v, ok := e.Value.(Item) + if !ok { + continue + } + + if v.SortedValue() >= item.SortedValue() { + l.container.InsertBefore(item, e) + return + } + } + + l.container.PushBack(item) +} + +func (l *sortedList) Remove(item Item) { + l.mu.Lock() + defer l.mu.Unlock() + + for e := l.container.Front(); e != nil; e = e.Next() { + v, ok := e.Value.(Item) + if !ok { + continue + } + + if v == item { + l.container.Remove(e) + return + } + } +} + +func (l *sortedList) Contains(item Item) bool { + l.mu.RLock() + defer l.mu.RUnlock() + + for e := l.container.Front(); e != nil; e = e.Next() { + if v, ok := e.Value.(Item); ok && v == item { + return true + } + } + + return false +} + +func (l *sortedList) Range(fn func(Item) bool) { + l.mu.RLock() + defer l.mu.RUnlock() + + for e := l.container.Front(); e != nil; e = e.Next() { + v, ok := e.Value.(Item) + if !ok { + continue + } + + if !fn(v) { + return + } + } +} + +func (l *sortedList) ReverseRange(fn func(Item) bool) { + l.mu.RLock() + defer l.mu.RUnlock() + + for e := l.container.Back(); e != nil; e = e.Prev() { + v, ok := e.Value.(Item) + if !ok { + continue + } + + if !fn(v) { + return + } + } +} diff --git a/pkg/container/list/sorted_list_test.go b/pkg/container/list/sorted_list_test.go new file mode 100644 index 000000000..2201cecc5 --- /dev/null +++ b/pkg/container/list/sorted_list_test.go @@ -0,0 +1,766 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package list + +import ( + "math/rand" + "runtime" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/pkg/container/list/mocks" +) + +const N = 1000 + +func TestSortedListInsert(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, l SortedList, items ...Item) + }{ + { + name: "insert values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + assert.Equal(l.Contains(items[0]), true) + assert.Equal(l.Len(), 1) + }, + }, + { + name: "insert multi value succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[1]) + assert.Equal(l.Contains(items[0]), true) + assert.Equal(l.Contains(items[1]), true) + assert.Equal(l.Len(), 2) + }, + }, + { + name: "insert same values", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(2), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[0]) + assert.Equal(l.Contains(items[0]), true) + assert.Equal(l.Len(), 2) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)} + tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT()) + tc.expect(t, NewSortedList(), mockItems[0], mockItems[1]) + }) + } +} + +func TestSortedListInsert_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes() + + l := NewSortedList() + nums := rand.Perm(N) + + var wg sync.WaitGroup + wg.Add(len(nums)) + for i := 0; i < len(nums); i++ { + go func(i int) { + l.Insert(mockItem) + wg.Done() + }(i) + } + + wg.Wait() + count := 0 + l.Range(func(item Item) bool { + count++ + return true + }) + if count != len(nums) { + t.Errorf("SortedList is missing element") + } +} + +func TestSortedListRemove(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, l SortedList, items ...Item) + }{ + { + name: "remove values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[1]) + assert.Equal(l.Contains(items[0]), true) + assert.Equal(l.Contains(items[1]), true) + assert.Equal(l.Len(), 2) + l.Remove(items[0]) + assert.Equal(l.Contains(items[0]), false) + assert.Equal(l.Len(), 1) + l.Remove(items[1]) + assert.Equal(l.Contains(items[1]), false) + assert.Equal(l.Len(), 0) + }, + }, + { + name: "remove value dost not exits", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + assert.Equal(l.Contains(items[0]), true) + assert.Equal(l.Len(), 1) + l.Remove(items[1]) + assert.Equal(l.Contains(items[0]), true) + assert.Equal(l.Len(), 1) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)} + tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT()) + tc.expect(t, NewSortedList(), mockItems[0], mockItems[1]) + }) + } +} + +func TestSortedListRemove_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes() + + l := NewSortedList() + nums := rand.Perm(N) + + for i := 0; i < len(nums); i++ { + l.Insert(mockItem) + } + + var wg sync.WaitGroup + wg.Add(len(nums)) + for i := 0; i < len(nums); i++ { + go func(i int) { + l.Remove(mockItem) + wg.Done() + }(i) + } + + wg.Wait() + if l.Len() != 0 { + t.Errorf("SortedList is redundant elements") + } +} + +func TestSortedListContains(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, l SortedList, items ...Item) + }{ + { + name: "contains values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[1]) + assert.Equal(l.Contains(items[0]), true) + assert.Equal(l.Contains(items[1]), true) + }, + }, + { + name: "contains value dost not exits", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + assert.Equal(l.Contains(items[1]), false) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)} + tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT()) + tc.expect(t, NewSortedList(), mockItems[0], mockItems[1]) + }) + } +} + +func TestSortedListContains_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes() + + l := NewSortedList() + nums := rand.Perm(N) + for range nums { + l.Insert(mockItem) + } + + var wg sync.WaitGroup + for range nums { + wg.Add(1) + go func() { + if ok := l.Contains(mockItem); !ok { + t.Error("SortedList contains error") + } + wg.Done() + }() + } + wg.Wait() +} + +func TestSortedListLen(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, l SortedList, items ...Item) + }{ + { + name: "get length succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[1]) + assert.Equal(l.Len(), 2) + }, + }, + { + name: "get empty list length", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + assert.Equal(l.Len(), 0) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)} + tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT()) + tc.expect(t, NewSortedList(), mockItems[0], mockItems[1]) + }) + } +} + +func TestSortedListLen_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes() + + l := NewSortedList() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + elems := l.Len() + for i := 0; i < N; i++ { + newElems := l.Len() + if newElems < elems { + t.Errorf("Len shrunk from %v to %v", elems, newElems) + } + } + wg.Done() + }() + + for i := 0; i < N; i++ { + l.Insert(mockItem) + } + wg.Wait() +} + +func TestSortedListRange(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, l SortedList, items ...Item) + }{ + { + name: "range succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[1]) + assert.Equal(l.Len(), 2) + + i := 0 + l.Range(func(item Item) bool { + assert.Equal(item, items[i]) + i++ + return true + }) + }, + }, + { + name: "range multi values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + for i := range m { + m[i].SortedValue().Return(i).AnyTimes() + } + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + for _, item := range items { + l.Insert(item) + } + assert.Equal(l.Len(), 10) + + i := 0 + l.Range(func(item Item) bool { + assert.Equal(item, items[i]) + i++ + return true + }) + }, + }, + { + name: "range stoped", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[1]) + assert.Equal(l.Len(), 2) + + l.Range(func(item Item) bool { + assert.Equal(item, items[0]) + return false + }) + }, + }, + { + name: "range same values", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).AnyTimes(), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[0]) + l.Insert(items[0]) + assert.Equal(l.Len(), 3) + + count := 0 + l.Range(func(item Item) bool { + assert.Equal(item, items[0]) + count++ + return true + }) + assert.Equal(count, 3) + }, + }, + { + name: "range empty list", + mock: func(m ...*mocks.MockItemMockRecorder) { + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + count := 0 + l.Range(func(item Item) bool { + count++ + return true + }) + assert.Equal(count, 0) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + var mockItems []Item + var mockItemRecorders []*mocks.MockItemMockRecorder + for i := 0; i < 10; i++ { + mockItem := mocks.NewMockItem(ctl) + mockItemRecorders = append(mockItemRecorders, mockItem.EXPECT()) + mockItems = append(mockItems, mockItem) + } + + tc.mock(mockItemRecorders...) + tc.expect(t, NewSortedList(), mockItems...) + }) + } +} + +func TestSortedListRange_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes() + + l := NewSortedList() + var wg sync.WaitGroup + wg.Add(1) + go func() { + i := 0 + l.Range(func(_ Item) bool { + i++ + return true + }) + + j := 0 + l.Range(func(_ Item) bool { + j++ + return true + }) + if j < i { + t.Errorf("Values shrunk from %v to %v", i, j) + } + wg.Done() + }() + + for i := 0; i < N; i++ { + l.Insert(mockItem) + } + wg.Wait() +} + +func TestSortedListReverseRange(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, l SortedList, items ...Item) + }{ + { + name: "reverse range succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[1]) + assert.Equal(l.Len(), 2) + + i := 0 + l.ReverseRange(func(item Item) bool { + assert.Equal(item, items[i]) + i++ + return true + }) + }, + }, + { + name: "reverse range multi values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + for i := range m { + m[i].SortedValue().Return(i).AnyTimes() + } + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + for _, item := range items { + l.Insert(item) + } + assert.Equal(l.Len(), 10) + + i := 9 + l.ReverseRange(func(item Item) bool { + assert.Equal(item, items[i]) + i-- + return true + }) + }, + }, + { + name: "reverse range stoped", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[1]) + assert.Equal(l.Len(), 2) + + l.ReverseRange(func(item Item) bool { + assert.Equal(item, items[1]) + return false + }) + }, + }, + { + name: "reverse range same values", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).AnyTimes(), + ) + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + l.Insert(items[0]) + l.Insert(items[0]) + l.Insert(items[0]) + assert.Equal(l.Len(), 3) + + count := 0 + l.ReverseRange(func(item Item) bool { + assert.Equal(item, items[0]) + count++ + return true + }) + assert.Equal(count, 3) + }, + }, + { + name: "reverse range empty list", + mock: func(m ...*mocks.MockItemMockRecorder) { + }, + expect: func(t *testing.T, l SortedList, items ...Item) { + assert := assert.New(t) + count := 0 + l.ReverseRange(func(item Item) bool { + count++ + return true + }) + assert.Equal(count, 0) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + var mockItems []Item + var mockItemRecorders []*mocks.MockItemMockRecorder + for i := 0; i < 10; i++ { + mockItem := mocks.NewMockItem(ctl) + mockItemRecorders = append(mockItemRecorders, mockItem.EXPECT()) + mockItems = append(mockItems, mockItem) + } + + tc.mock(mockItemRecorders...) + tc.expect(t, NewSortedList(), mockItems...) + }) + } +} + +func TestSortedListReverseRange_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes() + + l := NewSortedList() + var wg sync.WaitGroup + wg.Add(1) + go func() { + i := 0 + l.ReverseRange(func(_ Item) bool { + i++ + return true + }) + + j := 0 + l.ReverseRange(func(_ Item) bool { + j++ + return true + }) + if j < i { + t.Errorf("Values shrunk from %v to %v", i, j) + } + wg.Done() + }() + + for i := 0; i < N; i++ { + l.Insert(mockItem) + } + wg.Wait() +} + +type item struct{ id int } + +func (i *item) SortedValue() int { return rand.Intn(1000) } + +func BenchmarkSortedListInsert(b *testing.B) { + l := NewSortedList() + + var mockItems []*item + for i := 0; i < b.N; i++ { + mockItems = append(mockItems, &item{id: i}) + } + + b.ResetTimer() + for _, mockItem := range mockItems { + l.Insert(mockItem) + } +} + +func BenchmarkSortedListRemove(b *testing.B) { + l := NewSortedList() + + var mockItems []*item + for i := 0; i < b.N; i++ { + mockItems = append(mockItems, &item{id: i}) + } + + for _, mockItem := range mockItems { + l.Insert(mockItem) + } + + b.ResetTimer() + for _, mockItem := range mockItems { + l.Remove(mockItem) + } +} + +func BenchmarkSortedListContains(b *testing.B) { + l := NewSortedList() + + var mockItems []*item + for i := 0; i < b.N; i++ { + mockItems = append(mockItems, &item{id: i}) + } + + for _, mockItem := range mockItems { + l.Insert(mockItem) + } + + b.ResetTimer() + for _, mockItem := range mockItems { + l.Contains(mockItem) + } +} + +func BenchmarkSortedListRange(b *testing.B) { + l := NewSortedList() + + var mockItems []*item + for i := 0; i < b.N; i++ { + mockItems = append(mockItems, &item{id: i}) + } + + for _, mockItem := range mockItems { + l.Insert(mockItem) + } + + b.ResetTimer() + l.Range(func(_ Item) bool { return true }) +} + +func BenchmarkSortedListReverseRange(b *testing.B) { + l := NewSortedList() + + var mockItems []*item + for i := 0; i < b.N; i++ { + mockItems = append(mockItems, &item{id: i}) + } + + for _, mockItem := range mockItems { + l.Insert(mockItem) + } + + b.ResetTimer() + l.ReverseRange(func(_ Item) bool { return true }) +} diff --git a/pkg/container/list/sorted_unique_list.go b/pkg/container/list/sorted_unique_list.go new file mode 100644 index 000000000..5e0232423 --- /dev/null +++ b/pkg/container/list/sorted_unique_list.go @@ -0,0 +1,106 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package list + +import ( + "sync" + + "d7y.io/dragonfly/v2/pkg/container/set" +) + +type SortedUniqueList interface { + Len() int + Insert(Item) + Remove(Item) + Contains(Item) bool + Range(func(Item) bool) + ReverseRange(fn func(Item) bool) +} + +type sortedUniqueList struct { + mu *sync.RWMutex + container SortedList + data set.Set +} + +func NewSortedUniqueList() SortedUniqueList { + return &sortedUniqueList{ + mu: &sync.RWMutex{}, + container: NewSortedList(), + data: set.New(), + } +} + +func (ul *sortedUniqueList) Len() int { + ul.mu.RLock() + defer ul.mu.RUnlock() + + return ul.container.Len() +} + +func (ul *sortedUniqueList) Insert(item Item) { + ul.mu.Lock() + defer ul.mu.Unlock() + + if ok := ul.data.Contains(item); ok { + ul.container.Remove(item) + ul.container.Insert(item) + return + } + + ul.data.Add(item) + ul.container.Insert(item) +} + +func (ul *sortedUniqueList) Remove(item Item) { + ul.mu.Lock() + defer ul.mu.Unlock() + + ul.data.Delete(item) + ul.container.Remove(item) +} + +func (ul *sortedUniqueList) Contains(item Item) bool { + ul.mu.RLock() + defer ul.mu.RUnlock() + + return ul.data.Contains(item) +} + +func (ul *sortedUniqueList) Range(fn func(item Item) bool) { + ul.mu.RLock() + defer ul.mu.RUnlock() + + ul.container.Range(func(item Item) bool { + if !fn(item) { + return false + } + return true + }) +} + +func (ul *sortedUniqueList) ReverseRange(fn func(item Item) bool) { + ul.mu.RLock() + defer ul.mu.RUnlock() + + ul.container.ReverseRange(func(item Item) bool { + if !fn(item) { + return false + } + return true + }) +} diff --git a/pkg/container/list/sorted_unique_list_test.go b/pkg/container/list/sorted_unique_list_test.go new file mode 100644 index 000000000..221908a5c --- /dev/null +++ b/pkg/container/list/sorted_unique_list_test.go @@ -0,0 +1,786 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package list + +import ( + "math/rand" + "runtime" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/pkg/container/list/mocks" +) + +func TestSortedUniqueListInsert(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, ul SortedUniqueList, items ...Item) + }{ + { + name: "insert values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + assert.Equal(ul.Contains(items[0]), true) + assert.Equal(ul.Len(), 1) + }, + }, + { + name: "insert multi value succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[1]) + assert.Equal(ul.Contains(items[0]), true) + assert.Equal(ul.Contains(items[1]), true) + assert.Equal(ul.Len(), 2) + }, + }, + { + name: "insert same values", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[0]) + assert.Equal(ul.Contains(items[0]), true) + assert.Equal(ul.Len(), 1) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)} + tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT()) + tc.expect(t, NewSortedUniqueList(), mockItems[0], mockItems[1]) + }) + } +} + +func TestSortedUniqueListInsert_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + + ul := NewSortedUniqueList() + nums := rand.Perm(N) + + var mockItems []Item + for _, v := range nums { + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes() + mockItems = append(mockItems, mockItem) + } + + var wg sync.WaitGroup + wg.Add(len(mockItems)) + for _, mockItem := range mockItems { + go func(item Item) { + ul.Insert(item) + wg.Done() + }(mockItem) + } + + wg.Wait() + count := 0 + ul.Range(func(item Item) bool { + count++ + return true + }) + if count != len(nums) { + t.Errorf("SortedUniqueList is missing element") + } +} + +func TestSortedUniqueListRemove(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, ul SortedUniqueList, items ...Item) + }{ + { + name: "remove values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[1]) + assert.Equal(ul.Contains(items[0]), true) + assert.Equal(ul.Contains(items[1]), true) + assert.Equal(ul.Len(), 2) + ul.Remove(items[0]) + assert.Equal(ul.Contains(items[0]), false) + assert.Equal(ul.Len(), 1) + ul.Remove(items[1]) + assert.Equal(ul.Contains(items[1]), false) + assert.Equal(ul.Len(), 0) + }, + }, + { + name: "remove value dost not exits", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + assert.Equal(ul.Contains(items[0]), true) + assert.Equal(ul.Len(), 1) + ul.Remove(items[1]) + assert.Equal(ul.Contains(items[0]), true) + assert.Equal(ul.Len(), 1) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)} + tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT()) + tc.expect(t, NewSortedUniqueList(), mockItems[0], mockItems[1]) + }) + } +} + +func TestSortedUniqueListRemove_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + + ul := NewSortedUniqueList() + nums := rand.Perm(N) + + var mockItems []Item + for _, v := range nums { + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes() + mockItems = append(mockItems, mockItem) + ul.Insert(mockItem) + } + + var wg sync.WaitGroup + wg.Add(len(mockItems)) + for _, mockItem := range mockItems { + go func(item Item) { + ul.Remove(item) + wg.Done() + }(mockItem) + } + + wg.Wait() + if ul.Len() != 0 { + t.Errorf("SortedUniqueList is redundant elements") + } +} + +func TestSortedUniqueListContains(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, ul SortedUniqueList, items ...Item) + }{ + { + name: "contains values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[1]) + assert.Equal(ul.Contains(items[0]), true) + assert.Equal(ul.Contains(items[1]), true) + }, + }, + { + name: "contains value dost not exits", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + assert.Equal(ul.Contains(items[1]), false) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)} + tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT()) + tc.expect(t, NewSortedUniqueList(), mockItems[0], mockItems[1]) + }) + } +} + +func TestSortedUniqueListContains_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + + ul := NewSortedUniqueList() + nums := rand.Perm(N) + + var mockItems []Item + for _, v := range nums { + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes() + mockItems = append(mockItems, mockItem) + ul.Insert(mockItem) + } + + var wg sync.WaitGroup + wg.Add(len(mockItems)) + for _, mockItem := range mockItems { + go func(item Item) { + if ok := ul.Contains(item); !ok { + t.Error("SortedUniqueList contains error") + } + wg.Done() + }(mockItem) + } + wg.Wait() +} + +func TestSortedUniqueListLen(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, ul SortedUniqueList, items ...Item) + }{ + { + name: "get length succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[1]) + assert.Equal(ul.Len(), 2) + }, + }, + { + name: "get empty list length", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + assert.Equal(ul.Len(), 0) + }, + }, + { + name: "get same values length", + mock: func(m ...*mocks.MockItemMockRecorder) {}, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[0]) + assert.Equal(ul.Len(), 1) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)} + tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT()) + tc.expect(t, NewSortedUniqueList(), mockItems[0], mockItems[1]) + }) + } +} + +func TestSortedUniqueListLen_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + + ul := NewSortedUniqueList() + nums := rand.Perm(N) + + var mockItems []Item + for _, v := range nums { + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes() + mockItems = append(mockItems, mockItem) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + elems := ul.Len() + for i := 0; i < N; i++ { + newElems := ul.Len() + if newElems < elems { + t.Errorf("Len shrunk from %v to %v", elems, newElems) + } + } + wg.Done() + }() + + for _, mockItem := range mockItems { + ul.Insert(mockItem) + } + wg.Wait() +} + +func TestSortedUniqueListRange(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, ul SortedUniqueList, items ...Item) + }{ + { + name: "range succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[1]) + assert.Equal(ul.Len(), 2) + + i := 0 + ul.Range(func(item Item) bool { + assert.Equal(item, items[i]) + i++ + return true + }) + }, + }, + { + name: "range multi values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + for i := range m { + m[i].SortedValue().Return(i).AnyTimes() + } + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + for _, item := range items { + ul.Insert(item) + } + ul.Insert(items[1]) + assert.Equal(ul.Len(), 10) + + i := 0 + ul.Range(func(item Item) bool { + assert.Equal(item, items[i]) + i++ + return true + }) + }, + }, + { + name: "range stoped", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[1]) + assert.Equal(ul.Len(), 2) + + ul.Range(func(item Item) bool { + assert.Equal(item, items[0]) + return false + }) + }, + }, + { + name: "range same values", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).AnyTimes(), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[0]) + ul.Insert(items[0]) + assert.Equal(ul.Len(), 1) + + count := 0 + ul.Range(func(item Item) bool { + assert.Equal(item, items[0]) + count++ + return true + }) + assert.Equal(count, 1) + }, + }, + { + name: "range empty list", + mock: func(m ...*mocks.MockItemMockRecorder) { + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + count := 0 + ul.Range(func(item Item) bool { + count++ + return true + }) + assert.Equal(count, 0) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + var mockItems []Item + var mockItemRecorders []*mocks.MockItemMockRecorder + for i := 0; i < 10; i++ { + mockItem := mocks.NewMockItem(ctl) + mockItemRecorders = append(mockItemRecorders, mockItem.EXPECT()) + mockItems = append(mockItems, mockItem) + } + + tc.mock(mockItemRecorders...) + tc.expect(t, NewSortedUniqueList(), mockItems...) + }) + } +} + +func TestSortedUniqueListRange_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + + ul := NewSortedUniqueList() + nums := rand.Perm(N) + + var mockItems []Item + for _, v := range nums { + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes() + mockItems = append(mockItems, mockItem) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + i := 0 + ul.Range(func(_ Item) bool { + i++ + return true + }) + + j := 0 + ul.Range(func(_ Item) bool { + j++ + return true + }) + if j < i { + t.Errorf("Values shrunk from %v to %v", i, j) + } + wg.Done() + }() + + for _, mockItem := range mockItems { + ul.Insert(mockItem) + } + wg.Wait() +} + +func TestSortedUniqueListReverseRange(t *testing.T) { + tests := []struct { + name string + mock func(m ...*mocks.MockItemMockRecorder) + expect func(t *testing.T, ul SortedUniqueList, items ...Item) + }{ + { + name: "reverse range succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[1]) + assert.Equal(ul.Len(), 2) + + i := 1 + ul.ReverseRange(func(item Item) bool { + assert.Equal(item, items[i]) + i-- + return true + }) + }, + }, + { + name: "reverse range multi values succeeded", + mock: func(m ...*mocks.MockItemMockRecorder) { + for i := range m { + m[i].SortedValue().Return(i).AnyTimes() + } + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + for _, item := range items { + ul.Insert(item) + } + ul.Insert(items[1]) + assert.Equal(ul.Len(), 10) + + i := 9 + ul.Range(func(item Item) bool { + assert.Equal(item, items[i]) + i-- + return true + }) + }, + }, + { + name: "reverse range stoped", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).Times(1), + m[1].SortedValue().Return(1).Times(1), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[1]) + assert.Equal(ul.Len(), 2) + + ul.ReverseRange(func(item Item) bool { + assert.Equal(item, items[0]) + return false + }) + }, + }, + { + name: "reverse range same values", + mock: func(m ...*mocks.MockItemMockRecorder) { + gomock.InOrder( + m[0].SortedValue().Return(0).AnyTimes(), + ) + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + ul.Insert(items[0]) + ul.Insert(items[0]) + ul.Insert(items[0]) + assert.Equal(ul.Len(), 1) + + count := 0 + ul.ReverseRange(func(item Item) bool { + assert.Equal(item, items[0]) + count++ + return true + }) + assert.Equal(count, 1) + }, + }, + { + name: "reverse range empty list", + mock: func(m ...*mocks.MockItemMockRecorder) { + }, + expect: func(t *testing.T, ul SortedUniqueList, items ...Item) { + assert := assert.New(t) + count := 0 + ul.ReverseRange(func(item Item) bool { + count++ + return true + }) + assert.Equal(count, 0) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + var mockItems []Item + var mockItemRecorders []*mocks.MockItemMockRecorder + for i := 0; i < 10; i++ { + mockItem := mocks.NewMockItem(ctl) + mockItemRecorders = append(mockItemRecorders, mockItem.EXPECT()) + mockItems = append(mockItems, mockItem) + } + + tc.mock(mockItemRecorders...) + tc.expect(t, NewSortedUniqueList(), mockItems...) + }) + } +} + +func TestSortedUniqueListReverseRange_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + ctl := gomock.NewController(t) + defer ctl.Finish() + + ul := NewSortedUniqueList() + nums := rand.Perm(N) + + var mockItems []Item + for _, v := range nums { + mockItem := mocks.NewMockItem(ctl) + mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes() + mockItems = append(mockItems, mockItem) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + i := 0 + ul.ReverseRange(func(_ Item) bool { + i++ + return true + }) + + j := 0 + ul.ReverseRange(func(_ Item) bool { + j++ + return true + }) + if j < i { + t.Errorf("Values shrunk from %v to %v", i, j) + } + wg.Done() + }() + + for _, mockItem := range mockItems { + ul.Insert(mockItem) + } + wg.Wait() +} + +func BenchmarkSortedUniqueListInsert(b *testing.B) { + ul := NewSortedUniqueList() + + var mockItems []*item + for i := 0; i < b.N; i++ { + mockItems = append(mockItems, &item{id: i}) + } + + b.ResetTimer() + for _, mockItem := range mockItems { + ul.Insert(mockItem) + } +} + +func BenchmarkSortedUniqueListRemove(b *testing.B) { + ul := NewSortedUniqueList() + + var mockItems []*item + for i := 0; i < b.N; i++ { + mockItem := &item{id: i} + ul.Insert(mockItem) + mockItems = append(mockItems, mockItem) + } + + b.ResetTimer() + for _, mockItem := range mockItems { + ul.Remove(mockItem) + } +} + +func BenchmarkSortedUniqueListContains(b *testing.B) { + ul := NewSortedUniqueList() + + var mockItems []*item + for i := 0; i < b.N; i++ { + mockItem := &item{id: i} + ul.Insert(mockItem) + mockItems = append(mockItems, mockItem) + } + + b.ResetTimer() + for _, mockItem := range mockItems { + ul.Contains(mockItem) + } +} + +func BenchmarkSortedUniqueListRange(b *testing.B) { + ul := NewSortedUniqueList() + + for i := 0; i < b.N; i++ { + mockItem := item{id: i} + ul.Insert(&mockItem) + } + + b.ResetTimer() + ul.Range(func(_ Item) bool { return true }) +} + +func BenchmarkSortedUniqueListReverseRange(b *testing.B) { + ul := NewSortedUniqueList() + + for i := 0; i < b.N; i++ { + mockItem := item{id: i} + ul.Insert(&mockItem) + } + + b.ResetTimer() + ul.ReverseRange(func(item Item) bool { return true }) +} diff --git a/pkg/container/set/safe_set.go b/pkg/container/set/safe_set.go new file mode 100644 index 000000000..5055ff41f --- /dev/null +++ b/pkg/container/set/safe_set.go @@ -0,0 +1,101 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package set + +import ( + "sync" +) + +type SafeSet interface { + Values() []interface{} + Add(interface{}) bool + Delete(interface{}) + Contains(...interface{}) bool + Len() uint + Range(func(interface{}) bool) +} + +type safeSet struct { + mu *sync.RWMutex + data map[interface{}]struct{} +} + +func NewSafeSet() SafeSet { + return &safeSet{ + mu: &sync.RWMutex{}, + data: make(map[interface{}]struct{}), + } +} + +func (s *safeSet) Values() []interface{} { + var result []interface{} + s.Range(func(v interface{}) bool { + result = append(result, v) + return true + }) + + return result +} + +func (s *safeSet) Add(v interface{}) bool { + s.mu.RLock() + _, found := s.data[v] + if found { + s.mu.RUnlock() + return false + } + s.mu.RUnlock() + + s.mu.Lock() + defer s.mu.Unlock() + s.data[v] = struct{}{} + return true +} + +func (s *safeSet) Delete(v interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.data, v) +} + +func (s *safeSet) Contains(vals ...interface{}) bool { + s.mu.RLock() + defer s.mu.RUnlock() + for _, v := range vals { + if _, ok := s.data[v]; !ok { + return false + } + } + + return true +} + +func (s *safeSet) Len() uint { + s.mu.RLock() + defer s.mu.RUnlock() + return uint(len(s.data)) +} + +func (s *safeSet) Range(fn func(interface{}) bool) { + s.mu.RLock() + defer s.mu.RUnlock() + for v := range s.data { + if !fn(v) { + break + } + } +} diff --git a/pkg/container/set/safe_set_test.go b/pkg/container/set/safe_set_test.go new file mode 100644 index 000000000..101d9ec5a --- /dev/null +++ b/pkg/container/set/safe_set_test.go @@ -0,0 +1,382 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package set + +import ( + "math/rand" + "runtime" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +const N = 1000 + +func TestSafeSetAdd(t *testing.T) { + tests := []struct { + name string + value interface{} + expect func(t *testing.T, ok bool, s Set, value interface{}) + }{ + { + name: "add value succeeded", + value: "foo", + expect: func(t *testing.T, ok bool, s Set, value interface{}) { + assert := assert.New(t) + assert.Equal(ok, true) + assert.Equal(s.Values(), []interface{}{value}) + }, + }, + { + name: "add value failed", + value: "foo", + expect: func(t *testing.T, _ bool, s Set, value interface{}) { + assert := assert.New(t) + ok := s.Add("foo") + assert.Equal(ok, false) + assert.Equal(s.Values(), []interface{}{value}) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := NewSafeSet() + tc.expect(t, s.Add(tc.value), s, tc.value) + }) + } +} + +func TestSafeSetAdd_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + s := NewSafeSet() + nums := rand.Perm(N) + + var wg sync.WaitGroup + wg.Add(len(nums)) + for i := 0; i < len(nums); i++ { + go func(i int) { + s.Add(i) + wg.Done() + }(i) + } + + wg.Wait() + for _, n := range nums { + if !s.Contains(n) { + t.Errorf("Set is missing element: %v", n) + } + } +} + +func TestSafeSetDelete(t *testing.T) { + tests := []struct { + name string + value interface{} + expect func(t *testing.T, s Set, value interface{}) + }{ + { + name: "delete value succeeded", + value: "foo", + expect: func(t *testing.T, s Set, value interface{}) { + assert := assert.New(t) + s.Delete(value) + assert.Equal(s.Len(), uint(0)) + }, + }, + { + name: "delete value does not exist", + value: "foo", + expect: func(t *testing.T, s Set, _ interface{}) { + assert := assert.New(t) + s.Delete("bar") + assert.Equal(s.Len(), uint(1)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := NewSafeSet() + s.Add(tc.value) + tc.expect(t, s, tc.value) + }) + } +} + +func TestSafeSetDelete_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + s := NewSafeSet() + nums := rand.Perm(N) + for _, v := range nums { + s.Add(v) + } + + var wg sync.WaitGroup + wg.Add(len(nums)) + for _, v := range nums { + go func(i int) { + s.Delete(i) + wg.Done() + }(v) + } + wg.Wait() + + if s.Len() != 0 { + t.Errorf("Expected len 0; got %v", s.Len()) + } +} + +func TestSafeSetContains(t *testing.T) { + tests := []struct { + name string + value interface{} + expect func(t *testing.T, s Set, value interface{}) + }{ + { + name: "contains value succeeded", + value: "foo", + expect: func(t *testing.T, s Set, value interface{}) { + assert := assert.New(t) + assert.Equal(s.Contains(value), true) + }, + }, + { + name: "contains value does not exist", + value: "foo", + expect: func(t *testing.T, s Set, _ interface{}) { + assert := assert.New(t) + assert.Equal(s.Contains("bar"), false) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := NewSafeSet() + s.Add(tc.value) + tc.expect(t, s, tc.value) + }) + } +} + +func TestSafeSetContains_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + s := NewSafeSet() + nums := rand.Perm(N) + interfaces := make([]interface{}, 0) + for _, v := range nums { + s.Add(v) + interfaces = append(interfaces, v) + } + + var wg sync.WaitGroup + for range nums { + wg.Add(1) + go func() { + s.Contains(interfaces...) + wg.Done() + }() + } + wg.Wait() +} + +func TestSetSafeLen(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, s Set) + }{ + { + name: "get length succeeded", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + assert.Equal(s.Len(), uint(1)) + }, + }, + { + name: "get empty set length", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + assert.Equal(s.Len(), uint(0)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := NewSafeSet() + tc.expect(t, s) + }) + } +} + +func TestSafeSetLen_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + s := NewSafeSet() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + elems := s.Len() + for i := 0; i < N; i++ { + newElems := s.Len() + if newElems < elems { + t.Errorf("Len shrunk from %v to %v", elems, newElems) + } + } + wg.Done() + }() + + for i := 0; i < N; i++ { + s.Add(rand.Int()) + } + wg.Wait() +} + +func TestSafeSetValues(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, s Set) + }{ + { + name: "get values succeeded", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + assert.Equal(s.Values(), []interface{}{"foo"}) + }, + }, + { + name: "get empty values", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + assert.Equal(s.Values(), []interface{}(nil)) + }, + }, + { + name: "get multi values succeeded", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + s.Add("bar") + assert.Contains(s.Values(), "bar") + assert.Contains(s.Values(), "foo") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := NewSafeSet() + tc.expect(t, s) + }) + } +} + +func TestSafeSetValues_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + s := NewSafeSet() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + elems := s.Values() + for i := 0; i < N; i++ { + newElems := s.Values() + if len(newElems) < len(elems) { + t.Errorf("Values shrunk from %v to %v", elems, newElems) + } + } + wg.Done() + }() + + for i := 0; i < N; i++ { + s.Add(rand.Int()) + } + wg.Wait() +} + +func TestSafeSetRange(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, s Set) + }{ + { + name: "range succeeded", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + s.Range(func(v interface{}) bool { + assert.Equal(v, "foo") + return true + }) + }, + }, + { + name: "range failed", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + s.Add("bar") + s.Range(func(v interface{}) bool { + assert.Equal(s.Contains(v), true) + return false + }) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := NewSafeSet() + tc.expect(t, s) + }) + } +} + +func TestSafeSetRange_Concurrent(t *testing.T) { + runtime.GOMAXPROCS(2) + + s := NewSafeSet() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + elems := s.Values() + i := 0 + s.Range(func(v interface{}) bool { + i++ + return true + }) + if i < len(elems) { + t.Errorf("Values shrunk from %v to %v", elems, i) + } + wg.Done() + }() + + for i := 0; i < N; i++ { + s.Add(rand.Int()) + } + wg.Wait() +} diff --git a/pkg/container/set/set.go b/pkg/container/set/set.go new file mode 100644 index 000000000..89a5c20eb --- /dev/null +++ b/pkg/container/set/set.go @@ -0,0 +1,78 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package set + +type Set interface { + Values() []interface{} + Add(interface{}) bool + Delete(interface{}) + Contains(...interface{}) bool + Len() uint + Range(func(interface{}) bool) +} + +type set map[interface{}]struct{} + +func New() Set { + return &set{} +} + +func (s *set) Values() []interface{} { + var result []interface{} + s.Range(func(v interface{}) bool { + result = append(result, v) + return true + }) + + return result +} + +func (s *set) Add(v interface{}) bool { + _, found := (*s)[v] + if found { + return false + } + + (*s)[v] = struct{}{} + return true +} + +func (s *set) Delete(v interface{}) { + delete(*s, v) +} + +func (s *set) Contains(vals ...interface{}) bool { + for _, v := range vals { + if _, ok := (*s)[v]; !ok { + return false + } + } + + return true +} + +func (s *set) Len() uint { + return uint(len(*s)) +} + +func (s *set) Range(fn func(interface{}) bool) { + for v := range *s { + if !fn(v) { + break + } + } +} diff --git a/pkg/container/set/set_test.go b/pkg/container/set/set_test.go new file mode 100644 index 000000000..db2d5b818 --- /dev/null +++ b/pkg/container/set/set_test.go @@ -0,0 +1,234 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package set + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSetAdd(t *testing.T) { + tests := []struct { + name string + value interface{} + expect func(t *testing.T, ok bool, s Set, value interface{}) + }{ + { + name: "add value succeeded", + value: "foo", + expect: func(t *testing.T, ok bool, s Set, value interface{}) { + assert := assert.New(t) + assert.Equal(ok, true) + assert.Equal(s.Values(), []interface{}{value}) + }, + }, + { + name: "add value failed", + value: "foo", + expect: func(t *testing.T, _ bool, s Set, value interface{}) { + assert := assert.New(t) + ok := s.Add("foo") + assert.Equal(ok, false) + assert.Equal(s.Values(), []interface{}{value}) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := New() + tc.expect(t, s.Add(tc.value), s, tc.value) + }) + } +} + +func TestSetDelete(t *testing.T) { + tests := []struct { + name string + value interface{} + expect func(t *testing.T, s Set, value interface{}) + }{ + { + name: "delete value succeeded", + value: "foo", + expect: func(t *testing.T, s Set, value interface{}) { + assert := assert.New(t) + s.Delete(value) + assert.Equal(s.Len(), uint(0)) + }, + }, + { + name: "delete value does not exist", + value: "foo", + expect: func(t *testing.T, s Set, _ interface{}) { + assert := assert.New(t) + s.Delete("bar") + assert.Equal(s.Len(), uint(1)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := New() + s.Add(tc.value) + tc.expect(t, s, tc.value) + }) + } +} + +func TestSetContains(t *testing.T) { + tests := []struct { + name string + value interface{} + expect func(t *testing.T, s Set, value interface{}) + }{ + { + name: "contains value succeeded", + value: "foo", + expect: func(t *testing.T, s Set, value interface{}) { + assert := assert.New(t) + assert.Equal(s.Contains(value), true) + }, + }, + { + name: "contains value does not exist", + value: "foo", + expect: func(t *testing.T, s Set, _ interface{}) { + assert := assert.New(t) + assert.Equal(s.Contains("bar"), false) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := New() + s.Add(tc.value) + tc.expect(t, s, tc.value) + }) + } +} + +func TestSetLen(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, s Set) + }{ + { + name: "get length succeeded", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + assert.Equal(s.Len(), uint(1)) + }, + }, + { + name: "get empty set length", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + assert.Equal(s.Len(), uint(0)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := New() + tc.expect(t, s) + }) + } +} + +func TestSetValues(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, s Set) + }{ + { + name: "get values succeeded", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + assert.Equal(s.Values(), []interface{}{"foo"}) + }, + }, + { + name: "get empty values", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + assert.Equal(s.Values(), []interface{}(nil)) + }, + }, + { + name: "get multi values succeeded", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + s.Add("bar") + assert.Contains(s.Values(), "bar") + assert.Contains(s.Values(), "foo") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := New() + tc.expect(t, s) + }) + } +} + +func TestSetRange(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, s Set) + }{ + { + name: "range succeeded", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + s.Range(func(v interface{}) bool { + assert.Equal(v, "foo") + return true + }) + }, + }, + { + name: "range failed", + expect: func(t *testing.T, s Set) { + assert := assert.New(t) + s.Add("foo") + s.Add("bar") + s.Range(func(v interface{}) bool { + assert.Equal(s.Contains(v), true) + return false + }) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + s := New() + tc.expect(t, s) + }) + } +} diff --git a/pkg/structure/sortedlist/bucket.go b/pkg/structure/sortedlist/bucket.go deleted file mode 100644 index ef17112b1..000000000 --- a/pkg/structure/sortedlist/bucket.go +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sortedlist - -type empty struct{} - -type bucket struct { - count int - buckets []map[Item]empty -} - -func (b *bucket) Size() int { - return b.count -} - -func (b *bucket) Add(key int, data Item) { - for key >= len(b.buckets) { - b.buckets = append(b.buckets, make(map[Item]empty)) - } - b.buckets[key][data] = empty{} - b.count++ - return -} - -func (b *bucket) Delete(key int, data Item) { - for key >= len(b.buckets) { - b.buckets = append(b.buckets, make(map[Item]empty)) - } - _, ok := b.buckets[key][data] - if ok { - delete(b.buckets[key], data) - b.count-- - } - return -} diff --git a/pkg/structure/sortedlist/sorted_list.go b/pkg/structure/sortedlist/sorted_list.go deleted file mode 100644 index 575cc1a51..000000000 --- a/pkg/structure/sortedlist/sorted_list.go +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sortedlist - -import ( - "fmt" - "sync" -) - -const BucketMaxLength = 100000 -const InnerBucketMaxLength = 10000 - -type Item interface { - GetSortKeys() (key1 int, key2 int) -} - -type SortedList struct { - l sync.RWMutex - buckets []bucket - keyMap map[Item]int - left int - right int -} - -func NewSortedList() *SortedList { - l := &SortedList{ - left: 0, - right: 0, - keyMap: make(map[Item]int), - } - return l -} - -func (l *SortedList) Add(data Item) (err error) { - key1, key2 := data.GetSortKeys() - if key1 > BucketMaxLength || key1 < 0 { - return fmt.Errorf("sorted list key1 out of range") - } - if key2 > InnerBucketMaxLength || key2 < 0 { - return fmt.Errorf("sorted list key2 out of range") - } - l.l.Lock() - defer l.l.Unlock() - l.addItem(key1, key2, data) - return -} -func (l *SortedList) Update(data Item) (err error) { - key1, key2 := data.GetSortKeys() - if key1 > BucketMaxLength || key1 < 0 { - return fmt.Errorf("sorted list key1 out of range") - } - if key2 > InnerBucketMaxLength || key2 < 0 { - return fmt.Errorf("sorted list key2 out of range") - } - - l.l.Lock() - defer l.l.Unlock() - oldKey1, oldKey2, ok := l.getKeyMapKey(data) - if !ok { - return - } - - if key1 == oldKey1 && key2 == oldKey2 { - return - } - - l.deleteItem(oldKey1, oldKey2, data) - l.addItem(key1, key2, data) - return -} - -func (l *SortedList) UpdateOrAdd(data Item) (err error) { - key1, key2 := data.GetSortKeys() - if key1 > BucketMaxLength || key1 < 0 { - return fmt.Errorf("sorted list key1 out of range") - } - if key2 > InnerBucketMaxLength || key2 < 0 { - return fmt.Errorf("sorted list key2 out of range") - } - - l.l.Lock() - defer l.l.Unlock() - oldKey1, oldKey2, ok := l.getKeyMapKey(data) - if !ok { - l.addItem(key1, key2, data) - return - } - - if key1 == oldKey1 && key2 == oldKey2 { - return - } - - l.deleteItem(oldKey1, oldKey2, data) - l.addItem(key1, key2, data) - - return -} - -func (l *SortedList) Delete(data Item) (err error) { - l.l.Lock() - defer l.l.Unlock() - oldKey1, oldKey2, ok := l.getKeyMapKey(data) - if !ok { - return - } - l.deleteItem(oldKey1, oldKey2, data) - return -} - -func (l *SortedList) Range(fn func(data Item) bool) { - l.RangeLimit(-1, fn) -} - -func (l *SortedList) RangeLimit(limit int, fn func(Item) bool) { - if limit == 0 { - return - } - l.l.RLock() - defer l.l.RUnlock() - if len(l.buckets) == 0 { - return - } - count := 0 - for i := l.left; i <= l.right; i++ { - buc := l.buckets[i] - for _, b := range buc.buckets { - for it := range b { - if !fn(it) { - return - } - count++ - if limit > 0 && count >= limit { - return - } - } - } - } -} - -func (l *SortedList) RangeReverse(fn func(data Item) bool) { - l.RangeReverseLimit(-1, fn) -} - -func (l *SortedList) RangeReverseLimit(limit int, fn func(Item) bool) { - if limit == 0 { - return - } - l.l.RLock() - defer l.l.RUnlock() - if len(l.buckets) == 0 { - return - } - count := 0 - for i := l.right; i >= l.left; i-- { - for j := len(l.buckets[i].buckets) - 1; j >= 0; j-- { - for it := range l.buckets[i].buckets[j] { - if !fn(it) { - return - } - count++ - if limit > 0 && count >= limit { - return - } - } - } - } -} - -func (l *SortedList) Size() int { - l.l.RLock() - defer l.l.RUnlock() - return len(l.keyMap) -} - -func (l *SortedList) addItem(key1, key2 int, data Item) { - l.addKey(key1) - l.buckets[key1].Add(key2, data) - l.setKeyMapKey(key1, key2, data) - l.shrink() -} - -func (l *SortedList) deleteItem(key1, key2 int, data Item) { - l.addKey(key1) - l.buckets[key1].Delete(key2, data) - l.deleteKeyMapKey(data) - l.shrink() -} - -func (l *SortedList) addKey(key int) { - for key >= len(l.buckets) { - l.buckets = append(l.buckets, bucket{}) - } - if l.right < key { - l.right = key - } - if l.left > key { - l.left = key - } -} - -func (l *SortedList) shrink() { - for l.left < l.right && l.buckets[l.left].Size() == 0 { - l.left++ - } - for l.left < l.right && l.buckets[l.right].Size() == 0 { - l.right-- - } -} - -func (l *SortedList) setKeyMapKey(key1, key2 int, data Item) { - l.keyMap[data] = key1*1000 + key2 -} - -func (l *SortedList) getKeyMapKey(data Item) (key1, key2 int, ok bool) { - key, ok := l.keyMap[data] - key1 = key / 1000 - key2 = key % 1000 - return -} - -func (l *SortedList) deleteKeyMapKey(data Item) { - delete(l.keyMap, data) -} diff --git a/pkg/structure/sortedlist/sorted_list_test.go b/pkg/structure/sortedlist/sorted_list_test.go deleted file mode 100644 index d04d904cf..000000000 --- a/pkg/structure/sortedlist/sorted_list_test.go +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sortedlist - -import ( - "fmt" - "math/rand" - "testing" -) - -type item struct { - key1 int - key2 int -} - -func newItem(key1, key2 int) *item { - return &item{key1, key2} -} - -func (i *item) GetSortKeys() (int, int) { - return i.key1, i.key2 -} - -func TestAdd(t *testing.T) { - l := NewSortedList() - l.Add(newItem(1, 2)) - l.Add(newItem(2, 2)) - l.Add(newItem(2, 3)) - if l.Size() != 3 { - t.Errorf("TestAdd failed count required[3] but get [%d]", l.Size()) - } -} - -func TestDelete(t *testing.T) { - l := NewSortedList() - it := newItem(1, 3) - l.Add(newItem(1, 2)) - l.Add(newItem(2, 2)) - l.Add(it) - l.Add(newItem(2, 3)) - l.Delete(it) - if l.Size() != 3 { - t.Errorf("TestDelete failed count required[3] but get [%d]", l.Size()) - } -} - -func TestUpdate(t *testing.T) { - l := NewSortedList() - it := newItem(1, 3) - l.Add(newItem(1, 2)) - l.Add(newItem(2, 2)) - l.Add(it) - it.key1 = 2 - l.Update(it) - l.Add(newItem(2, 3)) - key1, key2, ok := l.getKeyMapKey(it) - if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok { - t.Errorf("TestUpdate failed count required[3] but get [%d]", l.Size()) - } -} - -func TestRange(t *testing.T) { - l := NewSortedList() - it := newItem(1, 3) - l.Add(newItem(1, 2)) - l.Add(newItem(2, 2)) - l.Add(it) - it.key1 = 2 - l.Update(it) - l.Add(newItem(2, 3)) - key1, key2, ok := l.getKeyMapKey(it) - if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok { - t.Errorf("TestUpdate failed count required[4] but get [%d]", l.Size()) - } - - count := 0 - l.Range(func(data Item) bool { - it := data.(*item) - fmt.Println(it.key1, it.key2) - count++ - return true - }) - if l.Size() != count { - t.Errorf("TestRange failed count required[4] but get [%d]", l.Size()) - } -} - -func TestRangeLimit(t *testing.T) { - l := NewSortedList() - it := newItem(1, 3) - l.Add(newItem(1, 2)) - l.Add(newItem(2, 2)) - l.Add(it) - it.key1 = 2 - l.Update(it) - l.Add(newItem(2, 3)) - key1, key2, ok := l.getKeyMapKey(it) - if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok { - t.Errorf("TestUpdate failed count required[4] but get [%d]", l.Size()) - } - - count := 0 - l.RangeLimit(2, func(data Item) bool { - it := data.(*item) - fmt.Println(it.key1, it.key2) - count++ - return true - }) - if 2 != count { - t.Errorf("TestRange failed count required[2] but get [%d]", count) - } -} - -func TestRangeReverse(t *testing.T) { - l := NewSortedList() - it := newItem(1, 3) - l.Add(newItem(1, 2)) - l.Add(newItem(2, 2)) - l.Add(it) - it.key1 = 2 - l.Update(it) - l.Add(newItem(2, 3)) - key1, key2, ok := l.getKeyMapKey(it) - if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok { - t.Errorf("TestRangeReverse failed count required[4] but get [%d]", l.Size()) - } - - count := 0 - l.RangeReverse(func(data Item) bool { - it := data.(*item) - fmt.Println(it.key1, it.key2) - count++ - return true - }) - if l.Size() != count { - t.Errorf("TestRangeReverse failed count required[4] but get [%d]", l.Size()) - } -} - -func TestRangeReverseLimit(t *testing.T) { - l := NewSortedList() - it := newItem(1, 3) - l.Add(newItem(1, 2)) - l.Add(newItem(2, 2)) - l.Add(it) - it.key1 = 2 - l.Update(it) - l.Add(newItem(2, 3)) - key1, key2, ok := l.getKeyMapKey(it) - if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok { - t.Errorf("TestRangeReverseLimit failed count required[4] but get [%d]", l.Size()) - } - - count := 0 - l.RangeReverseLimit(2, func(data Item) bool { - it := data.(*item) - fmt.Println(it.key1, it.key2) - count++ - return true - }) - if 2 != count { - t.Errorf("TestRangeReverseLimit failed count required[4] but get [%d]", l.Size()) - } -} - -func BenchmarkAdd(b *testing.B) { - b.ResetTimer() - l := NewSortedList() - for i := 0; i < b.N; i++ { - l.Add(newItem(rand.Intn(BucketMaxLength), rand.Intn(InnerBucketMaxLength))) - } - if b.N != l.Size() { - b.Errorf("BenchmarkAdd failed count required[%d] but get [%d]", b.N, l.Size()) - } - fmt.Println(l.Size(), b.N) -} diff --git a/scheduler/core/events.go b/scheduler/core/events.go index 1fb5a8c9e..5a7925ad3 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -27,8 +27,8 @@ import ( "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/list" schedulerRPC "d7y.io/dragonfly/v2/pkg/rpc/scheduler" - "d7y.io/dragonfly/v2/pkg/structure/sortedlist" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/core/scheduler" "d7y.io/dragonfly/v2/scheduler/supervisor" @@ -371,8 +371,12 @@ func constructSuccessPeerPacket(peer *supervisor.Peer, parent *supervisor.Peer, func handleCDNSeedTaskFail(task *supervisor.Task) { if task.CanBackToSource() { - task.GetPeers().Range(func(data sortedlist.Item) bool { - peer := data.(*supervisor.Peer) + task.GetPeers().Range(func(item list.Item) bool { + peer, ok := item.(*supervisor.Peer) + if !ok { + return true + } + if task.CanBackToSource() { if !task.ContainsBackToSourcePeer(peer.ID) { if peer.CloseChannelWithError(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source because cdn seed task failed", peer.ID)) == nil { @@ -381,12 +385,17 @@ func handleCDNSeedTaskFail(task *supervisor.Task) { } return true } + return false }) } else { task.SetStatus(supervisor.TaskStatusFail) - task.GetPeers().Range(func(data sortedlist.Item) bool { - peer := data.(*supervisor.Peer) + task.GetPeers().Range(func(item list.Item) bool { + peer, ok := item.(*supervisor.Peer) + if !ok { + return true + } + if err := peer.CloseChannelWithError(dferrors.New(dfcodes.SchedTaskStatusError, "schedule task status failed")); err != nil { peer.Log().Warnf("close peer conn channel failed: %v", err) } diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index f6bb64829..05b8bc49b 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -142,7 +142,7 @@ func (s *Scheduler) ScheduleParent(peer *supervisor.Peer, blankParents sets.Stri func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int, blankChildren sets.String) (candidateChildren []*supervisor.Peer) { peer.Log().Debug("start schedule children flow") defer peer.Log().Debugf("finish schedule children flow, select num %d candidate children, "+ - "current task tree node count %d, back source peers: %v", len(candidateChildren), peer.Task.GetPeers().Size(), peer.Task.GetBackToSourcePeers()) + "current task tree node count %d, back source peers: %v", len(candidateChildren), peer.Task.GetPeers().Len(), peer.Task.GetBackToSourcePeers()) candidateChildren = peer.Task.Pick(limit, func(candidateNode *supervisor.Peer) bool { if candidateNode == nil { peer.Log().Debugf("******candidate child peer is not selected because it is nil******") @@ -215,7 +215,7 @@ func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int, bl func (s *Scheduler) selectCandidateParents(peer *supervisor.Peer, limit int, blankParents sets.String) (candidateParents []*supervisor.Peer) { peer.Log().Debug("start schedule parent flow") defer peer.Log().Debugf("finish schedule parent flow, select num %d candidates parents, "+ - "current task tree node count %d, back source peers: %v", len(candidateParents), peer.Task.GetPeers().Size(), peer.Task.GetBackToSourcePeers()) + "current task tree node count %d, back source peers: %v", len(candidateParents), peer.Task.GetPeers().Len(), peer.Task.GetBackToSourcePeers()) if !peer.Task.CanSchedule() { peer.Log().Debugf("++++++peer can not be scheduled because task cannot be scheduled at this time,waiting task status become seeding. "+ "it current status is %s++++++", peer.Task.GetStatus()) diff --git a/scheduler/supervisor/host.go b/scheduler/supervisor/host.go index f881f64fe..9d88044af 100644 --- a/scheduler/supervisor/host.go +++ b/scheduler/supervisor/host.go @@ -26,6 +26,11 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" ) +const ( + // When using the manager configuration parameter, limit the maximum load number to 5000 + HostMaxLoad = 5 * 1000 +) + type HostManager interface { // Add host Add(*Host) diff --git a/scheduler/supervisor/mocks/host_mock.go b/scheduler/supervisor/mocks/host_mock.go index 833ac9dbf..1175d8649 100644 --- a/scheduler/supervisor/mocks/host_mock.go +++ b/scheduler/supervisor/mocks/host_mock.go @@ -1,4 +1,3 @@ -// Code generated by MockGen. DO NOT EDIT. // Source: d7y.io/dragonfly/v2/scheduler/supervisor (interfaces: HostManager) // Package mocks is a generated GoMock package. @@ -7,8 +6,9 @@ package mocks import ( reflect "reflect" - supervisor "d7y.io/dragonfly/v2/scheduler/supervisor" gomock "github.com/golang/mock/gomock" + + supervisor "d7y.io/dragonfly/v2/scheduler/supervisor" ) // MockHostManager is a mock of HostManager interface. diff --git a/scheduler/supervisor/peer.go b/scheduler/supervisor/peer.go index 5adf047d2..45dd6dec5 100644 --- a/scheduler/supervisor/peer.go +++ b/scheduler/supervisor/peer.go @@ -116,7 +116,7 @@ func (m *peerManager) Delete(id string) { func (m *peerManager) GetPeersByTask(taskID string) []*Peer { var peers []*Peer - m.peers.Range(func(key, value interface{}) bool { + m.peers.Range(func(_, value interface{}) bool { peer := value.(*Peer) if peer.Task.ID == taskID { peers = append(peers, peer) @@ -152,7 +152,7 @@ func (m *peerManager) RunGC() error { if peer.Host.GetPeersLen() == 0 { m.hostManager.Delete(peer.Host.UUID) } - if peer.Task.GetPeers().Size() == 0 { + if peer.Task.GetPeers().Len() == 0 { peer.Task.Log().Info("peers is empty, task status become waiting") peer.Task.SetStatus(TaskStatusWaiting) } @@ -396,22 +396,23 @@ func (peer *Peer) UpdateProgress(finishedCount int32, cost int) { peer.Task.UpdatePeer(peer) return } + } -func (peer *Peer) GetSortKeys() (key1, key2 int) { +func (peer *Peer) SortedValue() int { peer.lock.RLock() defer peer.lock.RUnlock() - key1 = int(peer.TotalPieceCount.Load()) - key2 = peer.getFreeLoad() - return + pieceCount := peer.TotalPieceCount.Load() + hostLoad := peer.getFreeLoad() + return int(pieceCount*HostMaxLoad + hostLoad) } -func (peer *Peer) getFreeLoad() int { +func (peer *Peer) getFreeLoad() int32 { if peer.Host == nil { return 0 } - return int(peer.Host.GetFreeUploadLoad()) + return peer.Host.GetFreeUploadLoad() } func (peer *Peer) SetStatus(status PeerStatus) { diff --git a/scheduler/supervisor/peer_test.go b/scheduler/supervisor/peer_test.go index 77e8daa6f..89ad48be8 100644 --- a/scheduler/supervisor/peer_test.go +++ b/scheduler/supervisor/peer_test.go @@ -29,6 +29,10 @@ import ( "d7y.io/dragonfly/v2/scheduler/supervisor/mocks" ) +const ( + HostMaxLoad = 5 * 1000 +) + func TestPeer_New(t *testing.T) { tests := []struct { name string @@ -209,9 +213,7 @@ func TestPeer_Cost(t *testing.T) { average, success := peer.GetPieceAverageCost() assert.True(success) assert.Equal(4, average) - finishedCountFetch, loadFetch := peer.GetSortKeys() - assert.Equal(4, finishedCountFetch) - assert.Equal(100, loadFetch) + assert.Equal(peer.SortedValue(), 4*HostMaxLoad+100) }, }, { diff --git a/scheduler/supervisor/task.go b/scheduler/supervisor/task.go index f6c31c2e6..596eaf673 100644 --- a/scheduler/supervisor/task.go +++ b/scheduler/supervisor/task.go @@ -24,9 +24,9 @@ import ( "go.uber.org/atomic" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/list" gc "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/pkg/rpc/base" - "d7y.io/dragonfly/v2/pkg/structure/sortedlist" "d7y.io/dragonfly/v2/scheduler/config" ) @@ -109,7 +109,7 @@ func (m *taskManager) RunGC() error { task.SetStatus(TaskStatusZombie) } - if task.GetPeers().Size() == 0 { + if task.GetPeers().Len() == 0 { task.Log().Info("peers is empty, task status become waiting") task.SetStatus(TaskStatusWaiting) } @@ -178,8 +178,8 @@ type Task struct { lastAccessAt *atomic.Time // status is task status and type is TaskStatus status atomic.Value - // peers is peer list - peers *sortedlist.SortedList + // peers is peer sorted unique list + peers list.SortedUniqueList // BackToSourceWeight is back-to-source peer weight BackToSourceWeight atomic.Int32 // backToSourcePeers is back-to-source peers list @@ -205,7 +205,7 @@ func NewTask(id, url string, meta *base.UrlMeta) *Task { lastAccessAt: atomic.NewTime(now), backToSourcePeers: []string{}, pieces: &sync.Map{}, - peers: sortedlist.NewSortedList(), + peers: list.NewSortedUniqueList(), logger: logger.WithTaskID(id), } @@ -263,30 +263,18 @@ func (task *Task) UpdateSuccess(pieceCount int32, contentLength int64) { } func (task *Task) AddPeer(peer *Peer) { - err := task.peers.UpdateOrAdd(peer) - if err != nil { - task.logger.Errorf("add peer %s failed: %v", peer.ID, err) - } - task.logger.Debugf("peer %s has been added, current total peer count is %d", peer.ID, task.peers.Size()) + task.peers.Insert(peer) } func (task *Task) UpdatePeer(peer *Peer) { - err := task.peers.Update(peer) - if err != nil { - task.logger.Errorf("update peer %s failed: %v", peer.ID, err) - } - task.logger.Debugf("peer %s has been updated, current total peer count is %d", peer.ID, task.peers.Size()) + task.peers.Insert(peer) } func (task *Task) DeletePeer(peer *Peer) { - err := task.peers.Delete(peer) - if err != nil { - task.logger.Errorf("delete peer %s failed: %v", peer.ID, err) - } - task.logger.Debugf("peer %s has been deleted, current total peer count is %d", peer.ID, task.peers.Size()) + task.peers.Remove(peer) } -func (task *Task) GetPeers() *sortedlist.SortedList { +func (task *Task) GetPeers() list.SortedUniqueList { return task.peers } @@ -343,45 +331,46 @@ func (task *Task) GetBackToSourcePeers() []string { return task.backToSourcePeers } -func (task *Task) Pick(limit int, pickFn func(peer *Peer) bool) (pickedPeers []*Peer) { - return task.pick(limit, false, pickFn) -} +func (task *Task) Pick(limit int, pickFn func(peer *Peer) bool) []*Peer { + var peers []*Peer -func (task *Task) PickReverse(limit int, pickFn func(peer *Peer) bool) (pickedPeers []*Peer) { - return task.pick(limit, true, pickFn) -} + task.GetPeers().Range(func(item list.Item) bool { + if len(peers) >= limit { + return false + } + peer, ok := item.(*Peer) + if !ok { + return true + } -func (task *Task) pick(limit int, reverse bool, pickFn func(peer *Peer) bool) (pickedPeers []*Peer) { - if pickFn == nil { - return - } + if pickFn(peer) { + peers = append(peers, peer) + } + return true + }) - if !reverse { - task.GetPeers().Range(func(data sortedlist.Item) bool { - if len(pickedPeers) >= limit { - return false - } - peer := data.(*Peer) - if pickFn(peer) { - pickedPeers = append(pickedPeers, peer) - } - return true - }) - return - } + return peers +} + +func (task *Task) PickReverse(limit int, pickFn func(peer *Peer) bool) []*Peer { + var peers []*Peer - task.GetPeers().RangeReverse(func(data sortedlist.Item) bool { - if len(pickedPeers) >= limit { + task.GetPeers().ReverseRange(func(item list.Item) bool { + if len(peers) >= limit { return false } - peer := data.(*Peer) + peer, ok := item.(*Peer) + if !ok { + return true + } + if pickFn(peer) { - pickedPeers = append(pickedPeers, peer) + peers = append(peers, peer) } return true }) - return + return peers } func (task *Task) Log() *logger.SugaredLoggerOnWith { diff --git a/scheduler/supervisor/task_test.go b/scheduler/supervisor/task_test.go index 22919ae74..297cb7d0d 100644 --- a/scheduler/supervisor/task_test.go +++ b/scheduler/supervisor/task_test.go @@ -322,14 +322,6 @@ func TestTask_Pick(t *testing.T) { limit: 100, answer: []string{}, }, - { - name: "invalid pickFn", - number: 10, - pick: (func(peer *supervisor.Peer) bool)(nil), - reverse: false, - limit: 100, - answer: []string{}, - }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) {