Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: elide RowChannel when connecting local processors #20550

Closed
petermattis opened this issue Dec 7, 2017 · 4 comments
Closed

distsql: elide RowChannel when connecting local processors #20550

petermattis opened this issue Dec 7, 2017 · 4 comments
Assignees
Labels
C-performance Perf of queries or internals. Solution not expected to change functional behavior.
Milestone

Comments

@petermattis
Copy link
Collaborator

RowChannel is currently used for connecting local distsql Processors. A RowChannel is essentially a channel with a fixed size. #19288 identified RowChannel as a throughput bottleneck in distsql processing.

RowChannel implements both the RowSource and RowReceiver interfaces. Each Processor is run within a separate goroutine and the Processor.Run method loops, pulling rows from its inputs (RowSource), processing them and emitting them via ProcOutputHelper to a RowReceiver.

With a modest amount of refactoring, I think this could be restructured by having processor implementations also implement the RowSource interface. The internals of such a processor would be reorganized so that Next() would pull from its input(s), process and return the next row. This would be akin to the planNode interface. A processor which implements RowSource would either be run via a call to Run(), in which case Next() would never be called, or zero or more calls to Next(). That is, a processor would either act as a RowSource or as a Processor within a given flow, never both.

We can incrementally move towards this new API. Currently, Flow.setup() always joins processors together with a RowChannel or MultiplexedRowChannel. If a processor implements RowSource, creation of the RowChannel can be elided. We would have to mark processors joined in this way and only call Processor.Run on processors that are not acting as RowSources.

See also #19134 which mentions "fusing" multiple processors into a single goroutine. The above proposal would achieve this.

This proposal needs to be validated by benchmarking before significant work is done. There are undoubtedly complexities with regards to cancellation and the particulars of the RowSource interface that will make this challenging.

@arjunravinarayan, @asubiotto You've likely already been thinking in this direction, but I couldn't find an issue discussing this.

@petermattis petermattis added the C-performance Perf of queries or internals. Solution not expected to change functional behavior. label Dec 7, 2017
petermattis added a commit to petermattis/cockroach that referenced this issue Dec 8, 2017
Add BenchmarkRowChannelPipeline which benchmarks throughput and latency
through a pipeline of RowChannels. The results are disappointing:

name                    time/op
RowChannelPipeline/1-8     110ns ± 2%
RowChannelPipeline/2-8     272ns ± 1%
RowChannelPipeline/3-8     377ns ± 4%
RowChannelPipeline/4-8     421ns ± 1%

name                    speed
RowChannelPipeline/1-8  72.8MB/s ± 3%
RowChannelPipeline/2-8  29.3MB/s ± 1%
RowChannelPipeline/3-8  21.2MB/s ± 4%
RowChannelPipeline/4-8  19.0MB/s ± 1%

Release note: None

See cockroachdb#20550
See cockroachdb#20553
See cockroachdb#20568
petermattis added a commit to petermattis/cockroach that referenced this issue Dec 8, 2017
Add BenchmarkRowChannelPipeline which benchmarks throughput and latency
through a pipeline of RowChannels. The results are disappointing:

name                    time/op
RowChannelPipeline/1-8     110ns ± 2%
RowChannelPipeline/2-8     272ns ± 1%
RowChannelPipeline/3-8     377ns ± 4%
RowChannelPipeline/4-8     421ns ± 1%

name                    speed
RowChannelPipeline/1-8  72.8MB/s ± 3%
RowChannelPipeline/2-8  29.3MB/s ± 1%
RowChannelPipeline/3-8  21.2MB/s ± 4%
RowChannelPipeline/4-8  19.0MB/s ± 1%

Release note: None

See cockroachdb#20550
See cockroachdb#20553
See cockroachdb#20568
@rjnn
Copy link
Contributor

rjnn commented Dec 11, 2017

@andreimatei brought up in #20584 the idea of "synchronous scheduling of co-located processors work". I'm not entirely sure what that means, but it seems to be an alternate design to this. I'd like to see it spelled out more explicitly.

