-
Notifications
You must be signed in to change notification settings - Fork 472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add memtable flush pacing mechanism #166
Add memtable flush pacing mechanism #166
Conversation
31afd90
to
b7ec484
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks directionally good. I have a suggestion below for how to replace flushable.bytesFlushed
with a cleaner approach.
Reviewable status: 0 of 14 files reviewed, 8 unresolved discussions (waiting on @ajkr, @petermattis, and @Ryanfsdf)
batch.go, line 879 at r1 (raw file):
b := &flushableBatch{ data: batch.storage.data, memTableSize: batch.memTableSize,
batch.memTableSize
is a pessimistic estimate of the size of the batch data when it gets inserted into the memtable. batch.storage.data
is the actual size of all of the data in the batch. But I'm not sure you want to use that either. I need to think about this more.
compaction.go, line 998 at r1 (raw file):
if c.flushing != nil { var dirtyBytes uint32 for _, m := range d.mu.mem.queue {
I think we need to expose a way for iter
to return the number of bytes flushed or similar. Note that d.mu.mem.queue
can contain more memtables that the ones being flushed. It would be more correct to use c.flushing
, but I think even that is too hacky.
db.go, line 45 at r1 (raw file):
newFlushIter(o *IterOptions) internalIterator newRangeDelIter(o *IterOptions) internalIterator bytesFlushed() uint32
I don't think this is a good way to structure access to the number of bytes flushed. I see why you didn't add a flushInternalIterator
interface with a bytesFlushed
method, as then we'd need a mergingIter
specialization and that seems onerous.
What if newFlushIter
took a bytesFlushed *uint32
parameter? That pointer would be stored in the various implementations such as arenaskl.FlushIterator
which would update the value during iteration. Then you could add compaction.bytesFlushed uint32
as a field and easily access it from runCompaction
.
It might be possible to make this general as well, so that it works for compactions in addition to flushes. Sstables can easily expose how far they are through iteration.
Lastly, I think this should be a uint64
so we never accidentally trip over problems if we hit a 4 GB flush (or compaction).
internal/arenaskl/flush_iterator.go, line 30 at r1 (raw file):
// to construct an iterator. The current state of the iterator can be cloned by // simply value copying the struct. type FlushIterator struct {
Does FlushIterator
need to be exported?
internal/arenaskl/flush_iterator.go, line 35 at r1 (raw file):
key base.InternalKey lower []byte upper []byte
Rather than making a copy of Iterator
, I think you can embed Iterator
and override the methods.
internal/arenaskl/flush_iterator.go, line 42 at r1 (raw file):
return &FlushIterator{} }, }
Flushes are rare enough that I think it is better to avoid a flushIterPool
to allocate them.
internal/arenaskl/flush_iterator.go, line 90 at r1 (raw file):
// TODO(ryan): This doesn't make much sense in the context of a flush iterator, // but it seems necessary in one function.
Where is it called from?
internal/arenaskl/skl.go, line 89 at r1 (raw file):
// bytesIterated is used by FlushIterator to keep track of the number of bytes // flushed. bytesIterated uint32
My other suggestions should allow this to be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTR!
Reviewable status: 0 of 14 files reviewed, 8 unresolved discussions (waiting on @ajkr, @petermattis, and @Ryanfsdf)
batch.go, line 879 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
batch.memTableSize
is a pessimistic estimate of the size of the batch data when it gets inserted into the memtable.batch.storage.data
is the actual size of all of the data in the batch. But I'm not sure you want to use that either. I need to think about this more.
Ack, I'll think about this more too.
compaction.go, line 998 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I think we need to expose a way for
iter
to return the number of bytes flushed or similar. Note thatd.mu.mem.queue
can contain more memtables that the ones being flushed. It would be more correct to usec.flushing
, but I think even that is too hacky.
I think we would still want to include all the memtables (even the ones not being flushed, such as the mutable memtable) in the dirty byte count since we want the dirty byte count to represent the total number of unflushed bytes in all the memtables.
This will be reworked, however, now that the iterator itself can expose the number of bytes iterated through.
db.go, line 45 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I don't think this is a good way to structure access to the number of bytes flushed. I see why you didn't add a
flushInternalIterator
interface with abytesFlushed
method, as then we'd need amergingIter
specialization and that seems onerous.What if
newFlushIter
took abytesFlushed *uint32
parameter? That pointer would be stored in the various implementations such asarenaskl.FlushIterator
which would update the value during iteration. Then you could addcompaction.bytesFlushed uint32
as a field and easily access it fromrunCompaction
.It might be possible to make this general as well, so that it works for compactions in addition to flushes. Sstables can easily expose how far they are through iteration.
Lastly, I think this should be a
uint64
so we never accidentally trip over problems if we hit a 4 GB flush (or compaction).
Got it, that sounds good. Thank you for the suggestion!
internal/arenaskl/flush_iterator.go, line 30 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Does
FlushIterator
need to be exported?
No, I'll make it unexported.
internal/arenaskl/flush_iterator.go, line 90 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Where is it called from?
It's called from compaction.allowZeroSeqNum
. It uses Last
to determine the end key for elideRangeTombstone
.
internal/arenaskl/skl.go, line 89 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
My other suggestions should allow this to be removed.
Ack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 14 files reviewed, 8 unresolved discussions (waiting on @ajkr, @petermattis, and @Ryanfsdf)
batch.go, line 879 at r1 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
Ack, I'll think about this more too.
When iterating through a flushableBatch
, we can compute the size of each entry using the info in flushableBatchEntry
and the size of the value (after it has been decoded). I just noticed that flushableBatchIter.Value
is somewhat less efficient than it could be. We already have the offset for the end of the key (flushableBatchEntry.keyEnd
). We could decode the varint stored at that offset to find the length of the value.
compaction.go, line 998 at r1 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
I think we would still want to include all the memtables (even the ones not being flushed, such as the mutable memtable) in the dirty byte count since we want the dirty byte count to represent the total number of unflushed bytes in all the memtables.
This will be reworked, however, now that the iterator itself can expose the number of bytes iterated through.
Good point about wanting the dirty bytes from all of the memtables. When this gets reworked, be sure to include that in a comment. It wasn't obvious when I first read this.
b7ec484
to
e220cdf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On top of addressing the comments, I've implemented a bytesIterated
indicator for sstable iteration as well. This isn't used yet and I'll include the usage in a separate PR, since I suspect that it will require a bit of work.
Reviewable status: 0 of 21 files reviewed, 8 unresolved discussions (waiting on @ajkr and @petermattis)
batch.go, line 879 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
When iterating through a
flushableBatch
, we can compute the size of each entry using the info influshableBatchEntry
and the size of the value (after it has been decoded). I just noticed thatflushableBatchIter.Value
is somewhat less efficient than it could be. We already have the offset for the end of the key (flushableBatchEntry.keyEnd
). We could decode the varint stored at that offset to find the length of the value.
Done. I added the optimization to flushableBatchIter.Value
so that the key is no longer decoded. I think I found a bug with keyStart
and keyEnd
while optimizing it too. I left on a comment on where I fixed it.
compaction.go, line 998 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Good point about wanting the dirty bytes from all of the memtables. When this gets reworked, be sure to include that in a comment. It wasn't obvious when I first read this.
Done.
internal/arenaskl/flush_iterator.go, line 35 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Rather than making a copy of
Iterator
, I think you can embedIterator
and override the methods.
Done.
internal/arenaskl/flush_iterator.go, line 42 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Flushes are rare enough that I think it is better to avoid a
flushIterPool
to allocate them.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 21 files reviewed, 9 unresolved discussions (waiting on @ajkr, @petermattis, and @Ryanfsdf)
batch.go, line 898 at r2 (raw file):
// byte encodes `0`, which is the length of the key. entry.keyStart = uint32(offset)+2 entry.keyEnd = uint32(offset)+2
Is this the bug you were referring to above? I think you are technically correct, that keyStart
and keyEnd
were not actually the offset of the key, but the way these fields are used to extract the key later I don't think that bug had any effect as we'd get a zero-length slice regardless.
batch.go, line 1159 at r2 (raw file):
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete: keyEnd := i.offsets[i.index].keyEnd _, value, ok = batchDecodeStr(i.batch.data[keyEnd:])
s/i.batch.data/i.data/g
?
batch.go, line 1237 at r2 (raw file):
entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset *i.bytesIterated += uint64(entryBytes) + i.valueSize() return &i.key, i.Value()
I think there is code that could be shared with flushableBatchIter
here with a little restructuring. I don't see precisely how to do it, but the overlap is significant.
compaction.go, line 998 at r1 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
Done.
I think you should pull out this loop into a method for clarity. Something like DB.memtableDirtyBytesLocked
(the Locked
suffix indicates that DB.mu
needs to be held by the caller).
compaction.go, line 1011 at r2 (raw file):
if dirtyBytes <= uint64(d.opts.MemTableSize * 105/100) { err := d.mu.flushLimiter.WaitN(context.Background(), int(flushAmount))
I recall rate limiters do weird things when the value passed in is larger than the "burst" amount that the rate limiter was configured with. I don't recall exactly what those weird things are, only that special handling is required if that is possible (which I think it is here).
internal/arenaskl/flush_iterator.go, line 90 at r1 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
It's called from
compaction.allowZeroSeqNum
. It usesLast
to determine the end key forelideRangeTombstone
.
Ack. Thanks for tracking that down.
internal/arenaskl/flush_iterator.go, line 57 at r2 (raw file):
it.nd = it.list.tail return nil, nil }
I believe you can call it.Iterator.First()
here. Similar comment applies in Next()
.
sstable/reader.go, line 507 at r2 (raw file):
i.data.invalidateUpper() // force i.data.Valid() to return false return nil, nil }
I believe the above can be replaced with i.Iterator.First()
. A similar comment applies to Next()
.
sstable/reader.go, line 525 at r2 (raw file):
return nil, nil } *i.bytesIterated += uint64(key.Size() + len(val))
The key size plus value size are not the full data size for a block. Per in-person discussion, I think you want to increment bytesIterated
by looking at the offset in the current record in the file (block offset + record offset within block) and the offset of the previous record in the file. This will automatically account for per-record overhead and overhead such as the block restart points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 21 files reviewed, 9 unresolved discussions (waiting on @ajkr, @petermattis, and @Ryanfsdf)
batch.go, line 898 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Is this the bug you were referring to above? I think you are technically correct, that
keyStart
andkeyEnd
were not actually the offset of the key, but the way these fields are used to extract the key later I don't think that bug had any effect as we'd get a zero-length slice regardless.
Yes this is the bug. Since we use keyEnd
to decode the value now, a test fails without the +2
.
compaction.go, line 1011 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I recall rate limiters do weird things when the value passed in is larger than the "burst" amount that the rate limiter was configured with. I don't recall exactly what those weird things are, only that special handling is required if that is possible (which I think it is here).
WaitN
returns an error if the value is larger than the burst amount, so the error from that would be properly propagated.
I can possibly refactor this to rate limit in "chunks" less than the burst amount or set the burst amount to something really high, do you think that's needed?
sstable/reader.go, line 507 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I believe the above can be replaced with
i.Iterator.First()
. A similar comment applies toNext()
.
I intentionally didn't do this (along with flushableBatchIter
) since Go doesn't inline functions. I should have left a comment about this. With that in mind, do you think it's worth keeping like this (with a comment) or should I refactor it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 21 files reviewed, 8 unresolved discussions (waiting on @ajkr, @petermattis, and @Ryanfsdf)
batch.go, line 898 at r2 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
Yes this is the bug. Since we use
keyEnd
to decode the value now, a test fails without the+2
.
Now that the computation is slightly more complex, this should probably be:
entry.keyStart = ...
entry.keyEnd = entry.keyStart
compaction.go, line 1011 at r2 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
WaitN
returns an error if the value is larger than the burst amount, so the error from that would be properly propagated.I can possibly refactor this to rate limit in "chunks" less than the burst amount or set the burst amount to something really high, do you think that's needed?
Something is needed. We don't want to error here if the computed flush rate is higher than the burst rate.
sstable/reader.go, line 507 at r2 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
I intentionally didn't do this (along with
flushableBatchIter
) since Go doesn't inline functions. I should have left a comment about this. With that in mind, do you think it's worth keeping like this (with a comment) or should I refactor it?
Go does inline functions, but the inlining isn't very aggressive. In this case though, I'd err on the side of code reuse. My original point about having separate Iterator
and compactionIterator
objects is so that Iterator
remains unchanged. Having compactionIterator
call into Iterator
seems ok on the surface (until profiles show us otherwise).
00b48f4
to
6cc8e36
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed review comments and added tests. I'll benchmark the changes for calling it.Iterator.First()
, and it.Iterator.Next()
from each of the wrapper iterators and post the results here.
Reviewable status: 0 of 24 files reviewed, 8 unresolved discussions (waiting on @ajkr and @petermattis)
batch.go, line 898 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Now that the computation is slightly more complex, this should probably be:
entry.keyStart = ... entry.keyEnd = entry.keyStart
Done.
batch.go, line 1159 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
s/i.batch.data/i.data/g
?
Done.
batch.go, line 1237 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I think there is code that could be shared with
flushableBatchIter
here with a little restructuring. I don't see precisely how to do it, but the overlap is significant.
Done.
compaction.go, line 998 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I think you should pull out this loop into a method for clarity. Something like
DB.memtableDirtyBytesLocked
(theLocked
suffix indicates thatDB.mu
needs to be held by the caller).
Done.
compaction.go, line 1011 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Something is needed. We don't want to error here if the computed flush rate is higher than the burst rate.
Done. With the new implementation, I don't think it's possible to ever error out since we use context.Background()
and the amount is never less than the "burst".
internal/arenaskl/flush_iterator.go, line 57 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I believe you can call
it.Iterator.First()
here. Similar comment applies inNext()
.
Done.
sstable/reader.go, line 507 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Go does inline functions, but the inlining isn't very aggressive. In this case though, I'd err on the side of code reuse. My original point about having separate
Iterator
andcompactionIterator
objects is so thatIterator
remains unchanged. HavingcompactionIterator
call intoIterator
seems ok on the surface (until profiles show us otherwise).
Ack. I'll try benchmarking each of these.
sstable/reader.go, line 525 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
The key size plus value size are not the full data size for a block. Per in-person discussion, I think you want to increment
bytesIterated
by looking at the offset in the current record in the file (block offset + record offset within block) and the offset of the previous record in the file. This will automatically account for per-record overhead and overhead such as the block restart points.
Due to compression, I've made some adjustments. I used the formula compressed_block_offset + uncompressed_record_offset_in_block * compressed_block_size / uncompressed_block_size
to determine the offset of the current record in the file. This still automatically accounts for block level overhead such as block restart points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good.
Reviewable status: 0 of 24 files reviewed, 12 unresolved discussions (waiting on @ajkr and @Ryanfsdf)
batch_test.go, line 422 at r3 (raw file):
for j := 0; j < 1000; j++ { binary.BigEndian.PutUint64(key, uint64(j)) batch.Set(key, value, nil)
Let's make some different key and value sizes here, rather than having them all be the same.
batch_test.go, line 435 at r3 (raw file):
t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) }
Nit: spurious blank line.
compaction.go, line 998 at r1 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
Done.
Oh, I didn't notice earlier that you're only locking d.mu
for this purpose. I think the locking and unlocking of d.mu
on every iteration is going to be problematic. One thought is to only call memTableTotalBytesLocked
every 1000 iterations.
Related, I think you can push the lock acquisition into memTableTotalBytesLocked
and removed the Locked
suffix.
compaction.go, line 1011 at r2 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
Done. With the new implementation, I don't think it's possible to ever error out since we use
context.Background()
and the amount is never less than the "burst".
What if flushAmount > 2*burst
? I believe what I've done in the past is something like:
if flushAmount >= burst {
flushAmount = burst
}
My thinking is that this should be a rare occurrence.
compaction.go, line 1010 at r3 (raw file):
prevBytesIterated = c.bytesIterated if dirtyBytes <= uint64(d.opts.MemTableSize * 105/100) {
Let's add some comments here. I confused myself for a bit about what this logic is doing. You're only limiting the flush rate when flushing is keeping up with the pace of user writes. If user writes are coming in so fast that dirty data is accumulating, flushing is allowed to run at full speed.
Also, I think we may need to update flushLimiter
in either case, so that the rate limiter sees an accurate picture of the rate at which data is being flushed. If rate limiting is not needed (because dirty data is accumulating too fast), you would call flushLimiter.AllowN
.
table_cache.go, line 93 at r3 (raw file):
} tableCompactionIter.Iterator.SetCloseHook(func() error {
This is going to conflict with my PR. One of us will have to handle the merge.
internal/arenaskl/flush_iterator.go, line 27 at r3 (raw file):
// to construct an iterator. The current state of the iterator can be cloned by // simply value copying the struct. type flushIterator struct {
The bytesIterated
support here deserves a test in the arenaskl
package, rather than relying on the memtable test you added.
internal/base/options.go, line 252 at r3 (raw file):
// Soft limit on the number of L0 files. Writes are slowed down when this // threshold is reached. L0SlowdownWritesThreshold int
Is this worth keeping now?
sstable/reader.go, line 525 at r2 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
Due to compression, I've made some adjustments. I used the formula
compressed_block_offset + uncompressed_record_offset_in_block * compressed_block_size / uncompressed_block_size
to determine the offset of the current record in the file. This still automatically accounts for block level overhead such as block restart points.
Good point about compression. I misunderstood what you were saying when we chatted in passing yesterday.
sstable/reader.go, line 513 at r3 (raw file):
return nil, nil } // i.dataBH.length/len(i.data.data) is the compression ratio. If uncompressed, this is 1.
I think there might be an extra 5 bytes to add to the length of i.data.data
. There is the byte indicating whether the block is compressed, and the block checksum. See Reader.readBlock
. Interesting that your test isn't catching this. That is something to investigate.
sstable/reader.go, line 516 at r3 (raw file):
// i.data.nextOffset is the uncompressed position of the current record in the block. // i.dataBH.offset is the offset of the block in the sstable before decompression. recordOffset := uint64(i.data.nextOffset) * i.dataBH.length / uint64(len(i.data.data))
When mixing multiplication and division, I prefer to state the grouping explicitly instead of relying on precedence rules: (uint64(i.data.nextOffset) * i.dataBH.length) / uint64(len(i.data.data))
sstable/reader_test.go, line 272 at r3 (raw file):
// Subtract emptySize because the iterator does not increment the last block's // restart points and block trailer.
Can that be fixed?
6cc8e36
to
c260312
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTR!
I've benchmarked the previous implementation where I copied the code into the wrapper iterators versus the new implementation where I call it.Iterator.First()
and it.Iterator.Next()
.
Skiplist:
name old time/op new time/op delta
IterNext-8 11.9ns ± 1% 15.5ns ± 1% +30.11% (p=0.000 n=9+8)
SSTable:
name old time/op new time/op delta
TableIterNext/restart=16-8 28.2ns ± 1% 30.2ns ± 1% +7.03% (p=0.000 n=10+10)
FlushableBatch:
name old time/op new time/op delta
FlushableBatchBytesIterated-8 35.3ns ± 3% 38.6ns ± 7% +9.11% (p=0.000 n=10+10)
The deltas seem fairly significant, especially for the Skiplist. Part of the delta is attributed to the fact that we had to add an extra nil
check on the return value of it.Iterator.First()
and it.Iterator.Next()
. Do you think these results are worth the code duplication?
Reviewable status: 0 of 25 files reviewed, 11 unresolved discussions (waiting on @ajkr, @petermattis, and @Ryanfsdf)
batch_test.go, line 422 at r3 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Let's make some different key and value sizes here, rather than having them all be the same.
Done.
compaction.go, line 998 at r1 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Oh, I didn't notice earlier that you're only locking
d.mu
for this purpose. I think the locking and unlocking ofd.mu
on every iteration is going to be problematic. One thought is to only callmemTableTotalBytesLocked
every 1000 iterations.Related, I think you can push the lock acquisition into
memTableTotalBytesLocked
and removed theLocked
suffix.
Only calling it once every x
iterations seems good, it should still give us a smooth enough signal in most cases.
However, I'm concerned about the case when we have large entries. With very large keys and values, it might be the case that 1000 entries fills an entire memtable. It might be worthwhile to call memtableTotalBytes
either every x
iterations or when 5% of memtable size has been flushed, whichever is hit first.
I've added the implementation.
compaction.go, line 1011 at r2 (raw file):
Previously, petermattis (Peter Mattis) wrote…
What if
flushAmount > 2*burst
? I believe what I've done in the past is something like:if flushAmount >= burst { flushAmount = burst }
My thinking is that this should be a rare occurrence.
Oops, I meant to have that as a for loop. for flushAmount > uint64(burst)
.
compaction.go, line 1010 at r3 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Let's add some comments here. I confused myself for a bit about what this logic is doing. You're only limiting the flush rate when flushing is keeping up with the pace of user writes. If user writes are coming in so fast that dirty data is accumulating, flushing is allowed to run at full speed.
Also, I think we may need to update
flushLimiter
in either case, so that the rate limiter sees an accurate picture of the rate at which data is being flushed. If rate limiting is not needed (because dirty data is accumulating too fast), you would callflushLimiter.AllowN
.
Done.
table_cache.go, line 93 at r3 (raw file):
Previously, petermattis (Peter Mattis) wrote…
This is going to conflict with my PR. One of us will have to handle the merge.
I can handle the merge.
internal/arenaskl/flush_iterator.go, line 27 at r3 (raw file):
Previously, petermattis (Peter Mattis) wrote…
The
bytesIterated
support here deserves a test in thearenaskl
package, rather than relying on the memtable test you added.
Done.
internal/base/options.go, line 252 at r3 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Is this worth keeping now?
No, I've removed it now.
sstable/reader.go, line 513 at r3 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I think there might be an extra 5 bytes to add to the length of
i.data.data
. There is the byte indicating whether the block is compressed, and the block checksum. SeeReader.readBlock
. Interesting that your test isn't catching this. That is something to investigate.
I investigated this and found that the extra 5 bytes are removed from i.data.data
after checking the checksum and compression type. I've also verified that i.dataBH.length == len(i.data.data)
if uncompressed.
sstable/reader.go, line 516 at r3 (raw file):
Previously, petermattis (Peter Mattis) wrote…
When mixing multiplication and division, I prefer to state the grouping explicitly instead of relying on precedence rules:
(uint64(i.data.nextOffset) * i.dataBH.length) / uint64(len(i.data.data))
Done.
sstable/reader_test.go, line 272 at r3 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Can that be fixed?
Yes. Now we also increment bytesIterated
when the iterator returns nil
, since that would indicate that we are done with the block.
c260312
to
4a0bd81
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the benchmarks show the code duplication is worth it, though I'd just manually inline Next
, and not worry about First
. You should also drop a comment in the "source" implementations to note that they are copied (i.e. arenaskl.Iterator.Next
) so that a future engineering editing that code can be aware.
Reviewable status: 0 of 25 files reviewed, 6 unresolved discussions (waiting on @ajkr and @Ryanfsdf)
compaction.go, line 998 at r1 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
Only calling it once every
x
iterations seems good, it should still give us a smooth enough signal in most cases.However, I'm concerned about the case when we have large entries. With very large keys and values, it might be the case that 1000 entries fills an entire memtable. It might be worthwhile to call
memtableTotalBytes
either everyx
iterations or when 5% of memtable size has been flushed, whichever is hit first.I've added the implementation.
Ack. I'm not sure if the 5% threshold is necessary, but I don't see a harm to it.
compaction.go, line 999 at r5 (raw file):
var prevBytesIterated uint64 var iterCount int dirtyBytes := d.memTableTotalBytes()
Perhaps this should be totalBytes
. Then dirtyBytes
becomes totalBytes - c.bytesIterated
.
compaction.go, line 1007 at r5 (raw file):
// Recalculate total memtable bytes only once every 1000 iterations or // when the refresh threshold is hit since getting the total memtable // byte count is expensive.
s/byte count/byte count requires grabbing DB.mu which/g
compaction.go, line 1042 at r5 (raw file):
burst := d.flushLimiter.Burst() for flushAmount > uint64(burst) { d.flushLimiter.AllowN(time.Now(), int(flushAmount))
s/flushAmount/burst/g
sstable/reader.go, line 516 at r5 (raw file):
// We are at the end of the last data block, so we must increment bytes iterated by // the size of the block trailer and restart points. *i.bytesIterated += blockTrailerLen + uint64(4*(i.data.numRestarts+1))
I think this will keep incrementing i.bytesIterated
if Next
is called repeatedly after we fall of the end of the sstable. That seems somewhat surprising.
sstable/reader_test.go, line 306 at r5 (raw file):
for i := uint64(0); i < numEntries; i++ { key := make([]byte, 8) value := make([]byte, 8)
Some varying sizes in the keys and values would be useful to create slightly different blocks size.
4a0bd81
to
e1b2798
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 26 files reviewed, 6 unresolved discussions (waiting on @ajkr and @petermattis)
compaction.go, line 999 at r5 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Perhaps this should be
totalBytes
. ThendirtyBytes
becomestotalBytes - c.bytesIterated
.
d.memTableTotalBytes()
will return different values throughout the iteration so it can't be assigned to a variable and reused. Also, we need an initial value for dirtyBytes
which is d.memTableTotalBytes() - bytesIterated
. At this point, bytesIterated
is zero so we don't subtract it. I've added a comment which explains this.
compaction.go, line 1007 at r5 (raw file):
Previously, petermattis (Peter Mattis) wrote…
s/byte count/byte count requires grabbing DB.mu which/g
Done.
compaction.go, line 1042 at r5 (raw file):
Previously, petermattis (Peter Mattis) wrote…
s/flushAmount/burst/g
Done.
sstable/reader.go, line 516 at r5 (raw file):
Previously, petermattis (Peter Mattis) wrote…
I think this will keep incrementing
i.bytesIterated
ifNext
is called repeatedly after we fall of the end of the sstable. That seems somewhat surprising.
I've updated this to only happen once. We now check on every iteration if the current entry is the last entry in the block. If it is, we include the trailing bytes as well.
sstable/reader_test.go, line 306 at r5 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Some varying sizes in the keys and values would be useful to create slightly different blocks size.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 26 files reviewed, 3 unresolved discussions (waiting on @ajkr and @Ryanfsdf)
compaction.go, line 999 at r5 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
d.memTableTotalBytes()
will return different values throughout the iteration so it can't be assigned to a variable and reused. Also, we need an initial value fordirtyBytes
which isd.memTableTotalBytes() - bytesIterated
. At this point,bytesIterated
is zero so we don't subtract it. I've added a comment which explains this.
Yes, totalBytes
would have to be updated every 1000 iterations, just as you're doing with dirtyBytes
. I'm imagining something like:
var prevBytesIterated uint64
var iterCount int
totalBytes := d.memTableTotalBytes()
for ... {
if c.flushing != nil {
if iterCount >= 1000 || ... {
totalBytes = d.memTableTotalBytes()
}
dirtyBytes := totalBytes - c.bytesIterated
flushBytes := c.bytesIterated - prevBytesIterated
prevBytesIterated = c.bytesIterated
...
}
}
Perhaps I'm misunderstanding something here.
compaction.go, line 1042 at r5 (raw file):
Previously, Ryanfsdf (Ryan Kim) wrote…
Done.
I believe you can do s/int(burst)/burst/g
.
sstable/reader.go, line 531 at r7 (raw file):
key, val := i.data.Next() if key != nil { if i.blockUpper != nil && i.cmp(key.UserKey, i.blockUpper) >= 0 {
Do we need to the upper bounds handling here? i.blockUpper
should always be nil
. Ditto below.
sstable/reader.go, line 674 at r7 (raw file):
// NewCompactionIter returns an internal iterator similar to NewIter but it also increments // the number of bytes iterated. func (r *Reader) NewCompactionIter(lower, upper []byte, bytesIterated *uint64) *compactionIterator {
Do we need to pass lower
and upper
here? I think they will always be nil
for compaction iterators.
e1b2798
to
2516ab5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTR, I've addressed the review comments and rebased on the caching changes.
Reviewable status: 0 of 26 files reviewed, 3 unresolved discussions (waiting on @ajkr and @petermattis)
compaction.go, line 999 at r5 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Yes,
totalBytes
would have to be updated every 1000 iterations, just as you're doing withdirtyBytes
. I'm imagining something like:var prevBytesIterated uint64 var iterCount int totalBytes := d.memTableTotalBytes() for ... { if c.flushing != nil { if iterCount >= 1000 || ... { totalBytes = d.memTableTotalBytes() } dirtyBytes := totalBytes - c.bytesIterated flushBytes := c.bytesIterated - prevBytesIterated prevBytesIterated = c.bytesIterated ... } }
Perhaps I'm misunderstanding something here.
Ahh, that looks much cleaner.
sstable/reader.go, line 531 at r7 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Do we need to the upper bounds handling here?
i.blockUpper
should always benil
. Ditto below.
No we don't. I've removed the boundary checks. I also removed them from batch
and arenaskl
since those don't use bounds either.
sstable/reader.go, line 674 at r7 (raw file):
Previously, petermattis (Peter Mattis) wrote…
Do we need to pass
lower
andupper
here? I think they will always benil
for compaction iterators.
That's true. I've removed them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 26 files reviewed, 1 unresolved discussion (waiting on @ajkr and @Ryanfsdf)
sstable/reader.go, line 667 at r8 (raw file):
func (r *Reader) NewCompactionIter(bytesIterated *uint64) *compactionIterator { i := iterPool.Get().(*Iterator) _ = i.Init(r, nil, nil)
Nit: add inline comments for the nil
parameters. Something like i.Init(r, nil /* lower */, nil /* upper */)
.
2516ab5
to
7ee9345
Compare
Added a flush iterator which tracks the number of bytes iterated through. This allows us to see how much of the memtable has been flushed so that we can slow down flushing to match the speed of incoming writes.
This implements a portion of #7
Initially, I tried to expose an API on the iterator itself to determine the node size of the current node, something like
FlushIterator.NodeSize()
. However, theinternalIterator
interface doesn't expose this API so that wasn't an option. The compaction iterator wraps aninternalIterator
so there is no way to access theFlushIterator.NodeSize()
API during the actual flushing. To get past this issue, I've extended theflushable
interface. FlushIterator updates abytesIterated
field in the underlying skiplist, which then exposes the field through theflushable
interface. This means that only one flush iterator can run concurrently for a skiplist. I'm not sure if this is the best way to do it, do you have any suggestions?The same thing needs to be done with
flushableBatchIter
. The iterator needs to update a field in the underlyingflushableBatch
and expose the iteration progress through theflushable
interface.An alternative I can think of includes extending
internalIterator
but that would result in "panic: unimplemented" in a lot of iterators. Another alternative would be to create a new "MergeIter" which would be a copy of the current "MergeIter" except it iterates over just a flush iterator instead of aninternalIterator
.