Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

membuffer: implement ART with basic get/set #1451

Merged
merged 7 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 329 additions & 10 deletions internal/unionstore/art/art.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,21 @@ package art
import (
"math"

tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
"github.com/tikv/client-go/v2/kv"
)

var testMode = false

// ART is rollbackable Adaptive Radix Tree optimized for TiDB's transaction states buffer use scenario.
// You can think ART is a combination of two separate tree map, one for key => value and another for key => keyFlags.
//
// The value map is rollbackable, that means you can use the `Staging`, `Release` and `Cleanup` API to safely modify KVs.
//
// The flags map is not rollbackable. There are two types of flag, persistent and non-persistent.
// When discarding a newly added KV in `Cleanup`, the non-persistent flags will be cleared.
// If there are persistent flags associated with key, we will keep this key in node without value.
type ART struct {
allocator artAllocator
root artNode
Expand All @@ -47,20 +58,318 @@ func New() *ART {
}

func (t *ART) Get(key []byte) ([]byte, error) {
panic("unimplemented")
// 1. search the leaf node.
_, leaf := t.search(key)
if leaf == nil || leaf.vAddr.IsNull() {
return nil, tikverr.ErrNotExist
}
// 2. get the value from the vlog.
return t.allocator.vlogAllocator.GetValue(leaf.vAddr), nil
}

// GetFlags returns the latest flags associated with key.
func (t *ART) GetFlags(key []byte) (kv.KeyFlags, error) {
panic("unimplemented")
_, leaf := t.search(key)
if leaf == nil {
return 0, tikverr.ErrNotExist
}
if leaf.vAddr.IsNull() && leaf.isDeleted() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the isDeleted used here but not in the above Get function? Or when should the isDeleted be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isDelete marks the leaf is removed from the tree, which is used for cleanup after staging.

The difference between Get and GetFlags is the flag-only key (created by UpdateFlags), whose value address is null, so Get will return not exist error, meanwhile GetFlags should read the updated flags.

The RBT will remove the cleanup nodes from the tree, but ART will not (by now). Removing the node can reduces the height of the tree but also introduces the memory fragmentation (#1375). ART's performance isn't affected by the number of nodes, so it's ok to just mark it's deleted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add comments about it here.

return 0, tikverr.ErrNotExist
}
return leaf.getKeyFlags(), nil
}

func (t *ART) Set(key artKey, value []byte, ops []kv.FlagsOp) error {
panic("unimplemented")
func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
if value != nil {
if size := uint64(len(key) + len(value)); size > t.entrySizeLimit {
return &tikverr.ErrEntryTooLarge{
Limit: t.entrySizeLimit,
Size: size,
}
}
}
if len(t.stages) == 0 {
t.dirty = true
}
// 1. create or search the existing leaf in the tree.
addr, leaf := t.recursiveInsert(key)
// 2. set the value and flags.
t.setValue(addr, leaf, value, ops)
if uint64(t.Size()) > t.bufferSizeLimit {
return &tikverr.ErrTxnTooLarge{Size: t.Size()}
}
return nil
}

// search looks up the leaf with the given key.
// It returns the memory arena address and leaf itself it there is a match leaf,
// returns arena.NullAddr and nil if the key is not found.
func (t *ART) search(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can documents its return values. Specifically, what value does it return when the key is not found?

panic("unimplemented")
current := t.root
if current == nullArtNode {
return arena.NullAddr, nil
}
depth := uint32(0)
var node *nodeBase
for {
if current.isLeaf() {
lf := current.asLeaf(&t.allocator)
if lf.match(0, key) {
ekexium marked this conversation as resolved.
Show resolved Hide resolved
return current.addr, lf
}
return arena.NullAddr, nil
}

// inline: performance critical path
// get the basic node information.
switch current.kind {
case typeNode4:
node = &current.asNode4(&t.allocator).nodeBase
case typeNode16:
node = &current.asNode16(&t.allocator).nodeBase
case typeNode48:
node = &current.asNode48(&t.allocator).nodeBase
case typeNode256:
node = &current.asNode256(&t.allocator).nodeBase
default:
panic("invalid nodeBase kind")
}

if node.prefixLen > 0 {
prefixLen := node.match(key, depth)
if prefixLen < min(node.prefixLen, maxPrefixLen) {
return arena.NullAddr, nil
}
// If node.prefixLen > maxPrefixLen, we optimistically match the prefix here.
// False positive is possible, but it's fine since we will check the full artLeaf key at last.
depth += node.prefixLen
}

_, current = current.findChild(&t.allocator, key.charAt(int(depth)), !key.valid(int(depth)))
if current.addr.IsNull() {
return arena.NullAddr, nil
}
depth++
}
}

// recursiveInsert returns the node address of the key.
// It will insert the key if not exists, returns the newly inserted or existing leaf.
func (t *ART) recursiveInsert(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
// lazy init root node and allocator.
// this saves memory for read only txns.
if t.root.addr.IsNull() {
t.root, _ = t.newNode4()
}

depth := uint32(0)
prevDepth := 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that prevDepth must be int? Can it be unit32 as well to avoid type conversions?

prev := nullArtNode
current := t.root
var node *nodeBase
for {
if current.isLeaf() {
return t.expandLeaf(key, depth, prev, current)
}

// inline: performance critical path
// get the basic node information.
switch current.kind {
case typeNode4:
node = &current.asNode4(&t.allocator).nodeBase
case typeNode16:
node = &current.asNode16(&t.allocator).nodeBase
case typeNode48:
node = &current.asNode48(&t.allocator).nodeBase
case typeNode256:
node = &current.asNode256(&t.allocator).nodeBase
default:
panic("invalid nodeBase kind")
}

if node.prefixLen > 0 {
mismatchIdx := node.matchDeep(&t.allocator, &current, key, depth)
if mismatchIdx < node.prefixLen {
// if the prefix doesn't match, we split the node into different prefixes.
return t.expandNode(key, depth, mismatchIdx, prev, current, node)
}
depth += node.prefixLen
}

// search next node
valid := key.valid(int(depth))
_, next := current.findChild(&t.allocator, key.charAt(int(depth)), !valid)
if next == nullArtNode {
// insert as leaf if there is no child.
newAn, newLeaf := t.newLeaf(key)
if current.addChild(&t.allocator, key.charAt(int(depth)), !valid, newAn) {
if prev == nullArtNode {
t.root = current
} else {
prev.replaceChild(&t.allocator, key.charAt(prevDepth), current)
}
}
return newAn.addr, newLeaf
}
if !valid && next.kind == typeLeaf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the meaning of valid here? Does it mean there is a leaf node but it is empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valid means if the current depth within the length of key. If valid is false, the in-place leaf is what we look for. If the in-place leaf is empty, we create it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add commments about it at the valid function define location.

// key is drained, return the leaf.
return next.addr, next.asLeaf(&t.allocator)
}
prev = current
current = next
prevDepth = int(depth)
depth++
continue
}
}