My thoughts: two reasons I like the design outlined in this issue to maintaining/improving the current design using different goroutines for different processors is that:

  1. direct function calls from one processor to the next have great cache locality.
  2. When we know for sure that two processors are one after the other, we can use a single buffer to pass a pre-allocated batch of rows so that we don't create extra garbage when passing data between them.
  3. in the longer run, we want to fuse the processors by JITting them into a single pipeline, to avoid interface indirection. The performance benefits of this is not as important when we already have row batches, but in the limit case JITting relatively small batches has the best cache locality[1]. JITting is probably not something we will do in 2018, but it bears keeping in mind what the optimal design looks like according to the fastest dataflow designs out there.

We definitely need benchmarks to convince anyone how direct function calls compare to using goroutines for separate processors. @danhhz has already done some preliminary work in this area, I'd also appreciate his thoughts here.

[1]: http://db.csail.mit.edu/pubs/abadi-column-stores.pdf section 4.1

@petermattis
Copy link
Collaborator Author

@arjunravinarayan I'd like to reiterate that fusing distsql processors together via JIT'ing is definitely not going to happen in 2018. There are lots of bits of lower hanging performance fruit to pluck first. And we also need to get distsql to feature parity with local sql. Definitely worthwhile to keep such fusing in mind, but I want to make sure any other spectators don't get too far ahead of the work immediately ahead.

petermattis added a commit to petermattis/cockroach that referenced this issue Dec 13, 2017
Refactor tableReader to implement the RowSource interface. Refactor
tableReader.Run() to be implemented in terms of
tableReader.Next() (i.e. the RowSource interface).

See cockroachdb#20550

Release note: None
petermattis added a commit to petermattis/cockroach that referenced this issue Dec 14, 2017
Refactor tableReader to implement the RowSource interface. Refactor
tableReader.Run() to be implemented in terms of
tableReader.Next() (i.e. the RowSource interface).

Adjusted BenchmarkTableReader to avoid using a RowBuffer. This shows the
benefit that can be achieved by using TableReader as a RowSource ("old"
below is with the benchmark modified to use a RowChannel).

name           old time/op  new time/op  delta
TableReader-8  11.6ms ± 5%   9.4ms ± 3%  -18.81%  (p=0.000 n=10+10)

See cockroachdb#20550

Release note: None
@petermattis
Copy link
Collaborator Author

After a day spent looking at profiles of distsql processors, it is clear that the lowest hanging fruit to pluck other than the overhead of RowChannel, is to eliminate allocations. As @arjunravinarayan mentioned above, one of the advantages of eliding RowChannel is that we can avoid allocations between processors.

Once processors implement RowSource, another smaller opportunity is to elide the "post-processing" steps if they are not present. The filter/limit/projection post-processing could be implemented via separate nodes that also model RowSource. Experimentation would be necessary to determine if this is beneficial or not.

petermattis added a commit to petermattis/cockroach that referenced this issue Dec 28, 2017
Refactor tableReader to implement the RowSource interface. Refactor
tableReader.Run() to be implemented in terms of
tableReader.Next() (i.e. the RowSource interface).

Adjusted BenchmarkTableReader to avoid using a RowBuffer. This shows the
benefit that can be achieved by using TableReader as a RowSource ("old"
below is with the benchmark modified to use a RowChannel).

name           old time/op  new time/op  delta
TableReader-8  11.6ms ± 5%   9.4ms ± 3%  -18.81%  (p=0.000 n=10+10)

See cockroachdb#20550

Release note: None
petermattis added a commit to petermattis/cockroach that referenced this issue Dec 28, 2017
Refactor tableReader to implement the RowSource interface. Refactor
tableReader.Run() to be implemented in terms of
tableReader.Next() (i.e. the RowSource interface).

Adjusted BenchmarkTableReader to avoid using a RowBuffer. This shows the
benefit that can be achieved by using TableReader as a RowSource ("old"
below is with the benchmark modified to use a RowChannel).

name           old time/op  new time/op  delta
TableReader-8  11.6ms ± 5%   9.4ms ± 3%  -18.81%  (p=0.000 n=10+10)

