Skip to content

Commit

Permalink
sstable: fix error path in writer.Close
Browse files Browse the repository at this point in the history
`Close()` records any error encountered and exits early if called
again. One of the return paths was not setting `w.err`, possibly
causing the panic in github.com/cockroachdb/cockroach/issues/97350.

This change fixes this and improves the structure of the function -
instead of each exit path having to set `w.err`, we set it in the
deferred function.
  • Loading branch information
RaduBerinde committed Mar 6, 2023
1 parent fdf055d commit e9a8c4a
Showing 1 changed file with 23 additions and 39 deletions.
62 changes: 23 additions & 39 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1751,13 +1751,18 @@ func (w *Writer) Close() (err error) {
w.writable.Abort()
w.writable = nil
}
// Record any error in the writer (so we can exit early if Close is called
// again).
if err != nil {
w.err = err
}
}()

// finish must be called before we check for an error, because finish will
// block until every single task added to the writeQueue has been processed,
// and an error could be encountered while any of those tasks are processed.
if err = w.coordination.writeQueue.finish(); err != nil {
w.err = err
if err := w.coordination.writeQueue.finish(); err != nil {
return err
}

if w.err != nil {
Expand All @@ -1783,17 +1788,14 @@ func (w *Writer) Close() (err error) {
if w.dataBlockBuf.dataBlock.nEntries > 0 || w.indexBlock.block.nEntries == 0 {
bh, err := w.writeBlock(w.dataBlockBuf.dataBlock.finish(), w.compression, &w.dataBlockBuf.blockBuf)
if err != nil {
w.err = err
return w.err
return err
}
var bhp BlockHandleWithProperties
if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil {
w.err = err
bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
if err != nil {
return err
}
prevKey := base.DecodeInternalKey(w.dataBlockBuf.dataBlock.curKey)
if err = w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
w.err = err
if err := w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
return err
}
}
Expand All @@ -1805,13 +1807,11 @@ func (w *Writer) Close() (err error) {
if w.filter != nil {
b, err := w.filter.finish()
if err != nil {
w.err = err
return w.err
return err
}
bh, err := w.writeBlock(b, NoCompression, &w.blockBuf)
if err != nil {
w.err = err
return w.err
return err
}
n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
metaindex.add(InternalKey{UserKey: []byte(w.filter.metaName())}, w.blockBuf.tmp[:n])
Expand All @@ -1825,8 +1825,7 @@ func (w *Writer) Close() (err error) {
// Write the two level index block.
indexBH, err = w.writeTwoLevelIndex()
if err != nil {
w.err = err
return w.err
return err
}
} else {
w.props.IndexType = binarySearchIndex
Expand All @@ -1839,8 +1838,7 @@ func (w *Writer) Close() (err error) {
// Write the single level index block.
indexBH, err = w.writeBlock(w.indexBlock.finish(), w.compression, &w.blockBuf)
if err != nil {
w.err = err
return w.err
return err
}
}

Expand All @@ -1865,8 +1863,7 @@ func (w *Writer) Close() (err error) {
}
rangeDelBH, err = w.writeBlock(w.rangeDelBlock.finish(), NoCompression, &w.blockBuf)
if err != nil {
w.err = err
return w.err
return err
}
}

Expand All @@ -1880,8 +1877,7 @@ func (w *Writer) Close() (err error) {
kind := key.Kind()
endKey, _, ok := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.curValue)
if !ok {
w.err = errors.Newf("invalid end key: %s", w.rangeKeyBlock.curValue)
return w.err
return errors.Newf("invalid end key: %s", w.rangeKeyBlock.curValue)
}
k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
w.meta.SetLargestRangeKey(k)
Expand All @@ -1890,8 +1886,7 @@ func (w *Writer) Close() (err error) {
// enable compression on this block.
rangeKeyBH, err = w.writeBlock(w.rangeKeyBlock.finish(), NoCompression, &w.blockBuf)
if err != nil {
w.err = err
return w.err
return err
}
}

Expand Down Expand Up @@ -1922,18 +1917,15 @@ func (w *Writer) Close() (err error) {
userProps := make(map[string]string)
for i := range w.propCollectors {
if err := w.propCollectors[i].Finish(userProps); err != nil {
w.err = err
return err
}
}
for i := range w.blockPropCollectors {
scratch := w.blockPropsEncoder.getScratchForProp()
// Place the shortID in the first byte.
scratch = append(scratch, byte(i))
buf, err :=
w.blockPropCollectors[i].FinishTable(scratch)
buf, err := w.blockPropCollectors[i].FinishTable(scratch)
if err != nil {
w.err = err
return err
}
var prop string
Expand All @@ -1959,8 +1951,7 @@ func (w *Writer) Close() (err error) {
w.props.save(&raw)
bh, err := w.writeBlock(raw.finish(), NoCompression, &w.blockBuf)
if err != nil {
w.err = err
return w.err
return err
}
n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
metaindex.add(InternalKey{UserKey: []byte(metaPropertiesName)}, w.blockBuf.tmp[:n])
Expand All @@ -1986,8 +1977,7 @@ func (w *Writer) Close() (err error) {
// expect the meta-index block to not be compressed.
metaindexBH, err := w.writeBlock(metaindex.blockWriter.finish(), NoCompression, &w.blockBuf)
if err != nil {
w.err = err
return w.err
return err
}

// Write the table footer.
Expand All @@ -1999,22 +1989,19 @@ func (w *Writer) Close() (err error) {
}
encoded := footer.encode(w.blockBuf.tmp[:])
if err := w.writable.Write(footer.encode(w.blockBuf.tmp[:])); err != nil {
w.err = err
return w.err
return err
}
w.meta.Size += uint64(len(encoded))
w.meta.Properties = w.props

// Check that the features present in the table are compatible with the format
// configured for the table.
if err = w.assertFormatCompatibility(); err != nil {
w.err = err
return w.err
return err
}

if err := w.writable.Finish(); err != nil {
w.writable = nil
w.err = err
return err
}
w.writable = nil
Expand All @@ -2027,9 +2014,6 @@ func (w *Writer) Close() (err error) {
w.indexBlock = nil

// Make any future calls to Set or Close return an error.
if w.err != nil {
return w.err
}
w.err = errWriterClosed
return nil
}
Expand Down

0 comments on commit e9a8c4a

Please sign in to comment.