Skip to content

Commit

Permalink
Update concurrent test to support multiple operations in each transac…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jun 5, 2024
1 parent b342624 commit afa2ac7
Showing 1 changed file with 95 additions and 85 deletions.
180 changes: 95 additions & 85 deletions concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ func (w *worker) name() string {

func (w *worker) run() (historyRecords, error) {
var rs historyRecords

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-w.stopCh:
Expand All @@ -355,19 +359,36 @@ func (w *worker) run() (historyRecords, error) {
default:
}

op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, err := executeOperation(op, w.db, bucket, key, w.conf)
if err != nil {
readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err)
w.t.Error(readErr)
w.errCh <- readErr
return rs, readErr
}
err := w.db.Update(func(tx *bolt.Tx) error {
inner:
for {
op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, eerr := executeOperation(op, tx, bucket, key, w.conf)
if eerr != nil {
opErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err)

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-windows (windows-amd64-unit-test-4-cpu)

undefined: err

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-linux-amd64 / test-linux (linux-unit-test-1-cpu)

undefined: err

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-linux-arm64 / test-linux (linux-unit-test-1-cpu)

undefined: err

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-linux-amd64 / test-linux (linux-unit-test-2-cpu)

undefined: err

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-linux-arm64-race / test-linux (linux-unit-test-4-cpu-race)

undefined: err

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-linux-arm64 / test-linux (linux-unit-test-2-cpu)

undefined: err

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-linux-amd64-race / test-linux (linux-unit-test-4-cpu-race)

undefined: err

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-linux-arm64 / test-linux (linux-unit-test-4-cpu)

undefined: err

Check failure on line 369 in concurrent_test.go

View workflow job for this annotation

GitHub Actions / test-linux-amd64 / test-linux (linux-unit-test-4-cpu)

undefined: err
w.t.Error(opErr)
w.errCh <- opErr
return opErr
}

rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
}

rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
select {
case <-ticker.C:
break inner
case <-w.stopCh:
break inner
default:
}
}
return nil
})
if err != nil {
return rs, err
}
}
}
Expand Down Expand Up @@ -401,111 +422,100 @@ func (w *worker) pickOperation() OperationType {
panic("unexpected")
}

func executeOperation(op OperationType, db *bolt.DB, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
func executeOperation(op OperationType, tx *bolt.Tx, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
switch op {
case Read:
return executeRead(db, bucket, key, conf.readInterval)
return executeRead(tx, bucket, key, conf.readInterval)
case Write:
return executeWrite(db, bucket, key, conf.writeBytes, conf.noopWriteRatio)
return executeWrite(tx, bucket, key, conf.writeBytes, conf.noopWriteRatio)
case Delete:
return executeDelete(db, bucket, key)
return executeDelete(tx, bucket, key)
default:
panic(fmt.Sprintf("unexpected operation type: %s", op))
}
}

func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
func executeRead(tx *bolt.Tx, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
var rec historyRecord
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)

initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
b := tx.Bucket(bucket)

if !bytes.Equal(initialVal, val) {
return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}
initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)

clonedVal := make([]byte, len(val))
copy(clonedVal, val)
if !bytes.Equal(initialVal, val) {
return rec, fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}

rec = historyRecord{
OperationType: Read,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Txid: tx.ID(),
}
clonedVal := make([]byte, len(val))
copy(clonedVal, val)

return nil
})
rec = historyRecord{
OperationType: Read,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Txid: tx.ID(),
}

return rec, err
return rec, nil
}

func executeWrite(db *bolt.DB, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
func executeWrite(tx *bolt.Tx, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
var rec historyRecord

err := db.Update(func(tx *bolt.Tx) error {
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return nil
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return rec, nil
}

b := tx.Bucket(bucket)
b := tx.Bucket(bucket)

valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return cErr
}
valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return rec, cErr
}

putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
}

return putErr
})

return rec, err
return rec, putErr
}

func executeDelete(db *bolt.DB, bucket []byte, key []byte) (historyRecord, error) {
func executeDelete(tx *bolt.Tx, bucket []byte, key []byte) (historyRecord, error) {
var rec historyRecord

err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
b := tx.Bucket(bucket)

deleteErr := b.Delete(key)
if deleteErr == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}
err := b.Delete(key)
if err == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}

return deleteErr
})
}

return rec, err
}
Expand Down

0 comments on commit afa2ac7

Please sign in to comment.