See cockroachdb#20550

Release note: None
petermattis added a commit to petermattis/cockroach that referenced this issue Dec 28, 2017
Refactor tableReader to implement the RowSource interface. Refactor
tableReader.Run() to be implemented in terms of
tableReader.Next() (i.e. the RowSource interface).

Adjusted BenchmarkTableReader to avoid using a RowBuffer. This shows the
benefit that can be achieved by using TableReader as a RowSource ("old"
below is with the benchmark modified to use a RowChannel).

name           old time/op  new time/op  delta
TableReader-8  11.6ms ± 5%   9.4ms ± 3%  -18.81%  (p=0.000 n=10+10)

See cockroachdb#20550

Release note: None
petermattis added a commit to petermattis/cockroach that referenced this issue Dec 29, 2017
Refactor tableReader to implement the RowSource interface. Refactor
tableReader.Run() to be implemented in terms of
tableReader.Next() (i.e. the RowSource interface).

Adjusted BenchmarkTableReader to avoid using a RowBuffer. This shows the
benefit that can be achieved by using TableReader as a RowSource ("old"
below is with the benchmark modified to use a RowChannel).

name           old time/op  new time/op  delta
TableReader-8  11.6ms ± 5%   9.4ms ± 3%  -18.81%  (p=0.000 n=10+10)

See cockroachdb#20550

Release note: None
petermattis added a commit to petermattis/cockroach that referenced this issue Jan 5, 2018
Elide RowChannel when connecting local processors in simple cases. A
simple `SELECT COUNT(*) FROM test.kv` query where `test.kv` contains 5m
rows is reduced from 5.4s to 4.3s (a 20% speedup). When using distsql,
this query utilizes a `tableReader` connected to an `aggregator`. For
comparison, this query takes 4.5s when running with `set
distsql=off`. These numbers are from a local single-node cluster.

See cockroachdb#20550

Release note (performance improvement): Speed up distsql query execution
by "fusing" processors executing on the same node together.
petermattis added a commit to petermattis/cockroach that referenced this issue Jan 5, 2018
Elide RowChannel when connecting local processors in simple cases. A
simple `SELECT COUNT(*) FROM test.kv` query where `test.kv` contains 5m
rows is reduced from 5.4s to 4.3s (a 20% speedup). When using distsql,
this query utilizes a `tableReader` connected to an `aggregator`. For
comparison, this query takes 4.5s when running with `set
distsql=off`. These numbers are from a local single-node cluster.

See cockroachdb#20550

Release note (performance improvement): Speed up distsql query execution
by "fusing" processors executing on the same node together.
petermattis added a commit to petermattis/cockroach that referenced this issue Jan 8, 2018
Elide RowChannel when connecting local processors in simple cases. A
simple `SELECT COUNT(*) FROM test.kv` query where `test.kv` contains 5m
rows is reduced from 5.4s to 4.3s (a 20% speedup). When using distsql,
this query utilizes a `tableReader` connected to an `aggregator`. For
comparison, this query takes 4.5s when running with `set
distsql=off`. These numbers are from a local single-node cluster.

See cockroachdb#20550

Release note (performance improvement): Speed up distsql query execution
by "fusing" processors executing on the same node together.
petermattis added a commit to petermattis/cockroach that referenced this issue Jan 8, 2018
Elide RowChannel when connecting local processors in simple cases. A
simple `SELECT COUNT(*) FROM test.kv` query where `test.kv` contains 5m
rows is reduced from 5.4s to 4.3s (a 20% speedup). When using distsql,
this query utilizes a `tableReader` connected to an `aggregator`. For
comparison, this query takes 4.5s when running with `set
distsql=off`. These numbers are from a local single-node cluster.

See cockroachdb#20550

Release note (performance improvement): Speed up distsql query execution
by "fusing" processors executing on the same node together.
@jordanlewis
Copy link
Member

Can this be closed, or are we waiting for a full solution?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-performance Perf of queries or internals. Solution not expected to change functional behavior.
Projects
None yet
Development

No branches or pull requests

4 participants