From 7f1a70dc4fb56a501489a453f72f37333e9bccd5 Mon Sep 17 00:00:00 2001 From: Arjun Nair Date: Thu, 6 Jan 2022 17:14:23 -0500 Subject: [PATCH] sstable: reorganize sstable buffers and structs This pr aims to serve as a first step towards unifying the parallel compression vs on parallel compression code paths, as suggested in this pr: https://github.com/cockroachdb/pebble/pull/1383#pullrequestreview-843757696. This pr organizes the buffers in the Writer in a way which makes it easier to reason about the ownership of buffers/slices. Future prs will add, a writeQueue, a compressionQueue, a mechanism to schedule parallel compression based on cpu utilization. --- sstable/table_test.go | 46 +++++--- sstable/writer.go | 254 ++++++++++++++++++++++++++--------------- sstable/writer_test.go | 14 +-- 3 files changed, 201 insertions(+), 113 deletions(-) diff --git a/sstable/table_test.go b/sstable/table_test.go index 0ea2dbc59f..79171bcca4 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -824,20 +824,38 @@ func (errorPropCollector) Name() string { } func TestTablePropertyCollectorErrors(t *testing.T) { - mem := vfs.NewMem() - f, err := mem.Create("foo") - require.NoError(t, err) - var opts WriterOptions - opts.TablePropertyCollectors = append(opts.TablePropertyCollectors, - func() TablePropertyCollector { - return errorPropCollector{} - }) + var testcases map[string]func(w *Writer) error = map[string]func(w *Writer) error{ + "add a#0,1 failed": func(w *Writer) error { + return w.Set([]byte("a"), []byte("b")) + }, + "add c#0,0 failed": func(w *Writer) error { + return w.Delete([]byte("c")) + }, + "add d#0,15 failed": func(w *Writer) error { + return w.DeleteRange([]byte("d"), []byte("e")) + }, + "add f#0,2 failed": func(w *Writer) error { + return w.Merge([]byte("f"), []byte("g")) + }, + "finish failed": func(w *Writer) error { + return w.Close() + }, + } + + for e, fun := range testcases { + mem := vfs.NewMem() + f, err := mem.Create("foo") + require.NoError(t, err) - w := NewWriter(f, opts) - require.Regexp(t, `add a#0,1 failed`, w.Set([]byte("a"), []byte("b"))) - require.Regexp(t, `add c#0,0 failed`, w.Delete([]byte("c"))) - require.Regexp(t, `add d#0,15 failed`, w.DeleteRange([]byte("d"), []byte("e"))) - require.Regexp(t, `add f#0,2 failed`, w.Merge([]byte("f"), []byte("g"))) - require.Regexp(t, `finish failed`, w.Close()) + var opts WriterOptions + opts.TablePropertyCollectors = append(opts.TablePropertyCollectors, + func() TablePropertyCollector { + return errorPropCollector{} + }) + + w := NewWriter(f, opts) + + require.Regexp(t, e, fun(w)) + } } diff --git a/sstable/writer.go b/sstable/writer.go index 0c1627cec9..47503fdc3f 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -108,7 +108,6 @@ type Writer struct { separator Separator successor Successor tableFormat TableFormat - checksumType ChecksumType cache *cache.Cache // disableKeyOrderChecks disables the checks that keys are added to an // sstable in order. It is intended for internal use only in the construction @@ -134,32 +133,19 @@ type Writer struct { // for testing. Note that v2 format blocks are backwards compatible with v1 // format blocks. rangeDelV1Format bool - block blockWriter indexBlock blockWriter rangeDelBlock blockWriter rangeKeyBlock blockWriter + topLevelIndexBlock blockWriter props Properties propCollectors []TablePropertyCollector blockPropCollectors []BlockPropertyCollector blockPropsEncoder blockPropertiesEncoder - // compressedBuf is the destination buffer for compression. It is - // re-used over the lifetime of the writer, avoiding the allocation of a - // temporary buffer for each block. - compressedBuf []byte // filter accumulates the filter block. If populated, the filter ingests // either the output of w.split (i.e. a prefix extractor) if w.split is not // nil, or the full keys otherwise. - filter filterWriter - // tmp is a scratch buffer, large enough to hold either footerLen bytes, - // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most - // likely large enough for a block handle with properties. - tmp [blockHandleLikelyMaxLen]byte - - xxHasher *xxhash.Digest - - topLevelIndexBlock blockWriter - indexPartitions []indexBlockWriterAndBlockProperties - + filter filterWriter + indexPartitions []indexBlockWriterAndBlockProperties // To allow potentially overlapping (i.e. un-fragmented) range keys spans to // be added to the Writer, a keyspan.Fragmenter and rangekey.Coalescer are // used to retain the keys and values, emitting fragmented, coalesced spans as @@ -167,6 +153,42 @@ type Writer struct { fragmenter keyspan.Fragmenter coalescer rangekey.Coalescer rkBuf []byte + // dataBlockBuf consists of the state which is currently owned by and used by + // the Writer client goroutine. This state can be handed off to other goroutines. + dataBlockBuf *dataBlockBuf + // blockBuf consists of the state which is owned by and used by the Writer client + // goroutine. + blockBuf blockBuf +} + +type checksummer struct { + checksumType ChecksumType + xxHasher *xxhash.Digest +} + +type blockBuf struct { + // tmp is a scratch buffer, large enough to hold either footerLen bytes, + // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most + // likely large enough for a block handle with properties. + tmp [blockHandleLikelyMaxLen]byte + // compressedBuf is the destination buffer for compression. It is re-used over the + // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block. + compressedBuf []byte + checksummer checksummer +} + +// A dataBlockBuf holds all the state required to compress and write a data block to disk. +// A dataBlockBuf begins its lifecycle owned by the Writer client goroutine. The Writer +// client goroutine adds keys to the sstable, writing directly into a dataBlockBuf's blockWriter +// until the block is full. Once a dataBlockBuf's block is full, the dataBlockBuf may be passed +// to other goroutines for compression and file I/O. +type dataBlockBuf struct { + blockBuf + dataBlock blockWriter +} + +func (w *Writer) Error() error { + return w.err } type indexBlockWriterAndBlockProperties struct { @@ -180,8 +202,8 @@ type indexBlockWriterAndBlockProperties struct { // // TODO(peter): untested func (w *Writer) Set(key, value []byte) error { - if w.err != nil { - return w.err + if err := w.Error(); err != nil { + return err } return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindSet), value) } @@ -192,8 +214,8 @@ func (w *Writer) Set(key, value []byte) error { // // TODO(peter): untested func (w *Writer) Delete(key []byte) error { - if w.err != nil { - return w.err + if err := w.Error(); err != nil { + return err } return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil) } @@ -205,8 +227,8 @@ func (w *Writer) Delete(key []byte) error { // // TODO(peter): untested func (w *Writer) DeleteRange(start, end []byte) error { - if w.err != nil { - return w.err + if err := w.Error(); err != nil { + return err } return w.addTombstone(base.MakeInternalKey(start, 0, InternalKeyKindRangeDelete), end) } @@ -218,8 +240,8 @@ func (w *Writer) DeleteRange(start, end []byte) error { // // TODO(peter): untested func (w *Writer) Merge(key, value []byte) error { - if w.err != nil { - return w.err + if err := w.Error(); err != nil { + return err } return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value) } @@ -231,8 +253,8 @@ func (w *Writer) Merge(key, value []byte) error { // point entries. Additionally, range deletion tombstones must be fragmented // (i.e. by keyspan.Fragmenter). func (w *Writer) Add(key InternalKey, value []byte) error { - if w.err != nil { - return w.err + if err := w.Error(); err != nil { + return err } switch key.Kind() { @@ -267,21 +289,23 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error { for i := range w.propCollectors { if err := w.propCollectors[i].Add(key, value); err != nil { + w.err = err return err } } for i := range w.blockPropCollectors { if err := w.blockPropCollectors[i].Add(key, value); err != nil { + w.err = err return err } } w.maybeAddToFilter(key.UserKey) - w.block.add(key, value) + w.dataBlockBuf.dataBlock.add(key, value) w.meta.updateSeqNum(key.SeqNum()) // block.curKey contains the most recently added key to the block. - w.meta.LargestPoint.UserKey = w.block.curKey[:len(w.block.curKey)-8] + w.meta.LargestPoint.UserKey = w.dataBlockBuf.dataBlock.curKey[:len(w.dataBlockBuf.dataBlock.curKey)-8] w.meta.LargestPoint.Trailer = key.Trailer if w.meta.SmallestPoint.UserKey == nil { // NB: we clone w.meta.LargestPoint rather than "key", even though they are @@ -344,6 +368,7 @@ func (w *Writer) addTombstone(key InternalKey, value []byte) error { for i := range w.propCollectors { if err := w.propCollectors[i].Add(key, value); err != nil { + w.err = err return err } } @@ -428,8 +453,8 @@ func (w *Writer) RangeKeyDelete(start, end []byte) error { // overlap. Range keys may be added out of order relative to point keys and // range deletions. func (w *Writer) AddRangeKey(key InternalKey, value []byte) error { - if w.err != nil { - return w.err + if err := w.Error(); err != nil { + return err } return w.addRangeKey(key, value) } @@ -653,23 +678,40 @@ func (w *Writer) maybeAddToFilter(key []byte) { } func (w *Writer) maybeFlush(key InternalKey, value []byte) error { - if !shouldFlush(key, value, &w.block, w.blockSize, w.blockSizeThreshold) { + if !shouldFlush(key, value, &w.dataBlockBuf.dataBlock, w.blockSize, w.blockSizeThreshold) { return nil } - bh, err := w.writeBlock(w.block.finish(), w.compression) + err := func() error { + finishedBlock := w.dataBlockBuf.dataBlock.finish() + b, err := compressAndChecksum( + finishedBlock, w.compression, &w.dataBlockBuf.blockBuf, + ) + if err != nil { + return err + } + var bh BlockHandle + if bh, err = w.writeCompressedBlock(b, w.dataBlockBuf.tmp[:]); err != nil { + return err + } + + var bhp BlockHandleWithProperties + if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { + return err + } + + prevKey := base.DecodeInternalKey(w.dataBlockBuf.dataBlock.curKey) + if err = w.addIndexEntry(prevKey, key, bhp, w.dataBlockBuf.tmp[:]); err != nil { + return err + } + return nil + }() + if err != nil { w.err = err - return w.err - } - var bhp BlockHandleWithProperties - if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { - return err - } - prevKey := base.DecodeInternalKey(w.block.curKey) - if err = w.addIndexEntry(prevKey, key, bhp); err != nil { return err } + return nil } @@ -697,7 +739,7 @@ func (w *Writer) maybeAddBlockPropertiesToBlockHandle( } // addIndexEntry adds an index entry for the specified key and block handle. -func (w *Writer) addIndexEntry(prevKey, key InternalKey, bhp BlockHandleWithProperties) error { +func (w *Writer) addIndexEntry(prevKey, key InternalKey, bhp BlockHandleWithProperties, tmp []byte) error { if bhp.Length == 0 { // A valid blockHandle must be non-zero. // In particular, it must have a non-zero length. @@ -709,7 +751,7 @@ func (w *Writer) addIndexEntry(prevKey, key InternalKey, bhp BlockHandleWithProp } else { sep = prevKey.Separator(w.compare, w.separator, nil, key) } - encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) + encoded := encodeBlockHandleWithProperties(tmp, bhp) if supportsTwoLevelIndex(w.tableFormat) && shouldFlush(sep, encoded, &w.indexBlock, w.indexBlockSize, w.indexBlockSizeThreshold) { @@ -794,7 +836,7 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { sep := base.DecodeInternalKey(b.writer.curKey) data := b.writer.finish() w.props.IndexSize += uint64(len(data)) - bh, err := w.writeBlock(data, w.compression) + bh, err := w.writeBlock(data, w.compression, &w.blockBuf) if err != nil { return BlockHandle{}, err } @@ -802,7 +844,7 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { BlockHandle: bh, Props: b.properties, } - encoded := encodeBlockHandleWithProperties(w.tmp[:], bhp) + encoded := encodeBlockHandleWithProperties(w.blockBuf.tmp[:], bhp) w.topLevelIndexBlock.add(sep, encoded) } @@ -813,15 +855,15 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { w.props.TopLevelIndexSize = uint64(w.topLevelIndexBlock.estimatedSize()) w.props.IndexSize += w.props.TopLevelIndexSize + blockTrailerLen - return w.writeBlock(w.topLevelIndexBlock.finish(), w.compression) + return w.writeBlock(w.topLevelIndexBlock.finish(), w.compression, &w.blockBuf) } -func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, error) { +func compressAndChecksum(b []byte, compression Compression, blockBuf *blockBuf) ([]byte, error) { // Compress the buffer, discarding the result if the improvement isn't at // least 12.5%. - blockType, compressed := compressBlock(compression, b, w.compressedBuf) - if blockType != noCompressionBlockType && cap(compressed) > cap(w.compressedBuf) { - w.compressedBuf = compressed[:cap(compressed)] + blockType, compressed := compressBlock(compression, b, blockBuf.compressedBuf) + if blockType != noCompressionBlockType && cap(compressed) > cap(blockBuf.compressedBuf) { + blockBuf.compressedBuf = compressed[:cap(compressed)] } if len(compressed) < len(b)-len(b)/8 { b = compressed @@ -829,27 +871,33 @@ func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, err blockType = noCompressionBlockType } - w.tmp[0] = byte(blockType) + blockBuf.tmp[0] = byte(blockType) // Calculate the checksum. var checksum uint32 - switch w.checksumType { + switch blockBuf.checksummer.checksumType { case ChecksumTypeCRC32c: - checksum = crc.New(b).Update(w.tmp[:1]).Value() + checksum = crc.New(b).Update(blockBuf.tmp[:1]).Value() case ChecksumTypeXXHash64: - if w.xxHasher == nil { - w.xxHasher = xxhash.New() + if blockBuf.checksummer.xxHasher == nil { + blockBuf.checksummer.xxHasher = xxhash.New() } else { - w.xxHasher.Reset() + blockBuf.checksummer.xxHasher.Reset() } - w.xxHasher.Write(b) - w.xxHasher.Write(w.tmp[:1]) - checksum = uint32(w.xxHasher.Sum64()) + blockBuf.checksummer.xxHasher.Write(b) + blockBuf.checksummer.xxHasher.Write(blockBuf.tmp[:1]) + checksum = uint32(blockBuf.checksummer.xxHasher.Sum64()) default: - return BlockHandle{}, errors.Newf("unsupported checksum type: %d", w.checksumType) + return nil, errors.Newf("unsupported checksum type: %d", blockBuf.checksummer.checksumType) } - binary.LittleEndian.PutUint32(w.tmp[1:5], checksum) - bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(b))} + binary.LittleEndian.PutUint32(blockBuf.tmp[1:5], checksum) + return b, nil +} + +func (w *Writer) writeCompressedBlock( + block []byte, blockTrailerBuf []byte, +) (BlockHandle, error) { + bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(block))} if w.cacheID != 0 && w.fileNum != 0 { // Remove the block being written from the cache. This provides defense in @@ -861,12 +909,12 @@ func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, err } // Write the bytes to the file. - n, err := w.writer.Write(b) + n, err := w.writer.Write(block) if err != nil { return BlockHandle{}, err } w.meta.Size += uint64(n) - n, err = w.writer.Write(w.tmp[:blockTrailerLen]) + n, err = w.writer.Write(blockTrailerBuf[:blockTrailerLen]) if err != nil { return BlockHandle{}, err } @@ -875,6 +923,16 @@ func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, err return bh, nil } +func (w *Writer) writeBlock( + b []byte, compression Compression, blockBuf *blockBuf, +) (BlockHandle, error) { + b, err := compressAndChecksum(b, compression, blockBuf) + if err != nil { + return BlockHandle{}, err + } + return w.writeCompressedBlock(b, blockBuf.tmp[:]) +} + // Close finishes writing the table and closes the underlying file that the // table was written to. func (w *Writer) Close() (err error) { @@ -888,24 +946,27 @@ func (w *Writer) Close() (err error) { } w.syncer = nil }() - if w.err != nil { - return w.err + + if err := w.Error(); err != nil { + return err } // Finish the last data block, or force an empty data block if there // aren't any data blocks at all. - if w.block.nEntries > 0 || w.indexBlock.nEntries == 0 { - bh, err := w.writeBlock(w.block.finish(), w.compression) + if w.dataBlockBuf.dataBlock.nEntries > 0 || w.indexBlock.nEntries == 0 { + bh, err := w.writeBlock(w.dataBlockBuf.dataBlock.finish(), w.compression, &w.dataBlockBuf.blockBuf) if err != nil { w.err = err return w.err } var bhp BlockHandleWithProperties if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil { + w.err = err return err } - prevKey := base.DecodeInternalKey(w.block.curKey) - if err = w.addIndexEntry(prevKey, InternalKey{}, bhp); err != nil { + prevKey := base.DecodeInternalKey(w.dataBlockBuf.dataBlock.curKey) + if err = w.addIndexEntry(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil { + w.err = err return err } } @@ -920,13 +981,13 @@ func (w *Writer) Close() (err error) { w.err = err return w.err } - bh, err := w.writeBlock(b, NoCompression) + bh, err := w.writeBlock(b, NoCompression, &w.blockBuf) if err != nil { w.err = err return w.err } - n := encodeBlockHandle(w.tmp[:], bh) - metaindex.add(InternalKey{UserKey: []byte(w.filter.metaName())}, w.tmp[:n]) + n := encodeBlockHandle(w.blockBuf.tmp[:], bh) + metaindex.add(InternalKey{UserKey: []byte(w.filter.metaName())}, w.blockBuf.tmp[:n]) w.props.FilterPolicyName = w.filter.policyName() w.props.FilterSize = bh.Length } @@ -949,7 +1010,7 @@ func (w *Writer) Close() (err error) { w.props.NumDataBlocks = uint64(w.indexBlock.nEntries) // Write the single level index block. - indexBH, err = w.writeBlock(w.indexBlock.finish(), w.compression) + indexBH, err = w.writeBlock(w.indexBlock.finish(), w.compression, &w.blockBuf) if err != nil { w.err = err return w.err @@ -974,7 +1035,7 @@ func (w *Writer) Close() (err error) { // internal buffer to get gc'd. w.meta.LargestRangeDel = base.MakeRangeDeleteSentinelKey(w.rangeDelBlock.curValue).Clone() } - rangeDelBH, err = w.writeBlock(w.rangeDelBlock.finish(), NoCompression) + rangeDelBH, err = w.writeBlock(w.rangeDelBlock.finish(), NoCompression, &w.blockBuf) if err != nil { w.err = err return w.err @@ -998,7 +1059,7 @@ func (w *Writer) Close() (err error) { // TODO(travers): The lack of compression on the range key block matches the // lack of compression on the range-del block. Revisit whether we want to // enable compression on this block. - rangeKeyBH, err = w.writeBlock(w.rangeKeyBlock.finish(), NoCompression) + rangeKeyBH, err = w.writeBlock(w.rangeKeyBlock.finish(), NoCompression, &w.blockBuf) if err != nil { w.err = err return w.err @@ -1010,14 +1071,15 @@ func (w *Writer) Close() (err error) { // metaindex block entries must be sorted, and the range key block name sorts // before the other block names. if w.props.NumRangeKeys() > 0 { - n := encodeBlockHandle(w.tmp[:], rangeKeyBH) - metaindex.add(InternalKey{UserKey: []byte(metaRangeKeyName)}, w.tmp[:n]) + n := encodeBlockHandle(w.blockBuf.tmp[:], rangeKeyBH) + metaindex.add(InternalKey{UserKey: []byte(metaRangeKeyName)}, w.blockBuf.tmp[:n]) } { userProps := make(map[string]string) for i := range w.propCollectors { if err := w.propCollectors[i].Finish(userProps); err != nil { + w.err = err return err } } @@ -1028,6 +1090,7 @@ func (w *Writer) Close() (err error) { buf, err := w.blockPropCollectors[i].FinishTable(scratch) if err != nil { + w.err = err return err } var prop string @@ -1051,26 +1114,26 @@ func (w *Writer) Close() (err error) { raw.restartInterval = propertiesBlockRestartInterval w.props.CompressionOptions = rocksDBCompressionOptions w.props.save(&raw) - bh, err := w.writeBlock(raw.finish(), NoCompression) + bh, err := w.writeBlock(raw.finish(), NoCompression, &w.blockBuf) if err != nil { w.err = err return w.err } - n := encodeBlockHandle(w.tmp[:], bh) - metaindex.add(InternalKey{UserKey: []byte(metaPropertiesName)}, w.tmp[:n]) + n := encodeBlockHandle(w.blockBuf.tmp[:], bh) + metaindex.add(InternalKey{UserKey: []byte(metaPropertiesName)}, w.blockBuf.tmp[:n]) } // Add the range deletion block handle to the metaindex block. if w.props.NumRangeDeletions > 0 { - n := encodeBlockHandle(w.tmp[:], rangeDelBH) + n := encodeBlockHandle(w.blockBuf.tmp[:], rangeDelBH) // The v2 range-del block encoding is backwards compatible with the v1 // encoding. We add meta-index entries for both the old name and the new // name so that old code can continue to find the range-del block and new // code knows that the range tombstones in the block are fragmented and // sorted. - metaindex.add(InternalKey{UserKey: []byte(metaRangeDelName)}, w.tmp[:n]) + metaindex.add(InternalKey{UserKey: []byte(metaRangeDelName)}, w.blockBuf.tmp[:n]) if !w.rangeDelV1Format { - metaindex.add(InternalKey{UserKey: []byte(metaRangeDelV2Name)}, w.tmp[:n]) + metaindex.add(InternalKey{UserKey: []byte(metaRangeDelV2Name)}, w.blockBuf.tmp[:n]) } } @@ -1078,7 +1141,7 @@ func (w *Writer) Close() (err error) { // policy is nil. NoCompression is specified because a) RocksDB never // compresses the meta-index block and b) RocksDB has some code paths which // expect the meta-index block to not be compressed. - metaindexBH, err := w.writeBlock(metaindex.blockWriter.finish(), NoCompression) + metaindexBH, err := w.writeBlock(metaindex.blockWriter.finish(), NoCompression, &w.blockBuf) if err != nil { w.err = err return w.err @@ -1087,12 +1150,12 @@ func (w *Writer) Close() (err error) { // Write the table footer. footer := footer{ format: w.tableFormat, - checksum: w.checksumType, + checksum: w.blockBuf.checksummer.checksumType, metaindexBH: metaindexBH, indexBH: indexBH, } var n int - if n, err = w.writer.Write(footer.encode(w.tmp[:])); err != nil { + if n, err = w.writer.Write(footer.encode(w.blockBuf.tmp[:])); err != nil { w.err = err return w.err } @@ -1120,7 +1183,7 @@ func (w *Writer) Close() (err error) { // EstimatedSize returns the estimated size of the sstable being written if a // called to Finish() was made without adding additional keys. func (w *Writer) EstimatedSize() uint64 { - return w.meta.Size + uint64(w.block.estimatedSize()+w.indexBlock.estimatedSize()) + return w.meta.Size + uint64(w.dataBlockBuf.dataBlock.estimatedSize()+w.indexBlock.estimatedSize()) } // Metadata returns the metadata for the finished sstable. Only valid to call @@ -1192,11 +1255,7 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * separator: o.Comparer.Separator, successor: o.Comparer.Successor, tableFormat: o.TableFormat, - checksumType: o.Checksum, cache: o.Cache, - block: blockWriter{ - restartInterval: o.BlockRestartInterval, - }, indexBlock: blockWriter{ restartInterval: 1, }, @@ -1215,6 +1274,17 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * }, coalescer: rangekey.Coalescer{}, } + + w.dataBlockBuf = &dataBlockBuf{ + dataBlock: blockWriter{ + restartInterval: o.BlockRestartInterval, + }, + } + w.dataBlockBuf.checksummer = checksummer{checksumType: o.Checksum} + w.blockBuf = blockBuf{ + checksummer: checksummer{checksumType: o.Checksum}, + } + if f == nil { w.err = errors.New("pebble: nil file") return w diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 038cee62e2..c53443e49d 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -348,9 +348,9 @@ func TestWriter_BlockProperties_Errors(t *testing.T) { case errSiteFinishBlock: require.NoError(t, err) // Addition of a second key completes the first block. - err = w.Add(k2, v2) - require.Error(t, err) - require.Equal(t, blockPropErr, err) + w.Add(k2, v2) + require.Error(t, w.Error()) + require.Equal(t, blockPropErr, w.Error()) case errSiteFinishIndex: require.NoError(t, err) // Addition of a second key completes the first block. @@ -358,15 +358,15 @@ func TestWriter_BlockProperties_Errors(t *testing.T) { require.NoError(t, err) // The index entry for the first block is added after the completion of // the second block, which is triggered by adding a third key. - err = w.Add(k3, v3) - require.Error(t, err) - require.Equal(t, blockPropErr, err) + w.Add(k3, v3) + require.Error(t, w.Error()) + require.Equal(t, blockPropErr, w.Error()) } err = w.Close() if tc == errSiteFinishTable { require.Error(t, err) - require.Equal(t, blockPropErr, err) + require.Equal(t, blockPropErr, w.Error()) } }) }