Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: pipeline sstable block compression #1383

Closed
wants to merge 9 commits into from
Closed

db: pipeline sstable block compression #1383

wants to merge 9 commits into from

Conversation

bananabrick
Copy link
Contributor

@bananabrick bananabrick commented Nov 23, 2021

Compression of blocks and writing the blocks to a file consume enough cpu that we can extract performance wins by parallelizing compression of blocks, and writing blocks to disk in a separate goroutine. This pr introduces mechanisms which allow us to parallelize compression, and achieve performance gains.

perf difference for no parallel compression vs parallel compression with 5 compression workers on a machine with no IO/CPU bottleneck. Benchmark was run for 30 minutes.

name                old ops/sec  new ops/sec  delta
ycsb/E/values=1024   61.2k ± 3%   62.6k ± 4%     ~     (p=0.222 n=5+5)
ycsb/A/values=1024    192k ± 3%    194k ± 5%     ~     (p=0.310 n=5+5)
ycsb/D/values=1024    195k ± 5%    198k ± 3%     ~     (p=0.222 n=5+5)
ycsb/C/values=1024    878k ±24%    943k ± 5%     ~     (p=0.151 n=5+5)
ycsb/B/values=1024    345k ± 5%    351k ± 3%     ~     (p=0.222 n=5+5)
ycsb/F/values=1024   68.0k ±11%   85.2k ± 4%  +25.37%  (p=0.008 n=5+5)

name                old read     new read     delta
ycsb/E/values=1024    109G ± 3%    105G ± 3%   -4.36%  (p=0.008 n=5+5)
ycsb/A/values=1024   1.01T ± 3%   1.02T ± 5%     ~     (p=0.548 n=5+5)
ycsb/D/values=1024    307G ± 5%    305G ± 2%     ~     (p=0.841 n=5+5)
ycsb/C/values=1024   49.1G ± 4%   41.1G ± 5%  -16.41%  (p=0.008 n=5+5)
ycsb/B/values=1024    274G ± 4%    282G ± 3%     ~     (p=0.056 n=5+5)
ycsb/F/values=1024    908G ±22%   1693G ±21%  +86.47%  (p=0.008 n=5+5)

name                old write    new write    delta
ycsb/E/values=1024    121G ± 3%    117G ± 3%   -3.72%  (p=0.016 n=5+5)
ycsb/A/values=1024   1.20T ± 3%   1.21T ± 5%     ~     (p=0.421 n=5+5)
ycsb/D/values=1024    345G ± 5%    343G ± 2%     ~     (p=1.000 n=5+5)
ycsb/C/values=1024   49.1G ± 4%   41.0G ± 5%  -16.41%  (p=0.008 n=5+5)
ycsb/B/values=1024    310G ± 4%    319G ± 3%     ~     (p=0.056 n=5+5)
ycsb/F/values=1024   1.17T ±20%   2.02T ±17%  +72.79%  (p=0.008 n=5+5)

name                old r-amp    new r-amp    delta
ycsb/E/values=1024    7.78 ± 3%    7.06 ± 2%   -9.30%  (p=0.008 n=5+5)
ycsb/A/values=1024    7.05 ± 1%    6.72 ± 1%   -4.60%  (p=0.008 n=5+5)
ycsb/D/values=1024    6.89 ± 1%    6.61 ± 1%   -4.03%  (p=0.008 n=5+5)
ycsb/C/values=1024    2.63 ±48%    2.21 ± 3%     ~     (p=0.151 n=5+5)
ycsb/B/values=1024    5.76 ± 1%    5.57 ± 0%   -3.16%  (p=0.008 n=5+5)
ycsb/F/values=1024    0.00         0.00          ~     (all equal)

name                old w-amp    new w-amp    delta
ycsb/E/values=1024    20.3 ± 2%    19.1 ± 2%   -5.86%  (p=0.008 n=5+5)
ycsb/A/values=1024    6.42 ± 0%    6.39 ± 0%   -0.47%  (p=0.000 n=5+4)
ycsb/D/values=1024    18.1 ± 0%    17.7 ± 0%   -2.18%  (p=0.008 n=5+5)
ycsb/C/values=1024    0.00         0.00          ~     (all equal)
ycsb/B/values=1024    9.21 ± 1%    9.32 ± 1%     ~     (p=0.063 n=5+5)
ycsb/F/values=1024    8.76 ± 9%   12.14 ±20%  +38.53%  (p=0.008 n=5+5)

ycsb/F perf difference for no parallel compression vs parallel compression with 5 compression workers on a machine with an IO bottleneck, but no cpu bottleneck. Benchmark was run for 30 minutes.

arjunnair@Arjuns-MacBook-Pro dec23_1 % benchstat stat_0 stat_5
name                old ops/sec  new ops/sec  delta
ycsb/F/values=1024   43.7k ± 2%   46.5k ± 3%   +6.25%  (p=0.008 n=5+5)

name                old read     new read     delta
ycsb/F/values=1024    387G ± 4%    480G ± 1%  +24.09%  (p=0.008 n=5+5)

name                old write    new write    delta
ycsb/F/values=1024    556G ± 3%    660G ± 2%  +18.74%  (p=0.008 n=5+5)

name                old r-amp    new r-amp    delta
ycsb/F/values=1024    0.00         0.00          ~     (all equal)

name                old w-amp    new w-amp    delta
ycsb/F/values=1024    6.51 ± 1%    7.25 ± 1%  +11.37%  (p=0.008 n=5+5)

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@bananabrick bananabrick requested a review from jbowens November 30, 2021 17:09
@bananabrick bananabrick marked this pull request as ready for review November 30, 2021 17:10
@bananabrick bananabrick changed the title pebble: pipeline sstable block compression. [WIP] pebble: pipeline sstable block compression. Nov 30, 2021
Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of mutex in the writer is sketchy and I used it to get rid of a bunch of races. It will have to be cleaned up.

Reviewable status: 0 of 25 files reviewed, all discussions resolved (waiting on @jbowens)

Copy link
Collaborator

@jbowens jbowens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 6 files at r1, 11 of 22 files at r3.
Reviewable status: 12 of 25 files reviewed, 1 unresolved discussion (waiting on @bananabrick and @jbowens)


sstable/parallel_compression.go, line 139 at r16 (raw file):

	// queue is sequenced by the order of the writes to
	// the file.
	queue  chan *BlockWriteCoordinator

Did you try any designs/code organizations that avoided channels? I think channels are often tempting but turn out to be cumbersome. I'm wondering if we model the queue explicitly, as a ring buffer, can we consolidate the compression and write queues?

type compressionQueue struct {
    queue       [queueSize]queueItem
    headWrite   int32
    headRequest int32
}

type queueItem struct {
    request     compressionRequest
    compressed  chan compressedData // 1-size buffer
}

We could start with synchronizing access to the queue with a mutex, and later if necessary, explore more complicated schemes like using lock-free compare-and-swaps like the commitQueue does.

Under the above scheme, compressionRequest should own all the buffers related to the original raw block, and the compressedData should own its own buffers. Both could be pooled under a sync.Pool, and released to the pool when finished.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 12 of 25 files reviewed, 1 unresolved discussion (waiting on @jbowens)


sstable/parallel_compression.go, line 139 at r16 (raw file):

Previously, jbowens (Jackson Owens) wrote…

Did you try any designs/code organizations that avoided channels? I think channels are often tempting but turn out to be cumbersome. I'm wondering if we model the queue explicitly, as a ring buffer, can we consolidate the compression and write queues?

type compressionQueue struct {
    queue       [queueSize]queueItem
    headWrite   int32
    headRequest int32
}

type queueItem struct {
    request     compressionRequest
    compressed  chan compressedData // 1-size buffer
}

We could start with synchronizing access to the queue with a mutex, and later if necessary, explore more complicated schemes like using lock-free compare-and-swaps like the commitQueue does.

Under the above scheme, compressionRequest should own all the buffers related to the original raw block, and the compressedData should own its own buffers. Both could be pooled under a sync.Pool, and released to the pool when finished.

Yea, I had my own queue initially, but my queues api looked a lot like a channel, with a little bit more control over the state. I could go back to that, since it was a little less confusing than my use of channels here.

In your code snippet, there would still need to be a separate write queue to maintain order of writes. The compression queue can have multiple threads reading from it, and there's no guarantee that those threads will finish compression work in the same order as the order of the items in the compression queue.

That's why each writer has a write queue, to indicate the order of writes to the file.

@bananabrick
Copy link
Contributor Author

benchstat comparison results for no parallel threads vs 5 parallel compression threads on a 10 minute ycsb/A workload.

arjunnair@Arjuns-MacBook-Pro par2 % benchstat stat_0 stat_5                     
name                old ops/sec  new ops/sec  delta
ycsb/A/values=1024   97.0k ± 2%   99.9k ± 1%   +3.05%  (p=0.008 n=5+5)

name                old read     new read     delta
ycsb/A/values=1024    196G ± 2%    198G ± 1%     ~     (p=0.310 n=5+5)

name                old write    new write    delta
ycsb/A/values=1024    231G ± 1%    233G ± 1%     ~     (p=0.095 n=5+5)

name                old r-amp    new r-amp    delta
ycsb/A/values=1024    12.5 ± 9%     9.1 ± 9%  -27.42%  (p=0.008 n=5+5)

name                old w-amp    new w-amp    delta
ycsb/A/values=1024    7.26 ± 2%    7.16 ± 1%     ~     (p=0.524 n=5+5)
arjunnair@Arjuns-MacBook-Pro par2 % 

@bananabrick bananabrick requested review from a team and itsbilal December 1, 2021 20:58
Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 12 of 25 files reviewed, 2 unresolved discussions (waiting on @itsbilal and @jbowens)


