Skip to content

Commit

Permalink
Merge pull request #7160 from RaduBerinde/distsql-node
Browse files Browse the repository at this point in the history
sql: distsql plan node
  • Loading branch information
RaduBerinde authored Jun 15, 2016
2 parents dfe2d0c + 3f3fd8f commit 3acc834
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 20 deletions.
14 changes: 8 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,20 @@ func NewServer(ctx Context, stopper *stop.Stopper) (*Server, error) {
s.leaseMgr = sql.NewLeaseManager(0, *s.db, s.clock, lmKnobs)
s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)

// Set up the DistSQL server
distSQLCtx := distsql.ServerContext{
DB: s.db,
}
s.distSQLServer = distsql.NewServer(distSQLCtx)
distsql.RegisterDistSQLServer(s.grpc, s.distSQLServer)

// Set up Executor
eCtx := sql.ExecutorContext{
DB: s.db,
Gossip: s.gossip,
LeaseManager: s.leaseMgr,
Clock: s.clock,
DistSQLSrv: s.distSQLServer,
}
if ctx.TestingKnobs.SQLExecutor != nil {
eCtx.TestingKnobs = ctx.TestingKnobs.SQLExecutor.(*sql.ExecutorTestingKnobs)
Expand All @@ -193,12 +201,6 @@ func NewServer(ctx Context, stopper *stop.Stopper) (*Server, error) {

s.pgServer = pgwire.MakeServer(s.ctx.Context, s.sqlExecutor, sqlRegistry)

distSQLCtx := distsql.ServerContext{
DB: s.db,
}
s.distSQLServer = distsql.NewServer(distSQLCtx)
distsql.RegisterDistSQLServer(s.grpc, s.distSQLServer)

// TODO(bdarnell): make StoreConfig configurable.
nCtx := storage.StoreContext{
Clock: s.clock,
Expand Down
222 changes: 222 additions & 0 deletions sql/dist_sql_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Radu Berinde ([email protected])

package sql

import (
"fmt"
"math"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/sql/distsql"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/sqlbase"
"github.com/cockroachdb/cockroach/util"
)

// distSQLNode is a planNode that receives results from a distsql flow (through
// a RowChannel).
type distSQLNode struct {
columns []ResultColumn
ordering orderingInfo

// colMapping maps columns in the RowChannel stream to result columns.
colMapping []uint32

flow *distsql.Flow
c distsql.RowChannel

values parser.DTuple
alloc sqlbase.DatumAlloc

flowStarted bool
}

var _ planNode = &distSQLNode{}

func (n *distSQLNode) ExplainTypes(func(elem string, desc string)) {}
func (n *distSQLNode) SetLimitHint(int64, bool) {}
func (n *distSQLNode) expandPlan() error { return nil }
func (n *distSQLNode) MarkDebug(explainMode) {}
func (n *distSQLNode) DebugValues() debugValues { return debugValues{} }
func (n *distSQLNode) Start() error { return nil }

func (n *distSQLNode) ExplainPlan(verbose bool) (name, description string, children []planNode) {
return "distsql", "", nil
}

func (n *distSQLNode) Columns() []ResultColumn {
return n.columns
}

func (n *distSQLNode) Ordering() orderingInfo {
return n.ordering
}

func newDistSQLNode(
columns []ResultColumn,
colMapping []uint32,
ordering orderingInfo,
) *distSQLNode {
n := &distSQLNode{
columns: columns,
ordering: ordering,
colMapping: colMapping,
values: make(parser.DTuple, len(columns)),
}
n.c.Init()
return n
}

func (n *distSQLNode) Next() (bool, error) {
if !n.flowStarted {
n.flow.Start()
n.flowStarted = true
}
d, ok := <-n.c.C
if !ok {
// No more data
return false, nil
}
if d.Err != nil {
return false, d.Err
}
if len(d.Row) != len(n.colMapping) {
return false, util.Errorf("row length %d, expected %d", len(d.Row), len(n.colMapping))
}
for i := range d.Row {
col := n.colMapping[i]
err := d.Row[i].Decode(&n.alloc)
if err != nil {
return false, err
}
n.values[col] = d.Row[i].Datum
}
return true, nil
}

func (n *distSQLNode) Values() parser.DTuple {
return n.values
}

// scanNodeToTableReaderSpec generates a TableReaderSpec that corresponds to a
// scanNode.
func scanNodeToTableReaderSpec(n *scanNode) *distsql.TableReaderSpec {
s := &distsql.TableReaderSpec{
Table: n.desc,
Reverse: n.reverse,
}
if n.index != &n.desc.PrimaryIndex {
for i := range n.desc.Indexes {
if n.index == &n.desc.Indexes[i] {
s.IndexIdx = uint32(i + 1)
break
}
}
if s.IndexIdx == 0 {
panic("invalid scanNode index")
}
}
s.Spans = make([]distsql.TableReaderSpan, len(n.spans))
for i, span := range n.spans {
s.Spans[i].Span.Key = span.Start
s.Spans[i].Span.EndKey = span.End
}
s.OutputColumns = make([]uint32, 0, len(n.resultColumns))
for i := range n.resultColumns {
if n.valNeededForCol[i] {
s.OutputColumns = append(s.OutputColumns, uint32(i))
}
}
if n.limitSoft {
s.SoftLimit = n.limitHint
} else {
s.HardLimit = n.limitHint
}

if n.filter != nil {
// Ugly hack to get the expression to print the way we want it.
//
// The distsql Expression uses the placeholder syntax ($0, $1, $2..) to
// refer to columns. We temporarily rename the scanNode columns to
// (literally) "$0", "$1", ... and convert to a string.
tmp := n.resultColumns
n.resultColumns = make([]ResultColumn, len(tmp))
for i, orig := range tmp {
n.resultColumns[i].Name = fmt.Sprintf("$%d", i)
n.resultColumns[i].Typ = orig.Typ
n.resultColumns[i].hidden = orig.hidden
}
expr := n.filter.String()
n.resultColumns = tmp
s.Filter.Expr = expr
}
return s
}

// scanNodeToDistSQL creates a flow and distSQLNode that correspond to a
// scanNode.
func scanNodeToDistSQL(n *scanNode) (*distSQLNode, error) {
req := &distsql.SetupFlowsRequest{Txn: n.p.txn.Proto}
tr := scanNodeToTableReaderSpec(n)
req.Flows = []distsql.FlowSpec{{
Processors: []distsql.ProcessorSpec{{
Core: distsql.ProcessorCoreUnion{TableReader: tr},
Output: []distsql.OutputRouterSpec{{
Type: distsql.OutputRouterSpec_MIRROR,
Streams: []distsql.StreamEndpointSpec{{
Mailbox: &distsql.MailboxSpec{SimpleResponse: true},
}},
}},
}},
}}

dn := newDistSQLNode(n.resultColumns, tr.OutputColumns, n.ordering)

srv := n.p.execCtx.DistSQLSrv
flow, err := srv.SetupSimpleFlow(context.Background(), req, &dn.c)
if err != nil {
return nil, err
}
dn.flow = flow
return dn, nil
}

// hackPlanToUseDistSQL goes through a planNode tree and replaces each scanNode with
// a distSQLNode and a corresponding flow.
func hackPlanToUseDistSQL(plan planNode) error {
// Trigger limit propagation.
plan.SetLimitHint(math.MaxInt64, true)

if sel, ok := plan.(*selectNode); ok {
if scan, ok := sel.source.plan.(*scanNode); ok {
distNode, err := scanNodeToDistSQL(scan)
if err != nil {
return err
}
sel.source.plan = distNode
}
}

_, _, children := plan.ExplainPlan(true)
for _, c := range children {
if err := hackPlanToUseDistSQL(c); err != nil {
return err
}
}
return nil
}
16 changes: 9 additions & 7 deletions sql/distsql/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type processor interface {
// local physical streams.
type StreamMsg struct {
// Only one of these fields will be set.
row sqlbase.EncDatumRow
err error
Row sqlbase.EncDatumRow
Err error
}

// RowChannel is a thin layer over a StreamMsg channel, which can be used to
Expand All @@ -71,14 +71,16 @@ type RowChannel struct {

var _ rowReceiver = &RowChannel{}

func (rc *RowChannel) initWithBufSize(chanBufSize int) {
// InitWithBufSize initializes the RowChannel with a given buffer size.
func (rc *RowChannel) InitWithBufSize(chanBufSize int) {
rc.dataChan = make(chan StreamMsg, chanBufSize)
rc.C = rc.dataChan
atomic.StoreUint32(&rc.noMoreRows, 0)
}

func (rc *RowChannel) init() {
rc.initWithBufSize(rowChannelBufSize)
// Init initializes the RowChannel with the default buffer size.
func (rc *RowChannel) Init() {
rc.InitWithBufSize(rowChannelBufSize)
}

// PushRow is part of the rowReceiver interface.
Expand All @@ -87,14 +89,14 @@ func (rc *RowChannel) PushRow(row sqlbase.EncDatumRow) bool {
return false
}

rc.dataChan <- StreamMsg{row: row, err: nil}
rc.dataChan <- StreamMsg{Row: row, Err: nil}
return true
}

// Close is part of the rowReceiver interface.
func (rc *RowChannel) Close(err error) {
if err != nil {
rc.dataChan <- StreamMsg{row: nil, err: err}
rc.dataChan <- StreamMsg{Row: nil, Err: err}
}
close(rc.dataChan)
}
Expand Down
8 changes: 4 additions & 4 deletions sql/distsql/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newJoinReader(
}

// Allow the input channel to buffer an entire batch.
jr.RowChannel.initWithBufSize(joinReaderBatchSize)
jr.RowChannel.InitWithBufSize(joinReaderBatchSize)

return jr, nil
}
Expand Down Expand Up @@ -111,11 +111,11 @@ func (jr *joinReader) mainLoop() error {
}
break
}
if d.err != nil {
return d.err
if d.Err != nil {
return d.Err
}

key, err := jr.generateKey(d.row, &alloc, primaryKeyPrefix)
key, err := jr.generateKey(d.Row, &alloc, primaryKeyPrefix)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions sql/distsql/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ loop:
err = m.flush(true, nil)
break loop
}
err = d.err
err = d.Err
if err == nil {
err = m.addRow(d.row)
err = m.addRow(d.Row)
}
if err != nil {
// Try to flush to send out the error, but ignore any
Expand Down Expand Up @@ -139,7 +139,7 @@ loop:
func (m *outbox) start(wg *sync.WaitGroup) {
wg.Add(1)
m.wg = wg
m.RowChannel.init()
m.RowChannel.Init()
m.flushTicker = time.NewTicker(outboxFlushPeriod)
go m.mainLoop()
}
11 changes: 11 additions & 0 deletions sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/sql/distsql"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/sqlbase"
"github.com/cockroachdb/cockroach/util"
Expand All @@ -50,6 +51,9 @@ var errNotRetriable = errors.New("the transaction is not in a retriable state")
const sqlTxnName string = "sql txn"
const sqlImplicitTxnName string = "sql txn implicit"

// TODO(radu): experimental code for testing distSQL flows.
const testDistSQL bool = false

type traceResult struct {
tag string
count int
Expand Down Expand Up @@ -152,6 +156,7 @@ type ExecutorContext struct {
Gossip *gossip.Gossip
LeaseManager *LeaseManager
Clock *hlc.Clock
DistSQLSrv *distsql.ServerImpl

TestingKnobs *ExecutorTestingKnobs
}
Expand Down Expand Up @@ -981,6 +986,12 @@ func (e *Executor) execStmt(
return result, err
}

if testDistSQL {
if err := hackPlanToUseDistSQL(plan); err != nil {
return result, err
}
}

if err := plan.Start(); err != nil {
return result, err
}
Expand Down

0 comments on commit 3acc834

Please sign in to comment.