Skip to content

Commit

Permalink
Introduce cmap for distributing mutexes per documents
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Oct 26, 2024
1 parent fac4779 commit 0e820e7
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 82 deletions.
122 changes: 122 additions & 0 deletions pkg/cmap/cmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmap

Check failure on line 17 in pkg/cmap/cmap.go

View workflow job for this annotation

GitHub Actions / build

package-comments: should have a package comment (revive)

import (
"sync"
)

// Map is a mutex-protected map.
type Map[K comparable, V any] struct {
sync.RWMutex
items map[K]V
}

// New creates a new Map.
func New[K comparable, V any]() *Map[K, V] {
return &Map[K, V]{
items: make(map[K]V),
}
}

// Set sets a key-value pair.
func (m *Map[K, V]) Set(key K, value V) {
m.Lock()
defer m.Unlock()

m.items[key] = value
}

// UpsertFunc is a function to insert or update a key-value pair.
type UpsertFunc[K comparable, V any] func(exists bool, valueInMap V) V

// Upsert inserts or updates a key-value pair.
func (m *Map[K, V]) Upsert(key K, upsertFunc UpsertFunc[K, V]) V {
m.Lock()
defer m.Unlock()

v, exists := m.items[key]
res := upsertFunc(exists, v)
m.items[key] = res
return res
}

// Get retrieves a value from the map.
func (m *Map[K, V]) Get(key K) (V, bool) {
m.RLock()
defer m.RUnlock()

value, exists := m.items[key]
return value, exists
}

// DeleteFunc is a function to delete a value from the map.
type DeleteFunc[K comparable, V any] func(value V, exsits bool) bool

// Delete removes a value from the map.
func (m *Map[K, V]) Delete(key K, deleteFunc DeleteFunc[K, V]) bool {
m.Lock()
defer m.Unlock()

value, exists := m.items[key]
del := deleteFunc(value, exists)
if del && exists {
delete(m.items, key)
}
return del
}

// Has checks if a key exists in the map
func (m *Map[K, V]) Has(key K) bool {
m.RLock()
defer m.RUnlock()

_, exists := m.items[key]
return exists
}

// Len returns the number of items in the map
func (m *Map[K, V]) Len() int {
m.RLock()
defer m.RUnlock()

return len(m.items)
}

// Keys returns a slice of all keys in the map
func (m *Map[K, V]) Keys() []K {
m.RLock()
defer m.RUnlock()

keys := make([]K, 0, len(m.items))
for k := range m.items {
keys = append(keys, k)
}
return keys
}

// Values returns a slice of all values in the map
func (m *Map[K, V]) Values() []V {
m.RLock()
defer m.RUnlock()

values := make([]V, 0, len(m.items))
for _, v := range m.items {
values = append(values, v)
}
return values
}
202 changes: 202 additions & 0 deletions pkg/cmap/cmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmap_test

import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/yorkie-team/yorkie/pkg/cmap"
"math/rand"
"sync"
"testing"
"time"
)

func TestMap(t *testing.T) {
t.Run("set and get", func(t *testing.T) {
m := cmap.New[string, int]()

m.Set("a", 1)
v, exists := m.Get("a")
assert.True(t, exists)
assert.Equal(t, 1, v)

v, exists = m.Get("b")
assert.False(t, exists)
assert.Equal(t, 0, v)
})

t.Run("upsert", func(t *testing.T) {
m := cmap.New[string, int]()

v := m.Upsert("a", func(exists bool, valueInMap int) int {
if exists {
return valueInMap + 1
}
return 1
})
assert.Equal(t, 1, v)

v = m.Upsert("a", func(exists bool, valueInMap int) int {
if exists {
return valueInMap + 1
}
return 1
})
assert.Equal(t, 2, v)
})

t.Run("delete", func(t *testing.T) {
m := cmap.New[string, int]()

m.Set("a", 1)
exists := m.Delete("a", func(value int, exists bool) bool {
assert.Equal(t, 1, value)
return exists
})
assert.True(t, exists)

_, exists = m.Get("a")
assert.False(t, exists)
})
}

func TestConcurrentMap(t *testing.T) {
t.Run("concurrent access", func(t *testing.T) {
m := cmap.New[int, int]()
const numRoutines = 100
const numOperations = 10000

var wg sync.WaitGroup
wg.Add(numRoutines)

for i := 0; i < numRoutines; i++ {
go func(routineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := rand.Intn(1000)

Check failure on line 91 in pkg/cmap/cmap_test.go

View workflow job for this annotation

GitHub Actions / build

G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec)
value := routineID*numOperations + j

switch rand.Intn(3) {

Check failure on line 94 in pkg/cmap/cmap_test.go

View workflow job for this annotation

GitHub Actions / build

G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec)
case 0: // Set
m.Set(key, value)
case 1: // Get
_, _ = m.Get(key)
case 2: // Delete
m.Delete(key, func(value int, exists bool) bool {
return exists
})
}
}
}(i)
}

wg.Wait()

// Verify the final state
if m.Len() > 1000 {
t.Errorf("Map length (%d) is greater than maximum possible unique keys (1000)", m.Len())
}
})

t.Run("concurrent set and get", func(t *testing.T) {
m := cmap.New[string, int]()
const numRoutines = 50
const numOperations = 100

var wg sync.WaitGroup
wg.Add(numRoutines * 2)

// Start setter routines
for i := 0; i < numRoutines; i++ {
go func(routineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key-%d-%d", routineID, j)
m.Set(key, j)
}
}(i)
}

// Start getter routines
for i := 0; i < numRoutines; i++ {
go func(routineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key-%d-%d", routineID, j)
for {
if value, ok := m.Get(key); ok && value == j {
break
}
time.Sleep(time.Microsecond) // Small delay before retry
}
}
}(i)
}

wg.Wait()

expectedLen := numRoutines * numOperations
if m.Len() != expectedLen {
t.Errorf("Expected map length %d, but got %d", expectedLen, m.Len())
}
})

t.Run("concurrent iteration", func(t *testing.T) {
m := cmap.New[int, int]()
const numItems = 10000

// Populate the map
for i := 0; i < numItems; i++ {
m.Set(i, i)
}

var wg sync.WaitGroup
wg.Add(3) // For Keys, Values, and modifier goroutines

// Start a goroutine to continuously modify the map.
go func() {
defer wg.Done()
for i := 0; i < numItems; i++ {
m.Set(rand.Intn(numItems), rand.Int())

Check failure on line 175 in pkg/cmap/cmap_test.go

View workflow job for this annotation

GitHub Actions / build

G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec)
m.Delete(rand.Intn(numItems), func(value int, exists bool) bool {
return exists
})
}
}()

// Start a goroutine to iterate over keys.
go func() {
defer wg.Done()
keys := m.Keys()
if len(keys) > numItems {
t.Errorf("Number of keys (%d) is greater than inserted items (%d)", len(keys), numItems)
}
}()

// Start a goroutine to iterate over values.
go func() {
defer wg.Done()
values := m.Values()
if len(values) > numItems {
t.Errorf("Number of values (%d) is greater than inserted items (%d)", len(values), numItems)
}
}()

wg.Wait()
})
}
Loading

0 comments on commit 0e820e7

Please sign in to comment.