Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] KV pushdown bisect #95793

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pkg/cmd/roachtest/tests/multitenant_tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
// runMultiTenantTPCH runs TPCH queries on a cluster that is first used as a
// single-tenant deployment followed by a run of all queries in a multi-tenant
// deployment with a single SQL instance.
func runMultiTenantTPCH(ctx context.Context, t test.Test, c cluster.Cluster) {
func runMultiTenantTPCH(
ctx context.Context, t test.Test, c cluster.Cluster, enableDirectScans bool,
) {
c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.All())
Expand All @@ -40,7 +42,12 @@ func runMultiTenantTPCH(ctx context.Context, t test.Test, c cluster.Cluster) {
// one at a time (using the given url as a parameter to the 'workload run'
// command). The runtimes are accumulated in the perf helper.
runTPCH := func(conn *gosql.DB, url string, setupIdx int) {
t.Status("restoring TPCH dataset for Scale Factor 1 in %s", setupNames[setupIdx])
setting := fmt.Sprintf("SET CLUSTER SETTING sql.distsql.direct_columnar_scans.enabled = %t", enableDirectScans)
t.Status(setting)
if _, err := conn.Exec(setting); err != nil {
t.Fatal(err)
}
t.Status("restoring TPCH dataset for Scale Factor 1 in ", setupNames[setupIdx])
if err := loadTPCHDataset(
ctx, t, c, conn, 1 /* sf */, c.NewMonitor(ctx), c.All(), false, /* disableMergeQueue */
); err != nil {
Expand Down Expand Up @@ -93,6 +100,17 @@ func registerMultiTenantTPCH(r registry.Registry) {
Name: "multitenant/tpch",
Owner: registry.OwnerSQLQueries,
Cluster: r.MakeClusterSpec(1 /* nodeCount */),
Run: runMultiTenantTPCH,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runMultiTenantTPCH(ctx, t, c, false /* enableDirectScans */)
},
})

r.Add(registry.TestSpec{
Name: "multitenant/tpch_direct_scans",
Owner: registry.OwnerSQLQueries,
Cluster: r.MakeClusterSpec(1 /* nodeCount */),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runMultiTenantTPCH(ctx, t, c, true /* enableDirectScans */)
},
})
}
1 change: 0 additions & 1 deletion pkg/col/coldata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ go_test(
deps = [
"//pkg/col/coldatatestutils",
"//pkg/sql/colconv",
"//pkg/sql/randgen",
"//pkg/sql/types",
"//pkg/util/leaktest",
"//pkg/util/randutil",
Expand Down
13 changes: 11 additions & 2 deletions pkg/col/coldata/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -510,6 +509,16 @@ func TestProportionalSize(t *testing.T) {

const letters = "abcdefghijklmnopqrstuvwxyz"

// randString generates a random string of the desired length from the input
// alphabet.
func randString(rng *rand.Rand, length int, alphabet string) string {
buf := make([]byte, length)
for i := range buf {
buf[i] = alphabet[rng.Intn(len(alphabet))]
}
return string(buf)
}

func TestToArrowSerializationFormat(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -523,7 +532,7 @@ func TestToArrowSerializationFormat(t *testing.T) {
if rng.Float64() < nullChance {
continue
}
element := []byte(randgen.RandString(rng, 1+rng.Intn(maxStringLength), letters))
element := []byte(randString(rng, 1+rng.Intn(maxStringLength), letters))
b.Set(i, element)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ go_library(
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_kr_pretty//:pretty",
"@org_golang_x_time//rate",
Expand Down
25 changes: 25 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
)

func init() {
Expand Down Expand Up @@ -63,6 +65,29 @@ func ReverseScan(
return result.Result{}, err
}
reply.BatchResponses = scanRes.KVData
case roachpb.COL_BATCH_RESPONSE:
var msg proto.Message
var da types.DynamicAny
if err := types.UnmarshalAny(cArgs.Header.IndexFetchSpec, &da); err != nil {
return result.Result{}, err
}
msg = da.Message
scanRes, err = storage.MVCCScanToCols(
ctx, reader, msg, args.Key, args.EndKey, h.Timestamp, opts,
)
if err != nil {
return result.Result{}, err
}
if scanRes.ColBatches != nil {
// TODO: consider changing scanRes.ColBatches to be a slice of
// interface{}.
reply.ColBatches.ColBatches = make([]interface{}, len(scanRes.ColBatches))
for i := range scanRes.ColBatches {
reply.ColBatches.ColBatches[i] = scanRes.ColBatches[i]
}
} else {
reply.BatchResponses = scanRes.KVData
}
case roachpb.KEY_VALUES:
scanRes, err = storage.MVCCScan(
ctx, reader, args.Key, args.EndKey, h.Timestamp, opts)
Expand Down
25 changes: 25 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
)

func init() {
Expand Down Expand Up @@ -64,6 +66,29 @@ func Scan(
return result.Result{}, err
}
reply.BatchResponses = scanRes.KVData
case roachpb.COL_BATCH_RESPONSE:
var msg proto.Message
var da types.DynamicAny
if err := types.UnmarshalAny(cArgs.Header.IndexFetchSpec, &da); err != nil {
return result.Result{}, err
}
msg = da.Message
scanRes, err = storage.MVCCScanToCols(
ctx, reader, msg, args.Key, args.EndKey, h.Timestamp, opts,
)
if err != nil {
return result.Result{}, err
}
if scanRes.ColBatches != nil {
// TODO: consider changing scanRes.ColBatches to be a slice of
// interface{}.
reply.ColBatches.ColBatches = make([]interface{}, len(scanRes.ColBatches))
for i := range scanRes.ColBatches {
reply.ColBatches.ColBatches[i] = scanRes.ColBatches[i]
}
} else {
reply.BatchResponses = scanRes.KVData
}
case roachpb.KEY_VALUES:
scanRes, err = storage.MVCCScan(
ctx, reader, args.Key, args.EndKey, h.Timestamp, opts)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func acquireUnreplicatedLocksOnKeys(
scanRes *storage.MVCCScanResult,
) error {
res.Local.AcquiredLocks = make([]roachpb.LockAcquisition, scanRes.NumKeys)
// TODO: add COL_BATCH_RESPONSE support.
switch scanFmt {
case roachpb.BATCH_RESPONSE:
var i int
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ func (r *Replica) collectSpansRead(
resp := br.Responses[i].GetInner()

if ba.WaitPolicy == lock.WaitPolicy_SkipLocked && roachpb.CanSkipLocked(req) {
if ba.IndexFetchSpec != nil {
return nil, nil, errors.AssertionFailedf("unexpectedly ColBatchScans are used with SKIP LOCKED wait policy")
}
// If the request is using a SkipLocked wait policy, it behaves as if run
// at a lower isolation level for any keys that it skips over. If the read
// request did not return a key, it does not need to check for conflicts
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -99,6 +100,9 @@ func (r *Replica) updateTimestampCache(
start, end := header.Key, header.EndKey

if ba.WaitPolicy == lock.WaitPolicy_SkipLocked && roachpb.CanSkipLocked(req) {
if ba.IndexFetchSpec != nil {
log.Errorf(ctx, "%v", errors.AssertionFailedf("unexpectedly ColBatchScans are used with SKIP LOCKED wait policy"))
}
// If the request is using a SkipLocked wait policy, it behaves as if run
// at a lower isolation level for any keys that it skips over. If the read
// request did not return a key, it does not make a claim about whether
Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ proto_library(
"//pkg/util/tracing/tracingpb:tracingpb_proto",
"@com_github_cockroachdb_errors//errorspb:errorspb_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:any_proto",
"@com_google_protobuf//:duration_proto",
"@com_google_protobuf//:timestamp_proto",
],
Expand Down
3 changes: 3 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ func (sr *ScanResponse) combine(c combinable) error {
sr.Rows = append(sr.Rows, otherSR.Rows...)
sr.IntentRows = append(sr.IntentRows, otherSR.IntentRows...)
sr.BatchResponses = append(sr.BatchResponses, otherSR.BatchResponses...)
// TODO: we cannot intertwine responses in different format when we need
// to maintain the ordering.
sr.ColBatches.ColBatches = append(sr.ColBatches.ColBatches, otherSR.ColBatches.ColBatches...)
if err := sr.ResponseHeader.combine(otherSR.Header()); err != nil {
return err
}
Expand Down
25 changes: 23 additions & 2 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import "util/hlc/timestamp.proto";
import "util/tracing/tracingpb/recorded_span.proto";
import "util/tracing/tracingpb/tracing.proto";
import "gogoproto/gogo.proto";
import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";

// ReadConsistencyType specifies what type of consistency is observed
Expand Down Expand Up @@ -491,6 +492,8 @@ enum ScanFormat {
// The batch_response format: a byte slice of alternating keys and values,
// each prefixed by their length as a varint.
BATCH_RESPONSE = 1;
// TODO: comment.
COL_BATCH_RESPONSE = 2;
}


Expand Down Expand Up @@ -542,6 +545,18 @@ message ScanResponse {
// entry. There are num_keys total pairs across all entries, as defined by the
// ResponseHeader. If set, rows will not be set and vice versa.
repeated bytes batch_responses = 4;

ColBatches col_batches = 5 [(gogoproto.nullable) = false];
}

message ColBatches {
option (gogoproto.gostring) = false;
option (gogoproto.equal) = false;
option (gogoproto.stringer) = false;
option (gogoproto.marshaler) = false;
option (gogoproto.sizer) = false;
option (gogoproto.unmarshaler) = false;
repeated bytes col_batches = 1 [(gogoproto.customtype) = "interface{}", (gogoproto.nullable) = false];
}

// A ReverseScanRequest is the argument to the ReverseScan() method. It specifies the
Expand Down Expand Up @@ -592,6 +607,8 @@ message ReverseScanResponse {
// entry. There are num_keys total pairs across all entries, as defined by the
// ResponseHeader. If set, rows will not be set and vice versa.
repeated bytes batch_responses = 4;

ColBatches col_batches = 5 [(gogoproto.nullable) = false];
}


Expand Down Expand Up @@ -2452,8 +2469,6 @@ message Header {
// The given value specifies the maximum number of keys in a row (i.e. the
// number of column families). If any larger rows are found at the end of the
// result, an error is returned.
//
// Added in 22.1, callers must check the ScanWholeRows version gate first.
int32 whole_rows_of_size = 26;
// If true, allow returning an empty result when the first result exceeds a
// limit (e.g. TargetBytes). Only supported by Get, Scan, and ReverseScan.
Expand Down Expand Up @@ -2528,6 +2543,12 @@ message Header {

util.tracing.tracingpb.TraceInfo trace_info = 25;

// This "any type" field is a serialized descpb.IndexFetchSpec that can be
// used to initialize a columnar scan for MVCC. We use this generic type to
// resolve some painful circular dependency issues.
// TODO: better comment.
google.protobuf.Any index_fetch_spec = 29;

reserved 7, 10, 12, 14, 20;
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ func (ba *BatchRequest) RefreshSpanIterate(br *BatchResponse, fn func(Span)) err
resp = br.Responses[i].GetInner()
}
if ba.WaitPolicy == lock.WaitPolicy_SkipLocked && CanSkipLocked(req) {
if ba.IndexFetchSpec != nil {
return errors.AssertionFailedf("unexpectedly ColBatchScans are used with SKIP LOCKED wait policy")
}
// If the request is using a SkipLocked wait policy, it behaves as if run
// at a lower isolation level for any keys that it skips over. For this
// reason, the request only adds point reads for the individual keys
Expand Down Expand Up @@ -582,6 +585,9 @@ func ResponseKeyIterate(req Request, resp Response, fn func(Key)) error {
}); err != nil {
return err
}
if v.ColBatches.ColBatches != nil {
return errors.AssertionFailedf("unexpectedly non-nil ColBatches")
}
case *ReverseScanResponse:
// If ScanFormat == KEY_VALUES.
for _, kv := range v.Rows {
Expand All @@ -595,6 +601,9 @@ func ResponseKeyIterate(req Request, resp Response, fn func(Key)) error {
}); err != nil {
return err
}
if v.ColBatches.ColBatches != nil {
return errors.AssertionFailedf("unexpectedly non-nil ColBatches")
}
default:
return errors.Errorf("cannot iterate over response keys of %s request", req.Method())
}
Expand Down Expand Up @@ -845,3 +854,14 @@ func (ba BatchRequest) ValidateForEvaluation() error {
}
return nil
}

func (ColBatches) Size() int { return 0 }

func (ColBatches) MarshalToSizedBuffer([]byte) (int, error) { return 0, nil }

func (ColBatches) Unmarshal(b []byte) error {
if len(b) > 0 {
return errors.AssertionFailedf("unexpectedly unmarshaling a non-empty ColBatches")
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/col/coldata",
"//pkg/col/coldataext",
"//pkg/col/typeconv",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/settings",
"//pkg/sql/catalog/descpb",
"//pkg/sql/colconv",
Expand All @@ -31,6 +32,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/row",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/tree/treebin",
Expand Down
Loading