Skip to content

Commit

Permalink
fix: loss data bug for deprecated querynode DoubleBuffer (#35128)
Browse files Browse the repository at this point in the history
relate: #31548

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored and congqixia committed Sep 5, 2024
1 parent 6158ad3 commit 8fcfa77
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
12 changes: 8 additions & 4 deletions internal/querynodev2/delegator/deletebuffer/delete_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ func (c *doubleCacheBuffer[T]) Put(entry T) {

err := c.head.Put(entry)
if errors.Is(err, errBufferFull) {
c.evict(entry.Timestamp())
c.head.Put(entry)
c.evict(entry.Timestamp(), entry)
}
}

Expand All @@ -88,9 +87,14 @@ func (c *doubleCacheBuffer[T]) ListAfter(ts uint64) []T {
}

// evict sets head as tail and evicts tail.
func (c *doubleCacheBuffer[T]) evict(newTs uint64) {
func (c *doubleCacheBuffer[T]) evict(newTs uint64, entry T) {
c.tail = c.head
c.head = newCacheBlock[T](newTs, c.maxSize/2)
c.head = &cacheBlock[T]{
headTs: newTs,
maxSize: c.maxSize / 2,
size: entry.Size(),
data: []T{entry},
}
c.ts = c.tail.headTs
}

Expand Down
52 changes: 52 additions & 0 deletions internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,58 @@ func (s *DoubleCacheBufferSuite) TestCache() {
s.Equal(1, len(buffer.ListAfter(12)))
}

func (s *DoubleCacheBufferSuite) TestPut() {
buffer := NewDoubleCacheDeleteBuffer[*Item](10, 1)
buffer.Put(&Item{
Ts: 11,
Data: []BufferItem{
{
PartitionID: 200,
DeleteData: storage.DeleteData{
Pks: []storage.PrimaryKey{storage.NewVarCharPrimaryKey("test1")},
Tss: []uint64{11},
RowCount: 1,
},
},
},
})

buffer.Put(&Item{
Ts: 12,
Data: []BufferItem{
{
PartitionID: 200,
DeleteData: storage.DeleteData{
Pks: []storage.PrimaryKey{storage.NewVarCharPrimaryKey("test2")},
Tss: []uint64{12},
RowCount: 1,
},
},
},
})

s.Equal(2, len(buffer.ListAfter(11)))
s.Equal(1, len(buffer.ListAfter(12)))

buffer.Put(&Item{
Ts: 13,
Data: []BufferItem{
{
PartitionID: 200,
DeleteData: storage.DeleteData{
Pks: []storage.PrimaryKey{storage.NewVarCharPrimaryKey("test3")},
Tss: []uint64{13},
RowCount: 1,
},
},
},
})

s.Equal(2, len(buffer.ListAfter(11)))
s.Equal(2, len(buffer.ListAfter(12)))
s.Equal(1, len(buffer.ListAfter(13)))
}

func TestDoubleCacheDeleteBuffer(t *testing.T) {
suite.Run(t, new(DoubleCacheBufferSuite))
}

0 comments on commit 8fcfa77

Please sign in to comment.