sstable/writer.go, line 433 at r19 (raw file):

	var bhp BlockHandleWithProperties
	var err error
	if bhp, err = w.maybeAddBlockPropertiesToBlockHandle(bh); err != nil {

This is now called asynchronously, and not when we determine that the block CAN be flushed. Is this fine?

@bananabrick bananabrick requested a review from jbowens December 1, 2021 21:04
Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be adding tests after an initial review.

Reviewable status: 12 of 25 files reviewed, 2 unresolved discussions (waiting on @itsbilal and @jbowens)

@bananabrick bananabrick changed the title [WIP] pebble: pipeline sstable block compression. pebble: pipeline sstable block compression. Dec 1, 2021
@bananabrick bananabrick changed the title pebble: pipeline sstable block compression. db: pipeline sstable block compression. Dec 1, 2021
Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 12 of 25 files reviewed, 3 unresolved discussions (waiting on @itsbilal and @jbowens)


sstable/writer.go, line 472 at r19 (raw file):

			doneCh:  doneCh,
			key:     copyInternalKey(key),
			value:   copySlice(value),

copying slices over here consumes some cpu. I'm not sure if this is unavoidable?

@bananabrick
Copy link
Contributor Author

bananabrick commented Dec 2, 2021

I ran all of the benchmarks. Seems like there's significant benefits for each workload except for ycsb/F which for some reason dropped.

name                old ops/sec  new ops/sec  delta
ycsb/E/values=1024   49.0k ± 7%   53.0k ± 8%     ~     (p=0.056 n=5+5)
ycsb/A/values=1024    100k ± 1%    100k ± 2%     ~     (p=0.222 n=5+5)
ycsb/D/values=1024    152k ± 8%    167k ± 7%   +9.91%  (p=0.016 n=5+5)
ycsb/F/values=1024   61.6k ± 2%   47.6k ± 1%  -22.72%  (p=0.008 n=5+5)
ycsb/C/values=1024    663k ± 6%    654k ±13%     ~     (p=0.841 n=5+5)
ycsb/B/values=1024    295k ± 6%    310k ± 6%     ~     (p=0.095 n=5+5)

name                old read     new read     delta
ycsb/E/values=1024   56.4G ± 6%   52.8G ± 1%   -6.39%  (p=0.032 n=5+5)
ycsb/A/values=1024    196G ± 1%    198G ± 1%   +1.24%  (p=0.032 n=5+5)
ycsb/D/values=1024    100G ± 7%    102G ± 5%     ~     (p=0.421 n=5+5)
ycsb/F/values=1024    150G ± 2%    176G ± 0%  +17.43%  (p=0.008 n=5+5)
ycsb/C/values=1024   58.1G ± 3%   52.6G ± 0%   -9.47%  (p=0.016 n=5+4)
ycsb/B/values=1024    103G ± 5%    104G ± 4%     ~     (p=1.000 n=5+5)

name                old write    new write    delta
ycsb/E/values=1024   59.5G ± 6%   56.1G ± 2%     ~     (p=0.095 n=5+5)
ycsb/A/values=1024    231G ± 1%    233G ± 1%     ~     (p=0.095 n=5+5)
ycsb/D/values=1024    110G ± 7%    113G ± 5%     ~     (p=0.310 n=5+5)
ycsb/F/values=1024    229G ± 2%    237G ± 0%   +3.59%  (p=0.008 n=5+5)
ycsb/C/values=1024   58.1G ± 3%   52.6G ± 0%   -9.47%  (p=0.016 n=5+4)
ycsb/B/values=1024    115G ± 5%    116G ± 4%     ~     (p=0.690 n=5+5)

name                old r-amp    new r-amp    delta
ycsb/E/values=1024    14.8 ± 8%    12.0 ± 8%  -19.06%  (p=0.008 n=5+5)
ycsb/A/values=1024    13.0 ± 6%     9.6 ± 5%  -25.98%  (p=0.008 n=5+5)
ycsb/D/values=1024    9.58 ± 4%    8.38 ± 3%  -12.55%  (p=0.008 n=5+5)
ycsb/F/values=1024    0.00         0.00          ~     (all equal)
ycsb/C/values=1024    3.35 ±21%    3.75 ± 2%     ~     (p=0.548 n=5+5)
ycsb/B/values=1024    7.11 ± 2%    6.67 ± 2%   -6.13%  (p=0.008 n=5+5)

name                old w-amp    new w-amp    delta
ycsb/E/values=1024    37.3 ± 6%    32.6 ± 7%  -12.66%  (p=0.008 n=5+5)
ycsb/A/values=1024    7.09 ± 1%    7.15 ± 1%     ~     (p=0.087 n=5+5)
ycsb/D/values=1024    22.3 ± 2%    20.9 ± 2%   -6.43%  (p=0.008 n=5+5)
ycsb/F/values=1024    5.71 ± 1%    7.62 ± 1%  +33.58%  (p=0.008 n=5+5)
ycsb/C/values=1024    0.00         0.00          ~     (all equal)
ycsb/B/values=1024    11.9 ± 3%    11.5 ± 2%   -3.80%  (p=0.016 n=5+5)

Looking at the lsms for no parallel compression for ycsb/F:

__level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___write(sz_cnt)____read___r-amp___w-amp
    WAL         3   127 M       -    37 G       -       -       -       -    37 G       -       -       -     1.0
      0     20877    24 G    5.72    37 G     0 B       0     0 B       0    36 G    31 K     0 B     267     1.0
      1         0     0 B    0.00     0 B     0 B       0     0 B       0     0 B       0     0 B       0     0.0
      2         0     0 B    0.00     0 B     0 B       0     0 B       0     0 B       0     0 B       0     0.0
      3       617   2.4 G    5.89    16 G     0 B       0     0 B       0    64 G    16 K    64 G       1     4.1
      4       665   4.3 G    5.90    12 G     0 B       0   1.6 G     425    34 G   5.3 K    34 G       1     2.7
      5       572   6.3 G    5.89   9.6 G     0 B       0   1.3 G     259    24 G   2.1 K    24 G       1     2.5
      6       408   9.3 G       -   6.4 G     0 B       0     0 B       0    17 G     751    17 G       1     2.7
  total     23139    46 G       -    37 G     0 B       0   2.9 G     684   213 G    55 K   139 G     271     5.7
  flush       407
compact      5606   133 G   2.7 G       3          (size == estimated-debt, score = in-progress-bytes, in = num-in-progress)
  ctype      4922       0       0     684       0  (default, delete, elision, move, read)
 memtbl         3   192 M
zmemtbl         0     0 B
   ztbl         0     0 B
 bcache      88 K   2.5 G    0.5%  (score == hit-rate)
 tcache     1.1 K   680 K   42.2%  (score == hit-rate)
 titers       541
 filter         -       -    0.0%  (score == utility)

vs parallel compression with 5 workers for ycsb/F:

Benchmarkycsb/F/values=1024 28674721  47791.1 ops/sec  175382337945 read  236834384844 write  0.00 r-amp  7.61 w-amp

__level_____count____size___score______in__ingest(sz_cnt)____move(sz_cnt)___write(sz_cnt)____read___r-amp___w-amp
    WAL         2   114 M       -    29 G       -       -       -       -    29 G       -       -       -     1.0
      0      9908    12 G    4.70    29 G     0 B       0     0 B       0    28 G    24 K     0 B     200     1.0
      1         0     0 B    0.00     0 B     0 B       0     0 B       0     0 B       0     0 B       0     0.0
      2         0     0 B    0.00     0 B     0 B       0     0 B       0     0 B       0     0 B       0     0.0
      3       538   2.1 G    4.86    19 G     0 B       0     0 B       0    65 G    16 K    65 G       1     3.4
      4       849   4.3 G    4.79    15 G     0 B       0   2.5 G     652    41 G   7.1 K    41 G       1     2.8
      5       893   7.3 G    4.79    13 G     0 B       0   1.1 G     453    32 G   3.0 K    32 G       1     2.5
      6       625    13 G       -   9.0 G     0 B       0     0 B       0    25 G   1.1 K    25 G       1     2.8
  total     12813    38 G       -    29 G     0 B       0   3.7 G   1.1 K   221 G    52 K   163 G     204     7.6
  flush       463
compact      7178    88 G   1.4 G       3          (size == estimated-debt, score = in-progress-bytes, in = num-in-progress)
  ctype      6073       0       0    1105       0  (default, delete, elision, move, read)
 memtbl         2   128 M
zmemtbl         0     0 B
   ztbl         0     0 B
 bcache      44 K   1.2 G    0.5%  (score == hit-rate)
 tcache       778   468 K   40.1%  (score == hit-rate)
 titers       362
 filter         -       -    0.0%  (score == utility)

It seems like we're just doing many more compactions because each compaction is faster, which somehow led to a hit in foreground performance. We did hit an ssd level throughput bottleneck for both tests, so maybe the parallel compression test had compactions occupying a larger portion of the throughput to the ssd. Note that the lsm is completely inverted in the case of no parallel compression. My guess is that running the write throughput benchmark on this or running ycsb/F with a larger ssd will give us more correct results.

@bananabrick
Copy link
Contributor Author

bananabrick commented Dec 4, 2021

I reran ycsb/F after removing the ssd level bottlenecks.

arjunnair@Arjuns-MacBook-Pro par4 % benchstat stat_0 stat_5               
name                old ops/sec  new ops/sec  delta
ycsb/F/values=1024   90.8k ± 8%   82.1k ± 7%     ~     (p=0.222 n=5+5)

name                old read     new read     delta
ycsb/F/values=1024    495G ± 7%    656G ± 3%  +32.65%  (p=0.008 n=5+5)

name                old write    new write    delta
ycsb/F/values=1024    612G ± 5%    762G ± 2%  +24.55%  (p=0.008 n=5+5)

name                old r-amp    new r-amp    delta
ycsb/F/values=1024    0.00         0.00          ~     (all equal)

name                old w-amp    new w-amp    delta
ycsb/F/values=1024    10.4 ±12%    14.3 ± 8%  +37.78%  (p=0.008 n=5+5)

Turns out that the w-amp is still higher for parallel compression, because we're doing many more compactions. The lsm is in better shape though.

What's interesting is that, more/faster compactions can still effect foreground performance even if there is no ssd level iops/throughput bottlenecks.

Copy link
Collaborator

@jbowens jbowens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's interesting is that, more/faster compactions can still effect foreground performance even if there is no ssd level iops/throughput bottlenecks.

How's the CPU utilization? Even if there's no IO bottleneck, background CPU utilization can introduce latency in scheduling foreground traffic.

Reviewable status: 12 of 25 files reviewed, 4 unresolved discussions (waiting on @bananabrick, @itsbilal, and @jbowens)


sstable/writer.go, line 433 at r19 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

This is now called asynchronously, and not when we determine that the block CAN be flushed. Is this fine?

If I understand what you mean, it is a problem. The block property collector accumulates state about the current block each time its Add method is called. When all the keys have been added to a block, the block property collector's FinishDataBlock must be called (by w.maybeAddBlockPropertiesToBlockHandle) before any additional Add calls are made to the block property collector. Calling FinishDataBlock later will cause the block's properties to be computed over keys that are not contained within the block.


sstable/writer.go, line 176 at r20 (raw file):

	// writerMu only has an effect if parallel compression is
	// enabled.
	writerMu sync.Mutex

I don't really understand why a general Writer mutex is necessary—In principle the user of the Writer should be able to write keys into buffers owned by the Writer. When a block is ready to be finished, it should be able to pass those buffers off to a compression goroutine, and obtain fresh buffers for the next block. The compression goroutine should never need to modify the Writer's memory, other than some mechanism for signaling blocks as flushed.

Did you add it because of the index block(s)? When a block is finished we:

  1. Write the restart intervals and compute the checksum. (ok to do in parallel)
  2. Compress the data. (ok to do in parallel)
  3. Call BlockPropertyCollector.FinishDataBlock to ask the block property collector what the block's properties are. With our current BlockPropertyCollector interface, this must be done sequentially within the Writer user's thread. (sequential)
  4. Add the block to the index block. This must be done in the order of the blocks, but not necessarily sequentially with the Writer user's thread. (parallel)
  5. Adding the above entry to the index block might trigger a flush of the current index block. This can mostly happen in parallel, except the call to BlockPropertyCollector.FinishIndexBlock. That must happen sequentially with all the BlockPropertyCollectors calls, and before any AddPrevDataBlockToIndexBlock calls for future finished data blocks.

I suspect some changes to the BlockPropertyCollector interface could help untangle these dependencies and simplify the memory sharing here. @sumeerbhola — do you have thoughts?

One initial idea is that only the BlockPropertyCollector is called by the goroutine driving the Writer. When a block is finished, an opaque BlockPropertyValue is returned that then may be Merged with other BlockPropertyValues to compute the property value for an index block. That Mergeing may happen by other goroutines, and ownership of the BlockPropertyValue may be passed from goroutine to goroutine.

type BlockPropertyCollector interface {
	// Name returns the name of the block property collector.
	Name() string
	// Add is called with each new entry added to a data block in the sstable.
	// The callee can assume that these are in sorted order.
	Add(key InternalKey, value []byte) error
	// FinishDataBlock is called when all the entries have been added to a
	// data block. Subsequent Add calls will be for the next data block. It
	// returns the property value for the finished block.
	FinishDataBlock(buf []byte) (BlockPropertyValue, error)
}

type BlockPropertyValue interface {
       // Encode serializes the value.
       Encode() ([]byte, error)

        // Merge merges a block property value with another subsequent block
        // property value. Merge may mutate the callee state and return itself for
        // efficiency.
	Merge(BlockPropertyValue) BlockPropertyValue
}

Copy link
Contributor

@nicktrav nicktrav left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any sense as to he

Reviewed 4 of 22 files at r3, 1 of 1 files at r14, 1 of 1 files at r15, all commit messages.
Reviewable status: 18 of 25 files reviewed, 8 unresolved discussions (waiting on @bananabrick, @itsbilal, and @jbowens)


-- commits, line 1 at r20:
Given the scope of this change, it would be nice to have more substantial commentary in the commit description.


options.go, line 389 at r20 (raw file):

MaxCompressionThreads

nit: I looked in the options and I couldn't find any other reference to "thread". Would "goroutine" be better? Or even just "concurrency".


sstable/parallel_compression.go, line 1 at r20 (raw file):

package sstable

I'm having a little trouble unpicking how all of this works, and what the data structures represent. Are you add some more commentary in this file as to the overall approach (i.e. these are the core structs and what they represent, this is how we parallelize the work; this is how we coordinate, etc).


sstable/writer.go, line 472 at r19 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

copying slices over here consumes some cpu. I'm not sure if this is unavoidable?

Seems like we're copying because we're moving into an async execution model and we don't want the bytes to change underneath us once this function returns. What are the guarantees on the lifetimes of the keys and values passed in elsewhere?

If we have to copy, could we pool the byte slices somehow?


sstable/writer.go, line 469 at r20 (raw file):

		w.writerMu.Unlock()

		w.writeQueue.queue <- &BlockWriteCoordinator{

Rather than referencing the struct members directly, it might be nice to have a function that handles this all of this. Might make testing a little simpler too.

@bananabrick
Copy link
Contributor Author

bananabrick commented Dec 6, 2021

TFTRs! I'll respond to them shortly.

I was able to get rid of the ycsb/F regression. I was only running the benchmarks for 10 minutes, which wasn't long enough to see the benefits of the better LSM shape.

I ran the experiments after removing ssd level bottlenecks, and I ran ycsb/F for 30 minutes, on gce n1-standard-16 machines.

Here's an example of the difference between 0 parallel compression workers, and 4 parallel compression workers:

arjunnair@Arjuns-MacBook-Pro par5 % benchstat stat_par0 stat_par4
name                old ops/sec  new ops/sec  delta
ycsb/F/values=1024   71.8k ±10%   80.5k ±10%  +12.08%  (p=0.032 n=5+5)

name                old read     new read     delta
ycsb/F/values=1024   1.04T ±36%   1.61T ±21%  +55.57%  (p=0.032 n=5+5)

name                old write    new write    delta
ycsb/F/values=1024   1.31T ±31%   1.92T ±18%  +46.33%  (p=0.032 n=5+5)

name                old r-amp    new r-amp    delta
ycsb/F/values=1024    0.00         0.00          ~     (all equal)

name                old w-amp    new w-amp    delta
ycsb/F/values=1024    9.24 ±18%   12.26 ±25%  +32.72%  (p=0.032 n=5+5)

Adding a fifth compression worker didn't seem to improve throughput for my configuration(improvement doesn't show up as statistically significant). I suspect it would still improve throughput if I ran the benchmarks for longer than 30 minutes.

Here's 0 parallel compression workers vs 5:

name                old ops/sec  new ops/sec  delta
ycsb/F/values=1024   71.8k ±10%   79.8k ±10%     ~     (p=0.056 n=5+5)

name                old read     new read     delta
ycsb/F/values=1024   1.04T ±36%   1.75T ±19%  +68.73%  (p=0.008 n=5+5)

name                old write    new write    delta
ycsb/F/values=1024   1.31T ±31%   2.06T ±15%  +56.48%  (p=0.008 n=5+5)

name                old r-amp    new r-amp    delta
ycsb/F/values=1024    0.00         0.00          ~     (all equal)

name                old w-amp    new w-amp    delta
ycsb/F/values=1024    9.24 ±18%   13.27 ±20%  +43.63%  (p=0.016 n=5+5)

The lsm shape incrementally improves from 0 parallel compression workers to 5.

Here's the cpu utilization for experiments with 0 to 5 parallel compression workers:
Screen Shot 2021-12-06 at 4 36 25 PM
Cpu utilization increases from ~30% to ~45% after adding one compression worker, but the increase is minimal for every compression worker added.

Here's the disk stats for 0 through 5 parallel compression workers:
Screen Shot 2021-12-06 at 4 38 09 PM

The cost of adding an additional parallel compression goroutine after the first one was minimal in terms of cpu utilization. Parallel compression also doesn't seem to have a significant impact on disk utilization, at least for the workload I was running.

Parallel compression does seem to have a decent impact on the lsm shape, and pebble performance. I'll be running ycsb/A-E with longer runtimes to confirm.

Copy link
Collaborator

@petermattis petermattis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 18 of 25 files reviewed, 9 unresolved discussions (waiting on @bananabrick, @itsbilal, and @jbowens)


sstable/writer.go, line 472 at r19 (raw file):

Previously, nicktrav (Nick Travers) wrote…

Seems like we're copying because we're moving into an async execution model and we don't want the bytes to change underneath us once this function returns. What are the guarantees on the lifetimes of the keys and values passed in elsewhere?

If we have to copy, could we pool the byte slices somehow?

The key and value are invalidated after the call to Writer.Add, but I do think we can lessen the memory allocations here significantly.

PS I don't think you need to copy the value, only the key. BlockWriteCoordinator.value looks unused.


sstable/writer.go, line 462 at r20 (raw file):

	if w.compressionQueueRef != nil {
		finished := copySlice(w.block.finish())

Making a copy of the block contents every time the block is flushed will cause a very significant increase in GC pressure. I think we need to structure the code to avoid this. I think you want to have a fixed number of blockWriters that is equal to the compression concurrency. When a block is flushed, you copy the key we're flushing. I don't think you need to copy the previous key as that is always w.block.curKey and w.block is part of what is being queued for compression. As you're doing now, in addition to queueing for compression you queue for writing and have a single writer goroutine that waits for the next block to write to be compressed. When it is compressed, it writes the block and sets a per-block error value. Then here in this flushing code, after doing this queueing you have to pick up the next block to write into (perhaps waiting for it to be written). If the block contains a write or compression error you set w.err. I think this would allow you to get rid of using w.writerMu to synchronize access to w.err. I think that should be another goal.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going through some of the comments. Will do the rest after I finish these.

Reviewable status: 18 of 25 files reviewed, 8 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @sumeerbhola)


-- commits, line 1 at r20:

Previously, nicktrav (Nick Travers) wrote…

Given the scope of this change, it would be nice to have more substantial commentary in the commit description.

I plan to add that when I squash the commits.


sstable/writer.go, line 433 at r19 (raw file):

Previously, jbowens (Jackson Owens) wrote…

If I understand what you mean, it is a problem. The block property collector accumulates state about the current block each time its Add method is called. When all the keys have been added to a block, the block property collector's FinishDataBlock must be called (by w.maybeAddBlockPropertiesToBlockHandle) before any additional Add calls are made to the block property collector. Calling FinishDataBlock later will cause the block's properties to be computed over keys that are not contained within the block.

Yep, trying to fix this now.


sstable/writer.go, line 176 at r20 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I don't really understand why a general Writer mutex is necessary—In principle the user of the Writer should be able to write keys into buffers owned by the Writer. When a block is ready to be finished, it should be able to pass those buffers off to a compression goroutine, and obtain fresh buffers for the next block. The compression goroutine should never need to modify the Writer's memory, other than some mechanism for signaling blocks as flushed.

Did you add it because of the index block(s)? When a block is finished we:

  1. Write the restart intervals and compute the checksum. (ok to do in parallel)
  2. Compress the data. (ok to do in parallel)
  3. Call BlockPropertyCollector.FinishDataBlock to ask the block property collector what the block's properties are. With our current BlockPropertyCollector interface, this must be done sequentially within the Writer user's thread. (sequential)
  4. Add the block to the index block. This must be done in the order of the blocks, but not necessarily sequentially with the Writer user's thread. (parallel)
  5. Adding the above entry to the index block might trigger a flush of the current index block. This can mostly happen in parallel, except the call to BlockPropertyCollector.FinishIndexBlock. That must happen sequentially with all the BlockPropertyCollectors calls, and before any AddPrevDataBlockToIndexBlock calls for future finished data blocks.

I suspect some changes to the BlockPropertyCollector interface could help untangle these dependencies and simplify the memory sharing here. @sumeerbhola — do you have thoughts?

One initial idea is that only the BlockPropertyCollector is called by the goroutine driving the Writer. When a block is finished, an opaque BlockPropertyValue is returned that then may be Merged with other BlockPropertyValues to compute the property value for an index block. That Mergeing may happen by other goroutines, and ownership of the BlockPropertyValue may be passed from goroutine to goroutine.

type BlockPropertyCollector interface {
	// Name returns the name of the block property collector.
	Name() string
	// Add is called with each new entry added to a data block in the sstable.
	// The callee can assume that these are in sorted order.
	Add(key InternalKey, value []byte) error
	// FinishDataBlock is called when all the entries have been added to a
	// data block. Subsequent Add calls will be for the next data block. It
	// returns the property value for the finished block.
	FinishDataBlock(buf []byte) (BlockPropertyValue, error)
}

type BlockPropertyValue interface {
       // Encode serializes the value.
       Encode() ([]byte, error)

        // Merge merges a block property value with another subsequent block
        // property value. Merge may mutate the callee state and return itself for
        // efficiency.
	Merge(BlockPropertyValue) BlockPropertyValue
}

I added the writer mutex because the main compaction thread calls writer.Add -> writer.addPoint, the thread which is writing the blocks to disk will call writer.writeBlockPostCompression, writer.addBlockPropertiesandIndexEntry. The reason the call to writer.addBlockPropertiesandIndexEntry happens in the thread which writes the blocks to disk is because it requires a block handle which is only returned from writer.writeBlockPostCompression. This thread also sets writer.err, and I was using the mutex for that too.

I'm not trying to synchronize to maintain any logical invariant, but to just avoid data races.

I do want to get rid of the mutex, though.

I'll explore some ways to do that today.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 18 of 25 files reviewed, 8 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, @petermattis, and @sumeerbhola)


sstable/writer.go, line 462 at r20 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Making a copy of the block contents every time the block is flushed will cause a very significant increase in GC pressure. I think we need to structure the code to avoid this. I think you want to have a fixed number of blockWriters that is equal to the compression concurrency. When a block is flushed, you copy the key we're flushing. I don't think you need to copy the previous key as that is always w.block.curKey and w.block is part of what is being queued for compression. As you're doing now, in addition to queueing for compression you queue for writing and have a single writer goroutine that waits for the next block to write to be compressed. When it is compressed, it writes the block and sets a per-block error value. Then here in this flushing code, after doing this queueing you have to pick up the next block to write into (perhaps waiting for it to be written). If the block contains a write or compression error you set w.err. I think this would allow you to get rid of using w.writerMu to synchronize access to w.err. I think that should be another goal.

I'm exploring the design with multiple blockWriters. But will copying the result of w.block.finish(), into a preallocated buffer avoid the gc pressure too?


sstable/writer.go, line 469 at r20 (raw file):

Previously, nicktrav (Nick Travers) wrote…

Rather than referencing the struct members directly, it might be nice to have a function that handles this all of this. Might make testing a little simpler too.

Yep, I was planning to do this.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 18 of 25 files reviewed, 8 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, @petermattis, and @sumeerbhola)


