diff --git a/internal/mvcc/backend/backend.go b/internal/mvcc/backend/backend.go index cde61cd1aa4f..74caa6b5a0b7 100644 --- a/internal/mvcc/backend/backend.go +++ b/internal/mvcc/backend/backend.go @@ -310,11 +310,7 @@ func (b *backend) defrag() error { b.mu.Lock() defer b.mu.Unlock() - // block concurrent read requests while resetting tx - b.readTx.mu.Lock() - defer b.readTx.mu.Unlock() - - b.batchTx.unsafeCommit(true) + b.batchTx.commit(true) b.batchTx.tx = nil tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions) diff --git a/internal/mvcc/backend/batch_tx.go b/internal/mvcc/backend/batch_tx.go index e7307bdca131..6fdb7ab0ddd3 100644 --- a/internal/mvcc/backend/batch_tx.go +++ b/internal/mvcc/backend/batch_tx.go @@ -98,6 +98,7 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte isMatch = func(b []byte) bool { return bytes.Equal(b, key) } limit = 1 } + for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() { vs = append(vs, cv) keys = append(keys, ck) @@ -231,6 +232,13 @@ func (t *batchTxBuffered) CommitAndStop() { } func (t *batchTxBuffered) commit(stop bool) { + // no need to reset read tx if there is no write. + // call commit to update stats. + if t.batchTx.pending == 0 && !stop { + t.batchTx.commit(stop) + return + } + // all read txs must be closed to acquire boltdb commit rwlock t.backend.readTx.mu.Lock() defer t.backend.readTx.mu.Unlock() diff --git a/internal/mvcc/kvstore_test.go b/internal/mvcc/kvstore_test.go index a526b603af75..1d5a61bcfaf2 100644 --- a/internal/mvcc/kvstore_test.go +++ b/internal/mvcc/kvstore_test.go @@ -638,6 +638,8 @@ func TestTxnBlockBackendForceCommit(t *testing.T) { s := NewStore(b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) + // Put a key into the store so that force commit can take effect. + s.Put([]byte("foo"), []byte("bar"), lease.NoLease) txn := s.Read() done := make(chan struct{})