// expandLeaf expands the existing artLeaf to a node4 if the keys are different.
// it returns the addr and leaf of the given key.
func (t *ART) expandLeaf(key artKey, depth uint32, prev, current artNode) (arena.MemdbArenaAddr, *artLeaf) {
// Expand the artLeaf to a node4.
//
// ┌────────────┐
// │ new │
// │ node4 │
// ┌─────────┐ └──────┬─────┘
// │ old │ ---> │
// │ leaf1 │ ┌────────┴────────┐
// └─────────┘ │ │
// ┌────▼────┐ ┌────▼────┐
// │ old │ │ new │
// │ leaf1 │ │ leaf2 │
// └─────────┘ └─────────┘
leaf1 := current.asLeaf(&t.allocator)
if leaf1.match(depth-1, key) {
// same key, return the artLeaf and overwrite the value.
return current.addr, leaf1
}
prevDepth := int(depth - 1)

leaf2Addr, leaf2 := t.newLeaf(key)
l1Key, l2Key := artKey(leaf1.GetKey()), artKey(leaf2.GetKey())
lcp := longestCommonPrefix(l1Key, l2Key, depth)

// calculate the common prefix length of new node.
newAn, newN4 := t.newNode4()
newN4.setPrefix(key[depth:], lcp)
depth += lcp
newAn.addChild(&t.allocator, l1Key.charAt(int(depth)), !l1Key.valid(int(depth)), current)
newAn.addChild(&t.allocator, l2Key.charAt(int(depth)), !l2Key.valid(int(depth)), leaf2Addr)

// swap the old leaf with the new node4.
if prev == nullArtNode {
t.root = newAn
} else {
prev.replaceChild(&t.allocator, key.charAt(prevDepth), newAn)
}
return leaf2Addr.addr, leaf2
}

