Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
98655: rowflow: remove redundant copyingRowReceiver in gateway row-based flow r=yuzefovich a=yuzefovich

Currently, in row-based flows we always wrap the output of each processor
(most commonly `RowChannel`) with `copyingRowReceiver` which performs
a shallow copy of each row. This is needed because the contract of
`RowSource.Next` is such that the rows are only safe until the next call
to `Next`. Each processor could run in a separate goroutine, so we need
this copying behavior to protect rows from corruption. However, we often
can fuse processors together (meaning that we remove the RowChannel
between two processors and make two of them run in the same goroutine),
and in that case we already remove this `copyingRowReceiver` to avoid
unnecessary row copies.

This commit adds another case where we can safely remove the redundant
`copyingRowReceiver` - when we have a flow on the gateway, then the
output of the "head" processor is `DistSQLReceiver`, and it's pushed
into from the same goroutine as the "head" processor, so we don't need
row copies.

Epic: None

Release note: None

111925: roachtest: fix read committed variant of ycsb r=rafiss a=nvanbenschoten

This was broken by 53ed1cd.

Epic: None
Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Oct 10, 2023
3 parents 559008f + 889aab9 + b43590a commit 7e1f909
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
20 changes: 19 additions & 1 deletion pkg/cmd/roachtest/tests/ycsb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"os"

Expand Down Expand Up @@ -69,8 +70,13 @@ func registerYCSB(r registry.Registry) {
c.Put(ctx, t.Cockroach(), "./cockroach", c.Range(1, nodes))
c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(nodes+1))
c.Start(ctx, t.L(), option.DefaultStartOptsNoBackups(), settings, c.Range(1, nodes))
err := WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), 1))

db := c.Conn(ctx, t.L(), 1)
err := enableIsolationLevels(ctx, t, db)
require.NoError(t, err)
err = WaitFor3XReplication(ctx, t, db)
require.NoError(t, err)
require.NoError(t, db.Close())

t.Status("running workload")
m := c.NewMonitor(ctx, c.Range(1, nodes))
Expand Down Expand Up @@ -163,3 +169,15 @@ func registerYCSB(r registry.Registry) {
}
}
}

func enableIsolationLevels(ctx context.Context, t test.Test, db *gosql.DB) error {
for _, cmd := range []string{
`SET CLUSTER SETTING sql.txn.snapshot_isolation_syntax.enabled = 'true';`,
`SET CLUSTER SETTING sql.txn.read_committed_syntax.enabled = 'true';`,
} {
if _, err := db.ExecContext(ctx, cmd); err != nil {
return err
}
}
return nil
}
1 change: 1 addition & 0 deletions pkg/sql/rowflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/buildutil",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/syncutil",
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -169,6 +170,18 @@ func (f *rowBasedFlow) setupProcessors(
outputs = append(outputs, output)
}
}
if f.Gateway {
// On the gateway flow, the output of the "head" processor is the
// DistSQLReceiver wrapped with copyingRowReceiver. The latter is
// redundant because DistSQLReceiver.Push happens from the same
// goroutine as headProc.Next, so we don't actually need to make row
// copies.
if crr, ok := outputs[len(outputs)-1].(*copyingRowReceiver); ok {
outputs[len(outputs)-1] = crr.RowReceiver
} else if buildutil.CrdbTestBuild {
panic(errors.AssertionFailedf("head processor output is not a copyingRowReceiver: %T", outputs[len(outputs)-1]))
}
}
return f.SetProcessorsAndOutputs(processors, outputs)
}

Expand Down

0 comments on commit 7e1f909

Please sign in to comment.