Skip to content

Commit

Permalink
Merge #35529
Browse files Browse the repository at this point in the history
35529: sql: automatically disable result buffering for sinkless changefeeds r=knz a=knz

Discussed on #34423.

This is limited in scope, but it will do in 19.1 where the feature is experimental anyway. In 19.2, bulk i/o will come into the CBO, and we can build a more robust solution there.


Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed Mar 8, 2019
2 parents ef9f048 + 9a2b19f commit 99ff530
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 44 deletions.
12 changes: 6 additions & 6 deletions pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,23 +792,23 @@ func VerifyUsableExportTarget(
// backupPlanHook implements PlanHookFn.
func backupPlanHook(
_ context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
backupStmt, ok := stmt.(*tree.Backup)
if !ok {
return nil, nil, nil, nil
return nil, nil, nil, false, nil
}

toFn, err := p.TypeAsString(backupStmt.To, "BACKUP")
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}
incrementalFromFn, err := p.TypeAsStringArray(backupStmt.IncrementalFrom, "BACKUP")
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}
optsFn, err := p.TypeAsStringOpts(backupStmt.Options, backupOptionExpectValues)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

header := sqlbase.ResultColumns{
Expand Down Expand Up @@ -1082,7 +1082,7 @@ func backupPlanHook(
}
return <-errCh
}
return fn, header, nil, nil
return fn, header, nil, false, nil
}

type backupResumer struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1236,20 +1236,20 @@ var RestoreHeader = sqlbase.ResultColumns{
// restorePlanHook implements sql.PlanHookFn.
func restorePlanHook(
_ context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
restoreStmt, ok := stmt.(*tree.Restore)
if !ok {
return nil, nil, nil, nil
return nil, nil, nil, false, nil
}

fromFn, err := p.TypeAsStringArray(restoreStmt.From, "RESTORE")
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

optsFn, err := p.TypeAsStringOpts(restoreStmt.Options, restoreOptionExpectValues)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
Expand Down Expand Up @@ -1306,7 +1306,7 @@ func restorePlanHook(
}
return doRestorePlan(ctx, restoreStmt, p, from, endTime, opts, resultsCh)
}
return fn, RestoreHeader, nil, nil
return fn, RestoreHeader, nil, false, nil
}

func doRestorePlan(
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ import (
// showBackupPlanHook implements PlanHookFn.
func showBackupPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
backup, ok := stmt.(*tree.ShowBackup)
if !ok {
return nil, nil, nil, nil
return nil, nil, nil, false, nil
}

if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "SHOW BACKUP",
); err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

if err := p.RequireSuperUser(ctx, "SHOW BACKUP"); err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

toFn, err := p.TypeAsString(backup.Path, "SHOW BACKUP")
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

var shower backupShower
Expand Down Expand Up @@ -81,7 +81,7 @@ func showBackupPlanHook(
return nil
}

return fn, shower.header, nil, nil
return fn, shower.header, nil, false, nil
}

