Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/diskmap: remove SortedDiskMapIterator.{Key,Value} #43145

Merged
merged 1 commit into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions pkg/sql/rowcontainer/disk_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (d *DiskRowContainer) keyValToRow(k []byte, v []byte) (sqlbase.EncDatumRow,
// diskRowIterator iterates over the rows in a DiskRowContainer.
type diskRowIterator struct {
rowContainer *DiskRowContainer
rowBuf []byte
diskmap.SortedDiskMapIterator
}

Expand Down Expand Up @@ -305,7 +306,23 @@ func (r *diskRowIterator) Row() (sqlbase.EncDatumRow, error) {
return nil, errors.AssertionFailedf("invalid row")
}

return r.rowContainer.keyValToRow(r.Key(), r.Value())
k := r.UnsafeKey()
v := r.UnsafeValue()
// TODO(asubiotto): the "true ||" should not be necessary. We should be to
// reuse rowBuf, yet doing so causes
// TestDiskBackedIndexedRowContainer/ReorderingOnDisk, TestHashJoiner, and
// TestSorter to fail. Some caller of Row() is presumably not making a copy
// of the return value.
if true || cap(r.rowBuf) < len(k)+len(v) {
r.rowBuf = make([]byte, 0, len(k)+len(v))
}
r.rowBuf = r.rowBuf[:len(k)+len(v)]
copy(r.rowBuf, k)
copy(r.rowBuf[len(k):], v)
k = r.rowBuf[:len(k)]
v = r.rowBuf[len(k):]

return r.rowContainer.keyValToRow(k, v)
}

func (r *diskRowIterator) Close() {
Expand Down Expand Up @@ -347,7 +364,8 @@ func (r *diskRowFinalIterator) Row() (sqlbase.EncDatumRow, error) {
if err != nil {
return nil, err
}
r.diskRowIterator.rowContainer.lastReadKey = r.Key()
r.diskRowIterator.rowContainer.lastReadKey =
append(r.diskRowIterator.rowContainer.lastReadKey[:0], r.UnsafeKey()...)
return row, nil
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/rowcontainer/hash_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ type hashDiskRowBucketIterator struct {
// encodedEqCols is the encoding of the equality columns of the rows in the
// bucket that this iterator iterates over.
encodedEqCols []byte
// Temporary buffer used for constructed marked values.
tmpBuf []byte
}

var _ RowMarkerIterator = &hashDiskRowBucketIterator{}
Expand Down Expand Up @@ -586,10 +588,7 @@ func (i *hashDiskRowBucketIterator) Valid() (bool, error) {
}
// Since the underlying map is sorted, once the key prefix does not equal
// the encoded equality columns, we have gone past the end of the bucket.
// TODO(asubiotto): Make UnsafeKey() and UnsafeValue() part of the
// SortedDiskMapIterator interface to avoid allocation here, in Mark(), and
// isRowMarked().
return bytes.HasPrefix(i.Key(), i.encodedEqCols), nil
return bytes.HasPrefix(i.UnsafeKey(), i.encodedEqCols), nil
}

// Row implements the RowIterator interface.
Expand Down Expand Up @@ -632,7 +631,7 @@ func (i *hashDiskRowBucketIterator) IsMarked(ctx context.Context) bool {
return false
}

rowVal := i.Value()
rowVal := i.UnsafeValue()
return bytes.Equal(rowVal[len(rowVal)-len(encodedTrue):], encodedTrue)
}

Expand All @@ -648,19 +647,20 @@ func (i *hashDiskRowBucketIterator) Mark(ctx context.Context, mark bool) error {
}
// rowVal are the non-equality encoded columns, the last of which is the
// column we use to mark a row.
rowVal := i.Value()
rowVal := append(i.tmpBuf[:0], i.UnsafeValue()...)
originalLen := len(rowVal)
rowVal = append(rowVal, markBytes...)

// Write the new encoding of mark over the old encoding of mark and truncate
// the extra bytes.
copy(rowVal[originalLen-len(markBytes):], rowVal[originalLen:])
rowVal = rowVal[:originalLen]
i.tmpBuf = rowVal