sstable/parallel_compression.go, line 1 at r20 (raw file):

Previously, nicktrav (Nick Travers) wrote…

I'm having a little trouble unpicking how all of this works, and what the data structures represent. Are you add some more commentary in this file as to the overall approach (i.e. these are the core structs and what they represent, this is how we parallelize the work; this is how we coordinate, etc).

Yep. Didn't add it here, because I expected the design to change during code review.

Copy link
Collaborator

@petermattis petermattis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 18 of 25 files reviewed, 8 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @sumeerbhola)


sstable/writer.go, line 462 at r20 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

I'm exploring the design with multiple blockWriters. But will copying the result of w.block.finish(), into a preallocated buffer avoid the gc pressure too?

blockWriter internally maintains a buffer. Notice how blockWriter.finish() returns []byte slice that aliases with blockWriter.buf. There is also the blockWriter.restarts slice which you also want to avoid reallocating. In addition to the slice of blockWriters, you also need a slice of compression bufs (e.g. Writer.compressedBuf). Each of the compression goroutines would reuse the same compression buf over and over while compressing blocks for the sstable.

Copy link
Contributor

@itsbilal itsbilal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly adding some minor comments on top of what others have already pointed out. I'll wait for you to respond to them before doing a more detailed review.

Reviewed 3 of 22 files at r3, 3 of 3 files at r20, all commit messages.
Reviewable status: 24 of 25 files reviewed, 17 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, @petermattis, and @sumeerbhola)


