diff --git a/go.mod b/go.mod index f7809b3..2c41a45 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,14 @@ go 1.22 require ( github.com/cespare/xxhash v1.1.0 github.com/outofforest/photon v0.5.0 + github.com/pkg/errors v0.9.1 + github.com/samber/lo v1.47.0 github.com/stretchr/testify v1.9.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/text v0.18.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6f3cd81..df40b26 100644 --- a/go.sum +++ b/go.sum @@ -10,10 +10,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/quantum.go b/quantum.go index 3d5ee4d..78043fe 100644 --- a/quantum.go +++ b/quantum.go @@ -1,18 +1,16 @@ package quantum import ( + "unsafe" + "github.com/cespare/xxhash" + "github.com/pkg/errors" + "github.com/samber/lo" "github.com/outofforest/photon" ) -const ( - bitsPerHop = 4 - arraySize = 1 << bitsPerHop - mask = arraySize - 1 - pageSize = 512 - uint64Length = 8 -) +const uint64Length = 8 // State enumerates possible slot states. type State byte @@ -26,6 +24,7 @@ const ( // Config stores configuration. type Config struct { TotalSize uint64 + NodeSize uint64 } // NodeHeader is the header common to all node types. @@ -34,11 +33,11 @@ type NodeHeader struct { HashMod uint64 } -// PointerNode is the node containing pointers to other nodes. -type PointerNode struct { - Header NodeHeader - States [arraySize]State - Pointers [arraySize]uint64 +// Node represents data stored inside node. +type Node[T comparable] struct { + Header *NodeHeader + States []State + Items []T } // DataItem stores single key-value pair. @@ -48,29 +47,40 @@ type DataItem[K, V comparable] struct { Value V } -// DataNode stores the data items. -type DataNode[K, V comparable] struct { - Header NodeHeader - States [arraySize]State - Items [arraySize]DataItem[K, V] -} - // New creates new quantum store. -func New[K, V comparable](config Config) Snapshot[K, V] { +func New[K, V comparable](config Config) (Snapshot[K, V], error) { + pointerNodeDescriptor, err := NewNodeDescriptor[uint64](config.NodeSize) + if err != nil { + return Snapshot[K, V]{}, err + } + + dataNodeDescriptor, err := NewNodeDescriptor[DataItem[K, V]](config.NodeSize) + if err != nil { + return Snapshot[K, V]{}, err + } + s := Snapshot[K, V]{ - version: 0, - data: make([]byte, config.TotalSize), - rootNodeType: stateData, + config: config, + pointerNodeDescriptor: pointerNodeDescriptor, + dataNodeDescriptor: dataNodeDescriptor, + data: make([]byte, config.TotalSize), + rootInfo: parentInfo{ + State: lo.ToPtr(stateFree), + Item: lo.ToPtr[uint64](0), + }, defaultValue: *new(V), } - return s + return s, nil } // Snapshot represents the state at particular point in time. type Snapshot[K, V comparable] struct { + config Config + pointerNodeDescriptor NodeDescriptor[uint64] + dataNodeDescriptor NodeDescriptor[DataItem[K, V]] + version uint64 - rootNode uint64 - rootNodeType State + rootInfo parentInfo hashMod uint64 defaultValue V @@ -81,174 +91,172 @@ type Snapshot[K, V comparable] struct { // Next transitions to the next snapshot of the state. func (s Snapshot[K, V]) Next() Snapshot[K, V] { s.version++ + s.rootInfo = parentInfo{ + State: lo.ToPtr(*s.rootInfo.State), + Item: lo.ToPtr(*s.rootInfo.Item), + } return s } // Get gets the value of the key. func (s *Snapshot[K, V]) Get(key K) (value V, exists bool) { h := hashKey(key, 0) - nType := s.rootNodeType - n := s.node(s.rootNode) - for { - header := photon.NewFromBytes[NodeHeader](n) + pInfo := s.rootInfo - if header.V.HashMod > 0 { - h = hashKey(key, header.V.HashMod) - } + for { + switch *pInfo.State { + case stateFree: + return s.defaultValue, false + case stateData: + dataNode := s.dataNodeDescriptor.ToNode(s.node(*pInfo.Item)) + if dataNode.Header.HashMod > 0 { + h = hashKey(key, dataNode.Header.HashMod) + } - index := h & mask - h >>= bitsPerHop + index := h & s.dataNodeDescriptor.addressMask - switch nType { - case statePointer: - node := photon.NewFromBytes[PointerNode](n) - if node.V.States[index] == stateFree { + if dataNode.States[index] == stateFree { return s.defaultValue, false } - nType = node.V.States[index] - n = s.node(node.V.Pointers[index]) - default: - node := photon.NewFromBytes[DataNode[K, V]](n) - if node.V.States[index] == stateFree { - return s.defaultValue, false - } - item := node.V.Items[index] + item := dataNode.Items[index] if item.Hash == h && item.Key == key { return item.Value, true } return s.defaultValue, false + default: + pointerNode := s.pointerNodeDescriptor.ToNode(s.node(*pInfo.Item)) + if pointerNode.Header.HashMod > 0 { + h = hashKey(key, pointerNode.Header.HashMod) + } + + index := h & s.pointerNodeDescriptor.addressMask + h >>= s.pointerNodeDescriptor.bitsPerHop + + pInfo = parentInfo{ + State: &pointerNode.States[index], + Item: &pointerNode.Items[index], + } } } } // Set sets the value for the key. func (s *Snapshot[K, V]) Set(key K, value V) { - h := hashKey(key, 0) - nType := s.rootNodeType - n := s.node(s.rootNode) + s.set(s.rootInfo, DataItem[K, V]{ + Hash: hashKey(key, 0), + Key: key, + Value: value, + }) +} - var parentNode photon.Union[*PointerNode] - var parentIndex uint64 +type parentInfo struct { + State *State + Item *uint64 +} +func (s *Snapshot[K, V]) set(pInfo parentInfo, item DataItem[K, V]) { for { - header := photon.NewFromBytes[NodeHeader](n) - if header.V.Version < s.version { - newNodeIndex, newNodeData := s.allocateNode() - copy(newNodeData, n) - header = photon.NewFromBytes[NodeHeader](newNodeData) - header.V.Version = s.version - n = newNodeData - - switch { - case parentNode.V == nil: - s.rootNode = newNodeIndex - default: - parentNode.V.Pointers[parentIndex] = newNodeIndex + switch *pInfo.State { + case stateFree: + dataNodeIndex, dataNodeData := s.allocateNode() + dataNode := s.dataNodeDescriptor.ToNode(dataNodeData) + dataNode.Header.Version = s.version + + *pInfo.State = stateData + *pInfo.Item = dataNodeIndex + + index := item.Hash & s.dataNodeDescriptor.addressMask + dataNode.States[index] = stateData + dataNode.Items[index] = item + + return + case stateData: + node := s.node(*pInfo.Item) + dataNode := s.dataNodeDescriptor.ToNode(node) + if dataNode.Header.Version < s.version { + newNodeIndex, newNodeData := s.allocateNode() + copy(newNodeData, node) + dataNode = s.dataNodeDescriptor.ToNode(newNodeData) + dataNode.Header.Version = s.version + *pInfo.Item = newNodeIndex + } + if dataNode.Header.HashMod > 0 { + item.Hash = hashKey(item.Key, dataNode.Header.HashMod) } - } - - if header.V.HashMod > 0 { - h = hashKey(key, header.V.HashMod) - } - - index := h & mask - h >>= bitsPerHop - switch nType { - case statePointer: - node := photon.NewFromBytes[PointerNode](n) - if node.V.States[index] == stateFree { - node.V.States[index] = stateData - nodeIndex, nodeData := s.allocateNode() - node.V.Pointers[index] = nodeIndex + index := item.Hash & s.dataNodeDescriptor.addressMask + if dataNode.States[index] == stateFree { + dataNode.States[index] = stateData + dataNode.Items[index] = item - dataNode := photon.NewFromBytes[DataNode[K, V]](nodeData) - dataNode.V.Header.Version = s.version - } - parentIndex = index - parentNode = node - nType = node.V.States[index] - n = s.node(node.V.Pointers[index]) - default: - node := photon.NewFromBytes[DataNode[K, V]](n) - if node.V.States[index] == stateFree { - node.V.States[index] = stateData - node.V.Items[index] = DataItem[K, V]{ - Hash: h, - Key: key, - Value: value, - } return } - item := node.V.Items[index] - var conflict bool - if item.Hash == h { - if item.Key == key { - node.V.Items[index].Value = value + if item.Hash == dataNode.Items[index].Hash { + if item.Key == dataNode.Items[index].Key { + dataNode.Items[index] = item + return } // hash conflict - conflict = true + s.hashMod++ + dataNode.Header.HashMod = s.hashMod } - // conflict or split needed + s.redistributeNode(pInfo) + s.set(pInfo, item) - pointerNodeIndex, pointerNodeData := s.allocateNode() - pointerNode := photon.NewFromBytes[PointerNode](pointerNodeData) - pointerNode.V.Header = NodeHeader{ - Version: s.version, - HashMod: node.V.Header.HashMod, + return + default: + node := s.node(*pInfo.Item) + pointerNode := s.pointerNodeDescriptor.ToNode(node) + if pointerNode.Header.Version < s.version { + newNodeIndex, newNodeData := s.allocateNode() + copy(newNodeData, node) + pointerNode = s.pointerNodeDescriptor.ToNode(newNodeData) + pointerNode.Header.Version = s.version + *pInfo.Item = newNodeIndex + } + if pointerNode.Header.HashMod > 0 { + item.Hash = hashKey(item.Key, pointerNode.Header.HashMod) } - for i := range uint64(arraySize) { - if node.V.States[i] == stateFree { - continue - } - - pointerNode.V.States[i] = stateData - dataNodeIndex, dataNodeData := s.allocateNode() - pointerNode.V.Pointers[i] = dataNodeIndex - dataNode := photon.NewFromBytes[DataNode[K, V]](dataNodeData) - dataNode.V.Header.Version = s.version - - item := node.V.Items[i] - var hash uint64 - if conflict && i == index { - s.hashMod++ - dataNode.V.Header.HashMod = s.hashMod - hash = hashKey(item.Key, s.hashMod) - } else { - hash = item.Hash - } + index := item.Hash & s.pointerNodeDescriptor.addressMask + item.Hash >>= s.pointerNodeDescriptor.bitsPerHop - index := hash & mask - dataNode.V.States[index] = stateData - dataNode.V.Items[index] = DataItem[K, V]{ - Hash: hash >> bitsPerHop, - Key: item.Key, - Value: item.Value, - } + pInfo = parentInfo{ + State: &pointerNode.States[index], + Item: &pointerNode.Items[index], } + } + } +} - if parentNode.V == nil { - s.rootNodeType = statePointer - s.rootNode = pointerNodeIndex - } else { - parentNode.V.States[parentIndex] = statePointer - parentNode.V.Pointers[parentIndex] = pointerNodeIndex - } +func (s *Snapshot[K, V]) redistributeNode(pInfo parentInfo) { + dataNode := s.dataNodeDescriptor.ToNode(s.node(*pInfo.Item)) - parentNode = pointerNode - parentIndex = index - n = s.node(pointerNode.V.Pointers[index]) + pointerNodeIndex, pointerNodeData := s.allocateNode() + pointerNode := s.pointerNodeDescriptor.ToNode(pointerNodeData) + *pointerNode.Header = NodeHeader{ + Version: s.version, + HashMod: dataNode.Header.HashMod, + } + + *pInfo.State = statePointer + *pInfo.Item = pointerNodeIndex + + for i := range s.dataNodeDescriptor.numOfItems { + if dataNode.States[i] == stateFree { + continue } + + s.set(pInfo, dataNode.Items[i]) } } func (s *Snapshot[K, V]) node(n uint64) []byte { - return s.data[n*pageSize : (n+1)*pageSize] + return s.data[n*s.config.NodeSize : (n+1)*s.config.NodeSize] } func (s *Snapshot[K, V]) allocateNode() (uint64, []byte) { @@ -258,6 +266,63 @@ func (s *Snapshot[K, V]) allocateNode() (uint64, []byte) { return s.allocatedNodeIndex, s.node(s.allocatedNodeIndex) } +// NewNodeDescriptor creates new descriptor converting byte slices of `nodeSize` size to node objects. +func NewNodeDescriptor[T comparable](nodeSize uint64) (NodeDescriptor[T], error) { + headerSize := uint64(unsafe.Sizeof(NodeHeader{})) + if headerSize >= nodeSize { + return NodeDescriptor[T]{}, errors.New("node size is too small") + } + + stateOffset := headerSize + headerSize%uint64Length + spaceLeft := nodeSize - stateOffset + + var t T + itemSize := uint64(unsafe.Sizeof(t)) + itemSize += itemSize % uint64Length + + numOfItems := spaceLeft / (itemSize + 1) // 1 is for slot state + if numOfItems == 0 { + return NodeDescriptor[T]{}, errors.New("node size is too small") + } + numOfItems, _ = highestPowerOfTwo(numOfItems) + spaceLeft -= numOfItems + spaceLeft -= numOfItems % uint64Length + + numOfItems = spaceLeft / itemSize + numOfItems, bitsPerHop := highestPowerOfTwo(numOfItems) + if numOfItems == 0 { + return NodeDescriptor[T]{}, errors.New("node size is too small") + } + + return NodeDescriptor[T]{ + numOfItems: numOfItems, + itemSize: itemSize, + stateOffset: stateOffset, + itemOffset: nodeSize - spaceLeft, + addressMask: numOfItems - 1, + bitsPerHop: bitsPerHop, + }, nil +} + +// NodeDescriptor describes the data structure of node. +type NodeDescriptor[T comparable] struct { + numOfItems uint64 + itemSize uint64 + stateOffset uint64 + itemOffset uint64 + addressMask uint64 + bitsPerHop uint64 +} + +// ToNode converts byte representation of the node to object. +func (nd NodeDescriptor[T]) ToNode(data []byte) Node[T] { + return Node[T]{ + Header: (*NodeHeader)(unsafe.Pointer(&data[0])), + States: unsafe.Slice((*State)(unsafe.Pointer(&data[nd.stateOffset])), nd.numOfItems), + Items: unsafe.Slice((*T)(unsafe.Pointer(&data[nd.itemOffset])), nd.numOfItems), + } +} + func hashKey[K comparable](key K, hashMod uint64) uint64 { var hash uint64 p := photon.NewFromValue[K](&key) @@ -281,3 +346,13 @@ func hashKey[K comparable](key K, hashMod uint64) uint64 { func testHash(hash uint64) uint64 { return hash & 0x7fffffff } + +func highestPowerOfTwo(n uint64) (uint64, uint64) { + var m uint64 = 1 + var p uint64 + for m <= n { + m <<= 1 // Multiply m by 2 (left shift) + p++ + } + return m >> 1, p - 1 // Divide by 2 (right shift) to get the highest power of 2 <= n +} diff --git a/quantum_test.go b/quantum_test.go index dce089a..db23bc7 100644 --- a/quantum_test.go +++ b/quantum_test.go @@ -8,8 +8,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/outofforest/photon" ) var collisions = [][]int{ @@ -25,7 +23,10 @@ var collisions = [][]int{ {32134280, 33645087, 37005304, 83416269}, } -var config = Config{TotalSize: 10 * 1024 * 1024} +var config = Config{ + TotalSize: 10 * 1024 * 1024, + NodeSize: 512, +} func TestCollisions(t *testing.T) { for _, set := range collisions { @@ -38,7 +39,8 @@ func TestCollisions(t *testing.T) { } func TestSet(t *testing.T) { - s := New[int, int](config) + s, err := New[int, int](config) + require.NoError(t, err) for i := range 10 { s.Set(i, i) @@ -48,7 +50,8 @@ func TestSet(t *testing.T) { } func TestSetCollisions(t *testing.T) { - s := New[int, int](config) + s, err := New[int, int](config) + require.NoError(t, err) allValues := make([]int, 0, len(collisions)*len(collisions[0])) @@ -65,7 +68,8 @@ func TestSetCollisions(t *testing.T) { } func TestGetCollisions(t *testing.T) { - s := New[int, int](config) + s, err := New[int, int](config) + require.NoError(t, err) inserted := make([]int, 0, len(collisions)*len(collisions[0])) read := make([]int, 0, len(collisions)*len(collisions[0])) @@ -92,7 +96,8 @@ func TestGetCollisions(t *testing.T) { } func TestSetOnNext(t *testing.T) { - s := New[int, int](config) + s, err := New[int, int](config) + require.NoError(t, err) for i := range 10 { s.Set(i, i) @@ -108,7 +113,8 @@ func TestSetOnNext(t *testing.T) { } func TestGet(t *testing.T) { - s := New[int, int](config) + s, err := New[int, int](config) + require.NoError(t, err) for i := range 10 { s.Set(i, i) @@ -121,7 +127,8 @@ func TestGet(t *testing.T) { } func TestReplace(t *testing.T) { - s1 := New[int, int](config) + s1, err := New[int, int](config) + require.NoError(t, err) for i := range 10 { s1.Set(i, i) @@ -176,8 +183,8 @@ func TestFindCollisions(t *testing.T) { func collect(s Snapshot[int, int]) []int { values := []int{} - typeStack := []State{s.rootNodeType} - nodeStack := []uint64{s.rootNode} + typeStack := []State{*s.rootInfo.State} + nodeStack := []uint64{*s.rootInfo.Item} for { if len(nodeStack) == 0 { @@ -192,18 +199,18 @@ func collect(s Snapshot[int, int]) []int { switch t { case stateData: - node := photon.NewFromBytes[DataNode[int, int]](s.node(n)) - for i := range arraySize { - if node.V.States[i] == stateData { - values = append(values, node.V.Items[i].Value) + node := s.dataNodeDescriptor.ToNode(s.node(n)) + for i := range len(node.Items) { + if node.States[i] == stateData { + values = append(values, node.Items[i].Value) } } default: - node := photon.NewFromBytes[PointerNode](s.node(n)) - for i := range arraySize { - if node.V.States[i] != stateFree { - typeStack = append(typeStack, node.V.States[i]) - nodeStack = append(nodeStack, node.V.Pointers[i]) + node := s.pointerNodeDescriptor.ToNode(s.node(n)) + for i := range len(node.Items) { + if node.States[i] != stateFree { + typeStack = append(typeStack, node.States[i]) + nodeStack = append(nodeStack, node.Items[i]) } } }