// These marks only matter when using a hashDiskRowIterator to iterate over
// unmarked rows. The writes are flushed when creating a NewIterator() in
// NewUnmarkedIterator().
return i.HashDiskRowContainer.bufferedRows.Put(i.Key(), rowVal)
return i.HashDiskRowContainer.bufferedRows.Put(i.UnsafeKey(), rowVal)
}

// hashDiskRowIterator iterates over all unmarked rows in a
Expand Down Expand Up @@ -720,7 +720,7 @@ func (i *hashDiskRowIterator) isRowMarked() bool {
return false
}

rowVal := i.Value()
rowVal := i.UnsafeValue()
return bytes.Equal(rowVal[len(rowVal)-len(encodedTrue):], encodedTrue)
}

Expand Down
8 changes: 1 addition & 7 deletions pkg/storage/diskmap/disk_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Factory interface {
// } else if !ok {
// break
// }
// key := i.Key()
// key := i.UnsafeKey()
// // Do something.
// }
type SortedDiskMapIterator interface {
Expand All @@ -49,12 +49,6 @@ type SortedDiskMapIterator interface {
Valid() (bool, error)
// Next advances the iterator to the next key in the iteration.
Next()
// Key returns the current key. The resulting byte slice is still valid
// after the next call to Seek(), Rewind(), or Next().
Key() []byte
// Value returns the current value. The resulting byte slice is still valid
// after the next call to Seek(), Rewind(), or Next().
Value() []byte

// UnsafeKey returns the same value as Key, but the memory is invalidated on
// the next call to {Next,Rewind,Seek,Close}.
Expand Down
28 changes: 0 additions & 28 deletions pkg/storage/engine/disk_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,6 @@ func (i *rocksDBMapIterator) Next() {
i.iter.Next()
}

// Key implements the SortedDiskMapIterator interface.
func (i *rocksDBMapIterator) Key() []byte {
return i.iter.Key().Key[len(i.prefix):]
}

// Value implements the SortedDiskMapIterator interface.
func (i *rocksDBMapIterator) Value() []byte {
return i.iter.Value()
}

// UnsafeKey implements the SortedDiskMapIterator interface.
func (i *rocksDBMapIterator) UnsafeKey() []byte {
return i.iter.UnsafeKey().Key[len(i.prefix):]
Expand Down Expand Up @@ -396,24 +386,6 @@ func (i *pebbleMapIterator) Next() {
i.iter.Next()
}

// Key implements the SortedDiskMapIterator interface.
func (i *pebbleMapIterator) Key() []byte {
unsafeKey := i.UnsafeKey()
safeKey := make([]byte, len(unsafeKey))
copy(safeKey, unsafeKey)

return safeKey
}

// Value implements the SortedDiskMapIterator interface.
func (i *pebbleMapIterator) Value() []byte {
unsafeValue := i.iter.Value()
safeValue := make([]byte, len(unsafeValue))
copy(safeValue, unsafeValue)

return safeValue
}

// UnsafeKey implements the SortedDiskMapIterator interface.
func (i *pebbleMapIterator) UnsafeKey() []byte {
unsafeKey := i.iter.Key()
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/engine/disk_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func runTestForEngine(ctx context.Context, t *testing.T, filename string, engine
}
valid, err := iter.Valid()
if valid && err == nil {
fmt.Fprintf(&b, "%s:%s\n", iter.Key(), iter.Value())
fmt.Fprintf(&b, "%s:%s\n", iter.UnsafeKey(), iter.UnsafeValue())
} else if err != nil {
fmt.Fprintf(&b, "err=%v\n", err)
} else {
Expand Down Expand Up @@ -381,8 +381,8 @@ func BenchmarkRocksDBMapIteration(b *testing.B) {
} else if !ok {
break
}
i.Key()
i.Value()
i.UnsafeKey()
i.UnsafeValue()
}
i.Close()
}
Expand Down Expand Up @@ -521,8 +521,8 @@ func BenchmarkPebbleMapIteration(b *testing.B) {
} else if !ok {
break
}
i.Key()
i.Value()
i.UnsafeKey()
i.UnsafeValue()
}
i.Close()
}
Expand Down