sstable/parallel_compression.go, line 12 at r20 (raw file):

// todo(bananabrick) : We should have a fixed number of these
// pulled from a buffer.
type CompressedData struct {

A sync.Pool will probably help if you don't wanna manually set up and manage a buffer.


sstable/parallel_compression.go, line 70 at r20 (raw file):

// NewCompressionQueueContainer will start numWorkers goroutines to process
// compression of blocks, and return a usable *CompressionQueueContainer.
func NewCompressionQueueContainer(

Maybe CompressionWorkersQueue is a better name for this? Since that's effectively what this is doing, and worker queues are a well known concept/term.


sstable/parallel_compression.go, line 83 at r20 (raw file):

}

func (qu *CompressionQueueContainer) startWorker() {

Nit: this should be runWorker, like below


sstable/parallel_compression.go, line 117 at r20 (raw file):

		toCompress.doneCh <- compressedData
	}
	qu.closeWG.Done()

Move to defer, like below


sstable/parallel_compression.go, line 156 at r20 (raw file):

}

func (qu *writeQueueContainer) startWorker() {

Nit: I think this should be runWorker as it runs the worker code itself. start implies something that calls go runWorker() and returns rightaway.


sstable/parallel_compression.go, line 178 at r20 (raw file):

			)
			if err != nil {
				qu.writer.writerMu.Lock()

This error handling code is repeated a lot in here, and you could probably benefit from just writing to an errChan that just sends it to the writer (which also reduces the need to grab the mutex here). And the else's are less necessary because you cold just continue in case of an error.


sstable/parallel_compression.go, line 193 at r20 (raw file):

		}
	}
	qu.closeWG.Done()

This can be in a defer at the top. It avoids a race that way that could show this goroutine as leaked even though this Done has already run. And you can replace the break with a return.


sstable/parallel_compression.go, line 201 at r20 (raw file):

// The queue is no longer usable after finish is called.
func (qu *writeQueueContainer) finish() {
	qu.queue <- &BlockWriteCoordinator{

You could close the channel instead? That would reduce the need to have the finished bool. Where you read from it, you could just do:

val, ok := <-channel
if !ok {
   break
}

sstable/writer.go, line 985 at r20 (raw file):

// NewWriter returns a new table writer for the file. Closing the writer will
// close the file.
func NewWriter(f writeCloseSyncer, compressionQueue *CompressionQueueContainer, o WriterOptions, extraOpts ...WriterOption) *Writer {

You can add a CompressionQueueOpt that implements WriterOption and takes in a compressionQueue. No need to make it a required argument when nil is valid input.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 24 of 25 files reviewed, 17 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, @petermattis, and @sumeerbhola)


sstable/writer.go, line 176 at r20 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

I added the writer mutex because the main compaction thread calls writer.Add -> writer.addPoint, the thread which is writing the blocks to disk will call writer.writeBlockPostCompression, writer.addBlockPropertiesandIndexEntry. The reason the call to writer.addBlockPropertiesandIndexEntry happens in the thread which writes the blocks to disk is because it requires a block handle which is only returned from writer.writeBlockPostCompression. This thread also sets writer.err, and I was using the mutex for that too.

I'm not trying to synchronize to maintain any logical invariant, but to just avoid data races.

I do want to get rid of the mutex, though.

I'll explore some ways to do that today.

@jbowens

I'm able to solve,

5. Adding the above entry to the index block might trigger a flush of the current index block. This can mostly happen in parallel, except the call to BlockPropertyCollector.FinishIndexBlock. That must happen sequentially with all the BlockPropertyCollectors calls, and before any AddPrevDataBlockToIndexBlock calls for future finished data blocks.

if I can determine whether the index block should be flushed before we perform the compression/write data block to disk. I'm exploring whether it is easier to do that, or if it's easier to change the block property collector interface.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 24 of 25 files reviewed, 17 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, @petermattis, and @sumeerbhola)


sstable/writer.go, line 176 at r20 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

@jbowens

I'm able to solve,

5. Adding the above entry to the index block might trigger a flush of the current index block. This can mostly happen in parallel, except the call to BlockPropertyCollector.FinishIndexBlock. That must happen sequentially with all the BlockPropertyCollectors calls, and before any AddPrevDataBlockToIndexBlock calls for future finished data blocks.

if I can determine whether the index block should be flushed before we perform the compression/write data block to disk. I'm exploring whether it is easier to do that, or if it's easier to change the block property collector interface.

Another way, I can think of to get rid of this dependency, would be to pre-compute the property value of index block in the Writer thread, regardless of whether we actually flush or not flush. Then we can use this precomputed value, if we actually decide to flush. What do you think?

Copy link
Collaborator

@jbowens jbowens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 24 of 25 files reviewed, 17 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, @petermattis, and @sumeerbhola)


sstable/writer.go, line 176 at r20 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

Another way, I can think of to get rid of this dependency, would be to pre-compute the property value of index block in the Writer thread, regardless of whether we actually flush or not flush. Then we can use this precomputed value, if we actually decide to flush. What do you think?

I suspect it would be too much additional overhead to pre-compute the property value unconditionally. Determining whether the index block will need to be flushed ahead of time seems tricky, but maybe feasible.

The decision to flush is dependent on the size of the compressed data block, because the length of the compressed data block is written to the index block as a part of the BlockHandle. I don't know of any reason the flush decision needs to be precise (and shouldFlush today calculates an estimate of the index block's current size through (*blockWriter).estimatedSize). It might be worth trying to make the determination whether to flush just assuming the maximum compressed length of math.MaxUint64. This only adds a few bytes of error to the calculation.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 2 of 26 files reviewed, 14 unresolved discussions (waiting on @itsbilal, @jbowens, @nicktrav, @petermattis, and @sumeerbhola)


sstable/parallel_compression.go, line 201 at r20 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I don't think that's true. If a buffered channel holds elements and is closed, I believe reads on the channel will still pull out buffered elements until the buffer is empty.

