From 1e6d4a65f0159ad25a55be8de0415f842bc47510 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Mon, 3 Oct 2022 19:13:28 -0400 Subject: [PATCH 1/2] sql/copy: update jackc/pgconn to fix copy bug Fixes: #88801 Release note: None --- DEPS.bzl | 12 ++++++------ build/bazelutil/distdir_files.bzl | 4 ++-- go.mod | 4 ++-- go.sum | 7 +++++-- vendor | 2 +- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 3477098e102a..1ab0a0dea06d 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4489,10 +4489,10 @@ def go_deps(): name = "com_github_jackc_pgconn", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgconn", - sha256 = "48d34064a1facff7766713d9224502e7376a5d90c1506f99a37c57bfceaf9636", - strip_prefix = "github.com/jackc/pgconn@v1.12.1", + sha256 = "4d9bf1309f5cdbd589d60a485fb5d5d7333edf9652c2dd47b7dd31b12dda887e", + strip_prefix = "github.com/jackc/pgconn@v1.13.1-0.20221001150415-49cbf4659151", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgconn/com_github_jackc_pgconn-v1.12.1.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgconn/com_github_jackc_pgconn-v1.13.1-0.20221001150415-49cbf4659151.zip", ], ) go_repository( @@ -4539,10 +4539,10 @@ def go_deps(): name = "com_github_jackc_pgproto3_v2", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgproto3/v2", - sha256 = "6b702c372e13520636243d3be58922968f0630b67e23ba77326ef6ee4cada463", - strip_prefix = "github.com/jackc/pgproto3/v2@v2.3.0", + sha256 = "57884e299825af31fd01268659f1e671883b73b708a51230da14d6f8ee0e4e36", + strip_prefix = "github.com/jackc/pgproto3/v2@v2.3.1", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.0.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.1.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 72d821a1b95a..dcafff56ca89 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -487,12 +487,12 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/j-keck/arping/com_github_j_keck_arping-v0.0.0-20160618110441-2cf9dc699c56.zip": "6001c94a8c4eed55718f627346cb685cce67369ca5c29ae059f58f7abd8bd8a7", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/chunkreader/com_github_jackc_chunkreader-v1.0.0.zip": "e204c917e2652ffe047f5c8b031192757321f568654e3df8408bf04178df1408", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/chunkreader/v2/com_github_jackc_chunkreader_v2-v2.0.1.zip": "6e3f4b7d9647f31061f6446ae10de71fc1407e64f84cd0949afac0cd231e8dd2", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgconn/com_github_jackc_pgconn-v1.12.1.zip": "48d34064a1facff7766713d9224502e7376a5d90c1506f99a37c57bfceaf9636", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgconn/com_github_jackc_pgconn-v1.13.1-0.20221001150415-49cbf4659151.zip": "4d9bf1309f5cdbd589d60a485fb5d5d7333edf9652c2dd47b7dd31b12dda887e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgio/com_github_jackc_pgio-v1.0.0.zip": "1a83c03d53f6a40339364cafcbbabb44238203c79ca0c9b98bf582d0df0e0468", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgmock/com_github_jackc_pgmock-v0.0.0-20210724152146-4ad1a8207f65.zip": "0fffd0a7a67dbdfafa04297e51028c6d2d08cd6691f3b6d78d7ae6502d3d4cf2", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgpassfile/com_github_jackc_pgpassfile-v1.0.0.zip": "1cc79fb0b80f54b568afd3f4648dd1c349f746ad7c379df8d7f9e0eb1cac938b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/com_github_jackc_pgproto3-v1.1.0.zip": "e3766bee50ed74e49a067b2c4797a2c69015cf104bf3f3624cd483a9e940b4ee", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.0.zip": "6b702c372e13520636243d3be58922968f0630b67e23ba77326ef6ee4cada463", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.1.zip": "57884e299825af31fd01268659f1e671883b73b708a51230da14d6f8ee0e4e36", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20200714003250-2b9c44734f2b.zip": "8422a25b9d2b0be05c66ee1ccfdbaab144ce98f1ac678bc647064c560d4cd6e2", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgtype/com_github_jackc_pgtype-v1.11.0.zip": "6a257b81c0bd386d6241219a14ebd41d574a02aeaeb3942670c06441b864dcad", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v4/com_github_jackc_pgx_v4-v4.16.1.zip": "c3a169a68ff0e56f9f81eee4de4d2fd2a5ec7f4d6be159159325f4863c80bd10", diff --git a/go.mod b/go.mod index 02c6586a930e..a25240294077 100644 --- a/go.mod +++ b/go.mod @@ -94,8 +94,8 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/guptarohit/asciigraph v0.5.5 github.com/irfansharif/recorder v0.0.0-20211218081646-a21b46510fd6 - github.com/jackc/pgconn v1.12.1 - github.com/jackc/pgproto3/v2 v2.3.0 + github.com/jackc/pgconn v1.13.1-0.20221001150415-49cbf4659151 + github.com/jackc/pgproto3/v2 v2.3.1 github.com/jackc/pgtype v1.11.0 github.com/jackc/pgx/v4 v4.16.1 github.com/jaegertracing/jaeger v1.18.1 diff --git a/go.sum b/go.sum index 74a19268d36b..901d39119752 100644 --- a/go.sum +++ b/go.sum @@ -1316,8 +1316,9 @@ github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpT github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= -github.com/jackc/pgconn v1.12.1 h1:rsDFzIpRk7xT4B8FufgpCCeyjdNpKyghZeSefViE5W8= github.com/jackc/pgconn v1.12.1/go.mod h1:ZkhRC59Llhrq3oSfrikvwQ5NaxYExr6twkdkMLaKono= +github.com/jackc/pgconn v1.13.1-0.20221001150415-49cbf4659151 h1:bPKk32KlGLQ/SWtOc3onTzZ/km0BIiwwYBqt+8YlKwU= +github.com/jackc/pgconn v1.13.1-0.20221001150415-49cbf4659151/go.mod h1:AnowpAqO4CMIIJNZl2VJp+KrkAZciAkhEl0W0JIobpI= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= @@ -1334,8 +1335,9 @@ github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1: github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgproto3/v2 v2.3.0 h1:brH0pCGBDkBW07HWlN/oSBXrmo3WB0UvZd1pIuDcL8Y= github.com/jackc/pgproto3/v2 v2.3.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.3.1 h1:nwj7qwf0S+Q7ISFfBndqeLwSwxs+4DPsbRFjECT1Y4Y= +github.com/jackc/pgproto3/v2 v2.3.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= @@ -2361,6 +2363,7 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8 h1:GIAS/yBem/gq2MUqgNIzUHW7cJMmx3TGZOrnyYaNQ6c= golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/vendor b/vendor index e05302b43589..1e5720b5eac2 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit e05302b435896a94a4fde003c37d07f96f8018c7 +Subproject commit 1e5720b5eac24eaace1795f29cf745a20cc1db0c From 2113dbca38ae6cabcb14e28cecb5687644661965 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Mon, 19 Sep 2022 12:21:41 -0400 Subject: [PATCH 2/2] sql/copy: fix test infrastructure Previously we hacked around lack of copy protocol support in logictest by doing an in memory hack that bypassed the client connection entirely. Now that the client supports use pgx to support copy use that instead. Also move copy test stuff to its own package. Fixes: #84619 Release note: None --- pkg/BUILD.bazel | 3 + pkg/cli/clisqlclient/api.go | 13 +- pkg/cli/nodelocal.go | 2 +- pkg/cli/userfile.go | 2 +- pkg/sql/BUILD.bazel | 3 - pkg/sql/copy.go | 14 +- pkg/sql/copy/BUILD.bazel | 35 ++ pkg/sql/copy/copy_test.go | 307 +++++++++++++++++ pkg/sql/copy/main_test.go | 31 ++ .../logic_test => copy/testdata}/copyfrom | 109 +++--- pkg/sql/copy_from_test.go | 324 ------------------ pkg/sql/copyshim.go | 161 --------- pkg/sql/logictest/logic.go | 57 --- .../tests/fakedist-disk/generated_test.go | 7 - .../tests/fakedist-vec-off/generated_test.go | 7 - .../tests/fakedist/generated_test.go | 7 - .../generated_test.go | 7 - .../tests/local-vec-off/generated_test.go | 7 - .../logictest/tests/local/generated_test.go | 7 - 19 files changed, 443 insertions(+), 660 deletions(-) create mode 100644 pkg/sql/copy/BUILD.bazel create mode 100644 pkg/sql/copy/copy_test.go create mode 100644 pkg/sql/copy/main_test.go rename pkg/sql/{logictest/testdata/logic_test => copy/testdata}/copyfrom (78%) delete mode 100644 pkg/sql/copy_from_test.go delete mode 100644 pkg/sql/copyshim.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index e41e8d6971d2..4f4a6cdc8d44 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -344,6 +344,7 @@ ALL_TESTS = [ "//pkg/sql/contention/txnidcache:txnidcache_test", "//pkg/sql/contention:contention_test", "//pkg/sql/contentionpb:contentionpb_test", + "//pkg/sql/copy:copy_test", "//pkg/sql/covering:covering_test", "//pkg/sql/decodeusername:decodeusername_test", "//pkg/sql/delegate:delegate_test", @@ -1466,6 +1467,7 @@ GO_TARGETS = [ "//pkg/sql/contention:contention_test", "//pkg/sql/contentionpb:contentionpb", "//pkg/sql/contentionpb:contentionpb_test", + "//pkg/sql/copy:copy_test", "//pkg/sql/covering:covering", "//pkg/sql/covering:covering_test", "//pkg/sql/decodeusername:decodeusername", @@ -2643,6 +2645,7 @@ GET_X_DATA_TARGETS = [ "//pkg/sql/contention/contentionutils:get_x_data", "//pkg/sql/contention/txnidcache:get_x_data", "//pkg/sql/contentionpb:get_x_data", + "//pkg/sql/copy:get_x_data", "//pkg/sql/covering:get_x_data", "//pkg/sql/decodeusername:get_x_data", "//pkg/sql/delegate:get_x_data", diff --git a/pkg/cli/clisqlclient/api.go b/pkg/cli/clisqlclient/api.go index fa87333231e4..5840eca3257a 100644 --- a/pkg/cli/clisqlclient/api.go +++ b/pkg/cli/clisqlclient/api.go @@ -180,7 +180,7 @@ type TxBoundConn interface { type DriverConn interface { Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) Exec(ctx context.Context, query string, args ...interface{}) error - CopyFrom(ctx context.Context, reader io.Reader, query string) error + CopyFrom(ctx context.Context, reader io.Reader, query string) (int64, error) } type driverConnAdapter struct { @@ -200,7 +200,12 @@ func (d *driverConnAdapter) Exec(ctx context.Context, query string, args ...inte return d.c.Exec(ctx, query, args...) } -func (d *driverConnAdapter) CopyFrom(ctx context.Context, reader io.Reader, query string) error { - _, err := d.c.conn.PgConn().CopyFrom(ctx, reader, query) - return err +func (d *driverConnAdapter) CopyFrom( + ctx context.Context, reader io.Reader, query string, +) (int64, error) { + cmdTag, err := d.c.conn.PgConn().CopyFrom(ctx, reader, query) + if err != nil { + return -1, err + } + return cmdTag.RowsAffected(), nil } diff --git a/pkg/cli/nodelocal.go b/pkg/cli/nodelocal.go index fa4f9ed14e42..612f838dd805 100644 --- a/pkg/cli/nodelocal.go +++ b/pkg/cli/nodelocal.go @@ -144,7 +144,7 @@ func uploadFile( } } - if err := ex.CopyFrom(ctx, bytes.NewReader(send), stmt); err != nil { + if _, err := ex.CopyFrom(ctx, bytes.NewReader(send), stmt); err != nil { return err } diff --git a/pkg/cli/userfile.go b/pkg/cli/userfile.go index 63821b75c3b2..10d379c73a96 100644 --- a/pkg/cli/userfile.go +++ b/pkg/cli/userfile.go @@ -622,7 +622,7 @@ func uploadUserFile( } } - if err := ex.CopyFrom(ctx, bytes.NewReader(send), stmt); err != nil { + if _, err := ex.CopyFrom(ctx, bytes.NewReader(send), stmt); err != nil { return "", err } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index d36f58033c4e..4ede49d16aad 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -51,7 +51,6 @@ go_library( "control_schedules.go", "copy.go", "copy_file_upload.go", - "copyshim.go", "crdb_internal.go", "create_database.go", "create_extension.go", @@ -508,7 +507,6 @@ go_library( "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", "@com_github_gogo_protobuf//types", - "@com_github_jackc_pgproto3_v2//:pgproto3", "@com_github_lib_pq//:pq", "@com_github_lib_pq//oid", "@com_github_prometheus_client_model//go", @@ -545,7 +543,6 @@ go_test( "conn_executor_test.go", "conn_io_test.go", "constraint_test.go", - "copy_from_test.go", "copy_in_test.go", "copy_test.go", "crdb_internal_test.go", diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index 0a54e817ba07..b2ccafcf3292 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -46,6 +47,18 @@ const CopyBatchRowSizeDefault = 100 // When this many rows are in the copy buffer, they are inserted. var copyBatchRowSize = util.ConstantWithMetamorphicTestRange("copy-batch-size", CopyBatchRowSizeDefault, 1, 10000) +// SetCopyFromBatchSize exports overriding copy batch size for test code. +func SetCopyFromBatchSize(i int) int { + old := copyBatchRowSize + if buildutil.CrdbTestBuild { + copyBatchRowSize = i + } else { + // We don't want non-test code mutating globals. + panic("SetCopyFromBatchSize is a test utility that requires crdb_test tag") + } + return old +} + type copyMachineInterface interface { run(ctx context.Context) error numInsertedRows() int @@ -767,7 +780,6 @@ func (p *planner) preparePlannerForCopy( txn = kv.NewTxnWithSteppingEnabled(ctx, p.execCfg.DB, nodeID, sessiondatapb.Normal) txnTs = p.execCfg.Clock.PhysicalTime() stmtTs = txnTs - } txnOpt.resetPlanner(ctx, p, txn, txnTs, stmtTs) if implicitTxn { diff --git a/pkg/sql/copy/BUILD.bazel b/pkg/sql/copy/BUILD.bazel new file mode 100644 index 000000000000..7b326532b8fb --- /dev/null +++ b/pkg/sql/copy/BUILD.bazel @@ -0,0 +1,35 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "copy_test", + srcs = [ + "copy_test.go", + "main_test.go", + ], + args = ["-test.timeout=295s"], + data = glob(["testdata/**"]), + deps = [ + "//pkg/base", + "//pkg/cli/clisqlclient", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/security/username", + "//pkg/server", + "//pkg/settings/cluster", + "//pkg/sql", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_cockroachdb_apd_v3//:apd", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_jackc_pgtype//:pgtype", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/sql/copy/copy_test.go b/pkg/sql/copy/copy_test.go new file mode 100644 index 000000000000..a3f3cb7e2044 --- /dev/null +++ b/pkg/sql/copy/copy_test.go @@ -0,0 +1,307 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package copy_test + +import ( + "context" + "database/sql/driver" + "fmt" + "io" + "net/url" + "runtime/pprof" + "strings" + "testing" + + "github.com/cockroachdb/apd/v3" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/cli/clisqlclient" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/datadriven" + "github.com/jackc/pgtype" + "github.com/stretchr/testify/require" +) + +const lineitemSchema string = `CREATE TABLE lineitem ( + l_orderkey INT8 NOT NULL, + l_partkey INT8 NOT NULL, + l_suppkey INT8 NOT NULL, + l_linenumber INT8 NOT NULL, + l_quantity DECIMAL(15,2) NOT NULL, + l_extendedprice DECIMAL(15,2) NOT NULL, + l_discount DECIMAL(15,2) NOT NULL, + l_tax DECIMAL(15,2) NOT NULL, + l_returnflag CHAR(1) NOT NULL, + l_linestatus CHAR(1) NOT NULL, + l_shipdate DATE NOT NULL, + l_commitdate DATE NOT NULL, + l_receiptdate DATE NOT NULL, + l_shipinstruct CHAR(25) NOT NULL, + l_shipmode CHAR(10) NOT NULL, + l_comment VARCHAR(44) NOT NULL, + PRIMARY KEY (l_orderkey, l_linenumber), + INDEX l_ok (l_orderkey ASC), + INDEX l_pk (l_partkey ASC), + INDEX l_sk (l_suppkey ASC), + INDEX l_sd (l_shipdate ASC), + INDEX l_cd (l_commitdate ASC), + INDEX l_rd (l_receiptdate ASC), + INDEX l_pk_sk (l_partkey ASC, l_suppkey ASC), + INDEX l_sk_pk (l_suppkey ASC, l_partkey ASC) +)` + +const csvData = `%d|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the` + +func TestCopy(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Settings: cluster.MakeTestingClusterSettings(), + }) + defer s.Stopper().Stop(ctx) + + url, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(username.RootUser)) + defer cleanup() + var sqlConnCtx clisqlclient.Context + conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, url.String()) + + testCopy := func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "exec-ddl": + err := conn.Exec(ctx, d.Input) + if err != nil { + require.NoError(t, err, "%s: %s", d.Pos, d.Cmd) + } + return "" + case "copy", "copy-error": + lines := strings.Split(d.Input, "\n") + stmt := lines[0] + data := strings.Join(lines[1:], "\n") + rows, err := conn.GetDriverConn().CopyFrom(ctx, strings.NewReader(data), stmt) + if d.Cmd == "copy" { + require.NoError(t, err, "%s\n%s\n", d.Cmd, d.Input) + require.Equal(t, int(rows), len(lines)-1, "Not all rows were inserted") + } else { + return err.Error() + } + return fmt.Sprintf("%d", rows) + case "query": + rows, err := conn.Query(ctx, d.Input) + require.NoError(t, err) + + vals := make([]driver.Value, len(rows.Columns())) + var results string + for { + if err := rows.Next(vals); err == io.EOF { + break + } else if err != nil { + require.NoError(t, err) + } + for i, v := range vals { + if i > 0 { + results += "|" + } + results += fmt.Sprintf("%v", v) + } + results += "\n" + } + err = rows.Close() + require.NoError(t, err) + return results + default: + return fmt.Sprintf("unknown command: %s\n", d.Cmd) + } + + } + datadriven.RunTest(t, testutils.TestDataPath(t, "copyfrom"), testCopy) +} + +// TestCopyFromTransaction tests that copy from rows are written with +// same transaction timestamp when done under an explicit transaction, +// copy rows are same transaction when done with default settings and +// batches are in separate transactions when non atomic mode is enabled. +func TestCopyFromTransaction(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Settings: cluster.MakeTestingClusterSettings(), + }) + defer s.Stopper().Stop(ctx) + + url, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "copytest", url.User(username.RootUser)) + defer cleanup() + var sqlConnCtx clisqlclient.Context + + decEq := func(v1, v2 driver.Value) bool { + valToDecimal := func(v driver.Value) *apd.Decimal { + mt, ok := v.(pgtype.Numeric) + require.True(t, ok) + buf, err := mt.EncodeText(nil, nil) + require.NoError(t, err) + decimal, _, err := apd.NewFromString(string(buf)) + require.NoError(t, err) + return decimal + } + return valToDecimal(v1).Cmp(valToDecimal(v2)) == 0 + } + + testCases := []struct { + name string + query string + data []string + testf func(clisqlclient.Conn, func(clisqlclient.Conn)) + result func(f1, f2 driver.Value) bool + }{ + { + "explicit_copy", + "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", + []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)}, + func(tconn clisqlclient.Conn, f func(tconn clisqlclient.Conn)) { + err := tconn.Exec(ctx, "BEGIN") + require.NoError(t, err) + f(tconn) + err = tconn.Exec(ctx, "COMMIT") + require.NoError(t, err) + }, + decEq, + }, + { + "implicit_atomic", + "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", + []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)}, + func(tconn clisqlclient.Conn, f func(tconn clisqlclient.Conn)) { + err := tconn.Exec(ctx, "SET copy_from_atomic_enabled = true") + require.NoError(t, err) + orig := sql.SetCopyFromBatchSize(1) + defer sql.SetCopyFromBatchSize(orig) + f(tconn) + }, + decEq, + }, + { + "implicit_non_atomic", + "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", + []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)}, + func(tconn clisqlclient.Conn, f func(tconn clisqlclient.Conn)) { + err := tconn.Exec(ctx, "SET copy_from_atomic_enabled = false") + require.NoError(t, err) + orig := sql.SetCopyFromBatchSize(1) + defer sql.SetCopyFromBatchSize(orig) + f(tconn) + }, + func(f1, f2 driver.Value) bool { return !decEq(f1, f2) }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tconn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, url.String()) + tc.testf(tconn, func(tconn clisqlclient.Conn) { + // Without this everything comes back as strings + tconn.SetAlwaysInferResultTypes(true) + // Put each test in its own db so they can be parallelized. + err := tconn.Exec(ctx, fmt.Sprintf("CREATE DATABASE %s; USE %s", tc.name, tc.name)) + require.NoError(t, err) + err = tconn.Exec(ctx, lineitemSchema) + require.NoError(t, err) + numrows, err := tconn.GetDriverConn().CopyFrom(ctx, strings.NewReader(strings.Join(tc.data, "\n")), tc.query) + require.NoError(t, err) + require.Equal(t, len(tc.data), int(numrows)) + + result, err := tconn.QueryRow(ctx, "SELECT l_partkey FROM lineitem WHERE l_orderkey = 1") + require.NoError(t, err) + partKey, ok := result[0].(int64) + require.True(t, ok) + require.Equal(t, int64(155190), partKey) + + results, err := tconn.Query(ctx, "SELECT crdb_internal_mvcc_timestamp FROM lineitem") + require.NoError(t, err) + var lastts driver.Value + firstTime := true + vals := make([]driver.Value, 1) + for { + err = results.Next(vals) + if err == io.EOF { + break + } + require.NoError(t, err) + if !firstTime { + require.True(t, tc.result(lastts, vals[0])) + } else { + firstTime = false + } + lastts = vals[0] + } + }) + err := tconn.Exec(ctx, "TRUNCATE TABLE lineitem") + require.NoError(t, err) + }) + } +} + +// BenchmarkCopyFrom measures copy performance against a TestServer. +func BenchmarkCopyFrom(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + ctx := context.Background() + + s, _, _ := serverutils.StartServer(b, base.TestServerArgs{ + Settings: cluster.MakeTestingClusterSettings(), + }) + defer s.Stopper().Stop(ctx) + + url, cleanup := sqlutils.PGUrl(b, s.ServingSQLAddr(), "copytest", url.User(username.RootUser)) + defer cleanup() + var sqlConnCtx clisqlclient.Context + conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, url.String()) + + err := conn.Exec(ctx, lineitemSchema) + require.NoError(b, err) + + // send data in 5 batches of 10k rows + const ROWS = sql.CopyBatchRowSizeDefault * 4 + datalen := 0 + var rows []string + for i := 0; i < ROWS; i++ { + row := fmt.Sprintf(csvData, i) + rows = append(rows, row) + datalen += len(row) + } + rowsize := datalen / ROWS + for _, batchSizeFactor := range []float64{.5, 1, 2, 4} { + batchSize := int(batchSizeFactor * sql.CopyBatchRowSizeDefault) + b.Run(fmt.Sprintf("%d", batchSize), func(b *testing.B) { + actualRows := rows[:batchSize] + for i := 0; i < b.N; i++ { + pprof.Do(ctx, pprof.Labels("run", "copy"), func(ctx context.Context) { + numrows, err := conn.GetDriverConn().CopyFrom(ctx, strings.NewReader(strings.Join(actualRows, "\n")), "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';") + require.NoError(b, err) + require.Equal(b, int(numrows), len(actualRows)) + }) + b.StopTimer() + err = conn.Exec(ctx, "TRUNCATE TABLE lineitem") + require.NoError(b, err) + b.StartTimer() + } + b.SetBytes(int64(len(actualRows) * rowsize)) + }) + } +} diff --git a/pkg/sql/copy/main_test.go b/pkg/sql/copy/main_test.go new file mode 100644 index 000000000000..3b6f31d600b4 --- /dev/null +++ b/pkg/sql/copy/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package copy_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/sql/logictest/testdata/logic_test/copyfrom b/pkg/sql/copy/testdata/copyfrom similarity index 78% rename from pkg/sql/logictest/testdata/logic_test/copyfrom rename to pkg/sql/copy/testdata/copyfrom index c7c1ebe4e3b4..7ecfb437070e 100644 --- a/pkg/sql/logictest/testdata/logic_test/copyfrom +++ b/pkg/sql/copy/testdata/copyfrom @@ -1,108 +1,93 @@ -# LogicTest: !3node-tenant -subtest basic - -statement ok +exec-ddl CREATE TABLE t (a int) +---- # builtins not allowed copy-error COPY t FROM STDIN - random() ---- -could not parse "random()" as type int: strconv.ParseInt: parsing "random()": invalid syntax +ERROR: could not parse "random()" as type int: strconv.ParseInt: parsing "random()": invalid syntax (SQLSTATE 22P02) copy COPY t FROM STDIN - ---- 0 copy COPY t FROM STDIN - 1 ---- 1 copy COPY t FROM STDIN - 1 2 ---- 2 -query I rowsort +query SELECT * FROM t ---- 1 1 2 -subtest types - -statement ok +exec-ddl CREATE TABLE t2 (i INT, d DATE, dec DECIMAL, t TIMESTAMP) +---- copy COPY t2 FROM STDIN - 1 1996-03-13 12.123 2016-01-25 10:10:10.555555 ---- 1 -query IT rowsort +query SELECT i,dec FROM t2 ---- -1 12.123 +1|12.123 copy-error COPY t2 FROM STDIN - a 1996-03-13 12.123 2016-01-25 10:10:10.555555 ---- -could not parse "a" as type int: strconv.ParseInt: parsing "a": invalid syntax +ERROR: could not parse "a" as type int: strconv.ParseInt: parsing "a": invalid syntax (SQLSTATE 22P02) copy-error COPY t2 FROM STDIN - 1 2 12.123 2016-01-25 10:10:10.555555 ---- -parsing as type date: missing required date fields +ERROR: parsing as type date: missing required date fields (SQLSTATE 22007) copy-error COPY t2 FROM STDIN - 1 1996-03-13 not a decimal 2016-01-25 10:10:10.555555 ---- -could not parse "not a decimal" as type decimal: parse exponent: cimal: strconv.ParseInt: parsing "cimal": invalid syntax +ERROR: could not parse "not a decimal" as type decimal: parse exponent: cimal: strconv.ParseInt: parsing "cimal": invalid syntax (SQLSTATE 22P02) copy-error COPY t2 FROM STDIN - 1 1996-03-13 12.123 not a timestamp ---- -parsing as type timestamp: could not parse "not a timestamp" +ERROR: parsing as type timestamp: could not parse "not a timestamp" (SQLSTATE 22007) copy-error COPY t2 FROM STDIN - 1 1996-03-13 12.123 ---- -expected 4 values, got 3 +ERROR: expected 4 values, got 3 (SQLSTATE 22P04) copy-error COPY t2 FROM STDIN - 1 1996-03-13 12.123 2016-01-25 10:10:10.555555 extra col ---- -expected 4 values, got 5 +ERROR: expected 4 values, got 5 (SQLSTATE 22P04) # now is allowed copy COPY t2 FROM STDIN - 2 1996-03-13 12.123 now ---- 1 @@ -110,7 +95,6 @@ COPY t2 FROM STDIN # now is allowed copy COPY t2 FROM STDIN - 3 1996-03-13 12.123 now() ---- 1 @@ -118,110 +102,105 @@ COPY t2 FROM STDIN # expressions are not allowed copy-error COPY t2 FROM STDIN - 2 1996-03-13 12.123 now()-1 ---- -parsing as type timestamp: could not parse "now()-1" +ERROR: parsing as type timestamp: could not parse "now()-1" (SQLSTATE 22007) -query I +query SELECT count(t) FROM t2 WHERE t > now() ---- 0 copy COPY t2 FROM STDIN - \N \N \N \N ---- 1 copy-error COPY t2 FROM STDIN WITH DESTINATION = 'foo.csv' - \N \N \N \N ---- -DESTINATION can only be specified when table is external storage table +ERROR: DESTINATION can only be specified when table is external storage table (SQLSTATE 0A000) -subtest constraints +#subtest constraints -statement ok +exec-ddl CREATE TABLE t3 (i INT CHECK (i > 0)) +---- copy-error COPY t3 FROM STDIN - 0 ---- -failed to satisfy CHECK constraint (i > 0:::INT8) +ERROR: failed to satisfy CHECK constraint (i > 0:::INT8) (SQLSTATE 23514) # Foreign key checks happen -statement ok +exec-ddl CREATE TABLE parent (k INT PRIMARY KEY); CREATE TABLE child (k INT PRIMARY KEY REFERENCES parent) +---- copy-error COPY child FROM STDIN - 1 ---- -insert on table "child" violates foreign key constraint "child_k_fkey" +ERROR: insert on table "child" violates foreign key constraint "child_k_fkey" (SQLSTATE 23503) -statement ok +exec-ddl CREATE TABLE t4 (i INT UNIQUE) +---- copy-error COPY t4 FROM STDIN - 1 1 ---- -duplicate key value violates unique constraint "t4_i_key" +ERROR: duplicate key value violates unique constraint "t4_i_key" (SQLSTATE 23505) -subtest defaults +#subtest defaults # Default column values tests -statement ok -CREATE table tdefaults (i INT PRIMARY KEY DEFAULT unique_rowid(), d INT NOT NULL DEFAULT -1, x INT) +exec-ddl +CREATE table tdefaults (i INT PRIMARY KEY DEFAULT unique_rowid(), d INT NOT NULL +DEFAULT -1, x INT) +---- copy COPY tdefaults(x) FROM STDIN - 1 ---- 1 copy COPY tdefaults(x,d) FROM STDIN - 1 2 ---- 1 copy COPY tdefaults FROM STDIN - 1 1 1 ---- 1 -query II rowsort +query SELECT d,x FROM tdefaults ---- -1 1 --1 1 -2 1 +1|1 +-1|1 +2|1 -subtest end -subtest array_decoding +#subtest array_decoding -statement ok +exec-ddl CREATE TABLE test_copy_array (id INT PRIMARY KEY, data TEXT[]) +---- copy COPY test_copy_array(id,data) FROM STDIN - 1 {} 2 {} 3 {} @@ -480,14 +459,12 @@ COPY test_copy_array(id,data) FROM STDIN ---- 255 -query ITT +query SELECT id, data AS got, array[chr(id)] AS want FROM test_copy_array WHERE data != ARRAY[chr(id)] ---- -subtest end - # Regression test for #87011 -statement ok +exec-ddl CREATE TABLE tab ( col1 STRING, col2 STRING, @@ -508,10 +485,10 @@ CREATE TABLE tab_child ( PRIMARY KEY (col1, col2, col3, col4, col5) using hash, FOREIGN KEY (col5, col6) REFERENCES tab (col5, col6) ) +---- copy-error COPY tab_child FROM STDIN - 'high' 'straight' 'writer' 'develop' 'shells' 'bean' 'basic' 'tent' 'compound' 'it' 'future' 'held' 'bite' 'bring' 'taught' 'world' 'themselves' 'airplane' @@ -521,4 +498,4 @@ COPY tab_child FROM STDIN 'grabbed' 'reader' 'number' 'serve' 'fill' 'wonderful' 'tower' 'former' 'mainly' 'point' 'class' 'idea' ---- -insert on table "tab_child" violates foreign key constraint "tab_child_col5_col6_fkey" +ERROR: insert on table "tab_child" violates foreign key constraint "tab_child_col5_col6_fkey" (SQLSTATE 23503) diff --git a/pkg/sql/copy_from_test.go b/pkg/sql/copy_from_test.go deleted file mode 100644 index 87f5b774c6fe..000000000000 --- a/pkg/sql/copy_from_test.go +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql_test - -import ( - "context" - "fmt" - "runtime/pprof" - "sync" - "testing" - - "github.com/cockroachdb/apd/v3" - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/stretchr/testify/require" -) - -var lineitemSchema string = `CREATE DATABASE c; CREATE TABLE c.lineitem ( - l_orderkey INT8 NOT NULL, - l_partkey INT8 NOT NULL, - l_suppkey INT8 NOT NULL, - l_linenumber INT8 NOT NULL, - l_quantity DECIMAL(15,2) NOT NULL, - l_extendedprice DECIMAL(15,2) NOT NULL, - l_discount DECIMAL(15,2) NOT NULL, - l_tax DECIMAL(15,2) NOT NULL, - l_returnflag CHAR(1) NOT NULL, - l_linestatus CHAR(1) NOT NULL, - l_shipdate DATE NOT NULL, - l_commitdate DATE NOT NULL, - l_receiptdate DATE NOT NULL, - l_shipinstruct CHAR(25) NOT NULL, - l_shipmode CHAR(10) NOT NULL, - l_comment VARCHAR(44) NOT NULL, - PRIMARY KEY (l_orderkey, l_linenumber), - INDEX l_ok (l_orderkey ASC), - INDEX l_pk (l_partkey ASC), - INDEX l_sk (l_suppkey ASC), - INDEX l_sd (l_shipdate ASC), - INDEX l_cd (l_commitdate ASC), - INDEX l_rd (l_receiptdate ASC), - INDEX l_pk_sk (l_partkey ASC, l_suppkey ASC), - INDEX l_sk_pk (l_suppkey ASC, l_partkey ASC) -)` - -const csvData = `%d|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the -` - -func doCopyEx( - ctx context.Context, - t require.TestingT, - s serverutils.TestServerInterface, - txn *kv.Txn, - rows []string, - batchSizeOverride int, - atomic bool, -) { - numrows, err := sql.RunCopyFrom(ctx, s, "c", nil /* txn */, "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", rows, batchSizeOverride, atomic) - require.NoError(t, err) - require.Equal(t, len(rows), numrows) -} - -func doCopyImplicit( - ctx context.Context, t require.TestingT, s serverutils.TestServerInterface, rows []string, -) { - doCopyEx(ctx, t, s, nil, rows, 0, true) -} - -func doCopyWithTxn( - ctx context.Context, - t require.TestingT, - s serverutils.TestServerInterface, - txn *kv.Txn, - rows []string, -) { - doCopyEx(ctx, t, s, txn, rows, 0, true) -} - -func doCopyOneRowBatches( - ctx context.Context, - t require.TestingT, - s serverutils.TestServerInterface, - rows []string, - atomic bool, -) { - doCopyEx(ctx, t, s, nil, rows, 1, atomic) -} - -// TestCopyFrom is a simple test to verify RunCopyFrom works for benchmarking -// purposes. -func TestCopyFrom(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - - s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettings(), - }) - defer s.Stopper().Stop(ctx) - - r := sqlutils.MakeSQLRunner(conn) - r.Exec(t, lineitemSchema) - rows := []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)} - doCopyImplicit(ctx, t, s, rows) - - partKey := 0 - r.QueryRow(t, "SELECT l_partkey FROM c.lineitem WHERE l_orderkey = 1").Scan(&partKey) - require.Equal(t, 155190, partKey) -} - -// TestCopyFromExplicitTransaction tests that copy from rows are written with -// same transaction timestamp. -func TestCopyFromExplicitTransaction(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - - s, conn, db := serverutils.StartServer(t, base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettings(), - }) - defer s.Stopper().Stop(ctx) - - r := sqlutils.MakeSQLRunner(conn) - r.Exec(t, lineitemSchema) - rows := []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)} - txn := db.NewTxn(ctx, "test") - doCopyWithTxn(ctx, t, s, txn, rows) - if err := txn.Commit(ctx); err != nil { - require.NoError(t, err) - } - partKey := 0 - r.QueryRow(t, "SELECT l_partkey FROM c.lineitem WHERE l_orderkey = 1").Scan(&partKey) - require.Equal(t, 155190, partKey) - - sqlRows := r.Query(t, "SELECT crdb_internal_mvcc_timestamp FROM c.lineitem") - var lastts float64 - firstTime := true - for sqlRows.Next() { - var ts float64 - err := sqlRows.Scan(&ts) - require.NoError(t, err) - if !firstTime { - require.EqualValues(t, lastts, ts) - } else { - firstTime = false - } - lastts = ts - } -} - -// TestCopyFromImplicitAtomicTransaction tests that copy from rows are -// not committed in batches (22.2 default behavior). -func TestCopyFromImplicitAtomicTransaction(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - - s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettings(), - }) - defer s.Stopper().Stop(ctx) - - r := sqlutils.MakeSQLRunner(conn) - r.Exec(t, lineitemSchema) - rows := []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)} - doCopyOneRowBatches(ctx, t, s, rows, true /* atomic */) - - partKey := 0 - r.QueryRow(t, "SELECT l_partkey FROM c.lineitem WHERE l_orderkey = 1").Scan(&partKey) - require.Equal(t, 155190, partKey) - - sqlRows := r.Query(t, "SELECT crdb_internal_mvcc_timestamp FROM c.lineitem") - var lastts apd.Decimal - firstTime := true - for sqlRows.Next() { - var ts apd.Decimal - err := sqlRows.Scan(&ts) - require.NoError(t, err) - if !firstTime { - require.EqualValues(t, lastts, ts) - } else { - firstTime = false - } - lastts = ts - } -} - -// TestCopyFromImplicitNonAtomicTransaction tests that copy from rows are -// committed in batches (pre-22.2 default behavior). -func TestCopyFromImplicitNonAtomicTransaction(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - - s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettings(), - }) - defer s.Stopper().Stop(ctx) - - r := sqlutils.MakeSQLRunner(conn) - r.Exec(t, lineitemSchema) - rows := []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)} - doCopyOneRowBatches(ctx, t, s, rows, false /* atomic */) - - partKey := 0 - r.QueryRow(t, "SELECT l_partkey FROM c.lineitem WHERE l_orderkey = 1").Scan(&partKey) - require.Equal(t, 155190, partKey) - - sqlRows := r.Query(t, "SELECT crdb_internal_mvcc_timestamp FROM c.lineitem") - var lastts apd.Decimal - firstTime := true - for sqlRows.Next() { - var ts apd.Decimal - err := sqlRows.Scan(&ts) - require.NoError(t, err) - if !firstTime { - require.NotEqualValues(t, lastts, ts) - } else { - firstTime = false - } - lastts = ts - } -} - -// BenchmarkCopyFrom measures copy performance against a TestServer. -func BenchmarkCopyFrom(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - ctx := context.Background() - - s, conn, _ := serverutils.StartServer(b, base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettings(), - }) - defer s.Stopper().Stop(ctx) - - r := sqlutils.MakeSQLRunner(conn) - r.Exec(b, lineitemSchema) - - // send data in 5 batches of 10k rows - const ROWS = sql.CopyBatchRowSizeDefault * 4 - datalen := 0 - var rows []string - for i := 0; i < ROWS; i++ { - row := fmt.Sprintf(csvData, i) - rows = append(rows, row) - datalen += len(row) - } - rowsize := datalen / ROWS - for _, batchSizeFactor := range []float64{.5, 1, 2, 4} { - batchSize := int(batchSizeFactor * sql.CopyBatchRowSizeDefault) - b.Run(fmt.Sprintf("%d", batchSize), func(b *testing.B) { - actualRows := rows[:batchSize] - for i := 0; i < b.N; i++ { - pprof.Do(ctx, pprof.Labels("run", "copy"), func(ctx context.Context) { - doCopyImplicit(ctx, b, s, actualRows) - }) - b.StopTimer() - r.Exec(b, "TRUNCATE TABLE c.lineitem") - b.StartTimer() - } - b.SetBytes(int64(len(actualRows) * rowsize)) - }) - } -} - -// BenchmarkParallelCopyFrom benchmarks break copy up into separate chunks in separate goroutines. -func BenchmarkParallelCopyFrom(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - ctx := context.Background() - - s, conn, _ := serverutils.StartServer(b, base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettings(), - }) - defer s.Stopper().Stop(ctx) - - r := sqlutils.MakeSQLRunner(conn) - r.Exec(b, lineitemSchema) - const ROWS = 50000 - datalen := 0 - const THREADS = 10 - var allrows [][]string - - chunk := ROWS / THREADS - for j := 0; j < THREADS; j++ { - var rows []string - for i := 0; i < chunk; i++ { - row := fmt.Sprintf(csvData, i+j*chunk) - rows = append(rows, row) - datalen += len(row) - } - allrows = append(allrows, rows) - } - - start := timeutil.Now() - var wg sync.WaitGroup - - for j := 0; j < THREADS; j++ { - wg.Add(1) - go func(j int) { - defer wg.Done() - doCopyImplicit(ctx, b, s, allrows[j]) - }(j) - } - wg.Wait() - duration := timeutil.Since(start) - b.ReportMetric(float64(datalen)/(1024*1024)/duration.Seconds(), "mb/s") - b.ReportMetric(float64(ROWS)/duration.Seconds(), "rows/s") -} diff --git a/pkg/sql/copyshim.go b/pkg/sql/copyshim.go deleted file mode 100644 index 51d0b19b2bfb..000000000000 --- a/pkg/sql/copyshim.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql - -import ( - "bufio" - "bytes" - "context" - "io" - "time" - - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" - "github.com/cockroachdb/cockroach/pkg/sql/execinfra" - "github.com/cockroachdb/cockroach/pkg/sql/parser" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/jackc/pgproto3/v2" -) - -type fakeConn struct { - pgwirebase.BufferedReader - rd *bufio.Reader -} - -var _ io.Reader = &fakeConn{} -var _ pgwirebase.BufferedReader = &fakeConn{} - -// Rd is part of the pgwirebase.Conn interface and returns a reader to be used -// to consume bytes from the connection. -func (c *fakeConn) Rd() pgwirebase.BufferedReader { - return c -} - -// Read is part of io.Reader interface. -func (c *fakeConn) Read(p []byte) (n int, err error) { - return c.rd.Read(p) -} - -// ReadString is part of pgwirebase.BufferedReader interface. -func (c *fakeConn) ReadString(delim byte) (string, error) { - return c.rd.ReadString(delim) -} - -// ReadByte is part of pgwirebase.BufferedReader interface. -func (c *fakeConn) ReadByte() (byte, error) { - return c.rd.ReadByte() -} - -// BeginCopyIn is part of the pgwirebase.Conn interface. Not needed for shim -// purposes. -func (c *fakeConn) BeginCopyIn( - ctx context.Context, columns []colinfo.ResultColumn, format pgwirebase.FormatCode, -) error { - return nil -} - -// SendCommandComplete is part of the pgwirebase.Conn interface. Not needed for shim -// purposes. -func (c *fakeConn) SendCommandComplete(tag []byte) error { - return nil -} - -// RunCopyFrom exposes copy functionality for the logictest "copy" command, it's -// test-only code but not in test package because logictest isn't in a test package. -func RunCopyFrom( - ctx context.Context, - s serverutils.TestServerInterface, - db string, - txn *kv.Txn, - copySQL string, - data []string, - copyBatchRowSizeOverride int, - atomic bool, -) (int, error) { - execCfg := s.ExecutorConfig().(ExecutorConfig) - dsp := execCfg.DistSQLPlanner - stmt, err := parser.ParseOne(copySQL) - if err != nil { - return -1, err - } - - mon := execinfra.NewTestMemMonitor(ctx, execCfg.Settings) - - // TODO(cucaroach): test open transaction and implicit txn, this will require - // a real client side/over the wire copy implementation logictest can use. - txnOpt := copyTxnOpt{txn: txn} - txnOpt.resetPlanner = func(ctx context.Context, p *planner, txn *kv.Txn, txnTS time.Time, stmtTS time.Time) { - p.cancelChecker.Reset(ctx) - p.optPlanningCtx.init(p) - p.resetPlanner(ctx, txn, stmtTS, p.sessionDataMutatorIterator.sds.Top(), mon) - p.extendedEvalCtx.Context.Txn = txn - } - p, cleanup := newInternalPlanner("copytest", - txn, - username.RootUserName(), - &MemoryMetrics{}, - &execCfg, - sessiondatapb.SessionData{ - Database: db, - }, - ) - // TODO(cucaroach): I believe newInternalPlanner should do this but doing it - // there causes lots of session diffs and test failures and is risky. - if err := p.resetAllSessionVars(ctx); err != nil { - return -1, err - } - defer cleanup() - - p.SessionData().CopyFromAtomicEnabled = atomic - - // Write what the client side would write into a buffer and then make it the conn's data. - var buf []byte - for _, d := range data { - b := make([]byte, 0, len(d)+10) - cd := pgproto3.CopyData{Data: []byte(d)} - b = cd.Encode(b) - buf = append(buf, b...) - } - - done := pgproto3.CopyDone{} - buf = done.Encode(buf) - - conn := &fakeConn{ - rd: bufio.NewReader(bytes.NewReader(buf)), - } - rows := 0 - c, err := newCopyMachine(ctx, conn, stmt.AST.(*tree.CopyFrom), p, txnOpt, mon, - func(ctx context.Context, p *planner, res RestrictedCommandResult) error { - err := dsp.ExecLocalAll(ctx, execCfg, p, res) - if err != nil { - return err - } - rows += res.RowsAffected() - return nil - }, - ) - if err != nil { - return -1, err - } - if copyBatchRowSizeOverride != 0 { - c.copyBatchRowSize = copyBatchRowSizeOverride - } - - if err := c.run(ctx); err != nil { - return -1, err - } - - return rows, nil -} diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 23d04afec8b1..8d3571a4130f 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -2427,63 +2427,6 @@ func (t *logicTest) processSubtest( delete(t.pendingQueries, name) t.success(path) - case "copy", "copy-error": - expectError := cmd == "copy-error" - var query logicQuery - query.pos = fmt.Sprintf("\n%s:%d", path, s.Line+subtest.lineLineIndexIntoFile) - gotsep, err := query.readSQL(t, s, true /* allowSeparator */) - if err != nil { - return err - } - if gotsep { - return errors.Errorf("%s: unexpected ---- separator, copy statement and data have to separated by empty line", query.pos) - } - var data bytes.Buffer - sep := false - for s.Scan() { - line := s.Text() - t.emit(line) - if line == "----" { - sep = true - break - } - fmt.Fprintln(&data, line) - } - if !sep { - return errors.Errorf("%s: expected ---- separator at end of copy data", query.pos) - } - // TODO(cucaroach): This is broken, t.cluster.Server(0) may not be - // the same tenant as t.db points to, for now the copy tests disable - // the 3node-tenant config but we should fix this. Also RunCopyFrom - // does an end run around the t.db connection entirely and runs - // copies w/o using the client protocol at all, not ideal but the go - // sql.DB interface doesn't support COPY so fixing it the right way - // that would require major surgery (ie making logictest use libpq - // or something low level like that). - rows, err := sql.RunCopyFrom(context.Background(), t.cluster.Server(0), "test", nil, query.sql, []string{data.String()}, 0 /* rowsPerBatch */, true /* atomic */) - result := fmt.Sprintf("%d", rows) - if err != nil { - if !expectError { - return err - } - result = err.Error() - } else if expectError { - return errors.Errorf("%s: copy-error expected error did not occur", query.pos) - } - if *rewriteResultsInTestfiles { - t.emit(result) - } - if s.Scan() { - exp := s.Text() - if !*rewriteResultsInTestfiles && result != exp { - return errors.Errorf("%s: got %s, expected %s", query.pos, result, exp) - } - } else if !*rewriteResultsInTestfiles { - return errors.Errorf("EOF looking for expected copy results") - } - t.finishOne("OK") - t.success(path) - case "query": var query logicQuery query.pos = fmt.Sprintf("\n%s:%d", path, s.Line+subtest.lineLineIndexIntoFile) diff --git a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go index 80631fb181b4..d9bc4414e6f4 100644 --- a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go @@ -415,13 +415,6 @@ func TestLogic_connect_privilege( runLogicTest(t, "connect_privilege") } -func TestLogic_copyfrom( - t *testing.T, -) { - defer leaktest.AfterTest(t)() - runLogicTest(t, "copyfrom") -} - func TestLogic_crdb_internal( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go index 3dbaedb0b70c..1e9de72fa684 100644 --- a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go @@ -415,13 +415,6 @@ func TestLogic_connect_privilege( runLogicTest(t, "connect_privilege") } -func TestLogic_copyfrom( - t *testing.T, -) { - defer leaktest.AfterTest(t)() - runLogicTest(t, "copyfrom") -} - func TestLogic_crdb_internal( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist/generated_test.go b/pkg/sql/logictest/tests/fakedist/generated_test.go index 988147cb08e8..9926d397f325 100644 --- a/pkg/sql/logictest/tests/fakedist/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist/generated_test.go @@ -415,13 +415,6 @@ func TestLogic_connect_privilege( runLogicTest(t, "connect_privilege") } -func TestLogic_copyfrom( - t *testing.T, -) { - defer leaktest.AfterTest(t)() - runLogicTest(t, "copyfrom") -} - func TestLogic_crdb_internal( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go index 810bdf10b07e..13f77315c21c 100644 --- a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go +++ b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go @@ -415,13 +415,6 @@ func TestLogic_connect_privilege( runLogicTest(t, "connect_privilege") } -func TestLogic_copyfrom( - t *testing.T, -) { - defer leaktest.AfterTest(t)() - runLogicTest(t, "copyfrom") -} - func TestLogic_crdb_internal( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-vec-off/generated_test.go b/pkg/sql/logictest/tests/local-vec-off/generated_test.go index 46d9ae83c8e9..ca1cea0de3c4 100644 --- a/pkg/sql/logictest/tests/local-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/local-vec-off/generated_test.go @@ -415,13 +415,6 @@ func TestLogic_connect_privilege( runLogicTest(t, "connect_privilege") } -func TestLogic_copyfrom( - t *testing.T, -) { - defer leaktest.AfterTest(t)() - runLogicTest(t, "copyfrom") -} - func TestLogic_crdb_internal( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go index 094c4d73c669..bb8cc12ccc6b 100644 --- a/pkg/sql/logictest/tests/local/generated_test.go +++ b/pkg/sql/logictest/tests/local/generated_test.go @@ -429,13 +429,6 @@ func TestLogic_connect_privilege( runLogicTest(t, "connect_privilege") } -func TestLogic_copyfrom( - t *testing.T, -) { - defer leaktest.AfterTest(t)() - runLogicTest(t, "copyfrom") -} - func TestLogic_crdb_internal( t *testing.T, ) {