diff --git a/x/sync/syncworkheap.go b/x/sync/syncworkheap.go index e44afb9ad94..5ed6d52ebd9 100644 --- a/x/sync/syncworkheap.go +++ b/x/sync/syncworkheap.go @@ -12,13 +12,15 @@ import ( "github.com/google/btree" ) -var _ heap.Interface = (*syncWorkHeap)(nil) +var _ heap.Interface = (*innerHeap)(nil) type heapItem struct { workItem *syncWorkItem heapIndex int } +type innerHeap []*heapItem + // A priority queue of syncWorkItems. // Note that work item ranges never overlap. // Supports range merging and priority updating. @@ -26,7 +28,7 @@ type heapItem struct { type syncWorkHeap struct { // Max heap of items by priority. // i.e. heap.Pop returns highest priority item. - priorityHeap []*heapItem + innerHeap innerHeap // The heap items sorted by range start. // A nil start is considered to be the smallest. sortedItems *btree.BTreeG[*heapItem] @@ -35,7 +37,6 @@ type syncWorkHeap struct { func newSyncWorkHeap() *syncWorkHeap { return &syncWorkHeap{ - priorityHeap: make([]*heapItem, 0), sortedItems: btree.NewG( 2, func(a, b *heapItem) bool { @@ -56,7 +57,10 @@ func (wh *syncWorkHeap) Insert(item *syncWorkItem) { return } - heap.Push(wh, &heapItem{workItem: item}) + wrappedItem := &heapItem{workItem: item} + + heap.Push(&wh.innerHeap, wrappedItem) + wh.sortedItems.ReplaceOrInsert(wrappedItem) } // Pops and returns a work item from the heap. @@ -65,7 +69,9 @@ func (wh *syncWorkHeap) GetWork() *syncWorkItem { if wh.closed || wh.Len() == 0 { return nil } - return heap.Pop(wh).(*heapItem).workItem + item := heap.Pop(&wh.innerHeap).(*heapItem) + wh.sortedItems.Delete(item) + return item.workItem } // Insert the item into the heap, merging it with existing items @@ -97,7 +103,7 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) { // merged into [beforeItem.start, item.end] beforeItem.workItem.end = item.end beforeItem.workItem.priority = math.Max(item.priority, beforeItem.workItem.priority) - heap.Fix(wh, beforeItem.heapIndex) + heap.Fix(&wh.innerHeap, beforeItem.heapIndex) mergedBefore = beforeItem } return false @@ -113,7 +119,7 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) { // [item.start, afterItem.end]. afterItem.workItem.start = item.start afterItem.workItem.priority = math.Max(item.priority, afterItem.workItem.priority) - heap.Fix(wh, afterItem.heapIndex) + heap.Fix(&wh.innerHeap, afterItem.heapIndex) mergedAfter = afterItem } return false @@ -128,61 +134,54 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) { wh.remove(mergedAfter) // update the priority mergedBefore.workItem.priority = math.Max(mergedBefore.workItem.priority, mergedAfter.workItem.priority) - heap.Fix(wh, mergedBefore.heapIndex) + heap.Fix(&wh.innerHeap, mergedBefore.heapIndex) } // nothing was merged, so add new item to the heap if mergedBefore == nil && mergedAfter == nil { // We didn't merge [item] with an existing one; put it in the heap. - heap.Push(wh, &heapItem{workItem: item}) + wh.Insert(item) } } // Deletes [item] from the heap. func (wh *syncWorkHeap) remove(item *heapItem) { - oldIndex := item.heapIndex - newLength := len(wh.priorityHeap) - 1 + heap.Remove(&wh.innerHeap, item.heapIndex) - // swap with last item, delete item, then fix heap if required - wh.Swap(newLength, item.heapIndex) - wh.priorityHeap[newLength] = nil - wh.priorityHeap = wh.priorityHeap[:newLength] - - // the item was already the last item, so nothing needs to be fixed - if oldIndex != newLength { - heap.Fix(wh, oldIndex) - } wh.sortedItems.Delete(item) } +func (wh *syncWorkHeap) Len() int { + return wh.innerHeap.Len() +} + // below this line are the implementations required for heap.Interface -func (wh *syncWorkHeap) Len() int { - return len(wh.priorityHeap) +func (h innerHeap) Len() int { + return len(h) } -func (wh *syncWorkHeap) Less(i int, j int) bool { - return wh.priorityHeap[i].workItem.priority > wh.priorityHeap[j].workItem.priority +func (h innerHeap) Less(i int, j int) bool { + return h[i].workItem.priority > h[j].workItem.priority } -func (wh *syncWorkHeap) Swap(i int, j int) { - wh.priorityHeap[i], wh.priorityHeap[j] = wh.priorityHeap[j], wh.priorityHeap[i] - wh.priorityHeap[i].heapIndex = i - wh.priorityHeap[j].heapIndex = j +func (h innerHeap) Swap(i int, j int) { + h[i], h[j] = h[j], h[i] + h[i].heapIndex = i + h[j].heapIndex = j } -func (wh *syncWorkHeap) Pop() interface{} { - newLength := len(wh.priorityHeap) - 1 - value := wh.priorityHeap[newLength] - wh.priorityHeap[newLength] = nil - wh.priorityHeap = wh.priorityHeap[:newLength] - wh.sortedItems.Delete(value) - return value +func (h *innerHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + old[n-1] = nil + *h = old[0 : n-1] + return item } -func (wh *syncWorkHeap) Push(x interface{}) { +func (h *innerHeap) Push(x interface{}) { item := x.(*heapItem) - item.heapIndex = len(wh.priorityHeap) - wh.priorityHeap = append(wh.priorityHeap, item) - wh.sortedItems.ReplaceOrInsert(item) + item.heapIndex = len(*h) + *h = append(*h, item) } diff --git a/x/sync/syncworkheap_test.go b/x/sync/syncworkheap_test.go index 03c0fbb9e30..bf4387c75b3 100644 --- a/x/sync/syncworkheap_test.go +++ b/x/sync/syncworkheap_test.go @@ -12,81 +12,99 @@ import ( ) // Tests heap.Interface methods Push, Pop, Swap, Len, Less. -func Test_SyncWorkHeap_Heap_Methods(t *testing.T) { +func Test_SyncWorkHeap_InnerHeap(t *testing.T) { require := require.New(t) - h := newSyncWorkHeap() - require.Zero(h.Len()) - - item1 := &heapItem{ + lowPriorityItem := &heapItem{ workItem: &syncWorkItem{ - start: nil, - end: nil, - priority: highPriority, + start: []byte{1}, + end: []byte{2}, + priority: lowPriority, LocalRootID: ids.GenerateTestID(), }, } - h.Push(item1) - require.Equal(1, h.Len()) - require.Len(h.priorityHeap, 1) - require.Equal(item1, h.priorityHeap[0]) - require.Zero(h.priorityHeap[0].heapIndex) - require.Equal(1, h.sortedItems.Len()) - gotItem, ok := h.sortedItems.Get(item1) - require.True(ok) - require.Equal(item1, gotItem) - h.Pop() - require.Zero(h.Len()) - require.Empty(h.priorityHeap) - require.Zero(h.sortedItems.Len()) + mediumPriorityItem := &heapItem{ + workItem: &syncWorkItem{ + start: []byte{3}, + end: []byte{4}, + priority: medPriority, + LocalRootID: ids.GenerateTestID(), + }, + } - item2 := &heapItem{ + highPriorityItem := &heapItem{ workItem: &syncWorkItem{ - start: []byte{0}, - end: []byte{1}, + start: []byte{5}, + end: []byte{6}, priority: highPriority, LocalRootID: ids.GenerateTestID(), }, } - h.Push(item1) - h.Push(item2) - require.Equal(2, h.Len()) - require.Len(h.priorityHeap, 2) - require.Equal(item1, h.priorityHeap[0]) - require.Equal(item2, h.priorityHeap[1]) - require.Zero(item1.heapIndex) - require.Equal(1, item2.heapIndex) - require.Equal(2, h.sortedItems.Len()) - gotItem, ok = h.sortedItems.Get(item1) - require.True(ok) - require.Equal(item1, gotItem) - gotItem, ok = h.sortedItems.Get(item2) - require.True(ok) - require.Equal(item2, gotItem) - - require.False(h.Less(0, 1)) - h.Swap(0, 1) - require.Equal(item2, h.priorityHeap[0]) - require.Equal(item1, h.priorityHeap[1]) - require.Equal(1, item1.heapIndex) - require.Zero(item2.heapIndex) + h := innerHeap{} + require.Zero(h.Len()) - require.False(h.Less(0, 1)) + // Note we're calling Push and Pop on the heap directly, + // not using heap.Push and heap.Pop. + h.Push(lowPriorityItem) + // Heap has [lowPriorityItem] + require.Equal(1, h.Len()) + require.Equal(lowPriorityItem, h[0]) - item1.workItem.priority = lowPriority - require.True(h.Less(0, 1)) + got := h.Pop() + // Heap has [] + require.Equal(lowPriorityItem, got) + require.Zero(h.Len()) - gotItem = h.Pop().(*heapItem) - require.Equal(item1, gotItem) + h.Push(lowPriorityItem) + h.Push(mediumPriorityItem) + // Heap has [lowPriorityItem, mediumPriorityItem] + require.Equal(2, h.Len()) + require.Equal(lowPriorityItem, h[0]) + require.Equal(mediumPriorityItem, h[1]) - gotItem = h.Pop().(*heapItem) - require.Equal(item2, gotItem) + got = h.Pop() + // Heap has [lowPriorityItem] + require.Equal(mediumPriorityItem, got) + require.Equal(1, h.Len()) + got = h.Pop() + // Heap has [] + require.Equal(lowPriorityItem, got) require.Zero(h.Len()) - require.Empty(h.priorityHeap) - require.Zero(h.sortedItems.Len()) + + h.Push(mediumPriorityItem) + h.Push(lowPriorityItem) + h.Push(highPriorityItem) + // Heap has [mediumPriorityItem, lowPriorityItem, highPriorityItem] + require.Equal(mediumPriorityItem, h[0]) + require.Equal(lowPriorityItem, h[1]) + require.Equal(highPriorityItem, h[2]) + + h.Swap(0, 1) + // Heap has [lowPriorityItem, mediumPriorityItem, highPriorityItem] + require.Equal(lowPriorityItem, h[0]) + require.Equal(mediumPriorityItem, h[1]) + require.Equal(highPriorityItem, h[2]) + + h.Swap(1, 2) + // Heap has [lowPriorityItem, highPriorityItem, mediumPriorityItem] + require.Equal(lowPriorityItem, h[0]) + require.Equal(highPriorityItem, h[1]) + require.Equal(mediumPriorityItem, h[2]) + + h.Swap(0, 2) + // Heap has [mediumPriorityItem, highPriorityItem, lowPriorityItem] + require.Equal(mediumPriorityItem, h[0]) + require.Equal(highPriorityItem, h[1]) + require.Equal(lowPriorityItem, h[2]) + require.False(h.Less(0, 1)) + require.True(h.Less(1, 0)) + require.True(h.Less(1, 2)) + require.False(h.Less(2, 1)) + require.True(h.Less(0, 2)) + require.False(h.Less(2, 0)) } // Tests Insert and GetWork @@ -94,27 +112,27 @@ func Test_SyncWorkHeap_Insert_GetWork(t *testing.T) { require := require.New(t) h := newSyncWorkHeap() - item1 := &syncWorkItem{ - start: []byte{0}, - end: []byte{1}, + lowPriorityItem := &syncWorkItem{ + start: []byte{4}, + end: []byte{5}, priority: lowPriority, LocalRootID: ids.GenerateTestID(), } - item2 := &syncWorkItem{ - start: []byte{2}, - end: []byte{3}, + mediumPriorityItem := &syncWorkItem{ + start: []byte{0}, + end: []byte{1}, priority: medPriority, LocalRootID: ids.GenerateTestID(), } - item3 := &syncWorkItem{ - start: []byte{4}, - end: []byte{5}, + highPriorityItem := &syncWorkItem{ + start: []byte{2}, + end: []byte{3}, priority: highPriority, LocalRootID: ids.GenerateTestID(), } - h.Insert(item3) - h.Insert(item2) - h.Insert(item1) + h.Insert(highPriorityItem) + h.Insert(mediumPriorityItem) + h.Insert(lowPriorityItem) require.Equal(3, h.Len()) // Ensure [sortedItems] is in right order. @@ -125,15 +143,18 @@ func Test_SyncWorkHeap_Insert_GetWork(t *testing.T) { return true }, ) - require.Equal([]*syncWorkItem{item1, item2, item3}, got) + require.Equal( + []*syncWorkItem{mediumPriorityItem, highPriorityItem, lowPriorityItem}, + got, + ) // Ensure priorities are in right order. gotItem := h.GetWork() - require.Equal(item3, gotItem) + require.Equal(highPriorityItem, gotItem) gotItem = h.GetWork() - require.Equal(item2, gotItem) + require.Equal(mediumPriorityItem, gotItem) gotItem = h.GetWork() - require.Equal(item1, gotItem) + require.Equal(lowPriorityItem, gotItem) gotItem = h.GetWork() require.Nil(gotItem) @@ -145,46 +166,63 @@ func Test_SyncWorkHeap_remove(t *testing.T) { h := newSyncWorkHeap() - item1 := &syncWorkItem{ + lowPriorityItem := &syncWorkItem{ start: []byte{0}, end: []byte{1}, priority: lowPriority, LocalRootID: ids.GenerateTestID(), } - h.Insert(item1) - - heapItem1 := h.priorityHeap[0] - h.remove(heapItem1) - - require.Zero(h.Len()) - require.Empty(h.priorityHeap) - require.Zero(h.sortedItems.Len()) - - item2 := &syncWorkItem{ + mediumPriorityItem := &syncWorkItem{ start: []byte{2}, end: []byte{3}, priority: medPriority, LocalRootID: ids.GenerateTestID(), } - h.Insert(item1) - h.Insert(item2) + highPriorityItem := &syncWorkItem{ + start: []byte{4}, + end: []byte{5}, + priority: highPriority, + LocalRootID: ids.GenerateTestID(), + } + + h.Insert(lowPriorityItem) + + wrappedLowPriorityItem := h.innerHeap[0] + h.remove(wrappedLowPriorityItem) + + require.Zero(h.Len()) + require.Empty(h.innerHeap) + require.Zero(h.sortedItems.Len()) + + h.Insert(lowPriorityItem) + h.Insert(mediumPriorityItem) + h.Insert(highPriorityItem) + + wrappedhighPriorityItem := h.innerHeap[0] + require.Equal(highPriorityItem, wrappedhighPriorityItem.workItem) + h.remove(wrappedhighPriorityItem) + require.Equal(2, h.Len()) + require.Len(h.innerHeap, 2) + require.Equal(2, h.sortedItems.Len()) + require.Zero(h.innerHeap[0].heapIndex) + require.Equal(mediumPriorityItem, h.innerHeap[0].workItem) - heapItem2 := h.priorityHeap[0] - require.Equal(item2, heapItem2.workItem) - h.remove(heapItem2) + wrappedMediumPriorityItem := h.innerHeap[0] + require.Equal(mediumPriorityItem, wrappedMediumPriorityItem.workItem) + h.remove(wrappedMediumPriorityItem) require.Equal(1, h.Len()) - require.Len(h.priorityHeap, 1) + require.Len(h.innerHeap, 1) require.Equal(1, h.sortedItems.Len()) - require.Zero(h.priorityHeap[0].heapIndex) - require.Equal(item1, h.priorityHeap[0].workItem) + require.Zero(h.innerHeap[0].heapIndex) + require.Equal(lowPriorityItem, h.innerHeap[0].workItem) - heapItem1 = h.priorityHeap[0] - require.Equal(item1, heapItem1.workItem) - h.remove(heapItem1) + wrappedLowPriorityItem = h.innerHeap[0] + require.Equal(lowPriorityItem, wrappedLowPriorityItem.workItem) + h.remove(wrappedLowPriorityItem) require.Zero(h.Len()) - require.Empty(h.priorityHeap) + require.Empty(h.innerHeap) require.Zero(h.sortedItems.Len()) }