diff --git a/DEPS.bzl b/DEPS.bzl index c3b323070410..66033e13aebc 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4809,10 +4809,10 @@ def go_deps(): name = "com_github_jackc_pgservicefile", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgservicefile", - sha256 = "8422a25b9d2b0be05c66ee1ccfdbaab144ce98f1ac678bc647064c560d4cd6e2", - strip_prefix = "github.com/jackc/pgservicefile@v0.0.0-20200714003250-2b9c44734f2b", + sha256 = "1f8bdf75b2a0d750e56c2a94b1d1b0b5be4b29d6df056aebd997162c29bfd8ab", + strip_prefix = "github.com/jackc/pgservicefile@v0.0.0-20221227161230-091c0ba34f0a", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20200714003250-2b9c44734f2b.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20221227161230-091c0ba34f0a.zip", ], ) go_repository( @@ -4839,10 +4839,10 @@ def go_deps(): name = "com_github_jackc_pgx_v5", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgx/v5", - sha256 = "e05b4284fb33e5c0c648b269070dedac6759711c411283177261228ab684f45f", - strip_prefix = "github.com/jackc/pgx/v5@v5.2.0", + sha256 = "e2f4a98f6b8716a6854d0a910c12c3527d35ff78ec5f2d16bf49f85601071bf0", + strip_prefix = "github.com/jackc/pgx/v5@v5.3.1", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.2.0.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.3.1.zip", ], ) go_repository( @@ -4859,10 +4859,10 @@ def go_deps(): name = "com_github_jackc_puddle_v2", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/puddle/v2", - sha256 = "73ea72b52d0a680442d535cf5d9a9713cb0803929c0b4a8e553eda47ee217c44", - strip_prefix = "github.com/jackc/puddle/v2@v2.1.2", + sha256 = "b99ea95df0c0298caf2be786c9eba511bfde2046eccfaa06e89b3e460ab406b0", + strip_prefix = "github.com/jackc/puddle/v2@v2.2.0", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.1.2.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.0.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 2f42131aae12..60fe83d7f748 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -631,12 +631,12 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgpassfile/com_github_jackc_pgpassfile-v1.0.0.zip": "1cc79fb0b80f54b568afd3f4648dd1c349f746ad7c379df8d7f9e0eb1cac938b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/com_github_jackc_pgproto3-v1.1.0.zip": "e3766bee50ed74e49a067b2c4797a2c69015cf104bf3f3624cd483a9e940b4ee", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.1.zip": "57884e299825af31fd01268659f1e671883b73b708a51230da14d6f8ee0e4e36", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20200714003250-2b9c44734f2b.zip": "8422a25b9d2b0be05c66ee1ccfdbaab144ce98f1ac678bc647064c560d4cd6e2", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20221227161230-091c0ba34f0a.zip": "1f8bdf75b2a0d750e56c2a94b1d1b0b5be4b29d6df056aebd997162c29bfd8ab", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgtype/com_github_jackc_pgtype-v1.11.0.zip": "6a257b81c0bd386d6241219a14ebd41d574a02aeaeb3942670c06441b864dcad", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v4/com_github_jackc_pgx_v4-v4.16.1.zip": "c3a169a68ff0e56f9f81eee4de4d2fd2a5ec7f4d6be159159325f4863c80bd10", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.2.0.zip": "e05b4284fb33e5c0c648b269070dedac6759711c411283177261228ab684f45f", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.3.1.zip": "e2f4a98f6b8716a6854d0a910c12c3527d35ff78ec5f2d16bf49f85601071bf0", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/com_github_jackc_puddle-v1.2.1.zip": "40d73550686666eb1f6df02b65008b2a4c98cfed1254dc4866e6ebe95fbc5c95", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.1.2.zip": "73ea72b52d0a680442d535cf5d9a9713cb0803929c0b4a8e553eda47ee217c44", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.0.zip": "b99ea95df0c0298caf2be786c9eba511bfde2046eccfaa06e89b3e460ab406b0", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jaegertracing/jaeger/com_github_jaegertracing_jaeger-v1.18.1.zip": "256a95b2a52a66494aca6d354224bb450ff38ce3ea1890af46a7c8dc39203891", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jcmturner/aescts/v2/com_github_jcmturner_aescts_v2-v2.0.0.zip": "717a211ad4aac248cf33cadde73059c13f8e9462123a0ab2fed5c5e61f7739d7", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jcmturner/dnsutils/v2/com_github_jcmturner_dnsutils_v2-v2.0.0.zip": "f9188186b672e547cfaef66107aa62d65054c5d4f10d4dcd1ff157d6bf8c275d", diff --git a/go.mod b/go.mod index 289141404fa0..376236d7822b 100644 --- a/go.mod +++ b/go.mod @@ -71,7 +71,7 @@ require ( github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.3.1 - github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.11.0 github.com/jackc/pgx/v4 v4.16.1 github.com/jackc/puddle v1.2.1 // indirect @@ -156,6 +156,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/guptarohit/asciigraph v0.5.5 github.com/irfansharif/recorder v0.0.0-20211218081646-a21b46510fd6 + github.com/jackc/pgx/v5 v5.3.1 github.com/jaegertracing/jaeger v1.18.1 github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible github.com/jordanlewis/gcassert v0.0.0-20221027203946-81f097ad35a0 @@ -314,6 +315,7 @@ require ( github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639 // indirect github.com/imdario/mergo v0.3.13 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect diff --git a/go.sum b/go.sum index 830802ebe01c..981a5371c37a 100644 --- a/go.sum +++ b/go.sum @@ -1377,8 +1377,9 @@ github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.3.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.3.1 h1:nwj7qwf0S+Q7ISFfBndqeLwSwxs+4DPsbRFjECT1Y4Y= github.com/jackc/pgproto3/v2 v2.3.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -1391,11 +1392,15 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.16.1 h1:JzTglcal01DrghUqt+PmzWsZx/Yh7SC/CTQmSBMTd0Y= github.com/jackc/pgx/v4 v4.16.1/go.mod h1:SIhx0D5hoADaiXZVyv+3gSm3LCIIINTVO0PficsvWGQ= +github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= +github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw= github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= +github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jaegertracing/jaeger v1.18.1 h1:eFqjEpTKq2FfiZ/YX53oxeCePdIZyWvDfXaTAGj0r5E= github.com/jaegertracing/jaeger v1.18.1/go.mod h1:WRzMFH62rje1VgbShlgk6UbWUNoo08uFFvs/x50aZKk= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= diff --git a/pkg/workload/BUILD.bazel b/pkg/workload/BUILD.bazel index 080484f07f8a..9f498cbf37b0 100644 --- a/pkg/workload/BUILD.bazel +++ b/pkg/workload/BUILD.bazel @@ -28,9 +28,10 @@ go_library( "//pkg/util/timeutil", "//pkg/workload/histogram", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgconn//:pgconn", - "@com_github_jackc_pgx_v4//:pgx", - "@com_github_jackc_pgx_v4//pgxpool", + "@com_github_jackc_pgx_v5//:pgx", + "@com_github_jackc_pgx_v5//pgconn", + "@com_github_jackc_pgx_v5//pgxpool", + "@com_github_jackc_pgx_v5//tracelog", "@com_github_lib_pq//:pq", "@com_github_spf13_pflag//:pflag", "@org_golang_x_sync//errgroup", diff --git a/pkg/workload/connection.go b/pkg/workload/connection.go index d97e96be1122..ca3dc4e701a5 100644 --- a/pkg/workload/connection.go +++ b/pkg/workload/connection.go @@ -15,6 +15,7 @@ import ( "net/url" "runtime" "strings" + "time" "github.com/spf13/pflag" ) @@ -24,8 +25,14 @@ type ConnFlags struct { *pflag.FlagSet DBOverride string Concurrency int - // Method for issuing queries; see SQLRunner. - Method string + Method string // Method for issuing queries; see SQLRunner. + + ConnHealthCheckPeriod time.Duration + MaxConnIdleTime time.Duration + MaxConnLifetime time.Duration + MaxConnLifetimeJitter time.Duration + MinConns int + WarmupConns int } // NewConnFlags returns an initialized ConnFlags. @@ -36,14 +43,35 @@ func NewConnFlags(genFlags *Flags) *ConnFlags { `Override for the SQL database to use. If empty, defaults to the generator name`) c.IntVar(&c.Concurrency, `concurrency`, 2*runtime.GOMAXPROCS(0), `Number of concurrent workers`) - c.StringVar(&c.Method, `method`, `prepare`, `SQL issue method (prepare, noprepare, simple)`) + c.StringVar(&c.Method, `method`, `cache_statement`, `SQL issue method (cache_statement, cache_describe, describe_exec, exec, simple_protocol)`) + c.DurationVar(&c.ConnHealthCheckPeriod, `conn-healthcheck-period`, 30*time.Second, `Interval that health checks are run on connections`) + c.IntVar(&c.MinConns, `min-conns`, 0, `Minimum number of connections to attempt to keep in the pool`) + c.DurationVar(&c.MaxConnIdleTime, `max-conn-idle-time`, 150*time.Second, `Max time an idle connection will be kept around`) + c.DurationVar(&c.MaxConnLifetime, `max-conn-lifetime`, 300*time.Second, `Max connection lifetime`) + c.DurationVar(&c.MaxConnLifetimeJitter, `max-conn-lifetime-jitter`, 150*time.Second, `Jitter max connection lifetime by this amount`) + c.IntVar(&c.WarmupConns, `warmup-conns`, 0, `Number of connections to warmup in each connection pool`) genFlags.AddFlagSet(c.FlagSet) if genFlags.Meta == nil { genFlags.Meta = make(map[string]FlagMeta) } - genFlags.Meta[`db`] = FlagMeta{RuntimeOnly: true} - genFlags.Meta[`concurrency`] = FlagMeta{RuntimeOnly: true} - genFlags.Meta[`method`] = FlagMeta{RuntimeOnly: true} + for _, k := range []string{ + `concurrency`, + `conn-healthcheck-period`, + `db`, + `max-conn-idle-time`, + `max-conn-lifetime-jitter`, + `max-conn-lifetime`, + `method`, + `min-conns`, + `warmup-conns`, + } { + v, ok := genFlags.Meta[k] + if !ok { + v = FlagMeta{} + } + v.RuntimeOnly = true + genFlags.Meta[k] = v + } return c } diff --git a/pkg/workload/connectionlatency/BUILD.bazel b/pkg/workload/connectionlatency/BUILD.bazel index 0ee0d706b864..87b652616098 100644 --- a/pkg/workload/connectionlatency/BUILD.bazel +++ b/pkg/workload/connectionlatency/BUILD.bazel @@ -11,7 +11,7 @@ go_library( "//pkg/util/timeutil", "//pkg/workload", "//pkg/workload/histogram", - "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v5//:pgx", "@com_github_spf13_pflag//:pflag", ], ) diff --git a/pkg/workload/connectionlatency/connectionlatency.go b/pkg/workload/connectionlatency/connectionlatency.go index b1358cf6c3a3..1975557a1ba9 100644 --- a/pkg/workload/connectionlatency/connectionlatency.go +++ b/pkg/workload/connectionlatency/connectionlatency.go @@ -19,7 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/spf13/pflag" ) diff --git a/pkg/workload/histogram/histogram.go b/pkg/workload/histogram/histogram.go index 22e9ed6043a7..ce32ff414021 100644 --- a/pkg/workload/histogram/histogram.go +++ b/pkg/workload/histogram/histogram.go @@ -73,6 +73,7 @@ func (w *Registry) newNamedHistogramLocked(name string) *NamedHistogram { // Record saves a new datapoint and should be called once per logical operation. func (w *NamedHistogram) Record(elapsed time.Duration) { w.prometheusHistogram.Observe(float64(elapsed.Nanoseconds()) / float64(time.Second)) + w.mu.Lock() maxLatency := time.Duration(w.mu.current.HighestTrackableValue()) if elapsed < minLatency { elapsed = minLatency @@ -80,7 +81,6 @@ func (w *NamedHistogram) Record(elapsed time.Duration) { elapsed = maxLatency } - w.mu.Lock() err := w.mu.current.RecordValue(elapsed.Nanoseconds()) w.mu.Unlock() diff --git a/pkg/workload/indexes/indexes.go b/pkg/workload/indexes/indexes.go index 12f80920877d..8df0849e5cc9 100644 --- a/pkg/workload/indexes/indexes.go +++ b/pkg/workload/indexes/indexes.go @@ -158,9 +158,8 @@ func (w *indexes) Ops( if err != nil { return workload.QueryLoad{}, err } - cfg := workload.MultiConnPoolCfg{ - MaxTotalConnections: w.connFlags.Concurrency + 1, - } + cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags) + cfg.MaxTotalConnections = w.connFlags.Concurrency + 1 mcp, err := workload.NewMultiConnPool(ctx, cfg, urls...) if err != nil { return workload.QueryLoad{}, err @@ -176,7 +175,7 @@ func (w *indexes) Ops( buf: make([]byte, w.payload), } op.stmt = op.sr.Define(stmt) - if err := op.sr.Init(ctx, "indexes", mcp, w.connFlags); err != nil { + if err := op.sr.Init(ctx, "indexes", mcp); err != nil { return workload.QueryLoad{}, err } ql.WorkerFns = append(ql.WorkerFns, op.run) diff --git a/pkg/workload/kv/BUILD.bazel b/pkg/workload/kv/BUILD.bazel index dd0308b1030e..b6730c84cca1 100644 --- a/pkg/workload/kv/BUILD.bazel +++ b/pkg/workload/kv/BUILD.bazel @@ -18,8 +18,8 @@ go_library( "//pkg/workload", "//pkg/workload/histogram", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgconn//:pgconn", - "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v5//:pgx", + "@com_github_jackc_pgx_v5//pgconn", "@com_github_spf13_pflag//:pflag", ], ) diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 6483d8b1bdc2..8fd5b92cec66 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -32,8 +32,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/spf13/pflag" ) @@ -347,9 +347,8 @@ func (w *kv) Ops( if err != nil { return workload.QueryLoad{}, err } - cfg := workload.MultiConnPoolCfg{ - MaxTotalConnections: w.connFlags.Concurrency + 1, - } + cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags) + cfg.MaxTotalConnections = w.connFlags.Concurrency + 1 mcp, err := workload.NewMultiConnPool(ctx, cfg, urls...) if err != nil { return workload.QueryLoad{}, err @@ -445,7 +444,7 @@ func (w *kv) Ops( } op.spanStmt = op.sr.Define(spanStmtStr) op.delStmt = op.sr.Define(delStmtStr) - if err := op.sr.Init(ctx, "kv", mcp, w.connFlags); err != nil { + if err := op.sr.Init(ctx, "kv", mcp); err != nil { return workload.QueryLoad{}, err } op.mcp = mcp @@ -557,9 +556,11 @@ func (o *kvOp) run(ctx context.Context) (retErr error) { // that each run call makes 1 attempt, so that rate limiting in workerRun // behaves as expected. var tx pgx.Tx - if tx, err = o.mcp.Get().Begin(ctx); err != nil { + tx, err := o.mcp.Get().BeginTx(ctx, pgx.TxOptions{}) + if err != nil { return err } + defer func() { rollbackErr := tx.Rollback(ctx) if !errors.Is(rollbackErr, pgx.ErrTxClosed) { diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index e19709707d25..714d7d99d72a 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -14,11 +14,14 @@ import ( "context" "strings" "sync/atomic" + "time" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" + "github.com/cockroachdb/errors" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/tracelog" "golang.org/x/sync/errgroup" ) @@ -33,6 +36,7 @@ type MultiConnPool struct { // preparedStatements is a map from name to SQL. The statements in the map // are prepared whenever a new connection is acquired from the pool. preparedStatements map[string]string + method pgx.QueryExecMode } } @@ -48,16 +52,85 @@ type MultiConnPoolCfg struct { // If 0, there is no per-pool maximum (other than the total maximum number of // connections which still applies). MaxConnsPerPool int + + // ConnHealthCheckPeriod specifies the amount of time between connection + // health checks. Defaults to 10% of MaxConnLifetime. + ConnHealthCheckPeriod time.Duration + + // MaxConnIdleTime specifies the amount of time a connection will be idle + // before being closed by the health checker. Defaults to 50% of the + // MaxConnLifetime. + MaxConnIdleTime time.Duration + + // MaxConnLifetime specifies the max age of individual connections in + // connection pools. If 0, a default value of 5 minutes is used. + MaxConnLifetime time.Duration + + // MaxConnLifetimeJitter shortens the max age of a connection by a random + // duration less than the specified jitter. If 0, default to 50% of + // MaxConnLifetime. + MaxConnLifetimeJitter time.Duration + + // Method specifies the query type to use for the PG wire protocol. + Method string + + // MinConns is the minimum number of connections the connection pool will + // attempt to keep. Connection count may dip below this value periodically, + // see pgxpool documentation for details. + MinConns int + + // WarmupConns specifies the number of connections to prewarm when + // initializing a MultiConnPool. A value of 0 automatically initialize the + // max number of connections per pool. A value less than 0 skips the + // connection warmup phase. + WarmupConns int + + // LogLevel specifies the log level (default: warn) + LogLevel tracelog.LogLevel +} + +// NewMultiConnPoolCfgFromFlags constructs a new MultiConnPoolCfg object based +// on the connection flags. +func NewMultiConnPoolCfgFromFlags(cf *ConnFlags) MultiConnPoolCfg { + return MultiConnPoolCfg{ + ConnHealthCheckPeriod: cf.ConnHealthCheckPeriod, + MaxConnIdleTime: cf.MaxConnIdleTime, + MaxConnLifetime: cf.MaxConnLifetime, + MaxConnLifetimeJitter: cf.MaxConnLifetimeJitter, + MaxConnsPerPool: cf.Concurrency, + MaxTotalConnections: cf.Concurrency, + Method: cf.Method, + MinConns: cf.MinConns, + WarmupConns: cf.WarmupConns, + } +} + +// String values taken from pgx.ParseConfigWithOptions() to maintain +// compatibility with pgx. See [1] and [2] for additional details. +// +// [1] https://github.com/jackc/pgx/blob/fa5fbed497bc75acee05c1667a8760ce0d634cba/conn.go#L167-L182 +// [2] https://github.com/jackc/pgx/blob/fa5fbed497bc75acee05c1667a8760ce0d634cba/conn.go#L578-L612 +var stringToMethod = map[string]pgx.QueryExecMode{ + "cache_statement": pgx.QueryExecModeCacheStatement, + "cache_describe": pgx.QueryExecModeCacheDescribe, + "describe_exec": pgx.QueryExecModeDescribeExec, + "exec": pgx.QueryExecModeExec, + "simple_protocol": pgx.QueryExecModeSimpleProtocol, + + // Preserve backward compatibility with original workload --method's + "prepare": pgx.QueryExecModeCacheStatement, + "noprepare": pgx.QueryExecModeExec, + "simple": pgx.QueryExecModeSimpleProtocol, } // pgxLogger implements the pgx.Logger interface. type pgxLogger struct{} -var _ pgx.Logger = pgxLogger{} +var _ tracelog.Logger = pgxLogger{} // Log implements the pgx.Logger interface. func (p pgxLogger) Log( - ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}, + ctx context.Context, level tracelog.LogLevel, msg string, data map[string]interface{}, ) { if ctx.Err() != nil { // Don't log anything from pgx if the context was canceled by the workload @@ -76,7 +149,7 @@ func (p pgxLogger) Log( return } } - log.Infof(ctx, "pgx logger [%s]: %s logParams=%v", level.String(), msg, data) + log.VInfof(ctx, log.Level(level), "pgx logger [%s]: %s logParams=%v", level.String(), msg, data) } // NewMultiConnPool creates a new MultiConnPool. @@ -92,27 +165,63 @@ func NewMultiConnPool( m := &MultiConnPool{} m.mu.preparedStatements = map[string]string{} + logLevel := tracelog.LogLevelWarn + if cfg.LogLevel != 0 { + logLevel = cfg.LogLevel + } + maxConnLifetime := 300 * time.Second + if cfg.MaxConnLifetime > 0 { + maxConnLifetime = cfg.MaxConnLifetime + } + maxConnLifetimeJitter := time.Duration(0.5 * float64(maxConnLifetime)) + if cfg.MaxConnLifetimeJitter > 0 { + maxConnLifetimeJitter = cfg.MaxConnLifetimeJitter + } + connHealthCheckPeriod := time.Duration(0.1 * float64(maxConnLifetime)) + if cfg.ConnHealthCheckPeriod > 0 { + connHealthCheckPeriod = cfg.ConnHealthCheckPeriod + } + maxConnIdleTime := time.Duration(0.5 * float64(maxConnLifetime)) + if cfg.MaxConnIdleTime > 0 { + maxConnIdleTime = cfg.MaxConnIdleTime + } + minConns := 0 + if cfg.MinConns > 0 { + minConns = cfg.MinConns + } + connsPerURL := distribute(cfg.MaxTotalConnections, len(urls)) maxConnsPerPool := cfg.MaxConnsPerPool if maxConnsPerPool == 0 { maxConnsPerPool = cfg.MaxTotalConnections } - var warmupConns [][]*pgxpool.Conn + // See + // https://github.com/jackc/pgx/blob/fa5fbed497bc75acee05c1667a8760ce0d634cba/conn.go#L578-L612 + // for details on the specifics of each query mode. + queryMode, ok := stringToMethod[strings.ToLower(cfg.Method)] + if !ok { + return nil, errors.Errorf("unknown method %s", cfg.Method) + } + m.mu.method = queryMode + for i := range urls { connsPerPool := distributeMax(connsPerURL[i], maxConnsPerPool) for _, numConns := range connsPerPool { - connCfg, err := pgxpool.ParseConfig(urls[i]) + poolCfg, err := pgxpool.ParseConfig(urls[i]) if err != nil { return nil, err } - // Disable the automatic prepared statement cache. We've seen a lot of - // churn in this cache since workloads create many of different queries. - connCfg.ConnConfig.BuildStatementCache = nil - connCfg.ConnConfig.LogLevel = pgx.LogLevelWarn - connCfg.ConnConfig.Logger = pgxLogger{} - connCfg.MaxConns = int32(numConns) - connCfg.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + poolCfg.HealthCheckPeriod = connHealthCheckPeriod + poolCfg.MaxConnLifetime = maxConnLifetime + poolCfg.MaxConnLifetimeJitter = maxConnLifetimeJitter + poolCfg.MaxConnIdleTime = maxConnIdleTime + poolCfg.MaxConns = int32(numConns) + if minConns > numConns { + minConns = numConns + } + poolCfg.MinConns = int32(minConns) + poolCfg.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { m.mu.RLock() defer m.mu.RUnlock() for name, sql := range m.mu.preparedStatements { @@ -126,51 +235,25 @@ func NewMultiConnPool( } return true } - p, err := pgxpool.ConnectConfig(ctx, connCfg) + + connCfg := poolCfg.ConnConfig + connCfg.DefaultQueryExecMode = queryMode + connCfg.Tracer = &tracelog.TraceLog{ + Logger: &pgxLogger{}, + LogLevel: logLevel, + } + p, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return nil, err } - warmupConns = append(warmupConns, make([]*pgxpool.Conn, numConns)) m.Pools = append(m.Pools, p) } } - // "Warm up" the pools so we don't have to establish connections later (which - // would affect the observed latencies of the first requests, especially when - // prepared statements are used). We do this by - // acquiring connections (in parallel), then releasing them back to the - // pool. - var g errgroup.Group - // Limit concurrent connection establishment. Allowing this to run - // at maximum parallelism would trigger syn flood protection on the - // host, which combined with any packet loss could cause Acquire to - // return an error and fail the whole function. The value 100 is - // chosen because it is less than the default value for SOMAXCONN - // (128). - sem := make(chan struct{}, 100) - for i, p := range m.Pools { - p := p - conns := warmupConns[i] - for j := range conns { - j := j - sem <- struct{}{} - g.Go(func() error { - var err error - conns[j], err = p.Acquire(ctx) - <-sem - return err - }) - } - } - if err := g.Wait(); err != nil { + if err := m.WarmupConns(ctx, cfg.WarmupConns); err != nil { return nil, err } - for i := range m.Pools { - for _, c := range warmupConns[i] { - c.Release() - } - } return m, nil } @@ -186,11 +269,13 @@ func (m *MultiConnPool) AddPreparedStatement(name string, statement string) { // Get returns one of the pools, in round-robin manner. func (m *MultiConnPool) Get() *pgxpool.Pool { - if len(m.Pools) == 1 { + numPools := uint32(len(m.Pools)) + if numPools == 1 { return m.Pools[0] } i := atomic.AddUint32(&m.counter, 1) - 1 - return m.Pools[i%uint32(len(m.Pools))] + + return m.Pools[i%numPools] } // Close closes all the pools. @@ -200,6 +285,92 @@ func (m *MultiConnPool) Close() { } } +// Method returns the query execution mode of the connection pool. +func (m *MultiConnPool) Method() pgx.QueryExecMode { + m.mu.Lock() + defer m.mu.Unlock() + return m.mu.method +} + +// WarmupConns warms up numConns connections across all pools contained within +// MultiConnPool. The max number of connections are warmed up if numConns is +// set to 0. +func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error { + if numConns < 0 { + return nil + } + + // NOTE(seanc@): see context cancellation note below. + warmupCtx, cancel := context.WithCancel(ctx) + defer cancel() + + var numWarmupConns int + if numConns == 0 { + for _, p := range m.Pools { + numWarmupConns += int(p.Config().MaxConns) + } + } else { + numWarmupConns = numConns + } + + // "Warm up" the pools so we don't have to establish connections later (which + // would affect the observed latencies of the first requests, especially when + // prepared statements are used). We do this by + // acquiring connections (in parallel), then releasing them back to the + // pool. + var g errgroup.Group + + // Limit concurrent connection establishment. Allowing this to run + // at maximum parallelism would trigger syn flood protection on the + // host, which combined with any packet loss could cause Acquire to + // return an error and fail the whole function. The value 100 is + // chosen because it is less than the default value for SOMAXCONN + // (128). + g.SetLimit(100) + + warmupConns := make(chan *pgxpool.Conn, numWarmupConns) + connsPerURL := distribute(numWarmupConns, len(m.Pools)) + for i, p := range m.Pools { + p := p + maxConns := connsPerURL[i] + for k := 0; k < maxConns && k < int(p.Config().MaxConns); k++ { + g.Go(func() error { + conn, err := p.Acquire(warmupCtx) + if err != nil { + return err + } + warmupConns <- conn + return nil + }) + } + } + + estConns := make([]*pgxpool.Conn, 0, numWarmupConns) + defer func() { + for _, conn := range estConns { + // NOTE(seanc@): Release() connections before canceling the warmupCtx to + // prevent partially established connections from being Acquire()'ed. + conn.Release() + } + }() + +WARMUP: + for i := 0; i < numWarmupConns; i++ { + select { + case conn := <-warmupConns: + estConns = append(estConns, conn) + case <-warmupCtx.Done(): + if err := warmupCtx.Err(); err != nil { + return err + } + + break WARMUP + } + } + + return nil +} + // distribute returns a slice of integers that add up to and are // within +/-1 of each other. func distribute(total, num int) []int { diff --git a/pkg/workload/querylog/BUILD.bazel b/pkg/workload/querylog/BUILD.bazel index 66f32b8adb92..cb2fbe0530af 100644 --- a/pkg/workload/querylog/BUILD.bazel +++ b/pkg/workload/querylog/BUILD.bazel @@ -17,7 +17,7 @@ go_library( "//pkg/workload/histogram", "//pkg/workload/rand", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v5//:pgx", "@com_github_lib_pq//oid", "@com_github_spf13_pflag//:pflag", ], diff --git a/pkg/workload/querylog/querylog.go b/pkg/workload/querylog/querylog.go index 2ead08dfd7e2..aef7aa4c3a60 100644 --- a/pkg/workload/querylog/querylog.go +++ b/pkg/workload/querylog/querylog.go @@ -34,7 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload/histogram" workloadrand "github.com/cockroachdb/cockroach/pkg/workload/rand" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/lib/pq/oid" "github.com/spf13/pflag" ) diff --git a/pkg/workload/schemachange/BUILD.bazel b/pkg/workload/schemachange/BUILD.bazel index 3a6da0648bde..aef5564a3f5e 100644 --- a/pkg/workload/schemachange/BUILD.bazel +++ b/pkg/workload/schemachange/BUILD.bazel @@ -35,9 +35,9 @@ go_library( "//pkg/workload", "//pkg/workload/histogram", "@com_github_cockroachdb_errors//:errors", - "@com_github_jackc_pgconn//:pgconn", - "@com_github_jackc_pgx_v4//:pgx", - "@com_github_jackc_pgx_v4//pgxpool", + "@com_github_jackc_pgx_v5//:pgx", + "@com_github_jackc_pgx_v5//pgconn", + "@com_github_jackc_pgx_v5//pgxpool", "@com_github_lib_pq//oid", "@com_github_spf13_pflag//:pflag", ], diff --git a/pkg/workload/schemachange/error_screening.go b/pkg/workload/schemachange/error_screening.go index 4f0eeadb5b75..a6e7e0e849e1 100644 --- a/pkg/workload/schemachange/error_screening.go +++ b/pkg/workload/schemachange/error_screening.go @@ -21,8 +21,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" ) func (og *operationGenerator) tableExists( diff --git a/pkg/workload/schemachange/operation_generator.go b/pkg/workload/schemachange/operation_generator.go index b061af7b047a..28698d62db73 100644 --- a/pkg/workload/schemachange/operation_generator.go +++ b/pkg/workload/schemachange/operation_generator.go @@ -34,8 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" ) // seqNum may be shared across multiple instances of this, so it should only diff --git a/pkg/workload/schemachange/schemachange.go b/pkg/workload/schemachange/schemachange.go index dfd6046ecd0b..b28f0cbb6281 100644 --- a/pkg/workload/schemachange/schemachange.go +++ b/pkg/workload/schemachange/schemachange.go @@ -20,7 +20,6 @@ import ( "math/rand" "os" "regexp" - "runtime" "sync" "time" @@ -30,8 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/spf13/pflag" ) @@ -67,8 +66,8 @@ const ( type schemaChange struct { flags workload.Flags + connFlags *workload.ConnFlags dbOverride string - concurrency int maxOpsPerWorker int errorRate int enumPct int @@ -95,8 +94,6 @@ var schemaChangeMeta = workload.Meta{ s.flags.FlagSet = pflag.NewFlagSet(`schemachange`, pflag.ContinueOnError) s.flags.StringVar(&s.dbOverride, `db`, ``, `Override for the SQL database to use. If empty, defaults to the generator name`) - s.flags.IntVar(&s.concurrency, `concurrency`, 2*runtime.GOMAXPROCS(0), /* TODO(spaskob): sensible default? */ - `Number of concurrent workers`) s.flags.IntVar(&s.maxOpsPerWorker, `max-ops-per-worker`, defaultMaxOpsPerWorker, `Number of operations to execute in a single transaction`) s.flags.IntVar(&s.errorRate, `error-rate`, defaultErrorRate, @@ -122,6 +119,7 @@ var schemaChangeMeta = workload.Meta{ defaultDeclarativeSchemaMaxStmtsPerTxn, `Number of statements per-txn used by the declarative schema changer.`) + s.connFlags = workload.NewConnFlags(&s.flags) return s }, } @@ -162,9 +160,7 @@ func (s *schemaChange) Ops( if err != nil { return workload.QueryLoad{}, err } - cfg := workload.MultiConnPoolCfg{ - MaxTotalConnections: s.concurrency * 2, //TODO(spaskob): pick a sensible default. - } + cfg := workload.NewMultiConnPoolCfgFromFlags(s.connFlags) pool, err := workload.NewMultiConnPool(ctx, cfg, urls...) if err != nil { return workload.QueryLoad{}, err @@ -204,7 +200,7 @@ func (s *schemaChange) Ops( s.dumpLogsOnce = &sync.Once{} - for i := 0; i < s.concurrency; i++ { + for i := 0; i < s.connFlags.Concurrency; i++ { opGeneratorParams := operationGeneratorParams{ seqNum: seqNum, diff --git a/pkg/workload/schemachange/type_resolver.go b/pkg/workload/schemachange/type_resolver.go index 8e8be208c35e..13dbb825941b 100644 --- a/pkg/workload/schemachange/type_resolver.go +++ b/pkg/workload/schemachange/type_resolver.go @@ -19,7 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/lib/pq/oid" ) diff --git a/pkg/workload/schemachange/watch_dog.go b/pkg/workload/schemachange/watch_dog.go index 4ed58f04f115..672220f566e9 100644 --- a/pkg/workload/schemachange/watch_dog.go +++ b/pkg/workload/schemachange/watch_dog.go @@ -17,8 +17,8 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) // schemaChangeWatchDog connection watch dog object. diff --git a/pkg/workload/sql_runner.go b/pkg/workload/sql_runner.go index 22980e901ace..8f579a69258c 100644 --- a/pkg/workload/sql_runner.go +++ b/pkg/workload/sql_runner.go @@ -13,11 +13,10 @@ package workload import ( "context" "fmt" - "strings" "github.com/cockroachdb/errors" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" ) // SQLRunner is a helper for issuing SQL statements; it supports multiple @@ -49,24 +48,9 @@ type SQLRunner struct { // The fields below are set by Init. initialized bool - method method mcp *MultiConnPool } -type method int - -const ( - prepare method = iota - noprepare - simple -) - -var stringToMethod = map[string]method{ - "prepare": prepare, - "noprepare": noprepare, - "simple": simple, -} - // Define creates a handle for the given statement. The handle can be used after // Init is called. func (sr *SQLRunner) Define(sql string) StmtHandle { @@ -83,39 +67,13 @@ func (sr *SQLRunner) Define(sql string) StmtHandle { // // The name is used for naming prepared statements. Multiple workers that use // the same set of defined queries can and should use the same name. -// -// The way we issue queries is set by flags.Method: -// -// - "prepare": explicitly prepare the query once per connection, then we reuse -// it for each execution. This results in a Bind and Execute on the server -// each time we run a query (on the given connection). Note that it's -// important to prepare on separate connections if there are many parallel -// workers; this avoids lock contention in the sql.Rows objects they produce. -// See #30811. -// -// - "noprepare": each query is issued separately (on the given connection). -// This results in Parse, Bind, Execute on the server each time we run a -// query. The statement is an anonymous prepared statement; that is, the -// name is the empty string. -// -// - "simple": each query is issued in a single string; parameters are -// rendered inside the string. This results in a single SimpleExecute -// request to the server for each query. Note that only a few parameter types -// are supported. -func (sr *SQLRunner) Init( - ctx context.Context, name string, mcp *MultiConnPool, flags *ConnFlags, -) error { +func (sr *SQLRunner) Init(ctx context.Context, name string, mcp *MultiConnPool) error { if sr.initialized { panic("already initialized") } - var ok bool - sr.method, ok = stringToMethod[strings.ToLower(flags.Method)] - if !ok { - return errors.Errorf("unknown method %s", flags.Method) - } - - if sr.method == prepare { + switch mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: for i, s := range sr.stmts { stmtName := fmt.Sprintf("%s-%d", name, i+1) s.preparedName = stmtName @@ -152,19 +110,21 @@ type StmtHandle struct { // See pgx.Conn.Exec. func (h StmtHandle) Exec(ctx context.Context, args ...interface{}) (pgconn.CommandTag, error) { h.check() - p := h.s.sr.mcp.Get() - switch h.s.sr.method { - case prepare: - return p.Exec(ctx, h.s.preparedName, args...) + conn, err := h.s.sr.mcp.Get().Acquire(ctx) + if err != nil { + return pgconn.CommandTag{}, err + } + defer conn.Release() - case noprepare: - return p.Exec(ctx, h.s.sql, args...) + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: + return conn.Exec(ctx, h.s.preparedName, args...) - case simple: - return p.Exec(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: + return conn.Exec(ctx, h.s.sql, args...) default: - panic("invalid method") + return pgconn.CommandTag{}, errors.Errorf("unsupported method %q", h.s.sr.mcp.Method()) } } @@ -175,18 +135,15 @@ func (h StmtHandle) ExecTx( ctx context.Context, tx pgx.Tx, args ...interface{}, ) (pgconn.CommandTag, error) { h.check() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return tx.Exec(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return tx.Exec(ctx, h.s.sql, args...) - case simple: - return tx.Exec(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: - panic("invalid method") + return pgconn.CommandTag{}, errors.Errorf("unsupported method %q", h.s.sr.mcp.Method()) } } @@ -196,18 +153,15 @@ func (h StmtHandle) ExecTx( func (h StmtHandle) Query(ctx context.Context, args ...interface{}) (pgx.Rows, error) { h.check() p := h.s.sr.mcp.Get() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return p.Query(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return p.Query(ctx, h.s.sql, args...) - case simple: - return p.Query(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: - panic("invalid method") + return nil, errors.Errorf("unsupported method %q", h.s.sr.mcp.Method()) } } @@ -216,18 +170,15 @@ func (h StmtHandle) Query(ctx context.Context, args ...interface{}) (pgx.Rows, e // See pgx.Tx.Query. func (h StmtHandle) QueryTx(ctx context.Context, tx pgx.Tx, args ...interface{}) (pgx.Rows, error) { h.check() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return tx.Query(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return tx.Query(ctx, h.s.sql, args...) - case simple: - return tx.Query(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: - panic("invalid method") + return nil, errors.Errorf("unsupported method %q", h.s.sr.mcp.Method()) } } @@ -237,16 +188,13 @@ func (h StmtHandle) QueryTx(ctx context.Context, tx pgx.Tx, args ...interface{}) func (h StmtHandle) QueryRow(ctx context.Context, args ...interface{}) pgx.Row { h.check() p := h.s.sr.mcp.Get() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return p.QueryRow(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return p.QueryRow(ctx, h.s.sql, args...) - case simple: - return p.QueryRow(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: panic("invalid method") } @@ -258,30 +206,17 @@ func (h StmtHandle) QueryRow(ctx context.Context, args ...interface{}) pgx.Row { // See pgx.Conn.QueryRow. func (h StmtHandle) QueryRowTx(ctx context.Context, tx pgx.Tx, args ...interface{}) pgx.Row { h.check() - switch h.s.sr.method { - case prepare: + switch h.s.sr.mcp.Method() { + case pgx.QueryExecModeCacheStatement, pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: return tx.QueryRow(ctx, h.s.preparedName, args...) - case noprepare: + case pgx.QueryExecModeSimpleProtocol, pgx.QueryExecModeExec: return tx.QueryRow(ctx, h.s.sql, args...) - case simple: - return tx.QueryRow(ctx, h.s.sql, prependQuerySimpleProtocol(args)...) - default: panic("invalid method") } } -// prependQuerySimpleProtocol inserts pgx.QuerySimpleProtocol(true) at the -// beginning of the slice. It is based on -// https://github.com/golang/go/wiki/SliceTricks. -func prependQuerySimpleProtocol(args []interface{}) []interface{} { - args = append(args, pgx.QuerySimpleProtocol(true)) - copy(args[1:], args) - args[0] = pgx.QuerySimpleProtocol(true) - return args -} - // Appease the linter. var _ = StmtHandle.QueryRow diff --git a/pkg/workload/tpcc/BUILD.bazel b/pkg/workload/tpcc/BUILD.bazel index b0f1e704c2c5..76044879a329 100644 --- a/pkg/workload/tpcc/BUILD.bazel +++ b/pkg/workload/tpcc/BUILD.bazel @@ -32,11 +32,11 @@ go_library( "//pkg/workload", "//pkg/workload/histogram", "//pkg/workload/workloadimpl", - "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgx", + "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgxv5", "@com_github_cockroachdb_errors//:errors", "@com_github_codahale_hdrhistogram//:hdrhistogram", "@com_github_jackc_pgtype//:pgtype", - "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v5//:pgx", "@com_github_lib_pq//:pq", "@com_github_prometheus_client_golang//prometheus", "@com_github_prometheus_client_golang//prometheus/promauto", diff --git a/pkg/workload/tpcc/delivery.go b/pkg/workload/tpcc/delivery.go index f6a86f84e296..6f15b782d9ab 100644 --- a/pkg/workload/tpcc/delivery.go +++ b/pkg/workload/tpcc/delivery.go @@ -17,11 +17,11 @@ import ( "strings" "sync/atomic" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "golang.org/x/exp/rand" ) @@ -72,7 +72,7 @@ func createDelivery( WHERE ol_w_id = $1 AND ol_d_id = $2 AND ol_o_id = $3`, ) - if err := del.sr.Init(ctx, "delivery", mcp, config.connFlags); err != nil { + if err := del.sr.Init(ctx, "delivery", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/new_order.go b/pkg/workload/tpcc/new_order.go index 5956f760e007..0bd9435d335d 100644 --- a/pkg/workload/tpcc/new_order.go +++ b/pkg/workload/tpcc/new_order.go @@ -18,11 +18,11 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/lib/pq" "golang.org/x/exp/rand" ) @@ -123,7 +123,7 @@ func createNewOrder( VALUES ($1, $2, $3)`, ) - if err := n.sr.Init(ctx, "new-order", mcp, config.connFlags); err != nil { + if err := n.sr.Init(ctx, "new-order", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/order_status.go b/pkg/workload/tpcc/order_status.go index 8ac970d964d9..db2c6a82819a 100644 --- a/pkg/workload/tpcc/order_status.go +++ b/pkg/workload/tpcc/order_status.go @@ -15,13 +15,13 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/jackc/pgtype" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "golang.org/x/exp/rand" ) @@ -109,7 +109,7 @@ func createOrderStatus( WHERE ol_w_id = $1 AND ol_d_id = $2 AND ol_o_id = $3`, ) - if err := o.sr.Init(ctx, "order-status", mcp, config.connFlags); err != nil { + if err := o.sr.Init(ctx, "order-status", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/payment.go b/pkg/workload/tpcc/payment.go index e7ea325921ea..994a9ff3817f 100644 --- a/pkg/workload/tpcc/payment.go +++ b/pkg/workload/tpcc/payment.go @@ -16,12 +16,12 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "golang.org/x/exp/rand" ) @@ -142,7 +142,7 @@ func createPayment(ctx context.Context, config *tpcc, mcp *workload.MultiConnPoo VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, ) - if err := p.sr.Init(ctx, "payment", mcp, config.connFlags); err != nil { + if err := p.sr.Init(ctx, "payment", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/stock_level.go b/pkg/workload/tpcc/stock_level.go index 25fc709d604a..bd83e92de662 100644 --- a/pkg/workload/tpcc/stock_level.go +++ b/pkg/workload/tpcc/stock_level.go @@ -13,10 +13,10 @@ package tpcc import ( "context" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "golang.org/x/exp/rand" ) @@ -83,7 +83,7 @@ func createStockLevel( )`, ) - if err := s.sr.Init(ctx, "stock-level", mcp, config.connFlags); err != nil { + if err := s.sr.Init(ctx, "stock-level", mcp); err != nil { return nil, err } diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 1c18f8ed9315..bc17ba676290 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -27,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/cockroach/pkg/workload/workloadimpl" "github.com/cockroachdb/errors" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" "github.com/spf13/pflag" "golang.org/x/exp/rand" "golang.org/x/sync/errgroup" @@ -162,7 +162,6 @@ var tpccMeta = workload.Meta{ g := &tpcc{} g.flags.FlagSet = pflag.NewFlagSet(`tpcc`, pflag.ContinueOnError) g.flags.Meta = map[string]workload.FlagMeta{ - `db`: {RuntimeOnly: true}, `mix`: {RuntimeOnly: true}, `partitions`: {RuntimeOnly: true}, `client-partitions`: {RuntimeOnly: true}, @@ -765,12 +764,12 @@ func (w *tpcc) Ops( // We can't use a single MultiConnPool because we want to implement partition // affinity. Instead we have one MultiConnPool per server. - cfg := workload.MultiConnPoolCfg{ - MaxTotalConnections: (w.numConns + len(urls) - 1) / len(urls), // round up - // Limit the number of connections per pool (otherwise preparing statements - // at startup can be slow). - MaxConnsPerPool: 50, - } + cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags) + cfg.MaxTotalConnections = (w.numConns + len(urls) - 1) / len(urls) // round up + + // Limit the number of connections per pool (otherwise preparing statements at + // startup can be slow). + cfg.MaxConnsPerPool = w.connFlags.Concurrency fmt.Printf("Initializing %d connections...\n", w.numConns) dbs := make([]*workload.MultiConnPool, len(urls)) diff --git a/pkg/workload/ycsb/BUILD.bazel b/pkg/workload/ycsb/BUILD.bazel index fd9a7f4180a2..e43fba4f7495 100644 --- a/pkg/workload/ycsb/BUILD.bazel +++ b/pkg/workload/ycsb/BUILD.bazel @@ -21,11 +21,11 @@ go_library( "//pkg/util/timeutil", "//pkg/workload", "//pkg/workload/histogram", - "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgx", + "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgxv5", "@com_github_cockroachdb_errors//:errors", "@com_github_jackc_pgconn//:pgconn", - "@com_github_jackc_pgx_v4//:pgx", - "@com_github_jackc_pgx_v4//pgxpool", + "@com_github_jackc_pgx_v5//:pgx", + "@com_github_jackc_pgx_v5//pgxpool", "@com_github_spf13_pflag//:pflag", "@org_golang_x_exp//rand", ], diff --git a/pkg/workload/ycsb/ycsb.go b/pkg/workload/ycsb/ycsb.go index 3a00c785a17b..0237ee8808b6 100644 --- a/pkg/workload/ycsb/ycsb.go +++ b/pkg/workload/ycsb/ycsb.go @@ -21,7 +21,7 @@ import ( "strings" "sync/atomic" - "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -31,8 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "github.com/spf13/pflag" "golang.org/x/exp/rand" ) @@ -460,10 +460,8 @@ func (g *ycsb) Ops( if err != nil { return workload.QueryLoad{}, err } - pool, err := workload.NewMultiConnPool(ctx, workload.MultiConnPoolCfg{ - // We want number of connections = number of workers. - MaxTotalConnections: g.connFlags.Concurrency, - }, urls...) + cfg := workload.NewMultiConnPoolCfgFromFlags(g.connFlags) + pool, err := workload.NewMultiConnPool(ctx, cfg, urls...) if err != nil { return workload.QueryLoad{}, err }