diff --git a/concurrent_test.go b/concurrent_test.go index 3995c0466..bcafe99d1 100644 --- a/concurrent_test.go +++ b/concurrent_test.go @@ -347,27 +347,47 @@ func (w *worker) name() string { func (w *worker) run() (historyRecords, error) { var rs historyRecords + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { select { case <-w.stopCh: - w.t.Logf("%q finished.", w.name()) return rs, nil default: } - op := w.pickOperation() - bucket, key := w.pickBucket(), w.pickKey() - rec, err := executeOperation(op, w.db, bucket, key, w.conf) - if err != nil { - readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err) - w.t.Error(readErr) - w.errCh <- readErr - return rs, readErr - } + err := w.db.Update(func(tx *bolt.Tx) error { + inner: + for { + op := w.pickOperation() + bucket, key := w.pickBucket(), w.pickKey() + rec, eerr := executeOperation(op, tx, bucket, key, w.conf) + if eerr != nil { + opErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, eerr) + w.t.Error(opErr) + w.errCh <- opErr + return opErr + } + + rs = append(rs, rec) + if w.conf.workInterval != (duration{}) { + time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max)) + } - rs = append(rs, rec) - if w.conf.workInterval != (duration{}) { - time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max)) + select { + case <-ticker.C: + break inner + case <-w.stopCh: + break inner + default: + } + } + return nil + }) + if err != nil { + return rs, err } } } @@ -401,111 +421,100 @@ func (w *worker) pickOperation() OperationType { panic("unexpected") } -func executeOperation(op OperationType, db *bolt.DB, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) { +func executeOperation(op OperationType, tx *bolt.Tx, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) { switch op { case Read: - return executeRead(db, bucket, key, conf.readInterval) + return executeRead(tx, bucket, key, conf.readInterval) case Write: - return executeWrite(db, bucket, key, conf.writeBytes, conf.noopWriteRatio) + return executeWrite(tx, bucket, key, conf.writeBytes, conf.noopWriteRatio) case Delete: - return executeDelete(db, bucket, key) + return executeDelete(tx, bucket, key) default: panic(fmt.Sprintf("unexpected operation type: %s", op)) } } -func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) (historyRecord, error) { +func executeRead(tx *bolt.Tx, bucket []byte, key []byte, readInterval duration) (historyRecord, error) { var rec historyRecord - err := db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(bucket) - initialVal := b.Get(key) - time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) - val := b.Get(key) + b := tx.Bucket(bucket) - if !bytes.Equal(initialVal, val) { - return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", - string(key), formatBytes(initialVal), formatBytes(val)) - } + initialVal := b.Get(key) + time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) + val := b.Get(key) - clonedVal := make([]byte, len(val)) - copy(clonedVal, val) + if !bytes.Equal(initialVal, val) { + return rec, fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", + string(key), formatBytes(initialVal), formatBytes(val)) + } - rec = historyRecord{ - OperationType: Read, - Bucket: string(bucket), - Key: string(key), - Value: clonedVal, - Txid: tx.ID(), - } + clonedVal := make([]byte, len(val)) + copy(clonedVal, val) - return nil - }) + rec = historyRecord{ + OperationType: Read, + Bucket: string(bucket), + Key: string(key), + Value: clonedVal, + Txid: tx.ID(), + } - return rec, err + return rec, nil } -func executeWrite(db *bolt.DB, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) { +func executeWrite(tx *bolt.Tx, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) { var rec historyRecord - err := db.Update(func(tx *bolt.Tx) error { - if mrand.Intn(100) < noopWriteRatio { - // A no-op write transaction has two consequences: - // 1. The txid increases by 1; - // 2. Two meta pages point to the same root page. - rec = historyRecord{ - OperationType: Write, - Bucket: string(bucket), - Key: noopTxKey, - Value: nil, - Txid: tx.ID(), - } - return nil + if mrand.Intn(100) < noopWriteRatio { + // A no-op write transaction has two consequences: + // 1. The txid increases by 1; + // 2. Two meta pages point to the same root page. + rec = historyRecord{ + OperationType: Write, + Bucket: string(bucket), + Key: noopTxKey, + Value: nil, + Txid: tx.ID(), } + return rec, nil + } - b := tx.Bucket(bucket) + b := tx.Bucket(bucket) - valueBytes := randomIntInRange(writeBytes.min, writeBytes.max) - v := make([]byte, valueBytes) - if _, cErr := crand.Read(v); cErr != nil { - return cErr - } + valueBytes := randomIntInRange(writeBytes.min, writeBytes.max) + v := make([]byte, valueBytes) + if _, cErr := crand.Read(v); cErr != nil { + return rec, cErr + } - putErr := b.Put(key, v) - if putErr == nil { - rec = historyRecord{ - OperationType: Write, - Bucket: string(bucket), - Key: string(key), - Value: v, - Txid: tx.ID(), - } + putErr := b.Put(key, v) + if putErr == nil { + rec = historyRecord{ + OperationType: Write, + Bucket: string(bucket), + Key: string(key), + Value: v, + Txid: tx.ID(), } + } - return putErr - }) - - return rec, err + return rec, putErr } -func executeDelete(db *bolt.DB, bucket []byte, key []byte) (historyRecord, error) { +func executeDelete(tx *bolt.Tx, bucket []byte, key []byte) (historyRecord, error) { var rec historyRecord - err := db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(bucket) + b := tx.Bucket(bucket) - deleteErr := b.Delete(key) - if deleteErr == nil { - rec = historyRecord{ - OperationType: Delete, - Bucket: string(bucket), - Key: string(key), - Txid: tx.ID(), - } + err := b.Delete(key) + if err == nil { + rec = historyRecord{ + OperationType: Delete, + Bucket: string(bucket), + Key: string(key), + Txid: tx.ID(), } - - return deleteErr - }) + } return rec, err } @@ -886,7 +895,11 @@ func TestConcurrentRepeatableRead(t *testing.T) { t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween) for j := 0; j < writeOperationCountInBetween; j++ { - _, err := executeWrite(db, bucket, key, writeBytes, 0) + err := db.Update(func(tx *bolt.Tx) error { + _, eerr := executeWrite(tx, bucket, key, writeBytes, 0) + return eerr + }) + require.NoError(t, err) } } @@ -902,7 +915,10 @@ func TestConcurrentRepeatableRead(t *testing.T) { return default: } - _, err := executeWrite(db, bucket, key, writeBytes, 0) + err := db.Update(func(tx *bolt.Tx) error { + _, eerr := executeWrite(tx, bucket, key, writeBytes, 0) + return eerr + }) require.NoError(t, err) } }()