Skip to content

Commit

Permalink
Merge #97808
Browse files Browse the repository at this point in the history
97808: sql: make COPY TO/FROM respect timeouts and cancellation, and appear in SHOW QUERIES r=otan a=rafiss

informs #94194

### sql: use conn executor state machine for COPY FROM 

Using the conn executor state machine allows a lot of bespoke logic to
be removed. (If we were to remove the copy_from_atomic_enabled setting
and make it always true, we could remove even more custom logic.)

This then makes it easy to hook up to cancellation and introspection.

Release note (bug fix): The COPY FROM command now respects the
statement_timeout and transaction_timeout settings.

Release note (bug fix): The COPY FROM command now appears in the results
of SHOW QUERIES.

---

### sql: make stmt/txn timeout apply to COPY TO

This commit simplifies the conn executor state transitions a bit, and
adds in timers for stmt/txn timeouts analogously to how they were for
COPY FROM.

No release note since COPY TO is new.

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Mar 1, 2023
2 parents c82f0b9 + 78e6280 commit a683fc7
Show file tree
Hide file tree
Showing 8 changed files with 638 additions and 186 deletions.
503 changes: 366 additions & 137 deletions pkg/sql/conn_executor.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (ex *connExecutor) execStmtInOpenState(
}()
}

if ex.sessionData().TransactionTimeout > 0 && !ex.implicitTxn() {
if ex.sessionData().TransactionTimeout > 0 && !ex.implicitTxn() && ex.executorType != executorTypeInternal {
timerDuration :=
ex.sessionData().TransactionTimeout - timeutil.Since(ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionTransactionStarted))

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,6 @@ type CopyInResult interface {
// produces no output for the client.
type CopyOutResult interface {
ResultBase
RestrictedCommandResult
}

// ClientLock is an interface returned by ClientComm.lockCommunication(). It
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/copy/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
"//pkg/util/encoding/csv",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand All @@ -40,6 +41,7 @@ go_test(
"//pkg/util/timetz",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgtype//:pgtype",
"@com_github_jackc_pgx_v4//:pgx",
Expand Down
182 changes: 181 additions & 1 deletion pkg/sql/copy/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,33 @@ import (
"bytes"
"context"
"database/sql/driver"
"errors"
"fmt"
"io"
"net/url"
"regexp"
"runtime/pprof"
"strings"
"testing"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/clisqlclient"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -288,6 +292,182 @@ func TestCopyFromTransaction(t *testing.T) {
}
}

// slowCopySource is a pgx.CopyFromSource that copies a fixed number of rows
// and sleeps for 500 ms in between each one.
type slowCopySource struct {
count int
total int
}

func (s *slowCopySource) Next() bool {
s.count++
return s.count < s.total
}

func (s *slowCopySource) Values() ([]interface{}, error) {
time.Sleep(500 * time.Millisecond)
return []interface{}{s.count}, nil
}

func (s *slowCopySource) Err() error {
return nil
}

var _ pgx.CopyFromSource = &slowCopySource{}

// TestCopyFromTimeout checks that COPY FROM respects the statement_timeout
// and transaction_timeout settings.
func TestCopyFromTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

pgURL, cleanup := sqlutils.PGUrl(
t,
s.ServingSQLAddr(),
"TestCopyFromTimeout",
url.User(username.RootUser),
)
defer cleanup()

t.Run("copy from", func(t *testing.T) {
conn, err := pgx.Connect(ctx, pgURL.String())
require.NoError(t, err)

_, err = conn.Exec(ctx, "CREATE TABLE t (a INT PRIMARY KEY)")
require.NoError(t, err)

_, err = conn.Exec(ctx, "SET transaction_timeout = '100ms'")
require.NoError(t, err)

tx, err := conn.Begin(ctx)
require.NoError(t, err)

_, err = tx.CopyFrom(ctx, pgx.Identifier{"t"}, []string{"a"}, &slowCopySource{total: 2})
require.ErrorContains(t, err, "query execution canceled due to transaction timeout")

err = tx.Rollback(ctx)
require.NoError(t, err)

_, err = conn.Exec(ctx, "SET statement_timeout = '200ms'")
require.NoError(t, err)

_, err = conn.CopyFrom(ctx, pgx.Identifier{"t"}, []string{"a"}, &slowCopySource{total: 2})
require.ErrorContains(t, err, "query execution canceled due to statement timeout")
})

t.Run("copy to", func(t *testing.T) {
conn, err := pgx.Connect(ctx, pgURL.String())
require.NoError(t, err)

_, err = conn.Exec(ctx, "SET transaction_timeout = '100ms'")
require.NoError(t, err)

tx, err := conn.Begin(ctx)
require.NoError(t, err)

_, err = tx.Exec(ctx, "COPY (SELECT pg_sleep(1) FROM ROWS FROM (generate_series(1, 60)) AS i) TO STDOUT")
require.ErrorContains(t, err, "query execution canceled due to transaction timeout")

err = tx.Rollback(ctx)
require.NoError(t, err)

_, err = conn.Exec(ctx, "SET statement_timeout = '200ms'")
require.NoError(t, err)

_, err = conn.Exec(ctx, "COPY (SELECT pg_sleep(1) FROM ROWS FROM (generate_series(1, 60)) AS i) TO STDOUT")
require.ErrorContains(t, err, "query execution canceled due to statement timeout")
})
}

func TestShowQueriesIncludesCopy(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

pgURL, cleanup := sqlutils.PGUrl(
t,
s.ServingSQLAddr(),
"TestShowQueriesIncludesCopy",
url.User(username.RootUser),
)
defer cleanup()

showConn, err := pgx.Connect(ctx, pgURL.String())
require.NoError(t, err)
q := pgURL.Query()
q.Add("application_name", "app_name")
pgURL.RawQuery = q.Encode()
copyConn, err := pgx.Connect(ctx, pgURL.String())
require.NoError(t, err)
_, err = copyConn.Exec(ctx, "CREATE TABLE t (a INT PRIMARY KEY)")
require.NoError(t, err)

t.Run("copy to", func(t *testing.T) {
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, err = copyConn.Exec(ctx, "COPY (SELECT pg_sleep(1) FROM ROWS FROM (generate_series(1, 60)) AS i) TO STDOUT")
return err
})

// The COPY query should use the specified app name. SucceedsSoon is used
// since COPY is being executed concurrently.
var appName string
testutils.SucceedsSoon(t, func() error {
err = showConn.QueryRow(ctx, "SELECT application_name FROM [SHOW QUERIES] WHERE query LIKE 'COPY (SELECT pg_sleep(1) %'").Scan(&appName)
if err != nil {
return err
}
if appName != "app_name" {
return errors.New("expected COPY to appear in SHOW QUERIES")
}
return nil
})

err = copyConn.PgConn().CancelRequest(ctx)
require.NoError(t, err)

// An error is expected, since the query was canceled.
err = g.Wait()
require.ErrorContains(t, err, "query execution canceled")
})

