diff --git a/batch.go b/batch.go index e29c6d917e..a54d1d388f 100644 --- a/batch.go +++ b/batch.go @@ -267,11 +267,6 @@ type batchInternal struct { // memtable. flushable *flushableBatch - // ingestedSSTBatch indicates that the batch contains one or more key kinds - // of InternalKeyKindIngestSST. If the batch contains key kinds of IngestSST - // then it will only contain key kinds of IngestSST. - ingestedSSTBatch bool - // minimumFormatMajorVersion indicates the format major version required in // order to commit this batch. If an operation requires a particular format // major version, it ratchets the batch's minimumFormatMajorVersion. When @@ -299,6 +294,22 @@ type batchInternal struct { commitStats BatchCommitStats commitErr error + + // Position bools together to reduce the sizeof the struct. + + // ingestedSSTBatch indicates that the batch contains one or more key kinds + // of InternalKeyKindIngestSST. If the batch contains key kinds of IngestSST + // then it will only contain key kinds of IngestSST. + ingestedSSTBatch bool + + // committing is set to true when a batch begins to commit. It's used to + // ensure the batch is not mutated concurrently. It is not an atomic + // deliberately, so as to avoid the overhead on batch mutations. This is + // okay, because under correct usage this field will never be accessed + // concurrently. It's only under incorrect usage the memory accesses of this + // variable may violate memory safety. Since we don't use atomics here, + // false negatives are possible. + committing bool } // BatchCommitStats exposes stats related to committing a batch. @@ -560,6 +571,9 @@ func (b *Batch) Get(key []byte) ([]byte, io.Closer, error) { } func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind InternalKeyKind) { + if b.committing { + panic("pebble: batch already committing") + } if len(b.data) == 0 { b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchHeaderLen) } @@ -609,6 +623,9 @@ func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind Interna } func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) { + if b.committing { + panic("pebble: batch already committing") + } if len(b.data) == 0 { b.init(keyLen + binary.MaxVarintLen64 + batchHeaderLen) } diff --git a/db.go b/db.go index c44f565b44..5a7e28b76d 100644 --- a/db.go +++ b/db.go @@ -808,6 +808,9 @@ func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) er if err := d.closed.Load(); err != nil { panic(err) } + if batch.committing { + panic("pebble: batch already committing") + } if batch.applied.Load() { panic("pebble: batch already applied") } @@ -838,6 +841,7 @@ func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) er } // TODO(jackson): Assert that all range key operands are suffixless. } + batch.committing = true if batch.db == nil { batch.refreshMemTableSize()