type backupShower struct {
Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ func MakeSinklessFeedFactory(s serverutils.TestServerInterface, sink url.URL) Te
// Feed implements the TestFeedFactory interface
func (f *sinklessFeedFactory) Feed(create string, args ...interface{}) (TestFeed, error) {
sink := f.sink
q := sink.Query()
q.Add(`results_buffer_size`, `1`)
sink.RawQuery = q.Encode()
sink.RawQuery = sink.Query().Encode()
sink.Path = `d`
// Use pgx directly instead of database/sql so we can close the conn
// (instead of returning it to the pool).
Expand Down
12 changes: 7 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,16 @@ var changefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
// changefeedPlanHook implements sql.PlanHookFn.
func changefeedPlanHook(
_ context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
changefeedStmt, ok := stmt.(*tree.CreateChangefeed)
if !ok {
return nil, nil, nil, nil
return nil, nil, nil, false, nil
}

var sinkURIFn func() (string, error)
var header sqlbase.ResultColumns
unspecifiedSink := changefeedStmt.SinkURI == nil
avoidBuffering := false
if unspecifiedSink {
// An unspecified sink triggers a fairly radical change in behavior.
// Instead of setting up a system.job to emit to a sink in the
Expand All @@ -102,11 +103,12 @@ func changefeedPlanHook(
{Name: "key", Typ: types.Bytes},
{Name: "value", Typ: types.Bytes},
}
avoidBuffering = true
} else {
var err error
sinkURIFn, err = p.TypeAsString(changefeedStmt.SinkURI, `CREATE CHANGEFEED`)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}
header = sqlbase.ResultColumns{
{Name: "job_id", Typ: types.Int},
Expand All @@ -115,7 +117,7 @@ func changefeedPlanHook(

optsFn, err := p.TypeAsStringOpts(changefeedStmt.Options, changefeedOptionExpectValues)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
Expand Down Expand Up @@ -295,7 +297,7 @@ func changefeedPlanHook(
}
return nil
}
return fn, header, nil, nil
return fn, header, nil, avoidBuffering, nil
}

func changefeedJobDescription(
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,29 @@ const exportFilePatternDefault = exportFilePatternPart + ".csv"
// exportPlanHook implements sql.PlanHook.
func exportPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
exportStmt, ok := stmt.(*tree.Export)
if !ok {
return nil, nil, nil, nil
return nil, nil, nil, false, nil
}

fileFn, err := p.TypeAsString(exportStmt.File, "EXPORT")
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

if exportStmt.FileFormat != "CSV" {
return nil, nil, nil, errors.Errorf("unsupported export format: %q", exportStmt.FileFormat)
return nil, nil, nil, false, errors.Errorf("unsupported export format: %q", exportStmt.FileFormat)
}

optsFn, err := p.TypeAsStringOpts(exportStmt.Options, exportOptionExpectValues)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

sel, err := p.Select(ctx, exportStmt.Query, nil)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, plans []sql.PlanNode, resultsCh chan<- tree.Datums) error {
Expand Down Expand Up @@ -161,7 +161,7 @@ func exportPlanHook(
return rw.Err()
}

return fn, exportHeader, []sql.PlanNode{sel}, nil
return fn, exportHeader, []sql.PlanNode{sel}, false, nil
}

func newCSVWriterProcessor(
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,28 +438,28 @@ func importJobDescription(
// importPlanHook implements sql.PlanHookFn.
func importPlanHook(
_ context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
importStmt, ok := stmt.(*tree.Import)
if !ok {
return nil, nil, nil, nil
return nil, nil, nil, false, nil
}

filesFn, err := p.TypeAsStringArray(importStmt.Files, "IMPORT")
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

var createFileFn func() (string, error)
if !importStmt.Bundle && importStmt.CreateDefs == nil {
createFileFn, err = p.TypeAsString(importStmt.CreateFile, "IMPORT")
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}
}

optsFn, err := p.TypeAsStringOpts(importStmt.Options, importOptionExpectValues)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
Expand Down Expand Up @@ -869,7 +869,7 @@ func importPlanHook(
}
return <-errCh
}
return fn, backupccl.RestoreHeader, nil, nil
return fn, backupccl.RestoreHeader, nil, false, nil
}

func doDistributedCSVTransform(
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// defer is a catch-all in case some other return path is taken.
defer planner.curPlan.close(ctx)

// Certain statements want their results to go to the client
// directly. Configure this here.
if planner.curPlan.avoidBuffering {
res.DisableBuffering()
}

// Ensure that the plan is collected just before closing.
if sampleLogicalPlans.Get(&ex.appStats.st.SV) {
planner.curPlan.maybeSavePlan = func(ctx context.Context) *roachpb.ExplainTreePlanNode {
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,12 @@ type RestrictedCommandResult interface {
// RowsAffected returns either the number of times AddRow was called, or the
// sum of all n passed into IncrementRowsAffected.
RowsAffected() int

// DisableBuffering can be called during execution to ensure that
// the results accumulated so far, and all subsequent rows added
// to this CommandResult, will be flushed immediately to the client.
// This is currently used for sinkless changefeeds.
DisableBuffering()
}

// DescribeResult represents the result of a Describe command (for either
Expand Down Expand Up @@ -874,6 +880,10 @@ func (r *bufferedCommandResult) AddRow(ctx context.Context, row tree.Datums) err
return nil
}

func (r *bufferedCommandResult) DisableBuffering() {
panic("cannot disable buffering here")
}

// SetError is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) SetError(err error) {
r.err = err
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type commandResult struct {
// oids is a map from result column index to its Oid, similar to formatCodes
// (except oids must always be set).
oids []oid.Oid

// bufferingDisabled is conditionally set during planning of certain
// statements.
bufferingDisabled bool
}

func (c *conn) makeCommandResult(
Expand Down Expand Up @@ -208,10 +212,20 @@ func (r *commandResult) AddRow(ctx context.Context, row tree.Datums) error {
r.rowsAffected++

r.conn.bufferRow(ctx, row, r.formatCodes, r.conv, r.oids)
_ /* flushed */, err := r.conn.maybeFlush(r.pos)
var err error
if r.bufferingDisabled {
err = r.conn.Flush(r.pos)
} else {
_ /* flushed */, err = r.conn.maybeFlush(r.pos)
}
return err
}

// DisableBuffering is part of the CommandResult interface.
func (r *commandResult) DisableBuffering() {
r.bufferingDisabled = true
}

// SetColumns is part of the CommandResult interface.
func (r *commandResult) SetColumns(ctx context.Context, cols sqlbase.ResultColumns) {
r.conn.writerState.fi.registerCmd(r.pos)
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ type planTop struct {
// savedPlanForStats is conditionally populated at the end of
// statement execution, for registration in statement statistics.
savedPlanForStats *roachpb.ExplainTreePlanNode

// avoidBuffering, when set, causes the execution to avoid buffering
// results.
avoidBuffering bool
}

// makePlan implements the Planner interface. It populates the
Expand Down Expand Up @@ -502,9 +506,12 @@ func (p *planner) maybePlanHook(ctx context.Context, stmt tree.Statement) (planN
// upcoming IR work will provide unique numeric type tags, which will
// elegantly solve this.
for _, planHook := range planHooks {
if fn, header, subplans, err := planHook(ctx, stmt, p); err != nil {
if fn, header, subplans, avoidBuffering, err := planHook(ctx, stmt, p); err != nil {
return nil, err
} else if fn != nil {
if avoidBuffering {
p.curPlan.avoidBuffering = true
}
return &hookFnNode{f: fn, header: header, subplans: subplans}, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/planhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// plan execution.
type planHookFn func(
context.Context, tree.Statement, PlanHookState,
) (fn PlanHookRowFn, header sqlbase.ResultColumns, subplans []planNode, err error)
) (fn PlanHookRowFn, header sqlbase.ResultColumns, subplans []planNode, avoidBuffering bool, err error)

// PlanHookRowFn describes the row-production for hook-created plans. The
// channel argument is used to return results to the plan's runner. It's
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/tests/planhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
func init() {
testingPlanHook := func(
ctx context.Context, stmt tree.Statement, state sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) {
show, ok := stmt.(*tree.ShowVar)
if !ok || show.Name != "planhook" {
return nil, nil, nil, nil
return nil, nil, nil, false, nil
}
header := sqlbase.ResultColumns{
{Name: "value", Typ: types.String},
Expand All @@ -41,7 +41,7 @@ func init() {
return func(_ context.Context, subPlans []sql.PlanNode, resultsCh chan<- tree.Datums) error {
resultsCh <- tree.Datums{tree.NewDString(show.Name)}
return nil
}, header, []sql.PlanNode{}, nil
}, header, []sql.PlanNode{}, false, nil
}
sql.AddPlanHook(testingPlanHook)
}

0 comments on commit 99ff530

Please sign in to comment.