func (t *ART) expandNode(key artKey, depth, mismatchIdx uint32, prev, current artNode, currNode *nodeBase) (arena.MemdbArenaAddr, *artLeaf) {
// prefix mismatch, create a new parent node which has a shorter prefix.
// example of insert "acc" into node with "abc prefix:
// ┌────────────┐
// │ new node4 │
// │ prefix: a │
// └──────┬─────┘
// ┌─────────────┐ ┌── b ───┴── c ───┐
// │ node4 │ ---> │ │
// │ prefix: abc │ ┌──────▼─────┐ ┌──────▼─────┐
// └─────────────┘ │ old node4 │ │ new leaf │
// │ prefix: c │ │ key: acc │
// └────────────┘ └────────────┘
prevDepth := int(depth - 1)

// set prefix for new node.
newAn, newN4 := t.newNode4()
newN4.setPrefix(key[depth:], mismatchIdx)

// update prefix for old node and move it as a child of the new node.
if currNode.prefixLen <= maxPrefixLen {
nodeKey := currNode.prefix[mismatchIdx]
currNode.prefixLen -= mismatchIdx + 1
copy(currNode.prefix[:], currNode.prefix[mismatchIdx+1:])
newAn.addChild(&t.allocator, nodeKey, false, current)
} else {
currNode.prefixLen -= mismatchIdx + 1
leafArtNode := minimum(&t.allocator, current)
leaf := leafArtNode.asLeaf(&t.allocator)
leafKey := artKey(leaf.GetKey())
kMin := depth + mismatchIdx + 1
kMax := depth + mismatchIdx + 1 + min(currNode.prefixLen, maxPrefixLen)
copy(currNode.prefix[:], leafKey[kMin:kMax])
newAn.addChild(&t.allocator, leafKey.charAt(int(depth+mismatchIdx)), !leafKey.valid(int(depth)), current)
}

// insert the artLeaf into new node
newLeafAddr, newLeaf := t.newLeaf(key)
newAn.addChild(&t.allocator, key.charAt(int(depth+mismatchIdx)), !key.valid(int(depth+mismatchIdx)), newLeafAddr)
if prev == nullArtNode {
t.root = newAn
} else {
prev.replaceChild(&t.allocator, key.charAt(prevDepth), newAn)
}
return newLeafAddr.addr, newLeaf
}

func (t *ART) newNode4() (artNode, *node4) {
addr, n4 := t.allocator.allocNode4()
return artNode{kind: typeNode4, addr: addr}, n4
}

func (t *ART) newLeaf(key artKey) (artNode, *artLeaf) {
addr, lf := t.allocator.allocLeaf(key)
return artNode{kind: typeLeaf, addr: addr}, lf
}

func (t *ART) setValue(addr arena.MemdbArenaAddr, l *artLeaf, value []byte, ops []kv.FlagsOp) {
flags := l.getKeyFlags()
if flags == 0 && l.vAddr.IsNull() {
t.len++
t.size += int(l.klen)
}
if value != nil {
flags = kv.ApplyFlagsOps(flags, append([]kv.FlagsOp{kv.DelNeedConstraintCheckInPrewrite}, ops...)...)
} else {
// an UpdateFlag operation, do not delete the NeedConstraintCheckInPrewrite flag.
flags = kv.ApplyFlagsOps(flags, ops...)
}
if flags.AndPersistent() != 0 {
t.dirty = true
}
l.setKeyFlags(flags)
if value == nil {
// value == nil means it updates flags only.
return
}
oldSize, swapper := t.trySwapValue(l.vAddr, value)
if swapper {
return
}
t.size += len(value) - oldSize
vAddr := t.allocator.vlogAllocator.AppendValue(addr, l.vAddr, value)
l.vAddr = vAddr
}

// trySwapValue checks if the value can be updated in place.
// It returns 0 and true if it's updated, returns the size of old value and false if it cannot be updated in place.
func (t *ART) trySwapValue(addr arena.MemdbArenaAddr, value []byte) (int, bool) {
if addr.IsNull() {
return 0, false
}
oldVal := t.allocator.vlogAllocator.GetValue(addr)
if len(t.stages) > 0 {
cp := t.stages[len(t.stages)-1]
if !t.allocator.vlogAllocator.CanModify(&cp, addr) {
return len(oldVal), false
}
}
if len(oldVal) > 0 && len(oldVal) == len(value) {
copy(oldVal, value)
return 0, true
}
return len(oldVal), false
}

func (t *ART) Dirty() bool {
Expand All @@ -74,12 +383,12 @@ func (t *ART) Mem() uint64 {

// Len returns the count of entries in the MemBuffer.
func (t *ART) Len() int {
panic("unimplemented")
return t.len
}

// Size returns the size of the MemBuffer.
func (t *ART) Size() int {
panic("unimplemented")
return t.size
}

func (t *ART) checkpoint() arena.MemDBCheckpoint {
Expand Down Expand Up @@ -109,15 +418,13 @@ func (t *ART) Stages() []arena.MemDBCheckpoint {
}

func (t *ART) Staging() int {
panic("unimplemented")
return 0
}

func (t *ART) Release(h int) {
panic("unimplemented")
}

func (t *ART) Cleanup(h int) {
panic("unimplemented")
}

func (t *ART) revertToCheckpoint(cp *arena.MemDBCheckpoint) {
Expand All @@ -132,6 +439,18 @@ func (t *ART) truncate(snap *arena.MemDBCheckpoint) {
panic("unimplemented")
}

// Reset resets the MemBuffer to initial states.
func (t *ART) Reset() {
t.root = nullArtNode
t.stages = t.stages[:0]
t.dirty = false
t.vlogInvalid = false
t.size = 0
t.len = 0
t.allocator.nodeAllocator.Reset()
t.allocator.vlogAllocator.Reset()
}

// DiscardValues releases the memory used by all values.
// NOTE: any operation need value will panic after this function.
func (t *ART) DiscardValues() {
Expand Down
Loading
Loading