From b43590a0fc6a83f386c55e23db1fc41eae22dd2c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 6 Oct 2023 13:20:59 -0400 Subject: [PATCH 1/2] roachtest: fix read committed variant of ycsb This was broken by 53ed1cd2. Epic: None Release note: None --- pkg/cmd/roachtest/tests/ycsb.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/tests/ycsb.go b/pkg/cmd/roachtest/tests/ycsb.go index 14b60e7c2560..2871440f264d 100644 --- a/pkg/cmd/roachtest/tests/ycsb.go +++ b/pkg/cmd/roachtest/tests/ycsb.go @@ -12,6 +12,7 @@ package tests import ( "context" + gosql "database/sql" "fmt" "os" @@ -69,8 +70,13 @@ func registerYCSB(r registry.Registry) { c.Put(ctx, t.Cockroach(), "./cockroach", c.Range(1, nodes)) c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(nodes+1)) c.Start(ctx, t.L(), option.DefaultStartOptsNoBackups(), settings, c.Range(1, nodes)) - err := WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), 1)) + + db := c.Conn(ctx, t.L(), 1) + err := enableIsolationLevels(ctx, t, db) + require.NoError(t, err) + err = WaitFor3XReplication(ctx, t, db) require.NoError(t, err) + require.NoError(t, db.Close()) t.Status("running workload") m := c.NewMonitor(ctx, c.Range(1, nodes)) @@ -163,3 +169,15 @@ func registerYCSB(r registry.Registry) { } } } + +func enableIsolationLevels(ctx context.Context, t test.Test, db *gosql.DB) error { + for _, cmd := range []string{ + `SET CLUSTER SETTING sql.txn.snapshot_isolation_syntax.enabled = 'true';`, + `SET CLUSTER SETTING sql.txn.read_committed_syntax.enabled = 'true';`, + } { + if _, err := db.ExecContext(ctx, cmd); err != nil { + return err + } + } + return nil +} From 889aab9eb487b882c7703fc7d88290c18052672a Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 9 Oct 2023 13:53:30 -0700 Subject: [PATCH 2/2] rowflow: remove redundant copyingRowReceiver in gateway row-based flow Currently, in row-based flows we always wrap the output of each processor (most commonly `RowChannel`) with `copyingRowReceiver` which performs a shallow copy of each row. This is needed because the contract of `RowSource.Next` is such that the rows are only safe until the next call to `Next`. Each processor could run in a separate goroutine, so we need this copying behavior to protect rows from corruption. However, we often can fuse processors together (meaning that we remove the RowChannel between two processors and make two of them run in the same goroutine), and in that case we already remove this `copyingRowReceiver` to avoid unnecessary row copies. This commit adds another case where we can safely remove the redundant `copyingRowReceiver` - when we have a flow on the gateway, then the output of the "head" processor is `DistSQLReceiver`, and it's pushed into from the same goroutine as the "head" processor, so we don't need row copies. Epic: None Release note: None --- pkg/sql/rowflow/BUILD.bazel | 1 + pkg/sql/rowflow/row_based_flow.go | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/pkg/sql/rowflow/BUILD.bazel b/pkg/sql/rowflow/BUILD.bazel index 7b2062cb3846..993280199c2f 100644 --- a/pkg/sql/rowflow/BUILD.bazel +++ b/pkg/sql/rowflow/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/util/buildutil", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/syncutil", diff --git a/pkg/sql/rowflow/row_based_flow.go b/pkg/sql/rowflow/row_based_flow.go index 3ddf30adf7c5..83dcf079738b 100644 --- a/pkg/sql/rowflow/row_based_flow.go +++ b/pkg/sql/rowflow/row_based_flow.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -169,6 +170,18 @@ func (f *rowBasedFlow) setupProcessors( outputs = append(outputs, output) } } + if f.Gateway { + // On the gateway flow, the output of the "head" processor is the + // DistSQLReceiver wrapped with copyingRowReceiver. The latter is + // redundant because DistSQLReceiver.Push happens from the same + // goroutine as headProc.Next, so we don't actually need to make row + // copies. + if crr, ok := outputs[len(outputs)-1].(*copyingRowReceiver); ok { + outputs[len(outputs)-1] = crr.RowReceiver + } else if buildutil.CrdbTestBuild { + panic(errors.AssertionFailedf("head processor output is not a copyingRowReceiver: %T", outputs[len(outputs)-1])) + } + } return f.SetProcessorsAndOutputs(processors, outputs) }