From 2d4f937fd5273ea8921d4145daf6366eb3d81c71 Mon Sep 17 00:00:00 2001 From: Marylia Gutierrez Date: Mon, 15 Aug 2022 15:28:36 -0400 Subject: [PATCH 1/4] api: update database parameter name Previously, we were calling the database parameter in `SqlExecutionRequest` as `database_name`, instead of the correct `database`. This commit updates the parameter name to the correct one. Release note (bug fix): Use proper parameter name for database on SQL api Release justification: bug fix --- pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts index e763f07a32e3..3abfdef6bceb 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts @@ -15,7 +15,7 @@ export type SqlExecutionRequest = { execute?: boolean; timeout?: string; // Default 5s application_name?: string; // Defaults to '$ api-v2-sql' - database_name?: string; // Defaults to defaultDb + database?: string; // Defaults to defaultDb max_result_size?: number; // Default 10kib }; From 19fb54a7b49e6002bc8e19b588130b15f0a54b13 Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Mon, 15 Aug 2022 20:27:08 -0400 Subject: [PATCH 2/4] sql/schemachanger: fixed a ordial number bug in ALTER PRIMARY KEY Previously, we had a bug in ALTER PRIMARY KEY such that when the new primary key is a superset of the old primary key, we incorrectly gave ordinal numbers to IndexColumn elements. This PR fixed it and added a few logic tests for cases when the new primary key intersects with the old primary key. Fixes: #85877 Release note: None Release justification: fixed a bug that was revealed in roachtest failure (issue #85877) --- .../testdata/logic_test/alter_primary_key | 29 +++++++++++++++++++ .../alter_table_alter_primary_key.go | 6 ++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/pkg/sql/logictest/testdata/logic_test/alter_primary_key b/pkg/sql/logictest/testdata/logic_test/alter_primary_key index 96941e2f60ba..ddcfb42e0f77 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_primary_key +++ b/pkg/sql/logictest/testdata/logic_test/alter_primary_key @@ -1742,3 +1742,32 @@ SELECT column_name FROM [SHOW COLUMNS FROM t_rowid] ORDER BY column_name; ---- k v + +# The following subtest tests the case when the new primary key +# intersects with the old primary key. +subtest regression_85877 + +statement ok +CREATE TABLE t_85877(i INT NOT NULL, j INT NOT NULL, PRIMARY KEY (i)) + +statement ok +ALTER TABLE t_85877 ALTER PRIMARY KEY USING COLUMNS (i, j) + +statement ok +DROP TABLE t_85877 + +statement ok +CREATE TABLE t_85877 (i INT NOT NULL, j INT NOT NULL, PRIMARY KEY (i, j)) + +statement ok +ALTER TABLE t_85877 ALTER PRIMARY KEY USING COLUMNS (i) + +statement ok +DROP TABLE t_85877 + +statement ok +CREATE TABLE t_85877 (i INT NOT NULL, j INT NOT NULL, k INT NOT NULL, PRIMARY KEY (i, j)) + +statement ok +ALTER TABLE t_85877 ALTER PRIMARY KEY USING COLUMNS (j, k) + diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go index ebf9d01b884b..8d6b468ee593 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go @@ -617,13 +617,14 @@ func addIndexColumnsForNewUniqueSecondaryIndexAndTempIndex( } // Add each column that is not in the old primary key as a SUFFIX_KEY column. + var ord uint32 = 0 for i, keyColIDInNewPrimaryIndex := range newPrimaryIndexKeyColumnIDs { if !descpb.ColumnIDs(oldPrimaryIndexKeyColumnIDs).Contains(keyColIDInNewPrimaryIndex) { b.Add(&scpb.IndexColumn{ TableID: tbl.TableID, IndexID: newUniqueSecondaryIndexID, ColumnID: keyColIDInNewPrimaryIndex, - OrdinalInKind: uint32(i), + OrdinalInKind: ord, Kind: scpb.IndexColumn_KEY_SUFFIX, Direction: newPrimaryIndexKeyColumnDirs[i], }) @@ -631,10 +632,11 @@ func addIndexColumnsForNewUniqueSecondaryIndexAndTempIndex( TableID: tbl.TableID, IndexID: temporaryIndexIDForNewUniqueSecondaryIndex, ColumnID: keyColIDInNewPrimaryIndex, - OrdinalInKind: uint32(i), + OrdinalInKind: ord, Kind: scpb.IndexColumn_KEY_SUFFIX, Direction: newPrimaryIndexKeyColumnDirs[i], }) + ord++ } } } From b2156f8daf83a37ab6622f99920a48065c330c53 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Wed, 3 Aug 2022 22:24:40 +0000 Subject: [PATCH 3/4] sql: unit test, benchmark and logictests for COPY As a new/half supported feature COPY FROM STDIN support is under tested. Add tests and a simple benchmark to pave the way for enhancements and optimizations. Assists: #81731 Release note: None Release justification: non-production code change --- pkg/cmd/roachtest/tests/copyfrom.go | 13 +- pkg/sql/BUILD.bazel | 3 + pkg/sql/conn_executor.go | 5 +- pkg/sql/conn_executor_exec.go | 40 +- pkg/sql/copy.go | 10 +- pkg/sql/copy_file_upload.go | 2 +- pkg/sql/copy_from_test.go | 164 ++++++++ pkg/sql/copyshim.go | 157 ++++++++ pkg/sql/distsql_running.go | 46 +++ pkg/sql/logictest/logic.go | 66 +++ .../logictest/testdata/logic_test/copyfrom | 208 ++++++++++ .../tests/fakedist-disk/generated_test.go | 7 + .../tests/fakedist-vec-off/generated_test.go | 7 + .../tests/fakedist/generated_test.go | 7 + .../tests/local-vec-off/generated_test.go | 7 + .../logictest/tests/local/generated_test.go | 7 + pkg/sql/opt/bench/bench_test.go | 375 ++++++++++++++---- pkg/sql/planner.go | 2 +- pkg/sql/testutils.go | 40 ++ 19 files changed, 1037 insertions(+), 129 deletions(-) create mode 100644 pkg/sql/copy_from_test.go create mode 100644 pkg/sql/copyshim.go create mode 100644 pkg/sql/logictest/testdata/logic_test/copyfrom diff --git a/pkg/cmd/roachtest/tests/copyfrom.go b/pkg/cmd/roachtest/tests/copyfrom.go index 407774f538c5..5b2a478dc318 100644 --- a/pkg/cmd/roachtest/tests/copyfrom.go +++ b/pkg/cmd/roachtest/tests/copyfrom.go @@ -97,12 +97,19 @@ func runTest(ctx context.Context, t test.Test, c cluster.Cluster, pg string) { _, err = fmt.Sscan(det.Stdout, ©, &rows) require.NoError(t, err) rate := int(float64(rows) / dur.Seconds()) - t.L().Printf("results: %d rows/s", rate) + + det, err = c.RunWithDetailsSingleNode(ctx, t.L(), c.Node(1), "wc -c /tmp/lineitem-table.csv") + require.NoError(t, err) + var bytes float64 + _, err = fmt.Sscan(det.Stdout, &bytes) + require.NoError(t, err) + dataRate := bytes / 1024 / 1024 / dur.Seconds() + t.L().Printf("results: %d rows/s, %f mb/s", rate, dataRate) // Write the copy rate into the stats.json file to be used by roachperf. c.Run(ctx, c.Node(1), "mkdir", t.PerfArtifactsDir()) cmd := fmt.Sprintf( - `echo '{ "copy_rate": %d }' > %s/stats.json`, - rate, t.PerfArtifactsDir(), + `echo '{ "copy_row_rate": %d, "copy_data_rate": %f}' > %s/stats.json`, + rate, dataRate, t.PerfArtifactsDir(), ) c.Run(ctx, c.Node(1), cmd) } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 1556e670b689..c73b696d0784 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "control_schedules.go", "copy.go", "copy_file_upload.go", + "copyshim.go", "crdb_internal.go", "create_database.go", "create_extension.go", @@ -497,6 +498,7 @@ go_library( "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", "@com_github_gogo_protobuf//types", + "@com_github_jackc_pgproto3_v2//:pgproto3", "@com_github_lib_pq//:pq", "@com_github_lib_pq//oid", "@com_github_prometheus_client_model//go", @@ -532,6 +534,7 @@ go_test( "conn_executor_test.go", "conn_io_test.go", "constraint_test.go", + "copy_from_test.go", "copy_in_test.go", "copy_test.go", "crdb_internal_test.go", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index f092a922c0ce..6d9cf6fd8de3 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2335,7 +2335,7 @@ func isCopyToExternalStorage(cmd CopyIn) bool { // We handle the CopyFrom statement by creating a copyMachine and handing it // control over the connection until the copying is done. The contract is that, // when this is called, the pgwire.conn is not reading from the network -// connection any more until this returns. The copyMachine will to the reading +// connection any more until this returns. The copyMachine will do the reading // and writing up to the CommandComplete message. func (ex *connExecutor) execCopyIn( ctx context.Context, cmd CopyIn, @@ -2415,8 +2415,9 @@ func (ex *connExecutor) execCopyIn( if isCopyToExternalStorage(cmd) { cm, err = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg) } else { + p := planner{execCfg: ex.server.cfg, alloc: &tree.DatumAlloc{}} cm, err = newCopyMachine( - ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, + ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, // execInsertPlan func(ctx context.Context, p *planner, res RestrictedCommandResult) error { _, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index e0ae538236f5..8495040ede35 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1024,8 +1024,8 @@ func (ex *connExecutor) rollbackSQLTransaction( // dispatchToExecutionEngine executes the statement, writes the result to res // and returns an event for the connection's state machine. // -// If an error is returned, the connection needs to stop processing queries. -// Query execution errors are written to res; they are not returned; it is +// If an error is returned, the connection needs to stop processing queries.` +// Query execution errors are written to res; they are not returned; it is` // expected that the caller will inspect res and react to query errors by // producing an appropriate state machine event. func (ex *connExecutor) dispatchToExecutionEngine( @@ -1525,40 +1525,8 @@ func (ex *connExecutor) execWithDistSQLEngine( return &factoryEvalCtx } } - - if len(planner.curPlan.subqueryPlans) != 0 { - // Create a separate memory account for the results of the subqueries. - // Note that we intentionally defer the closure of the account until we - // return from this method (after the main query is executed). - subqueryResultMemAcc := planner.EvalContext().Mon.MakeBoundAccount() - defer subqueryResultMemAcc.Close(ctx) - if !ex.server.cfg.DistSQLPlanner.PlanAndRunSubqueries( - ctx, planner, evalCtxFactory, planner.curPlan.subqueryPlans, recv, &subqueryResultMemAcc, - // Skip the diagram generation since on this "main" query path we - // can get it via the statement bundle. - true, /* skipDistSQLDiagramGeneration */ - ) { - return *recv.stats, recv.commErr - } - } - recv.discardRows = planner.instrumentation.ShouldDiscardRows() - // We pass in whether or not we wanted to distribute this plan, which tells - // the planner whether or not to plan remote table readers. - cleanup := ex.server.cfg.DistSQLPlanner.PlanAndRun( - ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv, - ) - // Note that we're not cleaning up right away because postqueries might - // need to have access to the main query tree. - defer cleanup() - if recv.commErr != nil || res.Err() != nil { - return *recv.stats, recv.commErr - } - - ex.server.cfg.DistSQLPlanner.PlanAndRunCascadesAndChecks( - ctx, planner, evalCtxFactory, &planner.curPlan.planComponents, recv, - ) - - return *recv.stats, recv.commErr + err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory) + return *recv.stats, err } // beginTransactionTimestampsAndReadMode computes the timestamps and diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index 366e63d78947..86961cb3de6c 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -100,7 +100,7 @@ type copyMachine struct { // p is the planner used to plan inserts. preparePlanner() needs to be called // before preparing each new statement. - p planner + p *planner // parsingEvalCtx is an EvalContext used for the very limited needs to strings // parsing. Is it not correctly initialized with timestamps, transactions and @@ -115,8 +115,8 @@ func newCopyMachine( ctx context.Context, conn pgwirebase.Conn, n *tree.CopyFrom, + p *planner, txnOpt copyTxnOpt, - execCfg *ExecutorConfig, execInsertPlan func(ctx context.Context, p *planner, res RestrictedCommandResult) error, ) (_ *copyMachine, retErr error) { c := ©Machine{ @@ -129,7 +129,7 @@ func newCopyMachine( txnOpt: txnOpt, csvExpectHeader: n.Options.Header, // The planner will be prepared before use. - p: planner{execCfg: execCfg, alloc: &tree.DatumAlloc{}}, + p: p, execInsertPlan: execInsertPlan, } @@ -213,7 +213,7 @@ func newCopyMachine( } flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc) - _, tableDesc, err := resolver.ResolveExistingTableObject(ctx, &c.p, &n.Table, flags) + _, tableDesc, err := resolver.ResolveExistingTableObject(ctx, c.p, &n.Table, flags) if err != nil { return nil, err } @@ -755,7 +755,7 @@ func (c *copyMachine) insertRows(ctx context.Context) (retErr error) { } var res streamingCommandResult - err := c.execInsertPlan(ctx, &c.p, &res) + err := c.execInsertPlan(ctx, c.p, &res) if err != nil { return err } diff --git a/pkg/sql/copy_file_upload.go b/pkg/sql/copy_file_upload.go index bc6b17e540ec..9bc13885ab70 100644 --- a/pkg/sql/copy_file_upload.go +++ b/pkg/sql/copy_file_upload.go @@ -88,7 +88,7 @@ func newFileUploadMachine( c := ©Machine{ conn: conn, // The planner will be prepared before use. - p: planner{execCfg: execCfg, alloc: &tree.DatumAlloc{}}, + p: &planner{execCfg: execCfg, alloc: &tree.DatumAlloc{}}, } f = &fileUploadMachine{ c: c, diff --git a/pkg/sql/copy_from_test.go b/pkg/sql/copy_from_test.go new file mode 100644 index 000000000000..23fde1c6d9ef --- /dev/null +++ b/pkg/sql/copy_from_test.go @@ -0,0 +1,164 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql_test + +import ( + "context" + "fmt" + "runtime/pprof" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +var lineitemSchema string = `CREATE DATABASE c; CREATE TABLE c.lineitem ( + l_orderkey INT8 NOT NULL, + l_partkey INT8 NOT NULL, + l_suppkey INT8 NOT NULL, + l_linenumber INT8 NOT NULL, + l_quantity DECIMAL(15,2) NOT NULL, + l_extendedprice DECIMAL(15,2) NOT NULL, + l_discount DECIMAL(15,2) NOT NULL, + l_tax DECIMAL(15,2) NOT NULL, + l_returnflag CHAR(1) NOT NULL, + l_linestatus CHAR(1) NOT NULL, + l_shipdate DATE NOT NULL, + l_commitdate DATE NOT NULL, + l_receiptdate DATE NOT NULL, + l_shipinstruct CHAR(25) NOT NULL, + l_shipmode CHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL, + PRIMARY KEY (l_orderkey, l_linenumber), + INDEX l_ok (l_orderkey ASC), + INDEX l_pk (l_partkey ASC), + INDEX l_sk (l_suppkey ASC), + INDEX l_sd (l_shipdate ASC), + INDEX l_cd (l_commitdate ASC), + INDEX l_rd (l_receiptdate ASC), + INDEX l_pk_sk (l_partkey ASC, l_suppkey ASC), + INDEX l_sk_pk (l_suppkey ASC, l_partkey ASC) +)` + +const csvData = `%d|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the +` + +// TestCopyFrom is a simple test to verify RunCopyFrom works for benchmarking +// purposes. +func TestCopyFrom(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{ + Settings: cluster.MakeTestingClusterSettings(), + }) + defer s.Stopper().Stop(ctx) + + r := sqlutils.MakeSQLRunner(conn) + r.Exec(t, lineitemSchema) + rows := []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)} + numrows, err := sql.RunCopyFrom(ctx, s, "c", nil, "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", rows) + require.Equal(t, 2, numrows) + require.NoError(t, err) + + partKey := 0 + r.QueryRow(t, "SELECT l_partkey FROM c.lineitem WHERE l_orderkey = 1").Scan(&partKey) + require.Equal(t, 155190, partKey) +} + +// BenchmarkCopy measures copy performance against a TestServer. +func BenchmarkCopyFrom(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + ctx := context.Background() + + s, conn, _ := serverutils.StartServer(b, base.TestServerArgs{ + Settings: cluster.MakeTestingClusterSettings(), + }) + defer s.Stopper().Stop(ctx) + + r := sqlutils.MakeSQLRunner(conn) + r.Exec(b, lineitemSchema) + + // send data in 5 batches of 10k rows + ROWS := 50000 + datalen := 0 + var rows []string + for i := 0; i < ROWS; i++ { + row := fmt.Sprintf(csvData, i) + rows = append(rows, row) + datalen += len(row) + } + start := timeutil.Now() + pprof.Do(ctx, pprof.Labels("run", "copy"), func(ctx context.Context) { + rows, err := sql.RunCopyFrom(ctx, s, "c", nil, "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", rows) + require.NoError(b, err) + require.Equal(b, ROWS, rows) + }) + duration := timeutil.Since(start) + b.ReportMetric(float64(datalen)/(1024*1024)/duration.Seconds(), "mb/s") + b.ReportMetric(float64(ROWS)/duration.Seconds(), "rows/s") +} + +func BenchmarkParallelCopyFrom(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + ctx := context.Background() + + s, conn, _ := serverutils.StartServer(b, base.TestServerArgs{ + Settings: cluster.MakeTestingClusterSettings(), + }) + defer s.Stopper().Stop(ctx) + + r := sqlutils.MakeSQLRunner(conn) + r.Exec(b, lineitemSchema) + ROWS := 50000 + datalen := 0 + THREADS := 10 + var allrows [][]string + + chunk := ROWS / THREADS + for j := 0; j < THREADS; j++ { + var rows []string + for i := 0; i < chunk; i++ { + row := fmt.Sprintf(csvData, i+j*chunk) + rows = append(rows, row) + datalen += len(row) + } + allrows = append(allrows, rows) + } + + start := timeutil.Now() + var wg sync.WaitGroup + + for j := 0; j < THREADS; j++ { + wg.Add(1) + go func(j int) { + defer wg.Done() + count, err := sql.RunCopyFrom(ctx, s, "c", nil, "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", allrows[j]) + require.NoError(b, err) + require.Equal(b, chunk, count) + }(j) + } + wg.Wait() + duration := timeutil.Since(start) + b.ReportMetric(float64(datalen)/(1024*1024)/duration.Seconds(), "mb/s") + b.ReportMetric(float64(ROWS)/duration.Seconds(), "rows/s") +} diff --git a/pkg/sql/copyshim.go b/pkg/sql/copyshim.go new file mode 100644 index 000000000000..7dcd41f69aff --- /dev/null +++ b/pkg/sql/copyshim.go @@ -0,0 +1,157 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "bufio" + "bytes" + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/jackc/pgproto3/v2" +) + +type fakeConn struct { + pgwirebase.BufferedReader + rd *bufio.Reader +} + +// Rd returns a reader to be used to consume bytes from the connection. +func (c *fakeConn) Rd() pgwirebase.BufferedReader { + return c +} + +// Read is part of io.Reader interface. +func (c *fakeConn) Read(p []byte) (n int, err error) { + return c.rd.Read(p) +} + +// ReadString is part of pgwirebase.BufferedReader interface. +func (c *fakeConn) ReadString(delim byte) (string, error) { + return c.rd.ReadString(delim) +} + +// ReadByte is part of pgwirebase.BufferedReader interface. +func (c *fakeConn) ReadByte() (byte, error) { + return c.rd.ReadByte() +} + +// BeginCopyIn sends a message to the client about column info to client, +// not currently used in these tests but it is called by copyMachine.run. +func (c *fakeConn) BeginCopyIn( + ctx context.Context, columns []colinfo.ResultColumn, format pgwirebase.FormatCode, +) error { + return nil +} + +// SendCommandComplete sends a serverMsgCommandComplete with the given +// payload. +func (c *fakeConn) SendCommandComplete(tag []byte) error { + return nil +} + +// RunCopyFrom exposes copy functionality for the logictest "copy" command, its +// test-only code but not in test package because logictest isn't in a test package. +func RunCopyFrom( + ctx context.Context, + s serverutils.TestServerInterface, + db string, + txn *kv.Txn, + copySQL string, + data []string, +) (int, error) { + execCfg := s.ExecutorConfig().(ExecutorConfig) + dsp := execCfg.DistSQLPlanner + stmt, err := parser.ParseOne(copySQL) + if err != nil { + return -1, err + } + if txn == nil { + txn = s.DB().NewTxn(ctx, "test") + } + + // TODO(cucaroach): test open transaction and implicit txn, this will require + // a real client side/over the wire copy implementation logictest can use. + txnOpt := copyTxnOpt{txn: txn} + txnOpt.resetPlanner = func(ctx context.Context, p *planner, txn *kv.Txn, txnTS time.Time, stmtTS time.Time) { + p.cancelChecker.Reset(ctx) + p.optPlanningCtx.init(p) + } + p, cleanup := newInternalPlanner("copytest", + txn, + username.RootUserName(), + &MemoryMetrics{}, + &execCfg, + sessiondatapb.SessionData{ + Database: db, + }, + ) + // TODO(cucaroach): I believe newInternalPlanner should do this but doing it there causes lots of + // session diffs and test failures and is risky. + if err := p.sessionDataMutatorIterator.applyOnEachMutatorError(func(m sessionDataMutator) error { + return resetSessionVars(ctx, m) + }); err != nil { + panic(fmt.Sprintf("error setting up newInternalPlanner session: %s", err.Error())) + } + defer cleanup() + + var buf []byte + for _, d := range data { + b := make([]byte, 0, len(d)+10) + cd := pgproto3.CopyData{Data: []byte(d)} + b = cd.Encode(b) + buf = append(buf, b...) + } + + done := pgproto3.CopyDone{} + buf = done.Encode(buf) + + conn := &fakeConn{ + rd: bufio.NewReader(bytes.NewReader(buf)), + } + rows := 0 + c, err := newCopyMachine(ctx, conn, stmt.AST.(*tree.CopyFrom), p, txnOpt, + func(ctx context.Context, p *planner, res RestrictedCommandResult) error { + err := dsp.ExecLocalAll(ctx, execCfg, p, res) + if err != nil { + return err + } + rows += res.RowsAffected() + return nil + }, + ) + if err != nil { + return -1, err + } + + err = c.run(ctx) + if err != nil { + return -1, err + } + + if txn != nil { + err = txn.Commit(ctx) + if err != nil { + return -1, err + } + } + + return rows, nil +} diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index c7dd5246c6b1..46ad0694aa5f 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1305,6 +1305,52 @@ func (r *DistSQLReceiver) ProducerDone() { r.closed = true } +// PlanAndRunAll combines running the the main query, subqueries and cascades/checks. +// If an error is returned, the connection needs to stop processing queries. +// Query execution errors stored in recv; they are not returned. +func (dsp *DistSQLPlanner) PlanAndRunAll( + ctx context.Context, + evalCtx *extendedEvalContext, + planCtx *PlanningCtx, + planner *planner, + recv *DistSQLReceiver, + evalCtxFactory func() *extendedEvalContext, +) error { + if len(planner.curPlan.subqueryPlans) != 0 { + // Create a separate memory account for the results of the subqueries. + // Note that we intentionally defer the closure of the account until we + // return from this method (after the main query is executed). + subqueryResultMemAcc := planner.EvalContext().Mon.MakeBoundAccount() + defer subqueryResultMemAcc.Close(ctx) + if !dsp.PlanAndRunSubqueries( + ctx, planner, evalCtxFactory, planner.curPlan.subqueryPlans, recv, &subqueryResultMemAcc, + // Skip the diagram generation since on this "main" query path we + // can get it via the statement bundle. + true, /* skipDistSQLDiagramGeneration */ + ) { + return recv.commErr + } + } + recv.discardRows = planner.instrumentation.ShouldDiscardRows() + // We pass in whether or not we wanted to distribute this plan, which tells + // the planner whether or not to plan remote table readers. + cleanup := dsp.PlanAndRun( + ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv, + ) + // Note that we're not cleaning up right away because postqueries might + // need to have access to the main query tree. + defer cleanup() + if recv.commErr != nil || recv.resultWriter.Err() != nil { + return recv.commErr + } + + dsp.PlanAndRunCascadesAndChecks( + ctx, planner, evalCtxFactory, &planner.curPlan.planComponents, recv, + ) + + return recv.commErr +} + // PlanAndRunSubqueries returns false if an error was encountered and sets that // error in the provided receiver. Note that if false is returned, then this // function will have closed all the subquery plans because it assumes that the diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 8a294fe7dc03..84270c15fd54 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -234,6 +234,18 @@ import ( // Completes a pending statement with the provided name, validating its // results as expected per the given options to "statement async ...". // +// - copy,copy-error +// Runs a COPY FROM STDIN statement, because of the separate data chunk it requires +// special logictest support. Format is: +// copy +// COPY FROM STDIN; +// +// COPY DATA +// ---- +// +// copy-error is just like copy but an error is expected and results should be error +// string. +// // - query