diff --git a/pkg/ccl/backupccl/backup.go b/pkg/ccl/backupccl/backup.go index f7acb24dc6c6..f2e3396187c2 100644 --- a/pkg/ccl/backupccl/backup.go +++ b/pkg/ccl/backupccl/backup.go @@ -792,23 +792,23 @@ func VerifyUsableExportTarget( // backupPlanHook implements PlanHookFn. func backupPlanHook( _ context.Context, stmt tree.Statement, p sql.PlanHookState, -) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) { +) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) { backupStmt, ok := stmt.(*tree.Backup) if !ok { - return nil, nil, nil, nil + return nil, nil, nil, false, nil } toFn, err := p.TypeAsString(backupStmt.To, "BACKUP") if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } incrementalFromFn, err := p.TypeAsStringArray(backupStmt.IncrementalFrom, "BACKUP") if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } optsFn, err := p.TypeAsStringOpts(backupStmt.Options, backupOptionExpectValues) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } header := sqlbase.ResultColumns{ @@ -1082,7 +1082,7 @@ func backupPlanHook( } return <-errCh } - return fn, header, nil, nil + return fn, header, nil, false, nil } type backupResumer struct { diff --git a/pkg/ccl/backupccl/restore.go b/pkg/ccl/backupccl/restore.go index 6bf7ccdf0bc5..17af55227e7f 100644 --- a/pkg/ccl/backupccl/restore.go +++ b/pkg/ccl/backupccl/restore.go @@ -1236,20 +1236,20 @@ var RestoreHeader = sqlbase.ResultColumns{ // restorePlanHook implements sql.PlanHookFn. func restorePlanHook( _ context.Context, stmt tree.Statement, p sql.PlanHookState, -) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) { +) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) { restoreStmt, ok := stmt.(*tree.Restore) if !ok { - return nil, nil, nil, nil + return nil, nil, nil, false, nil } fromFn, err := p.TypeAsStringArray(restoreStmt.From, "RESTORE") if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } optsFn, err := p.TypeAsStringOpts(restoreStmt.Options, restoreOptionExpectValues) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { @@ -1306,7 +1306,7 @@ func restorePlanHook( } return doRestorePlan(ctx, restoreStmt, p, from, endTime, opts, resultsCh) } - return fn, RestoreHeader, nil, nil + return fn, RestoreHeader, nil, false, nil } func doRestorePlan( diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index d89dbee79720..a6a4dfd2f5f0 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -26,25 +26,25 @@ import ( // showBackupPlanHook implements PlanHookFn. func showBackupPlanHook( ctx context.Context, stmt tree.Statement, p sql.PlanHookState, -) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) { +) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) { backup, ok := stmt.(*tree.ShowBackup) if !ok { - return nil, nil, nil, nil + return nil, nil, nil, false, nil } if err := utilccl.CheckEnterpriseEnabled( p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "SHOW BACKUP", ); err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } if err := p.RequireSuperUser(ctx, "SHOW BACKUP"); err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } toFn, err := p.TypeAsString(backup.Path, "SHOW BACKUP") if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } var shower backupShower @@ -81,7 +81,7 @@ func showBackupPlanHook( return nil } - return fn, shower.header, nil, nil + return fn, shower.header, nil, false, nil } type backupShower struct { diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index 548f7748df8e..08a4fd19485f 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -84,9 +84,7 @@ func MakeSinklessFeedFactory(s serverutils.TestServerInterface, sink url.URL) Te // Feed implements the TestFeedFactory interface func (f *sinklessFeedFactory) Feed(create string, args ...interface{}) (TestFeed, error) { sink := f.sink - q := sink.Query() - q.Add(`results_buffer_size`, `1`) - sink.RawQuery = q.Encode() + sink.RawQuery = sink.Query().Encode() sink.Path = `d` // Use pgx directly instead of database/sql so we can close the conn // (instead of returning it to the pool). diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 868201b0975a..a700386fa92c 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -79,15 +79,16 @@ var changefeedOptionExpectValues = map[string]sql.KVStringOptValidate{ // changefeedPlanHook implements sql.PlanHookFn. func changefeedPlanHook( _ context.Context, stmt tree.Statement, p sql.PlanHookState, -) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) { +) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) { changefeedStmt, ok := stmt.(*tree.CreateChangefeed) if !ok { - return nil, nil, nil, nil + return nil, nil, nil, false, nil } var sinkURIFn func() (string, error) var header sqlbase.ResultColumns unspecifiedSink := changefeedStmt.SinkURI == nil + avoidBuffering := false if unspecifiedSink { // An unspecified sink triggers a fairly radical change in behavior. // Instead of setting up a system.job to emit to a sink in the @@ -102,11 +103,12 @@ func changefeedPlanHook( {Name: "key", Typ: types.Bytes}, {Name: "value", Typ: types.Bytes}, } + avoidBuffering = true } else { var err error sinkURIFn, err = p.TypeAsString(changefeedStmt.SinkURI, `CREATE CHANGEFEED`) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } header = sqlbase.ResultColumns{ {Name: "job_id", Typ: types.Int}, @@ -115,7 +117,7 @@ func changefeedPlanHook( optsFn, err := p.TypeAsStringOpts(changefeedStmt.Options, changefeedOptionExpectValues) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { @@ -295,7 +297,7 @@ func changefeedPlanHook( } return nil } - return fn, header, nil, nil + return fn, header, nil, avoidBuffering, nil } func changefeedJobDescription( diff --git a/pkg/ccl/importccl/exportcsv.go b/pkg/ccl/importccl/exportcsv.go index 6cec74631d90..346d6f519d04 100644 --- a/pkg/ccl/importccl/exportcsv.go +++ b/pkg/ccl/importccl/exportcsv.go @@ -60,29 +60,29 @@ const exportFilePatternDefault = exportFilePatternPart + ".csv" // exportPlanHook implements sql.PlanHook. func exportPlanHook( ctx context.Context, stmt tree.Statement, p sql.PlanHookState, -) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) { +) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) { exportStmt, ok := stmt.(*tree.Export) if !ok { - return nil, nil, nil, nil + return nil, nil, nil, false, nil } fileFn, err := p.TypeAsString(exportStmt.File, "EXPORT") if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } if exportStmt.FileFormat != "CSV" { - return nil, nil, nil, errors.Errorf("unsupported export format: %q", exportStmt.FileFormat) + return nil, nil, nil, false, errors.Errorf("unsupported export format: %q", exportStmt.FileFormat) } optsFn, err := p.TypeAsStringOpts(exportStmt.Options, exportOptionExpectValues) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } sel, err := p.Select(ctx, exportStmt.Query, nil) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } fn := func(ctx context.Context, plans []sql.PlanNode, resultsCh chan<- tree.Datums) error { @@ -161,7 +161,7 @@ func exportPlanHook( return rw.Err() } - return fn, exportHeader, []sql.PlanNode{sel}, nil + return fn, exportHeader, []sql.PlanNode{sel}, false, nil } func newCSVWriterProcessor( diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index ef1d4d940953..2eade0312cc4 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -438,28 +438,28 @@ func importJobDescription( // importPlanHook implements sql.PlanHookFn. func importPlanHook( _ context.Context, stmt tree.Statement, p sql.PlanHookState, -) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) { +) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) { importStmt, ok := stmt.(*tree.Import) if !ok { - return nil, nil, nil, nil + return nil, nil, nil, false, nil } filesFn, err := p.TypeAsStringArray(importStmt.Files, "IMPORT") if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } var createFileFn func() (string, error) if !importStmt.Bundle && importStmt.CreateDefs == nil { createFileFn, err = p.TypeAsString(importStmt.CreateFile, "IMPORT") if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } } optsFn, err := p.TypeAsStringOpts(importStmt.Options, importOptionExpectValues) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, false, err } fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { @@ -869,7 +869,7 @@ func importPlanHook( } return <-errCh } - return fn, backupccl.RestoreHeader, nil, nil + return fn, backupccl.RestoreHeader, nil, false, nil } func doDistributedCSVTransform( diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index fc4393ad4b16..578d5848af0c 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -866,6 +866,12 @@ func (ex *connExecutor) dispatchToExecutionEngine( // defer is a catch-all in case some other return path is taken. defer planner.curPlan.close(ctx) + // Certain statements want their results to go to the client + // directly. Configure this here. + if planner.curPlan.avoidBuffering { + res.DisableBuffering() + } + // Ensure that the plan is collected just before closing. if sampleLogicalPlans.Get(&ex.appStats.st.SV) { planner.curPlan.maybeSavePlan = func(ctx context.Context) *roachpb.ExplainTreePlanNode { diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index 413c6d1d5d4b..960dd2537110 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -706,6 +706,12 @@ type RestrictedCommandResult interface { // RowsAffected returns either the number of times AddRow was called, or the // sum of all n passed into IncrementRowsAffected. RowsAffected() int + + // DisableBuffering can be called during execution to ensure that + // the results accumulated so far, and all subsequent rows added + // to this CommandResult, will be flushed immediately to the client. + // This is currently used for sinkless changefeeds. + DisableBuffering() } // DescribeResult represents the result of a Describe command (for either @@ -874,6 +880,10 @@ func (r *bufferedCommandResult) AddRow(ctx context.Context, row tree.Datums) err return nil } +func (r *bufferedCommandResult) DisableBuffering() { + panic("cannot disable buffering here") +} + // SetError is part of the RestrictedCommandResult interface. func (r *bufferedCommandResult) SetError(err error) { r.err = err diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index c57475c187c6..4cf341ae9621 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -80,6 +80,10 @@ type commandResult struct { // oids is a map from result column index to its Oid, similar to formatCodes // (except oids must always be set). oids []oid.Oid + + // bufferingDisabled is conditionally set during planning of certain + // statements. + bufferingDisabled bool } func (c *conn) makeCommandResult( @@ -208,10 +212,20 @@ func (r *commandResult) AddRow(ctx context.Context, row tree.Datums) error { r.rowsAffected++ r.conn.bufferRow(ctx, row, r.formatCodes, r.conv, r.oids) - _ /* flushed */, err := r.conn.maybeFlush(r.pos) + var err error + if r.bufferingDisabled { + err = r.conn.Flush(r.pos) + } else { + _ /* flushed */, err = r.conn.maybeFlush(r.pos) + } return err } +// DisableBuffering is part of the CommandResult interface. +func (r *commandResult) DisableBuffering() { + r.bufferingDisabled = true +} + // SetColumns is part of the CommandResult interface. func (r *commandResult) SetColumns(ctx context.Context, cols sqlbase.ResultColumns) { r.conn.writerState.fi.registerCmd(r.pos) diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 673e3b4b3e97..01fffb934b98 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -306,6 +306,10 @@ type planTop struct { // savedPlanForStats is conditionally populated at the end of // statement execution, for registration in statement statistics. savedPlanForStats *roachpb.ExplainTreePlanNode + + // avoidBuffering, when set, causes the execution to avoid buffering + // results. + avoidBuffering bool } // makePlan implements the Planner interface. It populates the @@ -502,9 +506,12 @@ func (p *planner) maybePlanHook(ctx context.Context, stmt tree.Statement) (planN // upcoming IR work will provide unique numeric type tags, which will // elegantly solve this. for _, planHook := range planHooks { - if fn, header, subplans, err := planHook(ctx, stmt, p); err != nil { + if fn, header, subplans, avoidBuffering, err := planHook(ctx, stmt, p); err != nil { return nil, err } else if fn != nil { + if avoidBuffering { + p.curPlan.avoidBuffering = true + } return &hookFnNode{f: fn, header: header, subplans: subplans}, nil } } diff --git a/pkg/sql/planhook.go b/pkg/sql/planhook.go index 6a6adb37618c..e155b35e8b98 100644 --- a/pkg/sql/planhook.go +++ b/pkg/sql/planhook.go @@ -36,7 +36,7 @@ import ( // plan execution. type planHookFn func( context.Context, tree.Statement, PlanHookState, -) (fn PlanHookRowFn, header sqlbase.ResultColumns, subplans []planNode, err error) +) (fn PlanHookRowFn, header sqlbase.ResultColumns, subplans []planNode, avoidBuffering bool, err error) // PlanHookRowFn describes the row-production for hook-created plans. The // channel argument is used to return results to the plan's runner. It's diff --git a/pkg/sql/tests/planhook.go b/pkg/sql/tests/planhook.go index 182f8b78d327..d0268b116da2 100644 --- a/pkg/sql/tests/planhook.go +++ b/pkg/sql/tests/planhook.go @@ -29,10 +29,10 @@ import ( func init() { testingPlanHook := func( ctx context.Context, stmt tree.Statement, state sql.PlanHookState, - ) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) { + ) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, bool, error) { show, ok := stmt.(*tree.ShowVar) if !ok || show.Name != "planhook" { - return nil, nil, nil, nil + return nil, nil, nil, false, nil } header := sqlbase.ResultColumns{ {Name: "value", Typ: types.String}, @@ -41,7 +41,7 @@ func init() { return func(_ context.Context, subPlans []sql.PlanNode, resultsCh chan<- tree.Datums) error { resultsCh <- tree.Datums{tree.NewDString(show.Name)} return nil - }, header, []sql.PlanNode{}, nil + }, header, []sql.PlanNode{}, false, nil } sql.AddPlanHook(testingPlanHook) }