Skip to content

Commit

Permalink
Update concurrent test to support multiple operations in each transac…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jun 5, 2024
1 parent b342624 commit 6ab2fe1
Showing 1 changed file with 104 additions and 88 deletions.
192 changes: 104 additions & 88 deletions concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,27 +347,47 @@ func (w *worker) name() string {

func (w *worker) run() (historyRecords, error) {
var rs historyRecords

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-w.stopCh:
w.t.Logf("%q finished.", w.name())
return rs, nil
default:
}

op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, err := executeOperation(op, w.db, bucket, key, w.conf)
if err != nil {
readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err)
w.t.Error(readErr)
w.errCh <- readErr
return rs, readErr
}
err := w.db.Update(func(tx *bolt.Tx) error {
inner:
for {
op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, eerr := executeOperation(op, tx, bucket, key, w.conf)
if eerr != nil {
opErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, eerr)
w.t.Error(opErr)
w.errCh <- opErr
return opErr
}

rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
}

rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
select {
case <-ticker.C:
break inner
case <-w.stopCh:
break inner
default:
}
}
return nil
})
if err != nil {
return rs, err
}
}
}
Expand Down Expand Up @@ -401,111 +421,100 @@ func (w *worker) pickOperation() OperationType {
panic("unexpected")
}

func executeOperation(op OperationType, db *bolt.DB, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
func executeOperation(op OperationType, tx *bolt.Tx, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
switch op {
case Read:
return executeRead(db, bucket, key, conf.readInterval)
return executeRead(tx, bucket, key, conf.readInterval)
case Write:
return executeWrite(db, bucket, key, conf.writeBytes, conf.noopWriteRatio)
return executeWrite(tx, bucket, key, conf.writeBytes, conf.noopWriteRatio)
case Delete:
return executeDelete(db, bucket, key)
return executeDelete(tx, bucket, key)
default:
panic(fmt.Sprintf("unexpected operation type: %s", op))
}
}

func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
func executeRead(tx *bolt.Tx, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
var rec historyRecord
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)

initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
b := tx.Bucket(bucket)

if !bytes.Equal(initialVal, val) {
return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}
initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)

clonedVal := make([]byte, len(val))
copy(clonedVal, val)
if !bytes.Equal(initialVal, val) {
return rec, fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}

rec = historyRecord{
OperationType: Read,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Txid: tx.ID(),
}
clonedVal := make([]byte, len(val))
copy(clonedVal, val)

return nil
})
rec = historyRecord{
OperationType: Read,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Txid: tx.ID(),
}

return rec, err
return rec, nil
}

func executeWrite(db *bolt.DB, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
func executeWrite(tx *bolt.Tx, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
var rec historyRecord

err := db.Update(func(tx *bolt.Tx) error {
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return nil
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return rec, nil
}

b := tx.Bucket(bucket)
b := tx.Bucket(bucket)

valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return cErr
}
valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return rec, cErr
}

putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
}

return putErr
})

return rec, err
return rec, putErr
}

func executeDelete(db *bolt.DB, bucket []byte, key []byte) (historyRecord, error) {
func executeDelete(tx *bolt.Tx, bucket []byte, key []byte) (historyRecord, error) {
var rec historyRecord

err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
b := tx.Bucket(bucket)

deleteErr := b.Delete(key)
if deleteErr == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}
err := b.Delete(key)
if err == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}

return deleteErr
})
}

return rec, err
}
Expand Down Expand Up @@ -886,7 +895,11 @@ func TestConcurrentRepeatableRead(t *testing.T) {

t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween)
for j := 0; j < writeOperationCountInBetween; j++ {
_, err := executeWrite(db, bucket, key, writeBytes, 0)
err := db.Update(func(tx *bolt.Tx) error {
_, eerr := executeWrite(tx, bucket, key, writeBytes, 0)
return eerr
})

require.NoError(t, err)
}
}
Expand All @@ -902,7 +915,10 @@ func TestConcurrentRepeatableRead(t *testing.T) {
return
default:
}
_, err := executeWrite(db, bucket, key, writeBytes, 0)
err := db.Update(func(tx *bolt.Tx) error {
_, eerr := executeWrite(tx, bucket, key, writeBytes, 0)
return eerr
})
require.NoError(t, err)
}
}()
Expand Down

0 comments on commit 6ab2fe1

Please sign in to comment.