t.Run("copy from", func(t *testing.T) {
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, err := copyConn.CopyFrom(ctx, pgx.Identifier{"t"}, []string{"a"}, &slowCopySource{total: 5})
return err
})

// The COPY query should use the specified app name. SucceedsSoon is used
// since COPY is being executed concurrently.
var appName string
testutils.SucceedsSoon(t, func() error {
err = showConn.QueryRow(ctx, "SELECT application_name FROM [SHOW QUERIES] WHERE query ILIKE 'COPY%t%a%FROM%'").Scan(&appName)
if err != nil {
return err
}
if appName != "app_name" {
return errors.New("expected COPY to appear in SHOW QUERIES")
}
return nil
})

err = copyConn.PgConn().CancelRequest(ctx)
require.NoError(t, err)

// An error is expected, since the query was canceled.
err = g.Wait()
require.ErrorContains(t, err, "query execution canceled")
})
}

// BenchmarkCopyFrom measures copy performance against a TestServer.
func BenchmarkCopyFrom(b *testing.B) {
defer leaktest.AfterTest(b)()
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,25 @@ func newFileUploadMachine(
conn pgwirebase.Conn,
n *tree.CopyFrom,
txnOpt copyTxnOpt,
execCfg *ExecutorConfig,
p *planner,
parentMon *mon.BytesMonitor,
) (f *fileUploadMachine, retErr error) {
if len(n.Columns) != 0 {
return nil, errors.New("expected 0 columns specified for file uploads")
}
c := &copyMachine{
conn: conn,
conn: conn,
txnOpt: txnOpt,
// The planner will be prepared before use.
p: &planner{execCfg: execCfg},
p: p,
}
f = &fileUploadMachine{
c: c,
}

// We need a planner to do the initial planning, even if a planner
// is not required after that.
cleanup := c.p.preparePlannerForCopy(ctx, &txnOpt, false /* finalBatch */, c.implicitTxn)
cleanup := c.p.preparePlannerForCopy(ctx, &c.txnOpt, false /* finalBatch */, c.implicitTxn)
defer func() {
retErr = cleanup(ctx, retErr)
}()
Expand Down
Loading

0 comments on commit a683fc7

Please sign in to comment.