For example: https://go.dev/play/p/f8p9Px4oOoH

Ohh. Thanks. I had tested this in the past, but I guess I misremembered.

@bananabrick bananabrick requested a review from jbowens December 17, 2021 17:14
Compression of blocks and writing the blocks to a file consume enough cpu that we
can extract performance wins by parallelizing compression of blocks, and writing
blocks to disk in a separate goroutine. This pr introduces mechanisms which allow
us to parallelize compression, and achieve performance gains.
Copy link
Collaborator

@jbowens jbowens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still working my way though, but looking good!

Reviewed 1 of 22 files at r3, 1 of 4 files at r31, 1 of 19 files at r32, 1 of 2 files at r34, 1 of 3 files at r37, all commit messages.
Reviewable status: 6 of 26 files reviewed, 30 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


open.go, line 370 at r37 (raw file):

		if opts.MaxConcurrentCompactions > int(queueSize) {
			queueSize = uint64(opts.MaxConcurrentCompactions)
		}

Rather than bumping the compaction queue size to at least the concurrent compactions limit, would it be feasible for compactions to perform their own compression if the queue is full? Then the compaction queue is more like supplemental compression bandwidth.

I think it's a little more intuitive of an interface if they're additive.

Also, a compaction goroutine falling back to performing its own compression avoids starvation, ensuring each compaction can always continue to make progress.

Note also that in addition to MaxConcurrentCompactions there may be up to 1 concurrent flush, which also will use parallelized compression when enabled.


options.go, line 1022 at r37 (raw file):

				o.WALBytesPerSync, err = strconv.Atoi(value)
			case "max_compression_concurrency":
				compressionConcurrency, e := strconv.Atoi(value)

how about

o.Experimental.MaxCompressionConcurrency, err = strconv.ParseUint(value, 10, 64)

options.go, line 1213 at r37 (raw file):

	// todo(bananabrick) : This function is called by the compaction thread,
	// and some tests call Options.EnsureDefaults concurrently, so we have
	// a data race here.

would this not also be a problem for o.Level? Is this an existing race too, between (*Options).EnsureDefaults -> (*LevelOptions).EnsureDefaults() and (*Options).MakeWriterOptions?


sstable/parallel_compression.go, line 12 at r20 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

I wanted fixed size buffers, so I went with buffered channels. They seem okay.

Also, similar comment as below — can we unexported this? eg, compressedData.


sstable/parallel_compression.go, line 59 at r37 (raw file):

// BlockCompressionCoordinator is added to the compression
// queue to indicate a pending compression.
type BlockCompressionCoordinator struct {

BlockCompressionCoordinator is a bit of a mouthful and a little opaque — could this be compressionTask?

Also, does the type need to be exported from the package?


sstable/parallel_compression.go, line 66 at r37 (raw file):

	// doneCh is used to signal to the writer
	// thread that the compression has been
	// completed. doneCh should be a

nit: s/should be/is/


sstable/parallel_compression.go, line 73 at r37 (raw file):

// BlockWriteCoordinator is added to the write queue
// to maintain the order of block writes to disk.
type BlockWriteCoordinator struct {

nit: similar comment on this type's name and the fact that it's exported: writeTask or flushTask?


sstable/parallel_compression.go, line 75 at r37 (raw file):

type BlockWriteCoordinator struct {
	// doneCh is used to maintain write order
	// in the write queue. doneCh should be a

nit: s/should be/is/


sstable/parallel_compression.go, line 85 at r37 (raw file):

}

// CompressionWorkersQueue is used to queue up blocks

nit: CompressionQueue instead?

nit: ... queues blocks to be compressed in parallel.


sstable/parallel_compression.go, line 92 at r37 (raw file):

	writeCoordinators       chan *BlockWriteCoordinator
	compressionCoordinators chan *BlockCompressionCoordinator
	compressedDataState     chan *CompressedData

I'm getting lost in these various channels and their interactions. I think it'll be a little better once the type renames I suggested are applied, but I think we also need comments describing each of these channels, including their producers and consumers.


sstable/parallel_compression.go, line 99 at r37 (raw file):

}

func (qu *CompressionWorkersQueue) allocateBuffers(bufferSize int) {

is this called anywhere else other than when constructing a queue? if not, I think it might read clearer to inline it there. allocateBuffers is a little ambiguous, and at first glance I expected this to be talking about the []byte buffers, not the structures representing tasks.


sstable/parallel_compression.go, line 103 at r37 (raw file):

		w := &BlockWriteCoordinator{}
		c := &BlockCompressionCoordinator{}
		data := &CompressedData{}

If we're allocating these upfront, we mind as well allocate them as a slab for better cache locality and less heap fragmentation. For example:

writeTasks := make([]writeTask, bufferSize)
// ...
for i := 0; i < bufferSize; i++ {
    qu.writeTasks <- &writeTasks[i]
    // ...
}

sstable/parallel_compression.go, line 118 at r37 (raw file):

	qu.queue = make(chan *BlockCompressionCoordinator, maxSize)

	qu.writeCoordinators = make(chan *BlockWriteCoordinator, bufferSize)

I'm confused by this — does this mean that the compression queue can only support bufferSize concurrent sstable.Writers? If we attempt to use this parallelized compression implementation in RewriteKeySuffixes, we'll need to support concurrent sstable writers from outside of Pebble.

Is this just for reusing the BlockWriteCoordinator structures? Can we use a sync.Pool and support an unbounded number of writers (still with a fixed compression concurrency)?


sstable/parallel_compression.go, line 141 at r37 (raw file):

			// The channel was closed. There might still be more
			// blocks queued for compression, but we're going to ignore
			// those.

You can use the toCompress, ok := <-qu.queue syntax to check whether the channel is closed (!ok). Then, it can rightly be a panic if toCompress == nil, an invariant violation.


sstable/parallel_compression.go, line 144 at r37 (raw file):

			// todo(bananabrick) : Might want to process every item
			// added to the compression queue, so that the writer worker
			// doesn't block when the the db is closed.

👍 — We need to ensure all goroutines exit when the DB is closed. CockroachDB unit tests will enforce this with the leaktest checker.


sstable/parallel_compression.go, line 168 at r37 (raw file):

		// Release the compression coordinator.
		qu.releaseCompressionCoordinator(toCompress)

nit: small helpers like this that are called once make it a little more difficult to read, imo. Inline?


sstable/parallel_compression.go, line 174 at r37 (raw file):

// todo(bananabrick) : We need to make sure that there are no references
// to any of the fields of the values we're releasing. Add a reference
// counted byte slice to make sure of this.

can we zero out the structure?

*c = BlockCompressionCoordinator{}

sstable/parallel_compression.go, line 211 at r37 (raw file):

}

// newwriteQueueContainer will start a single goroutine to process

nit: s/newwriteQueueContainer/newWriteQueueContainer/


sstable/parallel_compression.go, line 224 at r37 (raw file):

}

