From 1c8ab92be4206c5c2f71d096f7b43c9cf9c0ef93 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 31 Oct 2024 17:08:18 +0900 Subject: [PATCH 1/9] membuffer: fix memory leak in red-black tree Signed-off-by: you06 --- internal/unionstore/memdb_test.go | 2 +- internal/unionstore/rbt/rbt.go | 36 ++++++++++++++++++++----- internal/unionstore/rbt/rbt_iterator.go | 6 ++++- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/internal/unionstore/memdb_test.go b/internal/unionstore/memdb_test.go index 6003059ae..4f1def3e8 100644 --- a/internal/unionstore/memdb_test.go +++ b/internal/unionstore/memdb_test.go @@ -1231,7 +1231,7 @@ func testMemBufferCache(t *testing.T, buffer MemBuffer) { } func TestMemDBLeafFragmentation(t *testing.T) { - // RBT cannot pass the leaf fragmentation test. + testMemDBLeafFragmentation(t, newRbtDBWithContext()) testMemDBLeafFragmentation(t, newArtDBWithContext()) } diff --git a/internal/unionstore/rbt/rbt.go b/internal/unionstore/rbt/rbt.go index 06884fb42..40e3b76f7 100644 --- a/internal/unionstore/rbt/rbt.go +++ b/internal/unionstore/rbt/rbt.go @@ -120,7 +120,9 @@ func (db *RBT) RevertVAddr(hdr *arena.MemdbVlogHdr) { // If there are no flags associated with this key, we need to delete this node. keptFlags := node.getKeyFlags().AndPersistent() if keptFlags == 0 { - db.deleteNode(node) + node.markDelete() + db.count-- + db.size -= int(node.klen) } else { node.setKeyFlags(keptFlags) db.dirty = true @@ -279,7 +281,7 @@ func (db *RBT) SelectValueHistory(key []byte, predicate func(value []byte) bool) // GetFlags returns the latest flags associated with key. func (db *RBT) GetFlags(key []byte) (kv.KeyFlags, error) { x := db.traverse(key, false) - if x.isNull() { + if x.isNull() || x.isDeleted() { return 0, tikverr.ErrNotExist } return x.getKeyFlags(), nil @@ -348,16 +350,24 @@ func (db *RBT) Set(key []byte, value []byte, ops ...kv.FlagsOp) error { // every write to the node removes the flag unless it's explicitly set. // This set must be in the latest stage so no special processing is needed. var flags kv.KeyFlags + flags = x.GetKeyFlags() + if flags == 0 && x.vptr.IsNull() && x.isDeleted() { + db.count++ + db.size += int(x.klen) + } if value != nil { - flags = kv.ApplyFlagsOps(x.getKeyFlags(), append([]kv.FlagsOp{kv.DelNeedConstraintCheckInPrewrite}, ops...)...) + flags = kv.ApplyFlagsOps(flags, append([]kv.FlagsOp{kv.DelNeedConstraintCheckInPrewrite}, ops...)...) } else { // an UpdateFlag operation, do not delete the NeedConstraintCheckInPrewrite flag. - flags = kv.ApplyFlagsOps(x.getKeyFlags(), ops...) + flags = kv.ApplyFlagsOps(flags, ops...) } if flags.AndPersistent() != 0 { db.dirty = true } x.setKeyFlags(flags) + if x.isDeleted() { + panic("") + } if value == nil { return nil @@ -881,8 +891,11 @@ func (n *memdbNode) getKey() []byte { const ( // bit 1 => red, bit 0 => black - nodeColorBit uint16 = 0x8000 - nodeFlagsMask = ^nodeColorBit + nodeColorBit uint16 = 0x8000 + // bit 1 => node is deleted, bit 0 => node is not deleted + // This flag is used to mark a node as deleted, so that we can reuse the node to avoid memory leak. + deleteFlag uint16 = 1 << 14 + nodeFlagsMask = ^(nodeColorBit | deleteFlag) ) func (n *memdbNode) GetKeyFlags() kv.KeyFlags { @@ -894,7 +907,16 @@ func (n *memdbNode) getKeyFlags() kv.KeyFlags { } func (n *memdbNode) setKeyFlags(f kv.KeyFlags) { - n.flags = (^nodeFlagsMask & n.flags) | uint16(f) + // only keep the color bit. + n.flags = (nodeColorBit & n.flags) | uint16(f) +} + +func (n *memdbNode) markDelete() { + n.flags |= deleteFlag +} + +func (n *memdbNode) isDeleted() bool { + return n.flags&deleteFlag != 0 } // RemoveFromBuffer removes a record from the mem buffer. It should be only used for test. diff --git a/internal/unionstore/rbt/rbt_iterator.go b/internal/unionstore/rbt/rbt_iterator.go index dea270b4f..623d895cb 100644 --- a/internal/unionstore/rbt/rbt_iterator.go +++ b/internal/unionstore/rbt/rbt_iterator.go @@ -119,7 +119,7 @@ func (i *RBTIterator) init() { } } - if i.isFlagsOnly() && !i.includeFlags { + if (i.isFlagsOnly() && !i.includeFlags) || (!i.curr.isNull() && i.curr.isDeleted()) { err := i.Next() _ = err // memdbIterator will never fail } @@ -174,6 +174,10 @@ func (i *RBTIterator) Next() error { i.curr = i.db.successor(i.curr) } + if i.curr.isDeleted() { + continue + } + // We need to skip persistent flags only nodes. if i.includeFlags || !i.isFlagsOnly() { break From 8348dc2aca1d734b32c47a3145a94a72e746b00b Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 31 Oct 2024 17:22:20 +0900 Subject: [PATCH 2/9] remove debug code Signed-off-by: you06 --- internal/unionstore/rbt/rbt.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/unionstore/rbt/rbt.go b/internal/unionstore/rbt/rbt.go index 40e3b76f7..180dd2132 100644 --- a/internal/unionstore/rbt/rbt.go +++ b/internal/unionstore/rbt/rbt.go @@ -349,8 +349,7 @@ func (db *RBT) Set(key []byte, value []byte, ops ...kv.FlagsOp) error { // the NeedConstraintCheckInPrewrite flag is temporary, // every write to the node removes the flag unless it's explicitly set. // This set must be in the latest stage so no special processing is needed. - var flags kv.KeyFlags - flags = x.GetKeyFlags() + flags := x.GetKeyFlags() if flags == 0 && x.vptr.IsNull() && x.isDeleted() { db.count++ db.size += int(x.klen) @@ -365,9 +364,6 @@ func (db *RBT) Set(key []byte, value []byte, ops ...kv.FlagsOp) error { db.dirty = true } x.setKeyFlags(flags) - if x.isDeleted() { - panic("") - } if value == nil { return nil From 28b2b7a4433830273c622665cb23dd4ebfc6357b Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 31 Oct 2024 18:41:57 +0900 Subject: [PATCH 3/9] fix test Signed-off-by: you06 --- internal/unionstore/rbt/rbt_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/unionstore/rbt/rbt_test.go b/internal/unionstore/rbt/rbt_test.go index 8142adaf0..9ad0b66a0 100644 --- a/internal/unionstore/rbt/rbt_test.go +++ b/internal/unionstore/rbt/rbt_test.go @@ -84,8 +84,16 @@ func TestDiscard(t *testing.T) { } it, _ := db.Iter(nil, nil) it.seekToFirst() + // find a non-deleted key after seek. + for !it.curr.isNull() && it.curr.isDeleted() { + _ = it.Next() + } assert.False(it.Valid()) it.seekToLast() + // find a non-deleted key after seek. + for !it.curr.isNull() && it.curr.isDeleted() { + _ = it.Next() + } assert.False(it.Valid()) it.seek([]byte{0xff}) assert.False(it.Valid()) From 616f35a5cd25dc5b9e5e55956805d4966d1e7d7d Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 12 Nov 2024 10:21:13 +0900 Subject: [PATCH 4/9] remove flags when marking node is deleted Signed-off-by: you06 --- internal/unionstore/rbt/rbt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/unionstore/rbt/rbt.go b/internal/unionstore/rbt/rbt.go index 180dd2132..bdae92415 100644 --- a/internal/unionstore/rbt/rbt.go +++ b/internal/unionstore/rbt/rbt.go @@ -124,8 +124,8 @@ func (db *RBT) RevertVAddr(hdr *arena.MemdbVlogHdr) { db.count-- db.size -= int(node.klen) } else { - node.setKeyFlags(keptFlags) db.dirty = true + node.setKeyFlags(keptFlags) } } else { db.size += len(db.vlog.GetValue(hdr.OldValue)) @@ -908,7 +908,7 @@ func (n *memdbNode) setKeyFlags(f kv.KeyFlags) { } func (n *memdbNode) markDelete() { - n.flags |= deleteFlag + n.flags = (nodeColorBit & n.flags) | deleteFlag } func (n *memdbNode) isDeleted() bool { From 60c86ad8288b0ffd15a45e8cbb3f19ad49badf48 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 13 Nov 2024 16:04:07 +0900 Subject: [PATCH 5/9] address comment Signed-off-by: you06 --- internal/unionstore/rbt/rbt_iterator.go | 6 ++++++ internal/unionstore/rbt/rbt_test.go | 8 -------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/unionstore/rbt/rbt_iterator.go b/internal/unionstore/rbt/rbt_iterator.go index 623d895cb..2cd900be2 100644 --- a/internal/unionstore/rbt/rbt_iterator.go +++ b/internal/unionstore/rbt/rbt_iterator.go @@ -199,6 +199,9 @@ func (i *RBTIterator) seekToFirst() { } i.curr = y + for !i.curr.isNull() && i.curr.isDeleted() { + i.curr = i.db.successor(i.curr) + } } func (i *RBTIterator) seekToLast() { @@ -211,6 +214,9 @@ func (i *RBTIterator) seekToLast() { } i.curr = y + for !i.curr.isNull() && i.curr.isDeleted() { + i.curr = i.db.predecessor(i.curr) + } } func (i *RBTIterator) seek(key []byte) { diff --git a/internal/unionstore/rbt/rbt_test.go b/internal/unionstore/rbt/rbt_test.go index 9ad0b66a0..8142adaf0 100644 --- a/internal/unionstore/rbt/rbt_test.go +++ b/internal/unionstore/rbt/rbt_test.go @@ -84,16 +84,8 @@ func TestDiscard(t *testing.T) { } it, _ := db.Iter(nil, nil) it.seekToFirst() - // find a non-deleted key after seek. - for !it.curr.isNull() && it.curr.isDeleted() { - _ = it.Next() - } assert.False(it.Valid()) it.seekToLast() - // find a non-deleted key after seek. - for !it.curr.isNull() && it.curr.isDeleted() { - _ = it.Next() - } assert.False(it.Valid()) it.seek([]byte{0xff}) assert.False(it.Valid()) From ceaabbbc044d525175b2fb0d567261162dc37289 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 13 Nov 2024 17:17:32 +0900 Subject: [PATCH 6/9] call markUndelete explicitly Signed-off-by: you06 --- internal/unionstore/rbt/rbt.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/unionstore/rbt/rbt.go b/internal/unionstore/rbt/rbt.go index bdae92415..dff31b350 100644 --- a/internal/unionstore/rbt/rbt.go +++ b/internal/unionstore/rbt/rbt.go @@ -351,6 +351,7 @@ func (db *RBT) Set(key []byte, value []byte, ops ...kv.FlagsOp) error { // This set must be in the latest stage so no special processing is needed. flags := x.GetKeyFlags() if flags == 0 && x.vptr.IsNull() && x.isDeleted() { + x.markUndelete() db.count++ db.size += int(x.klen) } @@ -903,14 +904,17 @@ func (n *memdbNode) getKeyFlags() kv.KeyFlags { } func (n *memdbNode) setKeyFlags(f kv.KeyFlags) { - // only keep the color bit. - n.flags = (nodeColorBit & n.flags) | uint16(f) + n.flags = (^nodeFlagsMask & n.flags) | uint16(f) } func (n *memdbNode) markDelete() { n.flags = (nodeColorBit & n.flags) | deleteFlag } +func (n *memdbNode) markUndelete() { + n.flags &= ^deleteFlag +} + func (n *memdbNode) isDeleted() bool { return n.flags&deleteFlag != 0 } From b3c9625634d59826adba228fddad55f400ff3583 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 13 Nov 2024 17:33:11 +0900 Subject: [PATCH 7/9] rename to Signed-off-by: you06 --- internal/unionstore/rbt/rbt.go | 6 +++--- internal/unionstore/rbt/rbt_iterator.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/unionstore/rbt/rbt.go b/internal/unionstore/rbt/rbt.go index dff31b350..f318816d9 100644 --- a/internal/unionstore/rbt/rbt.go +++ b/internal/unionstore/rbt/rbt.go @@ -125,7 +125,7 @@ func (db *RBT) RevertVAddr(hdr *arena.MemdbVlogHdr) { db.size -= int(node.klen) } else { db.dirty = true - node.setKeyFlags(keptFlags) + node.resetKeyFlags(keptFlags) } } else { db.size += len(db.vlog.GetValue(hdr.OldValue)) @@ -364,7 +364,7 @@ func (db *RBT) Set(key []byte, value []byte, ops ...kv.FlagsOp) error { if flags.AndPersistent() != 0 { db.dirty = true } - x.setKeyFlags(flags) + x.resetKeyFlags(flags) if value == nil { return nil @@ -903,7 +903,7 @@ func (n *memdbNode) getKeyFlags() kv.KeyFlags { return kv.KeyFlags(n.flags & nodeFlagsMask) } -func (n *memdbNode) setKeyFlags(f kv.KeyFlags) { +func (n *memdbNode) resetKeyFlags(f kv.KeyFlags) { n.flags = (^nodeFlagsMask & n.flags) | uint16(f) } diff --git a/internal/unionstore/rbt/rbt_iterator.go b/internal/unionstore/rbt/rbt_iterator.go index 2cd900be2..95beeba2c 100644 --- a/internal/unionstore/rbt/rbt_iterator.go +++ b/internal/unionstore/rbt/rbt_iterator.go @@ -142,7 +142,7 @@ func (i *RBTIterator) Flags() kv.KeyFlags { func (i *RBTIterator) UpdateFlags(ops ...kv.FlagsOp) { origin := i.curr.getKeyFlags() n := kv.ApplyFlagsOps(origin, ops...) - i.curr.setKeyFlags(n) + i.curr.resetKeyFlags(n) } // HasValue returns false if it is flags only. From 5472d7681a514593fc9ee15a3b6fd01490f6d767 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 13 Nov 2024 23:01:34 +0900 Subject: [PATCH 8/9] Update internal/unionstore/rbt/rbt.go Co-authored-by: cfzjywxk --- internal/unionstore/rbt/rbt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/unionstore/rbt/rbt.go b/internal/unionstore/rbt/rbt.go index f318816d9..c3f3edc5d 100644 --- a/internal/unionstore/rbt/rbt.go +++ b/internal/unionstore/rbt/rbt.go @@ -911,7 +911,7 @@ func (n *memdbNode) markDelete() { n.flags = (nodeColorBit & n.flags) | deleteFlag } -func (n *memdbNode) markUndelete() { +func (n *memdbNode) unmarkDelete() { n.flags &= ^deleteFlag } From fed393d9e9d91db1651638c3d8eab9c915bee3f0 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 14 Nov 2024 11:01:29 +0900 Subject: [PATCH 9/9] fix func name Signed-off-by: you06 --- internal/unionstore/rbt/rbt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/unionstore/rbt/rbt.go b/internal/unionstore/rbt/rbt.go index c3f3edc5d..a0bce51db 100644 --- a/internal/unionstore/rbt/rbt.go +++ b/internal/unionstore/rbt/rbt.go @@ -351,7 +351,7 @@ func (db *RBT) Set(key []byte, value []byte, ops ...kv.FlagsOp) error { // This set must be in the latest stage so no special processing is needed. flags := x.GetKeyFlags() if flags == 0 && x.vptr.IsNull() && x.isDeleted() { - x.markUndelete() + x.unmarkDelete() db.count++ db.size += int(x.klen) }