From 51ace6504e2b122274d993e06b32ce94e109281c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 10 Apr 2023 10:38:22 -0400 Subject: [PATCH 1/6] roachtest: move copyfrom test suite to SQL Queries See https://cockroachlabs.slack.com/archives/C0168LW5THS/p1679508254391039. Epic: None Release note: None --- pkg/cmd/roachtest/tests/copyfrom.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cmd/roachtest/tests/copyfrom.go b/pkg/cmd/roachtest/tests/copyfrom.go index 093f15b6ee71..e709106499ca 100644 --- a/pkg/cmd/roachtest/tests/copyfrom.go +++ b/pkg/cmd/roachtest/tests/copyfrom.go @@ -145,7 +145,7 @@ func registerCopyFrom(r registry.Registry) { tc := tc r.Add(registry.TestSpec{ Name: fmt.Sprintf("copyfrom/crdb-atomic/sf=%d/nodes=%d", tc.sf, tc.nodes), - Owner: registry.OwnerKV, + Owner: registry.OwnerSQLQueries, Cluster: r.MakeClusterSpec(tc.nodes), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { runCopyFromCRDB(ctx, t, c, tc.sf, true /*atomic*/) @@ -153,7 +153,7 @@ func registerCopyFrom(r registry.Registry) { }) r.Add(registry.TestSpec{ Name: fmt.Sprintf("copyfrom/crdb-nonatomic/sf=%d/nodes=%d", tc.sf, tc.nodes), - Owner: registry.OwnerKV, + Owner: registry.OwnerSQLQueries, Cluster: r.MakeClusterSpec(tc.nodes), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { runCopyFromCRDB(ctx, t, c, tc.sf, false /*atomic*/) @@ -161,7 +161,7 @@ func registerCopyFrom(r registry.Registry) { }) r.Add(registry.TestSpec{ Name: fmt.Sprintf("copyfrom/pg/sf=%d/nodes=%d", tc.sf, tc.nodes), - Owner: registry.OwnerKV, + Owner: registry.OwnerSQLQueries, Cluster: r.MakeClusterSpec(tc.nodes), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { runCopyFromPG(ctx, t, c, tc.sf) From 7e9b1ea6eb6f0a84393977314dd2a45dcf16539b Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Mon, 10 Apr 2023 16:29:44 +0200 Subject: [PATCH 2/6] autoconfig: prevent a data race in TestAutoConfig The calls to Peek and Pop can run concurrently. Release note: None --- pkg/server/autoconfig/BUILD.bazel | 1 + pkg/server/autoconfig/auto_config_test.go | 28 +++++++++++++++++++---- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/pkg/server/autoconfig/BUILD.bazel b/pkg/server/autoconfig/BUILD.bazel index 52b16892af01..7a88617d7128 100644 --- a/pkg/server/autoconfig/BUILD.bazel +++ b/pkg/server/autoconfig/BUILD.bazel @@ -54,6 +54,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/randutil", + "//pkg/util/syncutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/server/autoconfig/auto_config_test.go b/pkg/server/autoconfig/auto_config_test.go index f927584c7387..bf04b43f2c17 100644 --- a/pkg/server/autoconfig/auto_config_test.go +++ b/pkg/server/autoconfig/auto_config_test.go @@ -24,12 +24,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/stretchr/testify/require" ) const testEnvID autoconfigpb.EnvironmentID = "my test env" type testProvider struct { + syncutil.Mutex + t *testing.T notifyCh chan struct{} peekWaitCh chan struct{} @@ -93,6 +96,9 @@ func (p *testProvider) ActiveEnvironments() []autoconfigpb.EnvironmentID { func (p *testProvider) Pop( _ context.Context, envID autoconfigpb.EnvironmentID, taskID autoconfigpb.TaskID, ) error { + p.Lock() + defer p.Unlock() + p.t.Logf("runner reports completed task %d (env %q)", taskID, envID) for len(p.tasks) > 0 { if taskID >= p.tasks[0].task.TaskID { @@ -105,14 +111,25 @@ func (p *testProvider) Pop( return nil } +func (p *testProvider) head() (bool, *testTask) { + p.Lock() + defer p.Unlock() + + if len(p.tasks) == 0 { + return false, nil + } + return true, &p.tasks[0] +} + func (p *testProvider) Peek( ctx context.Context, envID autoconfigpb.EnvironmentID, ) (autoconfigpb.Task, error) { p.t.Logf("runner peeking (env %q)", envID) - if len(p.tasks) == 0 { + hasTask, tt := p.head() + if !hasTask { return autoconfigpb.Task{}, acprovider.ErrNoMoreTasks } - if !p.tasks[0].seen { + if !tt.seen { // seen ensures that the runner job won't have to wait a second // time when peeking the task. select { @@ -121,8 +138,11 @@ func (p *testProvider) Peek( case <-p.peekWaitCh: } } - p.tasks[0].seen = true - return p.tasks[0].task, nil + + p.Lock() + defer p.Unlock() + tt.seen = true + return tt.task, nil } func TestAutoConfig(t *testing.T) { From 0ba80e6d19ac789c99b51eb115ab75d92945e85c Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Mon, 10 Apr 2023 20:39:50 +0200 Subject: [PATCH 3/6] autoconfig: properly order versions in TestAutoConfig Before this change, the test was kicking off tasks concurrently with version upgrades. To reduce noise in logs, this patch fixes it to make it wait until upgrades have completed. Release note: None --- pkg/server/autoconfig/auto_config_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/server/autoconfig/auto_config_test.go b/pkg/server/autoconfig/auto_config_test.go index bf04b43f2c17..a3a68d1d1d43 100644 --- a/pkg/server/autoconfig/auto_config_test.go +++ b/pkg/server/autoconfig/auto_config_test.go @@ -48,7 +48,7 @@ var testTasks = []testTask{ {task: autoconfigpb.Task{ TaskID: 123, Description: "test task that creates a system table", - MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + MinVersion: clusterversion.TestingBinaryVersion, Payload: &autoconfigpb.Task_SimpleSQL{ SimpleSQL: &autoconfigpb.SimpleSQL{ UsernameProto: username.NodeUserName().EncodeProto(), @@ -64,7 +64,7 @@ var testTasks = []testTask{ {task: autoconfigpb.Task{ TaskID: 345, Description: "test task that fails with an error", - MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + MinVersion: clusterversion.TestingBinaryVersion, Payload: &autoconfigpb.Task_SimpleSQL{ SimpleSQL: &autoconfigpb.SimpleSQL{ TransactionalStatements: []string{"SELECT invalid"}, @@ -74,7 +74,7 @@ var testTasks = []testTask{ {task: autoconfigpb.Task{ TaskID: 456, Description: "test task that creates another system table", - MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + MinVersion: clusterversion.TestingBinaryVersion, Payload: &autoconfigpb.Task_SimpleSQL{ SimpleSQL: &autoconfigpb.SimpleSQL{ UsernameProto: username.NodeUserName().EncodeProto(), From b9b8da67d55451c2e322223aa3b83d5a9be20d15 Mon Sep 17 00:00:00 2001 From: Drew Kimball Date: Wed, 5 Apr 2023 17:49:12 -0700 Subject: [PATCH 4/6] opt: fix ordering-related optimizer panics It is possible for some functional-dependency information to be visible to a child operator but invisible to its parent. This could previously cause panics when a child provided an ordering that could be proven to satisfy the required ordering with the child FDs, but not with the parent's FDs. This patch adds a step to the logic that builds provided orderings that ensures a provided ordering can be proven to respect the required ordering without needing additional FD information. This ensures that a parent never needs to know its child's FDs in order to prove that the provided ordering is correct. The extra step is a no-op in the common case when the provided ordering can already be proven to respect the required ordering. Informs #85393 Informs #87806 Fixes #96288 Release note (bug fix): Fixed a rare internal error in the optimizer that has existed since before version 22.1, which could occur while enforcing orderings between SQL operators. --- pkg/sql/opt/ordering/ordering.go | 56 ++++++++++++++++--- pkg/sql/opt/xform/testdata/physprops/ordering | 41 +++++++++++++- pkg/sql/opt/xform/testdata/rules/join | 2 +- 3 files changed, 87 insertions(+), 12 deletions(-) diff --git a/pkg/sql/opt/ordering/ordering.go b/pkg/sql/opt/ordering/ordering.go index 560c39cc10fe..bb1ffd7830c9 100644 --- a/pkg/sql/opt/ordering/ordering.go +++ b/pkg/sql/opt/ordering/ordering.go @@ -88,6 +88,7 @@ func BuildProvided(expr memo.RelExpr, required *props.OrderingChoice) opt.Orderi return nil } provided := funcMap[expr.Op()].buildProvidedOrdering(expr, required) + provided = finalizeProvided(provided, required, expr.Relational().OutputCols) if buildutil.CrdbTestBuild { checkProvided(expr, required, provided) @@ -423,6 +424,53 @@ func trimProvided( return provided[:provIdx] } +// finalizeProvided ensures that the provided ordering satisfies the following +// properties: +// 1. The provided ordering can be proven to satisfy the required ordering +// without the use of additional (e.g. functional dependency) information. +// 2. The provided ordering is simplified, such that it does not contain any +// columns from the required ordering optional set. +// 3. The provided ordering only refers to output columns for the operator. +// +// This step is necessary because it is possible for child operators to have +// different functional dependency information than their parents as well as +// different output columns. We have to protect against the case where a parent +// operator cannot prove that its child's provided ordering satisfies its +// required ordering. +func finalizeProvided( + provided opt.Ordering, required *props.OrderingChoice, outCols opt.ColSet, +) (newProvided opt.Ordering) { + // First check if the given provided is already suitable. + providedCols := provided.ColSet() + if len(provided) == len(required.Columns) && providedCols.SubsetOf(outCols) { + needsRemap := false + for i := range provided { + choice, ordCol := required.Columns[i], provided[i] + if !choice.Group.Contains(ordCol.ID()) || choice.Descending != ordCol.Descending() { + needsRemap = true + break + } + } + if !needsRemap { + return provided + } + } + newProvided = make(opt.Ordering, len(required.Columns)) + for i, choice := range required.Columns { + group := choice.Group.Intersection(outCols) + if group.Intersects(providedCols) { + // Prefer using columns from the provided ordering if possible. + group.IntersectionWith(providedCols) + } + col, ok := group.Next(0) + if !ok { + panic(errors.AssertionFailedf("no output column equivalent to %d", redact.Safe(col))) + } + newProvided[i] = opt.MakeOrderingColumn(col, choice.Descending) + } + return newProvided +} + // checkRequired runs sanity checks on the ordering required of an operator. func checkRequired(expr memo.RelExpr, required *props.OrderingChoice) { rel := expr.Relational() @@ -472,12 +520,4 @@ func checkProvided(expr memo.RelExpr, required *props.OrderingChoice, provided o )) } } - - // The provided ordering should not have unnecessary columns. - fds := &expr.Relational().FuncDeps - if trimmed := trimProvided(provided, required, fds); len(trimmed) != len(provided) { - panic(errors.AssertionFailedf( - "provided %s can be trimmed to %s (FDs: %s)", redact.Safe(provided), redact.Safe(trimmed), redact.Safe(fds), - )) - } } diff --git a/pkg/sql/opt/xform/testdata/physprops/ordering b/pkg/sql/opt/xform/testdata/physprops/ordering index a4f5ff71e8ae..a258d072ce7e 100644 --- a/pkg/sql/opt/xform/testdata/physprops/ordering +++ b/pkg/sql/opt/xform/testdata/physprops/ordering @@ -3072,7 +3072,7 @@ sort ├── cardinality: [0 - 0] ├── key: () ├── fd: ()-->(6,12,23) - ├── ordering: +(12|23),-6 [actual: ] + ├── ordering: +(12|23),-6 [actual: +12,-6] └── values ├── columns: tab_922.crdb_internal_mvcc_timestamp:6!null col1_4:12!null col_2150:23!null ├── cardinality: [0 - 0] @@ -3097,16 +3097,51 @@ distinct-on │ ├── columns: c1:1!null c2:2 │ ├── key: (1) │ ├── fd: (1)-->(2) - │ ├── ordering: +1 [actual: +2,+1] + │ ├── ordering: +1 │ ├── scan v0@i3 │ │ ├── columns: c1:5!null c2:6 │ │ ├── constraint: /6/5: [/'2023-01-31 00:00:00' - /'2023-01-31 00:00:00'] │ │ ├── key: (5) │ │ ├── fd: (5)-->(6) - │ │ └── ordering: +5 [actual: +6,+5] + │ │ └── ordering: +5 │ └── projections │ ├── c1:5 [as=c1:1, outer=(5)] │ └── c2:6 [as=c2:2, outer=(6)] └── aggregations └── const-agg [as=c2:2, outer=(2)] └── c2:2 + +opt +SELECT c1 FROM v0 +WHERE (c1 = c1 AND c2 = '01-31-2023 00:00:00'::TIMESTAMP) + OR (c1 = b'00' AND c1 = b'0') + OR (c1 IS NULL AND c2 IS NULL) +ORDER BY c1; +---- +project + ├── columns: c1:1!null + ├── key: (1) + ├── ordering: +1 + └── distinct-on + ├── columns: c1:1!null c2:2 + ├── grouping columns: c1:1!null + ├── key: (1) + ├── fd: (1)-->(2) + ├── ordering: +1 + ├── project + │ ├── columns: c1:1!null c2:2 + │ ├── key: (1) + │ ├── fd: (1)-->(2) + │ ├── ordering: +1 + │ ├── scan v0@i3 + │ │ ├── columns: c1:5!null c2:6 + │ │ ├── constraint: /6/5: [/'2023-01-31 00:00:00' - /'2023-01-31 00:00:00'] + │ │ ├── key: (5) + │ │ ├── fd: (5)-->(6) + │ │ └── ordering: +5 + │ └── projections + │ ├── c1:5 [as=c1:1, outer=(5)] + │ └── c2:6 [as=c2:2, outer=(6)] + └── aggregations + └── const-agg [as=c2:2, outer=(2)] + └── c2:2 diff --git a/pkg/sql/opt/xform/testdata/rules/join b/pkg/sql/opt/xform/testdata/rules/join index 6596d646ec94..105fdc74bedd 100644 --- a/pkg/sql/opt/xform/testdata/rules/join +++ b/pkg/sql/opt/xform/testdata/rules/join @@ -9823,7 +9823,7 @@ project │ ├── cardinality: [0 - 1] │ ├── key: (25) │ ├── fd: (25)-->(26,27), (27)~~>(25,26) - │ ├── ordering: +25 [actual: ] + │ ├── ordering: +25 │ └── project │ ├── columns: a:25!null b:26!null c:27!null q:31!null r:32!null │ ├── cardinality: [0 - 1] From 1654c7d2504b2ead7b03bf5ba7bc9708762ba06f Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Mon, 10 Apr 2023 21:28:16 +0200 Subject: [PATCH 5/6] autoconfig: transactional SQL should wait on leases Release note: None --- pkg/server/autoconfig/BUILD.bazel | 1 + pkg/server/autoconfig/auto_config_task.go | 7 ++++++- pkg/server/autoconfig/auto_config_test.go | 8 +++++--- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/server/autoconfig/BUILD.bazel b/pkg/server/autoconfig/BUILD.bazel index 7a88617d7128..7b964371cc61 100644 --- a/pkg/server/autoconfig/BUILD.bazel +++ b/pkg/server/autoconfig/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/server/autoconfig/autoconfigpb", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/catalog/descs", "//pkg/sql/isql", "//pkg/sql/sessiondata", "//pkg/util/encoding", diff --git a/pkg/server/autoconfig/auto_config_task.go b/pkg/server/autoconfig/auto_config_task.go index ee2c6affc09d..9b313f488f5f 100644 --- a/pkg/server/autoconfig/auto_config_task.go +++ b/pkg/server/autoconfig/auto_config_task.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -156,7 +157,11 @@ func execSimpleSQL( // Now execute all the transactional, potentially not idempotent // statements. - return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + // + // We use DescsTxn here because the tasks may contain DDL statements + // and we want to ensure that each txn will wait for the leases from + // previous txns. + return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { for _, stmt := range sqlPayload.TransactionalStatements { log.Infof(ctx, "attempting execution of task statement:\n%s", stmt) _, err := txn.ExecEx(ctx, "exec-task-statement", diff --git a/pkg/server/autoconfig/auto_config_test.go b/pkg/server/autoconfig/auto_config_test.go index a3a68d1d1d43..c7ffb6980e88 100644 --- a/pkg/server/autoconfig/auto_config_test.go +++ b/pkg/server/autoconfig/auto_config_test.go @@ -53,11 +53,13 @@ var testTasks = []testTask{ SimpleSQL: &autoconfigpb.SimpleSQL{ UsernameProto: username.NodeUserName().EncodeProto(), NonTransactionalStatements: []string{ - "CREATE TABLE IF NOT EXISTS system.foo(x INT)", // This checks that the non-txn part works properly: SET // CLUSTER SETTING can only be run outside of explicit txns. "SET CLUSTER SETTING cluster.organization = 'woo'", }, + TransactionalStatements: []string{ + "CREATE TABLE IF NOT EXISTS system.foo(x INT)", + }, }, }, }}, @@ -77,8 +79,8 @@ var testTasks = []testTask{ MinVersion: clusterversion.TestingBinaryVersion, Payload: &autoconfigpb.Task_SimpleSQL{ SimpleSQL: &autoconfigpb.SimpleSQL{ - UsernameProto: username.NodeUserName().EncodeProto(), - NonTransactionalStatements: []string{"CREATE TABLE IF NOT EXISTS system.bar(y INT)"}, + UsernameProto: username.NodeUserName().EncodeProto(), + TransactionalStatements: []string{"CREATE TABLE IF NOT EXISTS system.bar(y INT)"}, }, }, }}, From bdf3f6262e380b867aa8884016eae53d28d074a4 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Wed, 29 Mar 2023 10:31:02 -0700 Subject: [PATCH 6/6] workload: jitter the teardown of connections to prevent thundering herd This change upgrades workload's use of pgx from v4 to v5 in order to allow jittering the teardown of connections. This change sets a max connection age of 5min and jitters the teardown by 30s. Upgrading to pgx v5 also adds non-blocking pgxpool connection acquisition. workload: add flags to manage the age and lifecycle of connection pool Add flags to all workload types to specify: * the max connection age: `--max-conn-lifetime duration` * the max connection age jitter: `--max-conn-lifetime-jitter duration` * the max connection idle time: `--max-conn-idle-time duration` * the connection health check interval: `--conn-healthcheck-period duration` * the min number of connections in the pool: `--min-conns int` workload: add support for remaining pgx query modes Add support for pgx.QueryExecModeCacheDescribe and pgx.QueryExecModeDescribeExec. Previously, only three of the five query modes were available. workload: fix race condition when recording histogram data Release note (cli change): workload jitters teardown of connections to prevent thundering herd impacting P99 latency results. Release note (cli change): workload utility now has flags to tune the connection pool used for testing. See `--conn-healthcheck-period`, `--min-conns`, and the `--max-conn-*` flags for details. Release note (cli change): workload now supports every [PostgreSQL query mode](https://github.com/jackc/pgx/blob/fa5fbed497bc75acee05c1667a8760ce0d634cba/conn.go#L167-L182) available via the underlying pgx driver. --- DEPS.bzl | 18 +- build/bazelutil/distdir_files.bzl | 6 +- go.mod | 4 +- go.sum | 7 +- pkg/workload/BUILD.bazel | 7 +- pkg/workload/connection.go | 40 ++- pkg/workload/connectionlatency/BUILD.bazel | 2 +- .../connectionlatency/connectionlatency.go | 2 +- pkg/workload/histogram/histogram.go | 2 +- pkg/workload/indexes/indexes.go | 7 +- pkg/workload/kv/BUILD.bazel | 4 +- pkg/workload/kv/kv.go | 15 +- pkg/workload/pgx_helpers.go | 282 ++++++++++++++---- pkg/workload/querylog/BUILD.bazel | 2 +- pkg/workload/querylog/querylog.go | 2 +- pkg/workload/schemachange/BUILD.bazel | 6 +- pkg/workload/schemachange/error_screening.go | 4 +- .../schemachange/operation_generator.go | 4 +- pkg/workload/schemachange/schemachange.go | 16 +- pkg/workload/schemachange/type_resolver.go | 2 +- pkg/workload/schemachange/watch_dog.go | 4 +- pkg/workload/sql_runner.go | 133 +++------ pkg/workload/tpcc/BUILD.bazel | 4 +- pkg/workload/tpcc/delivery.go | 6 +- pkg/workload/tpcc/new_order.go | 6 +- pkg/workload/tpcc/order_status.go | 6 +- pkg/workload/tpcc/payment.go | 6 +- pkg/workload/tpcc/stock_level.go | 6 +- pkg/workload/tpcc/tpcc.go | 15 +- pkg/workload/ycsb/BUILD.bazel | 6 +- pkg/workload/ycsb/ycsb.go | 12 +- 31 files changed, 390 insertions(+), 246 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index c3b323070410..66033e13aebc 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4809,10 +4809,10 @@ def go_deps(): name = "com_github_jackc_pgservicefile", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgservicefile", - sha256 = "8422a25b9d2b0be05c66ee1ccfdbaab144ce98f1ac678bc647064c560d4cd6e2", - strip_prefix = "github.com/jackc/pgservicefile@v0.0.0-20200714003250-2b9c44734f2b", + sha256 = "1f8bdf75b2a0d750e56c2a94b1d1b0b5be4b29d6df056aebd997162c29bfd8ab", + strip_prefix = "github.com/jackc/pgservicefile@v0.0.0-20221227161230-091c0ba34f0a", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20200714003250-2b9c44734f2b.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20221227161230-091c0ba34f0a.zip", ], ) go_repository( @@ -4839,10 +4839,10 @@ def go_deps(): name = "com_github_jackc_pgx_v5", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgx/v5", - sha256 = "e05b4284fb33e5c0c648b269070dedac6759711c411283177261228ab684f45f", - strip_prefix = "github.com/jackc/pgx/v5@v5.2.0", + sha256 = "e2f4a98f6b8716a6854d0a910c12c3527d35ff78ec5f2d16bf49f85601071bf0", + strip_prefix = "github.com/jackc/pgx/v5@v5.3.1", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.2.0.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.3.1.zip", ], ) go_repository( @@ -4859,10 +4859,10 @@ def go_deps(): name = "com_github_jackc_puddle_v2", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/puddle/v2", - sha256 = "73ea72b52d0a680442d535cf5d9a9713cb0803929c0b4a8e553eda47ee217c44", - strip_prefix = "github.com/jackc/puddle/v2@v2.1.2", + sha256 = "b99ea95df0c0298caf2be786c9eba511bfde2046eccfaa06e89b3e460ab406b0", + strip_prefix = "github.com/jackc/puddle/v2@v2.2.0", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.1.2.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.0.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 2f42131aae12..60fe83d7f748 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -631,12 +631,12 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgpassfile/com_github_jackc_pgpassfile-v1.0.0.zip": "1cc79fb0b80f54b568afd3f4648dd1c349f746ad7c379df8d7f9e0eb1cac938b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/com_github_jackc_pgproto3-v1.1.0.zip": "e3766bee50ed74e49a067b2c4797a2c69015cf104bf3f3624cd483a9e940b4ee", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.1.zip": "57884e299825af31fd01268659f1e671883b73b708a51230da14d6f8ee0e4e36", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20200714003250-2b9c44734f2b.zip": "8422a25b9d2b0be05c66ee1ccfdbaab144ce98f1ac678bc647064c560d4cd6e2", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20221227161230-091c0ba34f0a.zip": "1f8bdf75b2a0d750e56c2a94b1d1b0b5be4b29d6df056aebd997162c29bfd8ab", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgtype/com_github_jackc_pgtype-v1.11.0.zip": "6a257b81c0bd386d6241219a14ebd41d574a02aeaeb3942670c06441b864dcad", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v4/com_github_jackc_pgx_v4-v4.16.1.zip": "c3a169a68ff0e56f9f81eee4de4d2fd2a5ec7f4d6be159159325f4863c80bd10", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.2.0.zip": "e05b4284fb33e5c0c648b269070dedac6759711c411283177261228ab684f45f", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.3.1.zip": "e2f4a98f6b8716a6854d0a910c12c3527d35ff78ec5f2d16bf49f85601071bf0", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/com_github_jackc_puddle-v1.2.1.zip": "40d73550686666eb1f6df02b65008b2a4c98cfed1254dc4866e6ebe95fbc5c95", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.1.2.zip": "73ea72b52d0a680442d535cf5d9a9713cb0803929c0b4a8e553eda47ee217c44", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.0.zip": "b99ea95df0c0298caf2be786c9eba511bfde2046eccfaa06e89b3e460ab406b0", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jaegertracing/jaeger/com_github_jaegertracing_jaeger-v1.18.1.zip": "256a95b2a52a66494aca6d354224bb450ff38ce3ea1890af46a7c8dc39203891", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jcmturner/aescts/v2/com_github_jcmturner_aescts_v2-v2.0.0.zip": "717a211ad4aac248cf33cadde73059c13f8e9462123a0ab2fed5c5e61f7739d7", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jcmturner/dnsutils/v2/com_github_jcmturner_dnsutils_v2-v2.0.0.zip": "f9188186b672e547cfaef66107aa62d65054c5d4f10d4dcd1ff157d6bf8c275d", diff --git a/go.mod b/go.mod index 289141404fa0..376236d7822b 100644 --- a/go.mod +++ b/go.mod @@ -71,7 +71,7 @@ require ( github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.3.1 - github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.11.0 github.com/jackc/pgx/v4 v4.16.1 github.com/jackc/puddle v1.2.1 // indirect @@ -156,6 +156,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/guptarohit/asciigraph v0.5.5 github.com/irfansharif/recorder v0.0.0-20211218081646-a21b46510fd6 + github.com/jackc/pgx/v5 v5.3.1 github.com/jaegertracing/jaeger v1.18.1 github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible github.com/jordanlewis/gcassert v0.0.0-20221027203946-81f097ad35a0 @@ -314,6 +315,7 @@ require ( github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639 // indirect github.com/imdario/mergo v0.3.13 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect diff --git a/go.sum b/go.sum index 830802ebe01c..981a5371c37a 100644 --- a/go.sum +++ b/go.sum @@ -1377,8 +1377,9 @@ github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.3.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.3.1 h1:nwj7qwf0S+Q7ISFfBndqeLwSwxs+4DPsbRFjECT1Y4Y= github.com/jackc/pgproto3/v2 v2.3.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -1391,11 +1392,15 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.16.1 h1:JzTglcal01DrghUqt+PmzWsZx/Yh7SC/CTQmSBMTd0Y= github.com/jackc/pgx/v4 v4.16.1/go.mod h1:SIhx0D5hoADaiXZVyv+3gSm3LCIIINTVO0PficsvWGQ= +github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= +github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw= github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= +github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jaegertracing/jaeger v1.18.1 h1:eFqjEpTKq2FfiZ/YX53oxeCePdIZyWvDfXaTAGj0r5E= github.com/jaegertracing/jaeger v1.18.1/go.mod h1:WRzMFH62rje1VgbShlgk6UbWUNoo08uFFvs/x50aZKk= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= diff --git a/pkg/workload/BUILD.bazel b/pkg/workload/BUILD.bazel index 080484f07f8a..9f498cbf37b0 100644 --- a/pkg/workload/BUILD.bazel +++ b/pkg/workload/BUILD.bazel @@ -28,9 +28,10 @@ go_library( "//pkg/util/timeutil", "//pkg/workload/histogram", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgconn//:pgconn", - "@com_github_jackc_pgx_v4//:pgx", - "@com_github_jackc_pgx_v4//pgxpool", + "@com_github_jackc_pgx_v5//:pgx", + "@com_github_jackc_pgx_v5//pgconn", + "@com_github_jackc_pgx_v5//pgxpool", + "@com_github_jackc_pgx_v5//tracelog", "@com_github_lib_pq//:pq", "@com_github_spf13_pflag//:pflag", "@org_golang_x_sync//errgroup", diff --git a/pkg/workload/connection.go b/pkg/workload/connection.go index d97e96be1122..ca3dc4e701a5 100644 --- a/pkg/workload/connection.go +++ b/pkg/workload/connection.go @@ -15,6 +15,7 @@ import ( "net/url" "runtime" "strings" + "time" "github.com/spf13/pflag" ) @@ -24,8 +25,14 @@ type ConnFlags struct { *pflag.FlagSet DBOverride string Concurrency int - // Method for issuing queries; see SQLRunner. - Method string + Method string // Method for issuing queries; see SQLRunner. + + ConnHealthCheckPeriod time.Duration + MaxConnIdleTime time.Duration + MaxConnLifetime time.Duration + MaxConnLifetimeJitter time.Duration + MinConns int + WarmupConns int } // NewConnFlags returns an initialized ConnFlags. @@ -36,14 +43,35 @@ func NewConnFlags(genFlags *Flags) *ConnFlags { `Override for the SQL database to use. If empty, defaults to the generator name`) c.IntVar(&c.Concurrency, `concurrency`, 2*runtime.GOMAXPROCS(0), `Number of concurrent workers`) - c.StringVar(&c.Method, `method`, `prepare`, `SQL issue method (prepare, noprepare, simple)`) + c.StringVar(&c.Method, `method`, `cache_statement`, `SQL issue method (cache_statement, cache_describe, describe_exec, exec, simple_protocol)`) + c.DurationVar(&c.ConnHealthCheckPeriod, `conn-healthcheck-period`, 30*time.Second, `Interval that health checks are run on connections`) + c.IntVar(&c.MinConns, `min-conns`, 0, `Minimum number of connections to attempt to keep in the pool`) + c.DurationVar(&c.MaxConnIdleTime, `max-conn-idle-time`, 150*time.Second, `Max time an idle connection will be kept around`) + c.DurationVar(&c.MaxConnLifetime, `max-conn-lifetime`, 300*time.Second, `Max connection lifetime`) + c.DurationVar(&c.MaxConnLifetimeJitter, `max-conn-lifetime-jitter`, 150*time.Second, `Jitter max connection lifetime by this amount`) + c.IntVar(&c.WarmupConns, `warmup-conns`, 0, `Number of connections to warmup in each connection pool`) genFlags.AddFlagSet(c.FlagSet) if genFlags.Meta == nil { genFlags.Meta = make(map[string]FlagMeta) } - genFlags.Meta[`db`] = FlagMeta{RuntimeOnly: true} - genFlags.Meta[`concurrency`] = FlagMeta{RuntimeOnly: true} - genFlags.Meta[`method`] = FlagMeta{RuntimeOnly: true} + for _, k := range []string{ + `concurrency`, + `conn-healthcheck-period`, + `db`, + `max-conn-idle-time`, + `max-conn-lifetime-jitter`, + `max-conn-lifetime`, + `method`, + `min-conns`, + `warmup-conns`, + } { + v, ok := genFlags.Meta[k] + if !ok { + v = FlagMeta{} + } + v.RuntimeOnly = true + genFlags.Meta[k] = v + } return c } diff --git a/pkg/workload/connectionlatency/BUILD.bazel b/pkg/workload/connectionlatency/BUILD.bazel index 0ee0d706b864..87b652616098 100644 --- a/pkg/workload/connectionlatency/BUILD.bazel +++ b/pkg/workload/connectionlatency/BUILD.bazel @@ -11,7 +11,7 @@ go_library( "//pkg/util/timeutil", "//pkg/workload", "//pkg/workload/histogram", - "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v5//:pgx", "@com_github_spf13_pflag//:pflag", ], ) diff --git a/pkg/workload/connectionlatency/connectionlatency.go b/pkg/workload/connectionlatency/connectionlatency.go index b1358cf6c3a3..1975557a1ba9 100644 --- a/pkg/workload/connectionlatency/connectionlatency.go +++ b/pkg/workload/connectionlatency/connectionlatency.go @@ -19,7 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/spf13/pflag" ) diff --git a/pkg/workload/histogram/histogram.go b/pkg/workload/histogram/histogram.go index 22e9ed6043a7..ce32ff414021 100644 --- a/pkg/workload/histogram/histogram.go +++ b/pkg/workload/histogram/histogram.go @@ -73,6 +73,7 @@ func (w *Registry) newNamedHistogramLocked(name string) *NamedHistogram { // Record saves a new datapoint and should be called once per logical operation. func (w *NamedHistogram) Record(elapsed time.Duration) { w.prometheusHistogram.Observe(float64(elapsed.Nanoseconds()) / float64(time.Second)) + w.mu.Lock() maxLatency := time.Duration(w.mu.current.HighestTrackableValue()) if elapsed < minLatency { elapsed = minLatency @@ -80,7 +81,6 @@ func (w *NamedHistogram) Record(elapsed time.Duration) { elapsed = maxLatency } - w.mu.Lock() err := w.mu.current.RecordValue(elapsed.Nanoseconds()) w.mu.Unlock() diff --git a/pkg/workload/indexes/indexes.go b/pkg/workload/indexes/indexes.go index 12f80920877d..8df0849e5cc9 100644 --- a/pkg/workload/indexes/indexes.go +++ b/pkg/workload/indexes/indexes.go @@ -158,9 +158,8 @@ func (w *indexes) Ops( if err != nil { return workload.QueryLoad{}, err } - cfg := workload.MultiConnPoolCfg{ - MaxTotalConnections: w.connFlags.Concurrency + 1, - } + cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags) + cfg.MaxTotalConnections = w.connFlags.Concurrency + 1 mcp, err := workload.NewMultiConnPool(ctx, cfg, urls...) if err != nil { return workload.QueryLoad{}, err @@ -176,7 +175,7 @@ func (w *indexes) Ops( buf: make([]byte, w.payload), } op.stmt = op.sr.Define(stmt) - if err := op.sr.Init(ctx, "indexes", mcp, w.connFlags); err != nil { + if err := op.sr.Init(ctx, "indexes", mcp); err != nil { return workload.QueryLoad{}, err } ql.WorkerFns = append(ql.WorkerFns, op.run) diff --git a/pkg/workload/kv/BUILD.bazel b/pkg/workload/kv/BUILD.bazel index dd0308b1030e..b6730c84cca1 100644 --- a/pkg/workload/kv/BUILD.bazel +++ b/pkg/workload/kv/BUILD.bazel @@ -18,8 +18,8 @@ go_library( "//pkg/workload", "//pkg/workload/histogram", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgconn//:pgconn", - "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v5//:pgx", + "@com_github_jackc_pgx_v5//pgconn", "@com_github_spf13_pflag//:pflag", ], ) diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 6483d8b1bdc2..8fd5b92cec66 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -32,8 +32,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/spf13/pflag" ) @@ -347,9 +347,8 @@ func (w *kv) Ops( if err != nil { return workload.QueryLoad{}, err } - cfg := workload.MultiConnPoolCfg{ - MaxTotalConnections: w.connFlags.Concurrency + 1, - } + cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags) + cfg.MaxTotalConnections = w.connFlags.Concurrency + 1 mcp, err := workload.NewMultiConnPool(ctx, cfg, urls...) if err != nil { return workload.QueryLoad{}, err @@ -445,7 +444,7 @@ func (w *kv) Ops( } op.spanStmt = op.sr.Define(spanStmtStr) op.delStmt = op.sr.Define(delStmtStr) - if err := op.sr.Init(ctx, "kv", mcp, w.connFlags); err != nil { + if err := op.sr.Init(ctx, "kv", mcp); err != nil { return workload.QueryLoad{}, err } op.mcp = mcp @@ -557,9 +556,11 @@ func (o *kvOp) run(ctx context.Context) (retErr error) { // that each run call makes 1 attempt, so that rate limiting in workerRun // behaves as expected. var tx pgx.Tx - if tx, err = o.mcp.Get().Begin(ctx); err != nil { + tx, err := o.mcp.Get().BeginTx(ctx, pgx.TxOptions{}) + if err != nil { return err } + defer func() { rollbackErr := tx.Rollback(ctx) if !errors.Is(rollbackErr, pgx.ErrTxClosed) { diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index e19709707d25..cd99512f97f4 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -14,11 +14,14 @@ import ( "context" "strings" "sync/atomic" + "time" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" + "github.com/cockroachdb/errors" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/tracelog" "golang.org/x/sync/errgroup" ) @@ -33,6 +36,7 @@ type MultiConnPool struct { // preparedStatements is a map from name to SQL. The statements in the map // are prepared whenever a new connection is acquired from the pool. preparedStatements map[string]string + method pgx.QueryExecMode } } @@ -48,16 +52,85 @@ type MultiConnPoolCfg struct { // If 0, there is no per-pool maximum (other than the total maximum number of // connections which still applies). MaxConnsPerPool int + + // ConnHealthCheckPeriod specifies the amount of time between connection + // health checks. Defaults to 10% of MaxConnLifetime. + ConnHealthCheckPeriod time.Duration + + // MaxConnIdleTime specifies the amount of time a connection will be idle + // before being closed by the health checker. Defaults to 50% of the + // MaxConnLifetime. + MaxConnIdleTime time.Duration + + // MaxConnLifetime specifies the max age of individual connections in + // connection pools. If 0, a default value of 5 minutes is used. + MaxConnLifetime time.Duration + + // MaxConnLifetimeJitter shortens the max age of a connection by a random + // duration less than the specified jitter. If 0, default to 50% of + // MaxConnLifetime. + MaxConnLifetimeJitter time.Duration + + // Method specifies the query type to use for the PG wire protocol. + Method string + + // MinConns is the minimum number of connections the connection pool will + // attempt to keep. Connection count may dip below this value periodically, + // see pgxpool documentation for details. + MinConns int + + // WarmupConns specifies the number of connections to prewarm when + // initializing a MultiConnPool. A value of 0 automatically initialize the + // max number of connections per pool. A value less than 0 skips the + // connection warmup phase. + WarmupConns int + + // LogLevel specifies the log level (default: warn) + LogLevel tracelog.LogLevel +} + +// NewMultiConnPoolCfgFromFlags constructs a new MultiConnPoolCfg object based +// on the connection flags. +func NewMultiConnPoolCfgFromFlags(cf *ConnFlags) MultiConnPoolCfg { + return MultiConnPoolCfg{ + ConnHealthCheckPeriod: cf.ConnHealthCheckPeriod, + MaxConnIdleTime: cf.MaxConnIdleTime, + MaxConnLifetime: cf.MaxConnLifetime, + MaxConnLifetimeJitter: cf.MaxConnLifetimeJitter, + MaxConnsPerPool: cf.Concurrency, + MaxTotalConnections: cf.Concurrency, + Method: cf.Method, + MinConns: cf.MinConns, + WarmupConns: cf.WarmupConns, + } +} + +// String values taken from pgx.ParseConfigWithOptions() to maintain +// compatibility with pgx. See [1] and [2] for additional details. +// +// [1] https://github.com/jackc/pgx/blob/fa5fbed497bc75acee05c1667a8760ce0d634cba/conn.go#L167-L182 +// [2] https://github.com/jackc/pgx/blob/fa5fbed497bc75acee05c1667a8760ce0d634cba/conn.go#L578-L612 +var stringToMethod = map[string]pgx.QueryExecMode{ + "cache_statement": pgx.QueryExecModeCacheStatement, + "cache_describe": pgx.QueryExecModeCacheDescribe, + "describe_exec": pgx.QueryExecModeDescribeExec, + "exec": pgx.QueryExecModeExec, + "simple_protocol": pgx.QueryExecModeSimpleProtocol, + + // Preserve backward compatibility with original workload --method's + "prepare": pgx.QueryExecModeCacheStatement, + "noprepare": pgx.QueryExecModeExec, + "simple": pgx.QueryExecModeSimpleProtocol, } // pgxLogger implements the pgx.Logger interface. type pgxLogger struct{} -var _ pgx.Logger = pgxLogger{} +var _ tracelog.Logger = pgxLogger{} // Log implements the pgx.Logger interface. func (p pgxLogger) Log( - ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}, + ctx context.Context, level tracelog.LogLevel, msg string, data map[string]interface{}, ) { if ctx.Err() != nil { // Don't log anything from pgx if the context was canceled by the workload @@ -76,7 +149,7 @@ func (p pgxLogger) Log( return } } - log.Infof(ctx, "pgx logger [%s]: %s logParams=%v", level.String(), msg, data) + log.VInfof(ctx, log.Level(level), "pgx logger [%s]: %s logParams=%v", level.String(), msg, data) } // NewMultiConnPool creates a new MultiConnPool. @@ -92,27 +165,63 @@ func NewMultiConnPool( m := &MultiConnPool{} m.mu.preparedStatements = map[string]string{} + logLevel := tracelog.LogLevelWarn + if cfg.LogLevel != 0 { + logLevel = cfg.LogLevel + } + maxConnLifetime := 300 * time.Second + if cfg.MaxConnLifetime > 0 { + maxConnLifetime = cfg.MaxConnLifetime + } + maxConnLifetimeJitter := time.Duration(0.5 * float64(maxConnLifetime)) + if cfg.MaxConnLifetimeJitter > 0 { + maxConnLifetimeJitter = cfg.MaxConnLifetimeJitter + } + connHealthCheckPeriod := time.Duration(0.1 * float64(maxConnLifetime)) + if cfg.ConnHealthCheckPeriod > 0 { + connHealthCheckPeriod = cfg.ConnHealthCheckPeriod + } + maxConnIdleTime := time.Duration(0.5 * float64(maxConnLifetime)) + if cfg.MaxConnIdleTime > 0 { + maxConnIdleTime = cfg.MaxConnIdleTime + } + minConns := 0 + if cfg.MinConns > 0 { + minConns = cfg.MinConns + } + connsPerURL := distribute(cfg.MaxTotalConnections, len(urls)) maxConnsPerPool := cfg.MaxConnsPerPool if maxConnsPerPool == 0 { maxConnsPerPool = cfg.MaxTotalConnections } - var warmupConns [][]*pgxpool.Conn + // See + // https://github.com/jackc/pgx/blob/fa5fbed497bc75acee05c1667a8760ce0d634cba/conn.go#L578-L612 + // for details on the specifics of each query mode. + queryMode, ok := stringToMethod[strings.ToLower(cfg.Method)] + if !ok { + return nil, errors.Errorf("unknown method %s", cfg.Method) + } + m.mu.method = queryMode + for i := range urls { connsPerPool := distributeMax(connsPerURL[i], maxConnsPerPool) for _, numConns := range connsPerPool { - connCfg, err := pgxpool.ParseConfig(urls[i]) + poolCfg, err := pgxpool.ParseConfig(urls[i]) if err != nil { return nil, err } - // Disable the automatic prepared statement cache. We've seen a lot of - // churn in this cache since workloads create many of different queries. - connCfg.ConnConfig.BuildStatementCache = nil - connCfg.ConnConfig.LogLevel = pgx.LogLevelWarn - connCfg.ConnConfig.Logger = pgxLogger{} - connCfg.MaxConns = int32(numConns) - connCfg.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + poolCfg.HealthCheckPeriod = connHealthCheckPeriod + poolCfg.MaxConnLifetime = maxConnLifetime + poolCfg.MaxConnLifetimeJitter = maxConnLifetimeJitter + poolCfg.MaxConnIdleTime = maxConnIdleTime + poolCfg.MaxConns = int32(numConns) + if minConns > numConns { + minConns = numConns + } + poolCfg.MinConns = int32(minConns) + poolCfg.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { m.mu.RLock() defer m.mu.RUnlock() for name, sql := range m.mu.preparedStatements { @@ -126,51 +235,25 @@ func NewMultiConnPool( } return true } - p, err := pgxpool.ConnectConfig(ctx, connCfg) + + connCfg := poolCfg.ConnConfig + connCfg.DefaultQueryExecMode = queryMode + connCfg.Tracer = &tracelog.TraceLog{ + Logger: &pgxLogger{}, + LogLevel: logLevel, + } + p, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return nil, err } - warmupConns = append(warmupConns, make([]*pgxpool.Conn, numConns)) m.Pools = append(m.Pools, p) } } - // "Warm up" the pools so we don't have to establish connections later (which - // would affect the observed latencies of the first requests, especially when - // prepared statements are used). We do this by - // acquiring connections (in parallel), then releasing them back to the - // pool. - var g errgroup.Group - // Limit concurrent connection establishment. Allowing this to run - // at maximum parallelism would trigger syn flood protection on the - // host, which combined with any packet loss could cause Acquire to - // return an error and fail the whole function. The value 100 is - // chosen because it is less than the default value for SOMAXCONN - // (128). - sem := make(chan struct{}, 100) - for i, p := range m.Pools { - p := p - conns := warmupConns[i] - for j := range conns { - j := j - sem <- struct{}{} - g.Go(func() error { - var err error - conns[j], err = p.Acquire(ctx) - <-sem - return err - }) - } - } - if err := g.Wait(); err != nil { + if err := m.WarmupConns(ctx, cfg.WarmupConns); err != nil { return nil, err } - for i := range m.Pools { - for _, c := range warmupConns[i] { - c.Release() - } - } return m, nil } @@ -186,11 +269,13 @@ func (m *MultiConnPool) AddPreparedStatement(name string, statement string) { // Get returns one of the pools, in round-robin manner. func (m *MultiConnPool) Get() *pgxpool.Pool { - if len(m.Pools) == 1 { + numPools := uint32(len(m.Pools)) + if numPools == 1 { return m.Pools[0] } i := atomic.AddUint32(&m.counter, 1) - 1 - return m.Pools[i%uint32(len(m.Pools))] + + return m.Pools[i%numPools] } // Close closes all the pools. @@ -200,6 +285,101 @@ func (m *MultiConnPool) Close() { } } +// Method returns the query execution mode of the connection pool. +func (m *MultiConnPool) Method() pgx.QueryExecMode { + m.mu.Lock() + defer m.mu.Unlock() + return m.mu.method +} + +// WarmupConns warms up numConns connections across all pools contained within +// MultiConnPool. The max number of connections are warmed up if numConns is +// set to 0. +func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error { + if numConns < 0 { + return nil + } + + // NOTE(seanc@): see context cancellation note below. + warmupCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // "Warm up" the pools so we don't have to establish connections later (which + // would affect the observed latencies of the first requests, especially when + // prepared statements are used). We do this by + // acquiring connections (in parallel), then releasing them back to the + // pool. + var g errgroup.Group + + // Limit concurrent connection establishment. Allowing this to run + // at maximum parallelism would trigger syn flood protection on the + // host, which combined with any packet loss could cause Acquire to + // return an error and fail the whole function. The value 100 is + // chosen because it is less than the default value for SOMAXCONN + // (128). + g.SetLimit(100) + + var warmupConnsPerPool []int + if numConns == 0 { + warmupConnsPerPool = make([]int, len(m.Pools)) + for i, p := range m.Pools { + warmupConnsPerPool[i] = int(p.Config().MaxConns) + } + } else { + warmupConnsPerPool = distribute(numConns, len(m.Pools)) + for i, p := range m.Pools { + poolMaxConns := int(p.Config().MaxConns) + if warmupConnsPerPool[i] > poolMaxConns { + warmupConnsPerPool[i] = poolMaxConns + } + } + } + + var numWarmupConns int + for _, n := range warmupConnsPerPool { + numWarmupConns += n + } + warmupConns := make(chan *pgxpool.Conn, numWarmupConns) + for i, p := range m.Pools { + p := p + for k := 0; k < warmupConnsPerPool[i]; k++ { + g.Go(func() error { + conn, err := p.Acquire(warmupCtx) + if err != nil { + return err + } + warmupConns <- conn + return nil + }) + } + } + + estConns := make([]*pgxpool.Conn, 0, numWarmupConns) + defer func() { + for _, conn := range estConns { + // NOTE(seanc@): Release() connections before canceling the warmupCtx to + // prevent partially established connections from being Acquire()'ed. + conn.Release() + } + }() + +WARMUP: + for i := 0; i < numWarmupConns; i++ { + select { + case conn := <-warmupConns: + estConns = append(estConns, conn) + case <-warmupCtx.Done(): + if err := warmupCtx.Err(); err != nil { + return err + } + + break WARMUP + } + } + + return nil +} + // distribute returns a slice of integers that add up to and are // within +/-1 of each other. func distribute(total, num int) []int { diff --git a/pkg/workload/querylog/BUILD.bazel b/pkg/workload/querylog/BUILD.bazel index 66f32b8adb92..cb2fbe0530af 100644 --- a/pkg/workload/querylog/BUILD.bazel +++ b/pkg/workload/querylog/BUILD.bazel @@ -17,7 +17,7 @@ go_library( "//pkg/workload/histogram", "//pkg/workload/rand", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v5//:pgx", "@com_github_lib_pq//oid", "@com_github_spf13_pflag//:pflag", ], diff --git a/pkg/workload/querylog/querylog.go b/pkg/workload/querylog/querylog.go index 2ead08dfd7e2..aef7aa4c3a60 100644 --- a/pkg/workload/querylog/querylog.go +++ b/pkg/workload/querylog/querylog.go @@ -34,7 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload/histogram" workloadrand "github.com/cockroachdb/cockroach/pkg/workload/rand" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/lib/pq/oid" "github.com/spf13/pflag" ) diff --git a/pkg/workload/schemachange/BUILD.bazel b/pkg/workload/schemachange/BUILD.bazel index 3a6da0648bde..aef5564a3f5e 100644 --- a/pkg/workload/schemachange/BUILD.bazel +++ b/pkg/workload/schemachange/BUILD.bazel @@ -35,9 +35,9 @@ go_library( "//pkg/workload", "//pkg/workload/histogram", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgconn//:pgconn", - "@com_github_jackc_pgx_v4//:pgx", - "@com_github_jackc_pgx_v4//pgxpool", + "@com_github_jackc_pgx_v5//:pgx", + "@com_github_jackc_pgx_v5//pgconn", + "@com_github_jackc_pgx_v5//pgxpool", "@com_github_lib_pq//oid", "@com_github_spf13_pflag//:pflag", ], diff --git a/pkg/workload/schemachange/error_screening.go b/pkg/workload/schemachange/error_screening.go index 4f0eeadb5b75..a6e7e0e849e1 100644 --- a/pkg/workload/schemachange/error_screening.go +++ b/pkg/workload/schemachange/error_screening.go @@ -21,8 +21,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" ) func (og *operationGenerator) tableExists( diff --git a/pkg/workload/schemachange/operation_generator.go b/pkg/workload/schemachange/operation_generator.go index b061af7b047a..28698d62db73 100644 --- a/pkg/workload/schemachange/operation_generator.go +++ b/pkg/workload/schemachange/operation_generator.go @@ -34,8 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" ) // seqNum may be shared across multiple instances of this, so it should only diff --git a/pkg/workload/schemachange/schemachange.go b/pkg/workload/schemachange/schemachange.go index dfd6046ecd0b..b28f0cbb6281 100644 --- a/pkg/workload/schemachange/schemachange.go +++ b/pkg/workload/schemachange/schemachange.go @@ -20,7 +20,6 @@ import ( "math/rand" "os" "regexp" - "runtime" "sync" "time" @@ -30,8 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/spf13/pflag" ) @@ -67,8 +66,8 @@ const ( type schemaChange struct { flags workload.Flags + connFlags *workload.ConnFlags dbOverride string - concurrency int maxOpsPerWorker int errorRate int enumPct int @@ -95,8 +94,6 @@ var schemaChangeMeta = workload.Meta{ s.flags.FlagSet = pflag.NewFlagSet(`schemachange`, pflag.ContinueOnError) s.flags.StringVar(&s.dbOverride, `db`, ``, `Override for the SQL database to use. If empty, defaults to the generator name`) - s.flags.IntVar(&s.concurrency, `concurrency`, 2*runtime.GOMAXPROCS(0), /* TODO(spaskob): sensible default? */ - `Number of concurrent workers`) s.flags.IntVar(&s.maxOpsPerWorker, `max-ops-per-worker`, defaultMaxOpsPerWorker, `Number of operations to execute in a single transaction`) s.flags.IntVar(&s.errorRate, `error-rate`, defaultErrorRate, @@ -122,6 +119,7 @@ var schemaChangeMeta = workload.Meta{ defaultDeclarativeSchemaMaxStmtsPerTxn, `Number of statements per-txn used by the declarative schema changer.`) + s.connFlags = workload.NewConnFlags(&s.flags) return s }, } @@ -162,9 +160,7 @@ func (s *schemaChange) Ops( if err != nil { return workload.QueryLoad{}, err } - cfg := workload.MultiConnPoolCfg{ - MaxTotalConnections: s.concurrency * 2, //TODO(spaskob): pick a sensible default. - } + cfg := workload.NewMultiConnPoolCfgFromFlags(s.connFlags) pool, err := workload.NewMultiConnPool(ctx, cfg, urls...) if err != nil { return workload.QueryLoad{}, err @@ -204,7 +200,7 @@ func (s *schemaChange) Ops( s.dumpLogsOnce = &sync.Once{} - for i := 0; i < s.concurrency; i++ { + for i := 0; i < s.connFlags.Concurrency; i++ { opGeneratorParams := operationGeneratorParams{ seqNum: seqNum, diff --git a/pkg/workload/schemachange/type_resolver.go b/pkg/workload/schemachange/type_resolver.go index 8e8be208c35e..13dbb825941b 100644 --- a/pkg/workload/schemachange/type_resolver.go +++ b/pkg/workload/schemachange/type_resolver.go @@ -19,7 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/lib/pq/oid" ) diff --git a/pkg/workload/schemachange/watch_dog.go b/pkg/workload/schemachange/watch_dog.go index 4ed58f04f115..672220f566e9 100644 --- a/pkg/workload/schemachange/watch_dog.go +++ b/pkg/workload/schemachange/watch_dog.go @@ -17,8 +17,8 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) // schemaChangeWatchDog connection watch dog object. diff --git a/pkg/workload/sql_runner.go b/pkg/workload/sql_runner.go index 22980e901ace..8f579a69258c 100644 --- a/pkg/workload/sql_runner.go +++ b/pkg/workload/sql_runner.go @@ -13,11 +13,10 @@ package workload import ( "context" "fmt" - "strings" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" ) // SQLRunner is a helper for issuing SQL statements; it supports multiple @@ -49,24 +48,9 @@ type SQLRunner struct { // The fields below are set by Init. initialized bool - method method mcp *MultiConnPool } -type method int - -const ( - prepare method = iota - noprepare - simple -) - -var stringToMethod = map[string]method{ - "prepare": prepare, - "noprepare": noprepare, - "simple": simple, -} - // Define creates a handle for the given statement. The handle can be used after // Init is called. func (sr *SQLRunner) Define(sql string) StmtHandle { @@ -83,39 +67,13 @@ func (sr *SQLRunner) Define(sql string) StmtHandle { // // The name is used for naming prepared statements. Multiple workers that use // the same set of defined queries can and should use the same name. -// -// The way we issue queries is set by flags.Method: -// -// - "prepare": explicitly prepare the query once per connection, then we reuse -// it for each execution. This results in a Bind and Execute on the server -// each time we run a query (on the given connection). Note that it's -// important to prepare on separate connections if there are many parallel -// workers; this avoids lock contention in the sql.Rows objects they produce. -// See #30811. -// -// - "noprepare": each query is issued separately (on the given connection). -// This results in Parse, Bind, Execute on the server each time we run a -// query. The statement is an anonymous prepared statement; that is, the -// name is the empty string. -// -// - "simple": each query is issued in a single string; parameters are -// rendered inside the string. This results in a single SimpleExecute -// request to the server for each query. Note that only a few parameter types -// are supported. -func (sr *SQLRunner) Init( - ctx context.Context, name string, mcp *MultiConnPool, flags *ConnFlags, -) error { +func (sr *SQLRunner) Init(ctx context.Context, name string, mcp *MultiConnPool) error { if sr.initialized { panic("already initialized") } - var ok bool - sr.method, ok = stringToMethod[strings.ToLower(flags.Method)] - if !ok { - return errors.Errorf("unknown method %s", flags.Method) - } - - if sr.method == prepare { + switch mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: for i, s := range sr.stmts { stmtName := fmt.Sprintf("%s-%d", name, i+1) s.preparedName = stmtName @@ -152,19 +110,21 @@ type StmtHandle struct { // See pgx.Conn.Exec. func (h StmtHandle) Exec(ctx context.Context, args ...interface{}) (pgconn.CommandTag, error) { h.check() - p := h.s.sr.mcp.Get() - switch h.s.sr.method { - case prepare: - return p.Exec(ctx, h.s.preparedName, args...) + conn, err := h.s.sr.mcp.Get().Acquire(ctx) + if err != nil { + return pgconn.CommandTag{}, err + } + defer conn.Release() - case noprepare: - return p.Exec(ctx, h.s.sql, args...) + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: + return conn.Exec(ctx, h.s.preparedName, args...) - case simple: - return p.Exec(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: + return conn.Exec(ctx, h.s.sql, args...) default: - panic("invalid method") + return pgconn.CommandTag{}, errors.Errorf("unsupported method %q", h.s.sr.mcp.Method()) } } @@ -175,18 +135,15 @@ func (h StmtHandle) ExecTx( ctx context.Context, tx pgx.Tx, args ...interface{}, ) (pgconn.CommandTag, error) { h.check() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return tx.Exec(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return tx.Exec(ctx, h.s.sql, args...) - case simple: - return tx.Exec(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: - panic("invalid method") + return pgconn.CommandTag{}, errors.Errorf("unsupported method %q", h.s.sr.mcp.Method()) } } @@ -196,18 +153,15 @@ func (h StmtHandle) ExecTx( func (h StmtHandle) Query(ctx context.Context, args ...interface{}) (pgx.Rows, error) { h.check() p := h.s.sr.mcp.Get() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return p.Query(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return p.Query(ctx, h.s.sql, args...) - case simple: - return p.Query(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: - panic("invalid method") + return nil, errors.Errorf("unsupported method %q", h.s.sr.mcp.Method()) } } @@ -216,18 +170,15 @@ func (h StmtHandle) Query(ctx context.Context, args ...interface{}) (pgx.Rows, e // See pgx.Tx.Query. func (h StmtHandle) QueryTx(ctx context.Context, tx pgx.Tx, args ...interface{}) (pgx.Rows, error) { h.check() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return tx.Query(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return tx.Query(ctx, h.s.sql, args...) - case simple: - return tx.Query(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: - panic("invalid method") + return nil, errors.Errorf("unsupported method %q", h.s.sr.mcp.Method()) } } @@ -237,16 +188,13 @@ func (h StmtHandle) QueryTx(ctx context.Context, tx pgx.Tx, args ...interface{}) func (h StmtHandle) QueryRow(ctx context.Context, args ...interface{}) pgx.Row { h.check() p := h.s.sr.mcp.Get() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return p.QueryRow(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return p.QueryRow(ctx, h.s.sql, args...) - case simple: - return p.QueryRow(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: panic("invalid method") } @@ -258,30 +206,17 @@ func (h StmtHandle) QueryRow(ctx context.Context, args ...interface{}) pgx.Row { // See pgx.Conn.QueryRow. func (h StmtHandle) QueryRowTx(ctx context.Context, tx pgx.Tx, args ...interface{}) pgx.Row { h.check() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return tx.QueryRow(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return tx.QueryRow(ctx, h.s.sql, args...) - case simple: - return tx.QueryRow(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: panic("invalid method") } } -// prependQuerySimpleProtocol inserts pgx.QuerySimpleProtocol(true) at the -// beginning of the slice. It is based on -// https://github.com/golang/go/wiki/SliceTricks. -func prependQuerySimpleProtocol(args []interface{}) []interface{} { - args = append(args, pgx.QuerySimpleProtocol(true)) - copy(args[1:], args) - args[0] = pgx.QuerySimpleProtocol(true) - return args -} - // Appease the linter. var _ = StmtHandle.QueryRow diff --git a/pkg/workload/tpcc/BUILD.bazel b/pkg/workload/tpcc/BUILD.bazel index b0f1e704c2c5..76044879a329 100644 --- a/pkg/workload/tpcc/BUILD.bazel +++ b/pkg/workload/tpcc/BUILD.bazel @@ -32,11 +32,11 @@ go_library( "//pkg/workload", "//pkg/workload/histogram", "//pkg/workload/workloadimpl", - "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgx", + "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgxv5", "@com_github_cockroachdb_errors//:errors", "@com_github_codahale_hdrhistogram//:hdrhistogram", "@com_github_jackc_pgtype//:pgtype", - "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v5//:pgx", "@com_github_lib_pq//:pq", "@com_github_prometheus_client_golang//prometheus", "@com_github_prometheus_client_golang//prometheus/promauto", diff --git a/pkg/workload/tpcc/delivery.go b/pkg/workload/tpcc/delivery.go index f6a86f84e296..6f15b782d9ab 100644 --- a/pkg/workload/tpcc/delivery.go +++ b/pkg/workload/tpcc/delivery.go @@ -17,11 +17,11 @@ import ( "strings" "sync/atomic" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "golang.org/x/exp/rand" ) @@ -72,7 +72,7 @@ func createDelivery( WHERE ol_w_id = $1 AND ol_d_id = $2 AND ol_o_id = $3`, ) - if err := del.sr.Init(ctx, "delivery", mcp, config.connFlags); err != nil { + if err := del.sr.Init(ctx, "delivery", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/new_order.go b/pkg/workload/tpcc/new_order.go index 5956f760e007..0bd9435d335d 100644 --- a/pkg/workload/tpcc/new_order.go +++ b/pkg/workload/tpcc/new_order.go @@ -18,11 +18,11 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/lib/pq" "golang.org/x/exp/rand" ) @@ -123,7 +123,7 @@ func createNewOrder( VALUES ($1, $2, $3)`, ) - if err := n.sr.Init(ctx, "new-order", mcp, config.connFlags); err != nil { + if err := n.sr.Init(ctx, "new-order", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/order_status.go b/pkg/workload/tpcc/order_status.go index 8ac970d964d9..db2c6a82819a 100644 --- a/pkg/workload/tpcc/order_status.go +++ b/pkg/workload/tpcc/order_status.go @@ -15,13 +15,13 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/jackc/pgtype" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "golang.org/x/exp/rand" ) @@ -109,7 +109,7 @@ func createOrderStatus( WHERE ol_w_id = $1 AND ol_d_id = $2 AND ol_o_id = $3`, ) - if err := o.sr.Init(ctx, "order-status", mcp, config.connFlags); err != nil { + if err := o.sr.Init(ctx, "order-status", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/payment.go b/pkg/workload/tpcc/payment.go index e7ea325921ea..994a9ff3817f 100644 --- a/pkg/workload/tpcc/payment.go +++ b/pkg/workload/tpcc/payment.go @@ -16,12 +16,12 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "golang.org/x/exp/rand" ) @@ -142,7 +142,7 @@ func createPayment(ctx context.Context, config *tpcc, mcp *workload.MultiConnPoo VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, ) - if err := p.sr.Init(ctx, "payment", mcp, config.connFlags); err != nil { + if err := p.sr.Init(ctx, "payment", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/stock_level.go b/pkg/workload/tpcc/stock_level.go index 25fc709d604a..bd83e92de662 100644 --- a/pkg/workload/tpcc/stock_level.go +++ b/pkg/workload/tpcc/stock_level.go @@ -13,10 +13,10 @@ package tpcc import ( "context" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "golang.org/x/exp/rand" ) @@ -83,7 +83,7 @@ func createStockLevel( )`, ) - if err := s.sr.Init(ctx, "stock-level", mcp, config.connFlags); err != nil { + if err := s.sr.Init(ctx, "stock-level", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 1c18f8ed9315..bc17ba676290 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -27,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/cockroach/pkg/workload/workloadimpl" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/spf13/pflag" "golang.org/x/exp/rand" "golang.org/x/sync/errgroup" @@ -162,7 +162,6 @@ var tpccMeta = workload.Meta{ g := &tpcc{} g.flags.FlagSet = pflag.NewFlagSet(`tpcc`, pflag.ContinueOnError) g.flags.Meta = map[string]workload.FlagMeta{ - `db`: {RuntimeOnly: true}, `mix`: {RuntimeOnly: true}, `partitions`: {RuntimeOnly: true}, `client-partitions`: {RuntimeOnly: true}, @@ -765,12 +764,12 @@ func (w *tpcc) Ops( // We can't use a single MultiConnPool because we want to implement partition // affinity. Instead we have one MultiConnPool per server. - cfg := workload.MultiConnPoolCfg{ - MaxTotalConnections: (w.numConns + len(urls) - 1) / len(urls), // round up - // Limit the number of connections per pool (otherwise preparing statements - // at startup can be slow). - MaxConnsPerPool: 50, - } + cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags) + cfg.MaxTotalConnections = (w.numConns + len(urls) - 1) / len(urls) // round up + + // Limit the number of connections per pool (otherwise preparing statements at + // startup can be slow). + cfg.MaxConnsPerPool = w.connFlags.Concurrency fmt.Printf("Initializing %d connections...\n", w.numConns) dbs := make([]*workload.MultiConnPool, len(urls)) diff --git a/pkg/workload/ycsb/BUILD.bazel b/pkg/workload/ycsb/BUILD.bazel index fd9a7f4180a2..e43fba4f7495 100644 --- a/pkg/workload/ycsb/BUILD.bazel +++ b/pkg/workload/ycsb/BUILD.bazel @@ -21,11 +21,11 @@ go_library( "//pkg/util/timeutil", "//pkg/workload", "//pkg/workload/histogram", - "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgx", + "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgxv5", "@com_github_cockroachdb_errors//:errors", "@com_github_jackc_pgconn//:pgconn", - "@com_github_jackc_pgx_v4//:pgx", - "@com_github_jackc_pgx_v4//pgxpool", + "@com_github_jackc_pgx_v5//:pgx", + "@com_github_jackc_pgx_v5//pgxpool", "@com_github_spf13_pflag//:pflag", "@org_golang_x_exp//rand", ], diff --git a/pkg/workload/ycsb/ycsb.go b/pkg/workload/ycsb/ycsb.go index 3a00c785a17b..0237ee8808b6 100644 --- a/pkg/workload/ycsb/ycsb.go +++ b/pkg/workload/ycsb/ycsb.go @@ -21,7 +21,7 @@ import ( "strings" "sync/atomic" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -31,8 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "github.com/spf13/pflag" "golang.org/x/exp/rand" ) @@ -460,10 +460,8 @@ func (g *ycsb) Ops( if err != nil { return workload.QueryLoad{}, err } - pool, err := workload.NewMultiConnPool(ctx, workload.MultiConnPoolCfg{ - // We want number of connections = number of workers. - MaxTotalConnections: g.connFlags.Concurrency, - }, urls...) + cfg := workload.NewMultiConnPoolCfgFromFlags(g.connFlags) + pool, err := workload.NewMultiConnPool(ctx, cfg, urls...) if err != nil { return workload.QueryLoad{}, err }