func (qu *writeQueueContainer) releaseBuffers(w *BlockWriteCoordinator, c *CompressedData) {

One thing I'm not clear on — are we reusing any of the []byte buffers from task to task? If so, we need to be careful to not retain excessively large buffers indefinitely. There can be occasional very large blocks (eg, a single large k/v pair). If we don't throw away those large buffers, these buffers will all eventually be large and reserve excessive memory.


sstable/parallel_compression.go, line 235 at r37 (raw file):

	for {
		blockWriteCoordinator := <-qu.queue
		if blockWriteCoordinator == nil {

nit: use the channel close instead?

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 6 of 26 files reviewed, 24 unresolved discussions (waiting on @itsbilal, @jbowens, @nicktrav, and @petermattis)


open.go, line 370 at r37 (raw file):

Rather than bumping the compaction queue size to at least the concurrent compactions limit, would it be feasible for compactions to perform their own compression if the queue is full? Then the compaction queue is more like supplemental compression bandwidth.

Right now, it's tricky to switch back and forth between parallel compression and no parallel compression for a given writer. This is something I could explore in a future pr, if necessary.

The compression queue is FIFO and quite small(I've been testing with size 5), so each compaction is guaranteed to make progress, and compactions won't get starved. I can't find a good source on whether blocked sends to unbuffered channels maintain the FIFO property. If they don't, I could implement my own worker queue which does maintain that property. I think I could also write a queue which maintains fairness in other ways.


options.go, line 1022 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

how about

o.Experimental.MaxCompressionConcurrency, err = strconv.ParseUint(value, 10, 64)

Nice, thanks.


options.go, line 1213 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

would this not also be a problem for o.Level? Is this an existing race too, between (*Options).EnsureDefaults -> (*LevelOptions).EnsureDefaults() and (*Options).MakeWriterOptions?

Yes, I believe it's an existing race, which didn't show up before.


sstable/parallel_compression.go, line 12 at r20 (raw file):

Previously, jbowens (Jackson Owens) wrote…

Also, similar comment as below — can we unexported this? eg, compressedData.

Done.


sstable/parallel_compression.go, line 59 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

BlockCompressionCoordinator is a bit of a mouthful and a little opaque — could this be compressionTask?

Also, does the type need to be exported from the package?

Done.


sstable/parallel_compression.go, line 92 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I'm getting lost in these various channels and their interactions. I think it'll be a little better once the type renames I suggested are applied, but I think we also need comments describing each of these channels, including their producers and consumers.

These are just buffers. I was keeping around these structs to be re-used, because they contain slices, and I didn't want those to be allocated on each block flush. Added a comment stating that, and also added a Buffer suffix to these.


sstable/parallel_compression.go, line 99 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

is this called anywhere else other than when constructing a queue? if not, I think it might read clearer to inline it there. allocateBuffers is a little ambiguous, and at first glance I expected this to be talking about the []byte buffers, not the structures representing tasks.

They are for the structs which contain the []byte buffers. Since each struct contains multiple buffers, it's easier to just avoid allocations on the entire struct. Moved this inline.


sstable/parallel_compression.go, line 103 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

If we're allocating these upfront, we mind as well allocate them as a slab for better cache locality and less heap fragmentation. For example:

writeTasks := make([]writeTask, bufferSize)
// ...
for i := 0; i < bufferSize; i++ {
    qu.writeTasks <- &writeTasks[i]
    // ...
}

Reading and writing from those would require some concurrent access protection. This is another reason to write my own queue instead of using channels, though. What do you think?

Writing my own queue will also allow me to ensure fairness during compression.


sstable/parallel_compression.go, line 118 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I'm confused by this — does this mean that the compression queue can only support bufferSize concurrent sstable.Writers? If we attempt to use this parallelized compression implementation in RewriteKeySuffixes, we'll need to support concurrent sstable writers from outside of Pebble.

Is this just for reusing the BlockWriteCoordinator structures? Can we use a sync.Pool and support an unbounded number of writers (still with a fixed compression concurrency)?

Sorry, I think my naming scheme here was bad. I fixed that.

Each writer spins up its own goroutine to write blocks to disk. That's what the writeQueue is for. The writeQueue only blocks while it waits for the compression task to finish.

The bufferSize exists cause the pre-allocated structs are re-used, and I didn't want it to be unbounded. The writer.queueBlockForParallelCompression function may block for one of these structs to become available.


func (w *Writer) queueBlockForParallelCompression(key InternalKey) error {
	compressionTask := <-w.parallelWriterState.compressionQueueRef.compressionTaskBuffer
	writeTask := <-w.parallelWriterState.compressionQueueRef.writeTaskBuffer

We can write something to ensure fairness here, just in case buffered channels aren't fair when we read from them.


sstable/parallel_compression.go, line 141 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

You can use the toCompress, ok := <-qu.queue syntax to check whether the channel is closed (!ok). Then, it can rightly be a panic if toCompress == nil, an invariant violation.

Done.


sstable/parallel_compression.go, line 144 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

👍 — We need to ensure all goroutines exit when the DB is closed. CockroachDB unit tests will enforce this with the leaktest checker.

I believe this isn't a problem. I wrote that comment before realizing that channel close would maintain the FIFO property of the channel.

Edit:
https://go.dev/play/p/S3a-knXGe6k Seems like closing a channel doesn't obey the fifo property if there are sends to the channel which have blocked.


sstable/parallel_compression.go, line 168 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

nit: small helpers like this that are called once make it a little more difficult to read, imo. Inline?

I left a comment to make its purpose clear.


sstable/parallel_compression.go, line 174 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

can we zero out the structure?

*c = BlockCompressionCoordinator{}

No, the purpose of these is to avoid the allocations of any of the byte slices in these structs.


sstable/parallel_compression.go, line 224 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

One thing I'm not clear on — are we reusing any of the []byte buffers from task to task? If so, we need to be careful to not retain excessively large buffers indefinitely. There can be occasional very large blocks (eg, a single large k/v pair). If we don't throw away those large buffers, these buffers will all eventually be large and reserve excessive memory.

Yea, we're reusing compressedData, compressionTask, writeTask structs, which contain the byte buffers. In writer.queueBlockForParallelCompression, the copySliceToDst calls, copy over the byte slices to the byte slices in these structs without doing any allocations.

copySliceToDst does an allocation if the capacity of the destination slice isn't large enough. I can make it halve its size if the capacity is too large.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 6 of 26 files reviewed, 25 unresolved discussions (waiting on @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/writer.go, line 1040 at r37 (raw file):

// EstimatedSize returns the estimated size of the sstable being written if a
// called to Finish() was made without adding additional keys.
func (w *Writer) EstimatedSize() uint64 {

This pr maintains almost all of our current properties and behaviours in the writer. However, the EstimatedSize function is reporting incorrect values now. This is because w.meta.Size can only be updated asynchronously after the block is compressed.

Is this okay? I can fix this problem by keeping track of the historic compression ratio in the writer, and use that to increment w.meta.Size even before we compress the block.

@bananabrick bananabrick requested a review from jbowens December 20, 2021 21:22
Copy link
Collaborator

@jbowens jbowens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 5 files at r38, all commit messages.
Reviewable status: 5 of 26 files reviewed, 19 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/parallel_compression.go, line 103 at r37 (raw file):

Reading and writing from those would require some concurrent access protection.

I'm suggesting still using your buffered channel to handle concurrent access and portioning out buffers to goroutines. You don't need any additional synchronization just because they're allocated at once as a part of a single slice.


sstable/parallel_compression.go, line 118 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

Sorry, I think my naming scheme here was bad. I fixed that.

Each writer spins up its own goroutine to write blocks to disk. That's what the writeQueue is for. The writeQueue only blocks while it waits for the compression task to finish.

The bufferSize exists cause the pre-allocated structs are re-used, and I didn't want it to be unbounded. The writer.queueBlockForParallelCompression function may block for one of these structs to become available.


func (w *Writer) queueBlockForParallelCompression(key InternalKey) error {
	compressionTask := <-w.parallelWriterState.compressionQueueRef.compressionTaskBuffer
	writeTask := <-w.parallelWriterState.compressionQueueRef.writeTaskBuffer

We can write something to ensure fairness here, just in case buffered channels aren't fair when we read from them.

So you can't have more than bufferSize concurrent writes? I don't think we want a fixed limit to the number of concurrent writes. Doesn't the concern about having an unbounded number of sstable writers (because of CockroachDB sstable.Writers) still apply? I may still be misunderstanding?


sstable/parallel_compression.go, line 144 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

I believe this isn't a problem. I wrote that comment before realizing that channel close would maintain the FIFO property of the channel.

Edit:
https://go.dev/play/p/S3a-knXGe6k Seems like closing a channel doesn't obey the fifo property if there are sends to the channel which have blocked.

It's actually not legal to send on a closed channel at all. If you adjust that playground program to wait for the launched goroutine to finish, that program will panic because of the send on a closed channel: https://go.dev/play/p/uNP4eWgPmXW

Buffered channels are tricky. They can hide deadlock/goroutine leak issues that would be immediately obvious with an unbuffered channel.


sstable/parallel_compression.go, line 174 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

No, the purpose of these is to avoid the allocations of any of the byte slices in these structs.

The compressionTask type has four fields: toCompress, compression, checksumanddoneCh`. I don't think any of these are byte slices we want to reuse, right?

Oh, I see, the block gets copied to the compressionTask's toCompress buffer? This avoids the GC pressure that Peter mentioned, but at the expense of needing to do a synchronous memcpy on the compaction goroutine. Pooling blockWriters and passing the filled blockWriter off to the compression goroutine would avoid this copy.

Separately, I think it's good hygiene to explicitly copy over fields, zeroing out anything not explicitly copied. Fields added in the future then are by default zeroed, just like a newly allocated struct. (This is the pattern we follow with iterAlloc):

*c = compressionTask{toCompress: c.toCompress} 

sstable/parallel_compression.go, line 224 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

Yea, we're reusing compressedData, compressionTask, writeTask structs, which contain the byte buffers. In writer.queueBlockForParallelCompression, the copySliceToDst calls, copy over the byte slices to the byte slices in these structs without doing any allocations.

copySliceToDst does an allocation if the capacity of the destination slice isn't large enough. I can make it halve its size if the capacity is too large.

I think a fixed maximum that's a small multiplier of the blockSize would also work. We use a fixed constant for an analagous heuristic when choosing whether to reuse our iterator key buffers: https://github.com/cockroachdb/pebble/blob/master/iterator.go#L1142-L1165

Separately, one nice attribute of sync.Pools over home-rolled buffers is that Go will discard all of a sync.Pools objects during (some?) GC runs. That periodic discard helps ensure you're not unnecessarily retaining too many or too large of buffers.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 5 of 26 files reviewed, 19 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/parallel_compression.go, line 103 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

Reading and writing from those would require some concurrent access protection.

I'm suggesting still using your buffered channel to handle concurrent access and portioning out buffers to goroutines. You don't need any additional synchronization just because they're allocated at once as a part of a single slice.

Done.


sstable/parallel_compression.go, line 118 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

So you can't have more than bufferSize concurrent writes? I don't think we want a fixed limit to the number of concurrent writes. Doesn't the concern about having an unbounded number of sstable writers (because of CockroachDB sstable.Writers) still apply? I may still be misunderstanding?

bufferSize is larger than the compression concurrency/compaction concurrency, and the queue sizes. I don't expect us to block when retrieving a struct from the buffer, unless there are external writers.

The current design needs other modifications to accommodate external writers. For example, if we have a compression concurrency == 4, and we're running (3 compactions + 1 flush), and we add an external writer with parallel compression, we will have 5 writers, and only 4 compression workers.

I'm hesitant to use a sync.Pool because the compaction goroutine(which calls writer.Add), could run much faster than the compression/block write go routines, and cause sync.Pool to repeatedly allocate structs for every block flush.


sstable/parallel_compression.go, line 144 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

It's actually not legal to send on a closed channel at all. If you adjust that playground program to wait for the launched goroutine to finish, that program will panic because of the send on a closed channel: https://go.dev/play/p/uNP4eWgPmXW

Buffered channels are tricky. They can hide deadlock/goroutine leak issues that would be immediately obvious with an unbuffered channel.

I'm more concerned about:

  1. Create buffered channel of size 1.
  2. Do a send on buffered channel.
  3. Do another send on buffered channel(this blocks).
  4. close channel from a different goroutine.
  5. Read from channel.
  6. Read from channel.

Will the send on line number 3, show up when I read on line number 6?


sstable/parallel_compression.go, line 174 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

The compressionTask type has four fields: toCompress, compression, checksumanddoneCh`. I don't think any of these are byte slices we want to reuse, right?

Oh, I see, the block gets copied to the compressionTask's toCompress buffer? This avoids the GC pressure that Peter mentioned, but at the expense of needing to do a synchronous memcpy on the compaction goroutine. Pooling blockWriters and passing the filled blockWriter off to the compression goroutine would avoid this copy.

Separately, I think it's good hygiene to explicitly copy over fields, zeroing out anything not explicitly copied. Fields added in the future then are by default zeroed, just like a newly allocated struct. (This is the pattern we follow with iterAlloc):

*c = compressionTask{toCompress: c.toCompress} 

I think pooling of blockWriters is excessive, since the goroutines the blockWriters will be passed to don't really need the block writer for anything other than one of its buffers. I think I will pool the buffers in the blockWriter, and then pass over the buffer instead.

@bananabrick bananabrick requested a review from jbowens December 21, 2021 18:50
Copy link
Collaborator

@jbowens jbowens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 5 of 26 files reviewed, 22 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/parallel_compression.go, line 250 at r38 (raw file):

		compressedData := <-writeTask.doneCh
		// If we've already encountered an error, then we don't
		// want to write the current sstable using this worker.

Can you add additional commentary on the error termination condition here? Do we have this continue logic because there may be additional in-flight compression tasks, and we need to drain qu.queue? and will the producer(s) recognize the error condition, stop enqueuing compression tasks and close the qu.queue, or do we always step through the entirety of the sstable's blocks?


sstable/parallel_compression.go, line 261 at r38 (raw file):

			qu.releaseBuffers(writeTask, compressedData)
			continue
		}

this would probably read a little easier if you wrap the body of the logic in a closure, so you can reduce the duplication of the error handling logic:

err = func() error {
    if compressedData.err != nil {
        return err
    }

    bh, err := qu.writer.writeBlockPostCompression(
        compressedData.compressed, compressedData.tmpBuf,
    )
    if err != nil {
        return err
    }
    // ...
}()

qu.releaseBuffers(writeTask, compressedData)
if err != nil {
    qu.writer.parallelWriterState.errCh <- err
}

sstable/writer.go, line 213 at r38 (raw file):

	if len(w.parallelWriterState.errCh) > 0 {
		w.err = <-w.parallelWriterState.errCh
	}

I'm having a hard time reasoning about these channels and convincing myself that we're not leaving goroutines blocked.

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 5 of 26 files reviewed, 22 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/parallel_compression.go, line 250 at r38 (raw file):

Previously, jbowens (Jackson Owens) wrote…

Can you add additional commentary on the error termination condition here? Do we have this continue logic because there may be additional in-flight compression tasks, and we need to drain qu.queue? and will the producer(s) recognize the error condition, stop enqueuing compression tasks and close the qu.queue, or do we always step through the entirety of the sstable's blocks?

Yea, I can add some commentary. We have the continue logic to drain the queue. It's possible that the compaction goroutine(which calls writer.Add) hasn't checked for the error encountered by this goroutine yet, and may add additional compressionTasks/writeTasks to the queue. We don't want the compaction goroutine to block forever when it tries to add to the writeQueue so we have to keep reading from the writeQueue until the compaction goroutine encounters the error.

And yes, the queue will be closed once the error is encountered. The writer.Add call in the runCompaction function will return the error. The runCompaction function will call writer.Close, which will call writeQueue.finish.


sstable/parallel_compression.go, line 261 at r38 (raw file):

Previously, jbowens (Jackson Owens) wrote…

this would probably read a little easier if you wrap the body of the logic in a closure, so you can reduce the duplication of the error handling logic:

err = func() error {
    if compressedData.err != nil {
        return err
    }

    bh, err := qu.writer.writeBlockPostCompression(
        compressedData.compressed, compressedData.tmpBuf,
    )
    if err != nil {
        return err
    }
    // ...
}()

qu.releaseBuffers(writeTask, compressedData)
if err != nil {
    qu.writer.parallelWriterState.errCh <- err
}

Done.


sstable/writer.go, line 213 at r38 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I'm having a hard time reasoning about these channels and convincing myself that we're not leaving goroutines blocked.

This channel in particular is a buffered channel of size 1. We won't block on the read here, because we check if the channel is non-empty before doing the read, and no other goroutine is reading from this channel. We won't block on a send to this channel, because we write to this channel at most once.

@bananabrick bananabrick requested a review from jbowens December 21, 2021 19:45
Copy link
Collaborator

@jbowens jbowens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 5 of 26 files reviewed, 21 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/parallel_compression.go, line 118 at r37 (raw file):

I'm hesitant to use a sync.Pool because the compaction goroutine(which calls writer.Add), could run much faster than the compression/block write go routines, and cause sync.Pool to repeatedly allocate structs for every block flush.

These seem like orthogonal issues. We could use sync.Pool for object reuse, and a separate semaphore channel to avoid excessive pipeline buffering. If the semaphore is per-Writer, this could also improve fairness.

The current design needs other modifications to accommodate external writers.

For what it's worth, external writers are the immediate potential use case for pipelining compression, to support RewriteKeySuffixes.


sstable/parallel_compression.go, line 144 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

I'm more concerned about:

  1. Create buffered channel of size 1.
  2. Do a send on buffered channel.
  3. Do another send on buffered channel(this blocks).
  4. close channel from a different goroutine.
  5. Read from channel.
  6. Read from channel.

Will the send on line number 3, show up when I read on line number 6?

I believe the send on #3 will panic, because the channel is closed before the send completes.


sstable/parallel_compression.go, line 174 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

I think pooling of blockWriters is excessive, since the goroutines the blockWriters will be passed to don't really need the block writer for anything other than one of its buffers. I think I will pool the buffers in the blockWriter, and then pass over the buffer instead.

That's fine, although passing over the blockWriter allows the asynchronous goroutine to also finish the block (specifically encode the restart intervals), reducing the amount of work that must be sequential.


sstable/parallel_compression.go, line 250 at r38 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

Yea, I can add some commentary. We have the continue logic to drain the queue. It's possible that the compaction goroutine(which calls writer.Add) hasn't checked for the error encountered by this goroutine yet, and may add additional compressionTasks/writeTasks to the queue. We don't want the compaction goroutine to block forever when it tries to add to the writeQueue so we have to keep reading from the writeQueue until the compaction goroutine encounters the error.

And yes, the queue will be closed once the error is encountered. The writer.Add call in the runCompaction function will return the error. The runCompaction function will call writer.Close, which will call writeQueue.finish.

At the time the runCompaction goroutine notices the error, it's also possible that there are in-flight compression tasks but no in-flight write tasks, right? Does the writer need to wait for the in-flight compression tasks to complete before closing the write queue channel?


sstable/writer.go, line 164 at r37 (raw file):

	indexPartitions    []indexBlockWriterAndBlockProperties

	checksumData checksumData

should this be called checksummer or something similar since it contains the digest and the enum.


sstable/writer.go, line 1040 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

This pr maintains almost all of our current properties and behaviours in the writer. However, the EstimatedSize function is reporting incorrect values now. This is because w.meta.Size can only be updated asynchronously after the block is compressed.

Is this okay? I can fix this problem by keeping track of the historic compression ratio in the writer, and use that to increment w.meta.Size even before we compress the block.

EstimatedSize is used to determine when to split sstables during a compaction, so while it doesn't need to be exact, it needs to be somewhat accurate.

I don't think we want to update w.meta.Size asynchronously based on an estimate, because when the writer is finished w.meta.Size needs to be the exact size of the file. We can add another field, perhaps a uint64 that we update through atomic.AddUint64 calls, that represents the estimated size of inflight-blocks.


sstable/writer.go, line 174 at r39 (raw file):

	// accessed by both the parallel writer goroutine,
	// and writer.EstimatedSize.
	writerMu sync.Mutex

I think we should strive to remove this mutex. Why does the main compaction goroutine need to access the indexBlock?

Copy link
Contributor Author

@bananabrick bananabrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got through most of the comments, and added some documentation on why the channels won't block forever/deadlock. PTAL.

Reviewable status: 4 of 26 files reviewed, 21 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/parallel_compression.go, line 118 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I'm hesitant to use a sync.Pool because the compaction goroutine(which calls writer.Add), could run much faster than the compression/block write go routines, and cause sync.Pool to repeatedly allocate structs for every block flush.

These seem like orthogonal issues. We could use sync.Pool for object reuse, and a separate semaphore channel to avoid excessive pipeline buffering. If the semaphore is per-Writer, this could also improve fairness.

The current design needs other modifications to accommodate external writers.

For what it's worth, external writers are the immediate potential use case for pipelining compression, to support RewriteKeySuffixes.

I think the version as it is provides some excellent benefits for lsm stability/perf. Supporting external writers to use parallel compression won't be impossible. We would probably need a way to increase/decrease the number of compression workers dynamically. Should be ok to sneak that into a pr on top of this one, rather than add it in here.


sstable/parallel_compression.go, line 144 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I believe the send on #3 will panic, because the channel is closed before the send completes.

Done.


sstable/parallel_compression.go, line 250 at r38 (raw file):

Previously, jbowens (Jackson Owens) wrote…

At the time the runCompaction goroutine notices the error, it's also possible that there are in-flight compression tasks but no in-flight write tasks, right? Does the writer need to wait for the in-flight compression tasks to complete before closing the write queue channel?

No, that isn't possible. A compressionTask/writeTask share a doneCh, and the writeTask is added to the writeQueue before the compressionTask is added to the compression queue. So, the writeQueue will just block while it waits for the compressionTask to finish.

I added a commit which contains some reasoning on why none of the channels will block forever: e333f1d. Let me know if I need to add some clarification there, or if I'm missing some logic.


sstable/writer.go, line 164 at r37 (raw file):

Previously, jbowens (Jackson Owens) wrote…

should this be called checksummer or something similar since it contains the digest and the enum.

Done.


sstable/writer.go, line 174 at r39 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I think we should strive to remove this mutex. Why does the main compaction goroutine need to access the indexBlock?

writer.EstimatedSize also uses the indexBlock size.

@bananabrick bananabrick requested a review from jbowens December 22, 2021 00:23
Copy link
Collaborator

@jbowens jbowens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 5 files at r38, 1 of 2 files at r40, 2 of 3 files at r43, all commit messages.
Reviewable status: 8 of 26 files reviewed, 24 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


open.go, line 370 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

Rather than bumping the compaction queue size to at least the concurrent compactions limit, would it be feasible for compactions to perform their own compression if the queue is full? Then the compaction queue is more like supplemental compression bandwidth.

Right now, it's tricky to switch back and forth between parallel compression and no parallel compression for a given writer. This is something I could explore in a future pr, if necessary.

The compression queue is FIFO and quite small(I've been testing with size 5), so each compaction is guaranteed to make progress, and compactions won't get starved. I can't find a good source on whether blocked sends to unbuffered channels maintain the FIFO property. If they don't, I could implement my own worker queue which does maintain that property. I think I could also write a queue which maintains fairness in other ways.

Can the compaction goroutine not construct a compressionTask and writerTask, complete its own compression task synchronously and leave the writing to the asynchronous writer goroutine? Then the queue bounds the number of compression workers, but not the number of parallel sstable writer goroutines.


options.go, line 1213 at r37 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

Yes, I believe it's an existing race, which didn't show up before.

Got it — seems like a good thing to fix first in a separate PR.


sstable/parallel_compression.go, line 118 at r37 (raw file):

We would probably need a way to increase/decrease the number of compression workers dynamically.

I would much prefer a way for the compaction goroutine to perform its own compression if there are not enough available compression goroutines. It's simpler and provides better guarantees of progress for each individual sstable-constructing goroutine.


sstable/parallel_compression.go, line 90 at r43 (raw file):

	// The compressionTask is guaranteed to eventually
	// write to the doneCh.
	doneCh <-chan *compressedData

is there another name that would be better here? doneCh seems to imply it's used to signal when the writeTask is done, but it's actually when the writeTask may begin.


sstable/parallel_compression.go, line 185 at r43 (raw file):

			// INVARIANT: Every single item added to qu.queue
			// is processed before the workers are shut down.
			break

Thanks for this comment 👍


sstable/parallel_compression.go, line 207 at r43 (raw file):

b could be just returning toCompress.toCompress

If it's not (the common case of a compressible data block), we can avoid the copy, which seems worthwhile. We could alter compressAndChecksum's signature to expose whether or not the resulting block is compressed, indicating whether the backing buffer is compressedData.compressBuf or toCompress.toCompress.

IMO, even better would to have the buffers associated with a block follow it throughout the entire process so we avoid these unnecessary copies here, and at the point where we enqueue the block for compression. (See the previous comments about passing around *blockWriters, or at least their buffers).


sstable/writer.go, line 174 at r39 (raw file):

Previously, bananabrick (Arjun Nair) wrote…

writer.EstimatedSize also uses the indexBlock size.

I suspect we can eliminate the mutex with atomics (eg, atomic.AddUint64) since we're just tracking a few sizes.


sstable/writer.go, line 27 at r43 (raw file):

// WriterMetadata holds info about a finished sstable.
type WriterMetadata struct {
	EstimatedSize  uint64

I think the Writer is a better place for this, since it only applies to unfinished sstables and this struct is intended to represent the metadata of the final sstable.


sstable/writer.go, line 95 at r43 (raw file):

	// avg represents the average value for
	// n samples.
	avg float64

why not store the sum and recompute the average through division when queried? Recomputing the old sum seems prone to floating point imprecision.


sstable/writer.go, line 513 at r43 (raw file):

	w.meta.EstimatedSize += uint64(
		w.parallelWriterState.getAverageCompressionRatio() *
			float64(len(finishedBlock)))

I think we should only use an estimate to measure the in-flight sizes that we can't know. Once a block has been compressed and written, we know its size and may update w.meta.Size and reduce the running total of in-flight estimated sizes. If we want to ensure EstimatedSize is monotonic, we can also store the last computed total estimated size and avoid returning a smaller estimate if the current estimated size has decreased.

Copy link
Contributor

@itsbilal itsbilal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing some relatively surface-level comments while I do a deeper dive. Looking mostly good!

Reviewed 1 of 4 files at r31, 1 of 19 files at r32, 1 of 3 files at r33, 1 of 3 files at r36, 2 of 3 files at r45.
Reviewable status: 8 of 26 files reviewed, 29 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/parallel_compression.go, line 27 at r36 (raw file):

// to avoid writerMu.

func copySliceToDst(dst *[]byte, src []byte) {

Nit: it's a little more idiomatic to return the new slice instead of taking in a slice pointer. That way you can do dst = copySliceToDst(dst, src). append works the same way.

Also you can replace uses of this method with:
dst = append(dst[:0], src...).


sstable/parallel_compression.go, line 46 at r36 (raw file):

}

func copyInternalKey(key InternalKey) InternalKey {
  1. this doesn't copy the trailer?

  2. you could use InternalKey.Clone.


sstable/parallel_compression.go, line 99 at r45 (raw file):

// CompressionQueue queues up blocks
// which will get compressed in parallel.

Nit: you can go up to 80 chars in each line for comments.


sstable/parallel_compression.go, line 126 at r45 (raw file):

	// writeQueue.runWorker will always add items back to
	// these channels.
	writeTaskBuffer       sync.Pool

These pools can be global variables just like other sync.Pools in the codebase, no need to tie them to a CompressionQueue. That way, we're able to better manage memory across the pebble.DB instances in a cockroach process.

Also, s/Buffer/Pool/ in the names.


sstable/parallel_compression.go, line 227 at r45 (raw file):

func (qu *CompressionQueue) releaseCompressionTask(c *compressionTask) {
	c.compressionDone = nil
	qu.compressionTaskBuffer.Put(c)

Might be good hygiene to zero out most member variables in c, and make buffers zero-len but retain capacity like this:

c.toCompress = c.toCompress[:0]

That way, .Get won't return stuff populated with values from a past run, which it currently does, and reduces chances of bugs creeping in.

You can do the same for the other stuff down below, though it looks like compressedData's bufs need to maintain their len for the logic in compressAndChecksum to work correctly.


sstable/parallel_compression.go, line 284 at r45 (raw file):

func (qu *writeQueue) releaseBuffers(w *writeTask, c *compressedData) {
	qu.writer.parallelWriterState.compressionQueueRef.releaseWriteTask(w)

This is a very deep function call. Once the pools are global variables, you can make these plain functions that don't take any receivers (eg. just releaseWriteTask(w) instead of foo.releaseWriteTask(w)).

Copy link
Collaborator

@petermattis petermattis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is getting very large which makes reviewing it unwieldy (i.e. you'll get a lower quality review, which is really problematic for this performance and correctness sensitive code). I think you should try to separate out some smaller prerequisite PRs. For example, the changes to Writer.maybeFlush and related routines for when parallel compression is disabled. This is definitely more work for you (the author), but it makes the reviewers job easier and that is what we need to optimize for.

I'm not thrilled with how different the parallel compression on vs off code paths are. Would it be possible to reduce some of the differences? For example, perhaps there should always be a writer queue, even if parallel compression is off. The addition of the write queue could be done as an initial PR. Similarly, perhaps if there isn't a DB-level compression queue, we could create a single element compression queue tied to the lifetime of the sstable.Writer and adjust the code so that we don't continue writing new blocks while a block is being compressed. This is all a bit hand-wavy, but the high-level idea is to unify the code paths so that we have less cognitive overhead in understanding what is going on. Prior to this PR we had an implicit pipeline of 1) fill block, 2) compress block, 3) write compressed block. I think we can get that same behavior today with an explicit pipeline. When a block is ready for flushing, we put it on the compress queue (perhaps a private queue for the sstable if no DB-level queue has been configured). Writer.maybeFlush as a last step then tries to get a new blockWriter to insert into, which potentially blocks until the current block has finished writing. Hmm, I think this comment may be confusing. Happy to jump on a call to explain what I'm thinking if that would be useful.

Reviewable status: 8 of 26 files reviewed, 39 unresolved discussions (waiting on @bananabrick, @itsbilal, @jbowens, @nicktrav, and @petermattis)


sstable/parallel_compression.go, line 54 at r45 (raw file):

type compressedData struct {
	compressed       []byte
	tmpBuf           [blockHandleLikelyMaxLen]byte

Should this be blockTrailerLen? And perhaps named trailerBuf?


sstable/parallel_compression.go, line 71 at r45 (raw file):

	// completed. compressionDone is a
	// buffered channel of size 1.
	// The write to compressionDone won't block because

I found this somewhat confusing on first read. A buffered channel of size 1 doesn't prevent blocking. What prevents the blocking is that we only ever write a single item to this channel.


sstable/parallel_compression.go, line 72 at r45 (raw file):

	// buffered channel of size 1.
	// The write to compressionDone won't block because
	// the compressionDone is a buffered channel of size 1.

Nit: you just mentioned in the previous sentence that the channel has a size of 1.


sstable/parallel_compression.go, line 77 at r45 (raw file):

// writeTask is added to the write queue
// to maintain the order of block writes to disk.

Nit: you seem to be manually wrapping the lines in your comment paragraphs. Most editors have a "fill paragraph" functionality that will do this line wrapping automatically. In Goland it is under Edit -> Fill Paragraph. This will save you manual effort, and also make your comment line wrapping more consistent with other code.


sstable/parallel_compression.go, line 84 at r45 (raw file):

	// to complete. This ensures write order of the blocks
	// to disk since writeTasks are added to the writeQueue
	// in order.

Nit: we typically put blank lines between paragraphs in comments in the Pebble code base.


sstable/parallel_compression.go, line 113 at r45 (raw file):

	queue chan *compressionTask

	// The following buffered channels contain preallocated

I'm not seeing any buffered channels below. Is this comment out of date?


sstable/parallel_compression.go, line 172 at r45 (raw file):

	for {
		toCompress, ok := <-qu.queue
		if !ok {

I believe the code structure you have is equivalent to for toCompress := range qu.queue. The latter (ranging over a queue) is the idiomatic way to write this.


sstable/parallel_compression.go, line 213 at r45 (raw file):

		// This call will never block because compressionDone
		// must be a buffered channel of size 1.

See my other comment. Some additional words could be added here to clarify that we only write a single element to this channel.


sstable/writer.go, line 513 at r43 (raw file):

Previously, jbowens (Jackson Owens) wrote…

I think we should only use an estimate to measure the in-flight sizes that we can't know. Once a block has been compressed and written, we know its size and may update w.meta.Size and reduce the running total of in-flight estimated sizes. If we want to ensure EstimatedSize is monotonic, we can also store the last computed total estimated size and avoid returning a smaller estimate if the current estimated size has decreased.

I was going to say something similar here. The value returned by Writer.EstimatedSize() should be the current size written to disk (w.meta.Size) + an estimate of the size of the in-flight data after compression. Rather than computing an average compression ratio, I'd keep track of a Writer.uncompressedSize field that is incremented every time Writer.meta.Size is incremented. The compression ratio can then be computed as Writer.meta.Size / Writer.uncompressedSize and we can apply that compression ratio to the total of the inflight uncompressed bytes. If there are no inflight uncompressed bytes, the computation in Writer.EstimatedSize() becomes exactly what it is before this PR (or when parallel compression is disabled).


sstable/writer.go, line 516 at r45 (raw file):

	// Populate compression coordinator.
	copySliceToDst(&compressionTask.toCompress, finishedBlock)

Memory copies are fast, but not copying is faster. My suggestion for handling the parallel compression was to have a queue of blockWriters. When a block is finished, we'd take the current block writer and queue it for compression. No need to copy the data at all. When the block has been compressed and written to disk, we "free" the block writer for reuse.


sstable/writer.go, line 550 at r45 (raw file):

	// Queue compression + write tasks. It is important that
	// the tasks are queued in the order here.

Can you elaborate on why the ordering is important? If we added to the compression queue first we could compress the data before we added to the write queue, but that doesn't seem problematic.

bananabrick added a commit that referenced this pull request Jan 19, 2022
This pr aims to serve as a first step towards unifying the parallel compression vs
on parallel compression code paths, as suggested in this pr:
#1383 (review).

This pr organizes the buffers in the Writer in a way which makes it easier to reason
about the ownership of buffers/slices.

Future prs will add, a writeQueue, a compressionQueue, a mechanism to schedule parallel
compression based on cpu utilization.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants