Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update concurrent test to support multiple operations in each transaction #764

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 105 additions & 101 deletions concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,27 +347,45 @@ func (w *worker) name() string {

func (w *worker) run() (historyRecords, error) {
var rs historyRecords

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-w.stopCh:
w.t.Logf("%q finished.", w.name())
return rs, nil
default:
}

op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, err := executeOperation(op, w.db, bucket, key, w.conf)
if err != nil {
readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err)
w.t.Error(readErr)
w.errCh <- readErr
return rs, readErr
}
err := w.db.Update(func(tx *bolt.Tx) error {
for {
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, eerr := executeOperation(op, tx, bucket, key, w.conf)
if eerr != nil {
opErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, eerr)
w.t.Error(opErr)
w.errCh <- opErr
return opErr
}

rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
}

rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
select {
case <-ticker.C:
return nil
case <-w.stopCh:
return nil
default:
}
}
})
if err != nil {
return rs, err
}
}
}
Expand Down Expand Up @@ -401,111 +419,100 @@ func (w *worker) pickOperation() OperationType {
panic("unexpected")
}

func executeOperation(op OperationType, db *bolt.DB, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
func executeOperation(op OperationType, tx *bolt.Tx, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
switch op {
case Read:
return executeRead(db, bucket, key, conf.readInterval)
return executeRead(tx, bucket, key, conf.readInterval)
case Write:
return executeWrite(db, bucket, key, conf.writeBytes, conf.noopWriteRatio)
return executeWrite(tx, bucket, key, conf.writeBytes, conf.noopWriteRatio)
case Delete:
return executeDelete(db, bucket, key)
return executeDelete(tx, bucket, key)
default:
panic(fmt.Sprintf("unexpected operation type: %s", op))
}
}

func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
func executeRead(tx *bolt.Tx, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
var rec historyRecord
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)

initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
b := tx.Bucket(bucket)

if !bytes.Equal(initialVal, val) {
return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}
initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)

clonedVal := make([]byte, len(val))
copy(clonedVal, val)
if !bytes.Equal(initialVal, val) {
return rec, fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}

rec = historyRecord{
OperationType: Read,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Txid: tx.ID(),
}
clonedVal := make([]byte, len(val))
copy(clonedVal, val)

return nil
})
rec = historyRecord{
OperationType: Read,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Txid: tx.ID(),
}

return rec, err
return rec, nil
}

func executeWrite(db *bolt.DB, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
func executeWrite(tx *bolt.Tx, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
var rec historyRecord

err := db.Update(func(tx *bolt.Tx) error {
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return nil
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return rec, nil
}

b := tx.Bucket(bucket)
b := tx.Bucket(bucket)

valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return cErr
}
valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return rec, cErr
}

putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
}

return putErr
})

return rec, err
return rec, putErr
}

func executeDelete(db *bolt.DB, bucket []byte, key []byte) (historyRecord, error) {
func executeDelete(tx *bolt.Tx, bucket []byte, key []byte) (historyRecord, error) {
var rec historyRecord

err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
b := tx.Bucket(bucket)

deleteErr := b.Delete(key)
if deleteErr == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}
err := b.Delete(key)
if err == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}

return deleteErr
})
}

return rec, err
}
Expand Down Expand Up @@ -674,17 +681,7 @@ func (rs historyRecords) Less(i, j int) bool {
}

// Sorted by txid
if rs[i].Txid != rs[j].Txid {
return rs[i].Txid < rs[j].Txid
}

// Sorted by operation type: put `Read` after other operation types
// if they operate on the same (bucket, key) and have the same txid.
if rs[i].OperationType == Read {
return false
}

return true
return rs[i].Txid < rs[j].Txid
}

func (rs historyRecords) Swap(i, j int) {
Expand All @@ -695,7 +692,7 @@ func validateIncrementalTxid(rs historyRecords) error {
lastTxid := rs[0].Txid

for i := 1; i < len(rs); i++ {
if (rs[i].OperationType == Read && rs[i].Txid < lastTxid) || (rs[i].OperationType != Read && rs[i].Txid <= lastTxid) {
if rs[i].Txid < lastTxid {
return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType)
}
lastTxid = rs[i].Txid
Expand All @@ -705,7 +702,7 @@ func validateIncrementalTxid(rs historyRecords) error {
}

func validateSequential(rs historyRecords) error {
sort.Sort(rs)
sort.Stable(rs)

type bucketAndKey struct {
bucket string
Expand Down Expand Up @@ -886,7 +883,11 @@ func TestConcurrentRepeatableRead(t *testing.T) {

t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween)
for j := 0; j < writeOperationCountInBetween; j++ {
_, err := executeWrite(db, bucket, key, writeBytes, 0)
err := db.Update(func(tx *bolt.Tx) error {
_, eerr := executeWrite(tx, bucket, key, writeBytes, 0)
return eerr
})

require.NoError(t, err)
}
}
Expand All @@ -902,7 +903,10 @@ func TestConcurrentRepeatableRead(t *testing.T) {
return
default:
}
_, err := executeWrite(db, bucket, key, writeBytes, 0)
err := db.Update(func(tx *bolt.Tx) error {
_, eerr := executeWrite(tx, bucket, key, writeBytes, 0)
return eerr
})
require.NoError(t, err)
}
}()
Expand Down