Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: make internal executor streaming #59330

Merged
merged 1 commit into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,23 @@ func (ie *wrappedInternalExecutor) QueryRow(
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryIterator(
ctx context.Context, opName string, txn *kv.Txn, stmt string, qargs ...interface{},
) (sqlutil.InternalRows, error) {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryIteratorEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (sqlutil.InternalRows, error) {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) getErrFunc() func(statement string) error {
ie.mu.RLock()
defer ie.mu.RUnlock()
Expand Down
52 changes: 19 additions & 33 deletions pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
buf, syncResults, finished, stopper, err := startConnExecutor(ctx)
buf, syncResults, finished, stopper, _, err := startConnExecutor(ctx)
if err != nil {
t.Fatal(err)
}
Expand All @@ -69,11 +69,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
// succeed and the 2nd one to fail (since the portal is destroyed after the
// Execute).
cmdPos := 0
stmt := mustParseOne("SELECT 1")
if err != nil {
t.Fatal(err)
}
if err = buf.Push(ctx, PrepareStmt{Name: "ps_nontxn", Statement: stmt}); err != nil {
if err = buf.Push(ctx, PrepareStmt{Name: "ps_nontxn", Statement: mustParseOne("SELECT 1")}); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -121,7 +117,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
if numResults != cmdPos+1 {
t.Fatalf("expected %d results, got: %d", cmdPos+1, len(results))
}
if err := results[successfulDescribePos].err; err != nil {
if err = results[successfulDescribePos].err; err != nil {
t.Fatalf("expected first Describe to succeed, got err: %s", err)
}
if !testutils.IsError(results[failedDescribePos].err, "unknown portal") {
Expand All @@ -134,20 +130,12 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
// after the COMMIT). The point of the SELECT is to show that the portal
// survives execution of a statement.
cmdPos++
stmt = mustParseOne("BEGIN")
if err != nil {
t.Fatal(err)
}
if err := buf.Push(ctx, ExecStmt{Statement: stmt}); err != nil {
if err = buf.Push(ctx, ExecStmt{Statement: mustParseOne("BEGIN")}); err != nil {
t.Fatal(err)
}

cmdPos++
stmt = mustParseOne("SELECT 1")
if err != nil {
t.Fatal(err)
}
if err = buf.Push(ctx, PrepareStmt{Name: "ps1", Statement: stmt}); err != nil {
if err = buf.Push(ctx, PrepareStmt{Name: "ps1", Statement: mustParseOne("SELECT 1")}); err != nil {
t.Fatal(err)
}

Expand All @@ -160,11 +148,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
}

cmdPos++
stmt = mustParseOne("SELECT 2")
if err != nil {
t.Fatal(err)
}
if err := buf.Push(ctx, ExecStmt{Statement: stmt}); err != nil {
if err = buf.Push(ctx, ExecStmt{Statement: mustParseOne("SELECT 2")}); err != nil {
t.Fatal(err)
}

Expand All @@ -178,11 +162,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
}

cmdPos++
stmt = mustParseOne("COMMIT")
if err != nil {
t.Fatal(err)
}
if err := buf.Push(ctx, ExecStmt{Statement: stmt}); err != nil {
if err = buf.Push(ctx, ExecStmt{Statement: mustParseOne("COMMIT")}); err != nil {
t.Fatal(err)
}

Expand All @@ -207,7 +187,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
t.Fatalf("expected %d results, got: %d", exp, len(results))
}
succDescIdx := successfulDescribePos - numResults
if err := results[succDescIdx].err; err != nil {
if err = results[succDescIdx].err; err != nil {
t.Fatalf("expected first Describe to succeed, got err: %s", err)
}
failDescIdx := failedDescribePos - numResults
Expand All @@ -216,7 +196,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
}

buf.Close()
if err := <-finished; err != nil {
if err = <-finished; err != nil {
t.Fatal(err)
}
}
Expand All @@ -240,9 +220,13 @@ func mustParseOne(s string) parser.Statement {
// gets the error from closing down the executor once the StmtBuf is closed, a
// stopper that must be stopped when the test completes (this does not stop the
// executor but stops other background work).
//
// It also returns a channel that AddRow might block on which can buffer up to
// 16 items (including column types when applicable), so the caller might need
// to receive from it occasionally.
func startConnExecutor(
ctx context.Context,
) (*StmtBuf, <-chan []resWithPos, <-chan error, *stop.Stopper, error) {
) (*StmtBuf, <-chan []resWithPos, <-chan error, *stop.Stopper, <-chan ieIteratorResult, error) {
// A lot of boilerplate for creating a connExecutor.
stopper := stop.NewStopper()
clock := hlc.NewClock(hlc.UnixNano, 0 /* maxOffset */)
Expand All @@ -258,7 +242,7 @@ func startConnExecutor(
gw := gossip.MakeOptionalGossip(nil)
tempEngine, tempFS, err := storage.NewTempEngine(ctx, base.DefaultTestTempStorageConfig(st), base.DefaultTestStoreSpec)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
defer tempEngine.Close()
cfg := &ExecutorConfig{
Expand Down Expand Up @@ -305,16 +289,18 @@ func startConnExecutor(
s := NewServer(cfg, pool)
buf := NewStmtBuf()
syncResults := make(chan []resWithPos, 1)
iteratorCh := make(chan ieIteratorResult, 16)
var cc ClientComm = &internalClientComm{
sync: func(res []resWithPos) {
syncResults <- res
},
ch: iteratorCh,
}
sqlMetrics := MakeMemMetrics("test" /* endpoint */, time.Second /* histogramWindow */)

conn, err := s.SetupConn(ctx, SessionArgs{}, buf, cc, sqlMetrics)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
finished := make(chan error)

Expand All @@ -324,7 +310,7 @@ func startConnExecutor(
go func() {
finished <- s.ServeConn(ctx, conn, mon.BoundAccount{}, nil /* cancel */)
}()
return buf, syncResults, finished, stopper, nil
return buf, syncResults, finished, stopper, iteratorCh, nil
}

// Test that a client session can close without deadlocking when the closing
Expand Down
78 changes: 40 additions & 38 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,108 +858,110 @@ type resCloseType bool
const closed resCloseType = true
const discarded resCloseType = false

// bufferedCommandResult is a CommandResult that buffers rows and can call a
// provided callback when closed.
type bufferedCommandResult struct {
// streamingCommandResult is a CommandResult that streams rows on the channel
// and can call a provided callback when closed.
type streamingCommandResult struct {
ch chan ieIteratorResult
err error
rows []tree.Datums
rowsAffected int
cols colinfo.ResultColumns

// errOnly, if set, makes AddRow() panic. This can be used when the execution
// of the query is not expected to produce any results.
errOnly bool

// closeCallback, if set, is called when Close()/Discard() is called.
closeCallback func(*bufferedCommandResult, resCloseType, error)
closeCallback func(*streamingCommandResult, resCloseType)
}

var _ RestrictedCommandResult = &bufferedCommandResult{}
var _ CommandResultClose = &bufferedCommandResult{}
var _ RestrictedCommandResult = &streamingCommandResult{}
var _ CommandResultClose = &streamingCommandResult{}

// SetColumns is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) SetColumns(_ context.Context, cols colinfo.ResultColumns) {
if r.errOnly {
panic("SetColumns() called when errOnly is set")
}
r.cols = cols
func (r *streamingCommandResult) SetColumns(ctx context.Context, cols colinfo.ResultColumns) {
r.ch <- ieIteratorResult{cols: cols}
}

// BufferParamStatusUpdate is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) BufferParamStatusUpdate(key string, val string) {
func (r *streamingCommandResult) BufferParamStatusUpdate(key string, val string) {
panic("unimplemented")
}

// BufferNotice is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) BufferNotice(notice pgnotice.Notice) {
func (r *streamingCommandResult) BufferNotice(notice pgnotice.Notice) {
panic("unimplemented")
}

// ResetStmtType is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) ResetStmtType(stmt tree.Statement) {
func (r *streamingCommandResult) ResetStmtType(stmt tree.Statement) {
panic("unimplemented")
}

// AddRow is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) AddRow(ctx context.Context, row tree.Datums) error {
if r.errOnly {
panic("AddRow() called when errOnly is set")
}
func (r *streamingCommandResult) AddRow(ctx context.Context, row tree.Datums) error {
// AddRow() and IncrementRowsAffected() are never called on the same command
// result, so we will not double count the affected rows by an increment
// here.
r.rowsAffected++
rowCopy := make(tree.Datums, len(row))
copy(rowCopy, row)
r.rows = append(r.rows, rowCopy)
r.ch <- ieIteratorResult{row: rowCopy}
return nil
}

func (r *bufferedCommandResult) DisableBuffering() {
func (r *streamingCommandResult) DisableBuffering() {
panic("cannot disable buffering here")
}

// SetError is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) SetError(err error) {
func (r *streamingCommandResult) SetError(err error) {
r.err = err
// Note that we intentionally do not send the error on the channel (when it
// is present) since we might replace the error with another one later which
// is allowed by the interface. An example of this is queryDone() closure
// in execStmtInOpenState().
}

// Err is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) Err() error {
func (r *streamingCommandResult) Err() error {
return r.err
}

// IncrementRowsAffected is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) IncrementRowsAffected(n int) {
func (r *streamingCommandResult) IncrementRowsAffected(n int) {
r.rowsAffected += n
if r.ch != nil {
// streamingCommandResult might be used outside of the internal executor
// (i.e. not by rowsIterator) in which case the channel is not set.
r.ch <- ieIteratorResult{rowsAffectedIncrement: &n}
}
}

// RowsAffected is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) RowsAffected() int {
func (r *streamingCommandResult) RowsAffected() int {
return r.rowsAffected
}

// Close is part of the CommandResultClose interface.
func (r *bufferedCommandResult) Close(context.Context, TransactionStatusIndicator) {
func (r *streamingCommandResult) Close(context.Context, TransactionStatusIndicator) {
if r.closeCallback != nil {
r.closeCallback(r, closed, nil /* err */)
r.closeCallback(r, closed)
}
}

// Discard is part of the CommandResult interface.
func (r *bufferedCommandResult) Discard() {
func (r *streamingCommandResult) Discard() {
if r.closeCallback != nil {
r.closeCallback(r, discarded, nil /* err */)
r.closeCallback(r, discarded)
}
}

// SetInferredTypes is part of the DescribeResult interface.
func (r *bufferedCommandResult) SetInferredTypes([]oid.Oid) {}
func (r *streamingCommandResult) SetInferredTypes([]oid.Oid) {}

// SetNoDataRowDescription is part of the DescribeResult interface.
func (r *bufferedCommandResult) SetNoDataRowDescription() {}
func (r *streamingCommandResult) SetNoDataRowDescription() {}

// SetPrepStmtOutput is part of the DescribeResult interface.
func (r *bufferedCommandResult) SetPrepStmtOutput(context.Context, colinfo.ResultColumns) {}
func (r *streamingCommandResult) SetPrepStmtOutput(context.Context, colinfo.ResultColumns) {}

// SetPortalOutput is part of the DescribeResult interface.
func (r *bufferedCommandResult) SetPortalOutput(
func (r *streamingCommandResult) SetPortalOutput(
context.Context, colinfo.ResultColumns, []pgwirebase.FormatCode,
) {
}
2 changes: 1 addition & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (c *copyMachine) insertRows(ctx context.Context) (retErr error) {
return err
}

var res bufferedCommandResult
var res streamingCommandResult
err := c.execInsertPlan(ctx, &c.p, &res)
if err != nil {
return err
Expand Down
Loading