Skip to content

Commit

Permalink
sql/row: fix multi-range reads from PCR standby
Browse files Browse the repository at this point in the history
Reads from tables with external row data (i.e. reads from a PCR standby
cluster) need to use the fixed timestamp specified by the external row
data. This timestamp might be different from the transaction timestamp,
so we were explicitly setting BatchRequest.Timestamp in
kv_batch_fetcher.

The KV API only allows BatchRequest.Timestamp to be set for
non-transactional requests (i.e. requests sent with a
NonTransactionalSender, which is a CrossRangeTxnWrapperSender in this
case). We were using a NonTransactionalSender, but this had two
problems:

1. CrossRangeTxnWrapperSender in turn sends the BatchRequest with a
   transactional sender, which again does not allow
   BatchRequest.Timestamp to be set.
2. CrossRangeTxnWrapperSender uses `kv.(*Txn).CommitInBatch`, which does
   not provide the 1-to-1 request-response guarantee required by
   txnKVFetcher. It is `kv.(*Txn).Send` which provides this guarantee.

Because of these two problems, whenever the txnKVFetcher would send a
multi-range-spanning BatchRequest to CrossRangeTxnWrapperSender, it
would either fail with a "transactional request must not set batch
timestamp" error or would return an unexpected number of responses,
violating the txnKVFetcher's assumed mapping from request to response.

To fix both these problems, instead of using a NonTransactionalSender,
change the txnKVFetcher to open a new root transaction with the correct
fixed timestamp, and then use txn.Send.

Fixes: cockroachdb#132608

Release note: None
  • Loading branch information
michae2 committed Oct 21, 2024
1 parent 4a5e4d2 commit 121da38
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/testccl/sqlccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_test(
"session_revival_test.go",
"show_create_test.go",
"show_transfer_state_test.go",
"standby_read_test.go",
"temp_table_clean_test.go",
"tenant_gc_test.go",
],
Expand Down Expand Up @@ -39,6 +40,8 @@ go_test(
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/replication",
"//pkg/sql/gcjob",
"//pkg/sql/isql",
"//pkg/sql/lexbase",
Expand Down
110 changes: 110 additions & 0 deletions pkg/ccl/testccl/sqlccl/standby_read_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package sqlccl

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/replication"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestStandbyRead(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testcases := []struct {
standby bool
stmt string
expected [][]string
}{
{stmt: `CREATE TABLE abc (a INT PRIMARY KEY, b INT, c JSONB)`},
{stmt: `INSERT INTO abc VALUES (1, 10, '[100]'), (3, 30, '[300]'), (5, 50, '[500]')`},
{stmt: `ALTER TABLE abc SPLIT AT VALUES (2), (4)`},
{stmt: `SELECT count(*) FROM [SHOW TABLES]`, expected: [][]string{{"1"}}},
{stmt: `SELECT count(*) FROM abc`, expected: [][]string{{"3"}}},
{standby: true, stmt: `SELECT count(*) FROM [SHOW TABLES]`, expected: [][]string{{"1"}}},
{standby: true, stmt: `SELECT count(*) FROM abc`, expected: [][]string{{"3"}}},
}

ctx := context.Background()
tc := serverutils.StartCluster(t, 3, /* numNodes */
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
},
})
defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)

_, srcDB, err := ts.TenantController().StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantID: serverutils.TestTenantID(),
TenantName: "src",
UseDatabase: "defaultdb",
},
)
require.NoError(t, err)
dstTenant, dstDB, err := ts.TenantController().StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantID: serverutils.TestTenantID2(),
TenantName: "dst",
UseDatabase: "defaultdb",
},
)
require.NoError(t, err)

srcRunner := sqlutils.MakeSQLRunner(srcDB)
dstRunner := sqlutils.MakeSQLRunner(dstDB)
dstInternal := dstTenant.InternalDB().(*sql.InternalDB)

dstRunner.Exec(t, `SET CLUSTER SETTING sql.defaults.distsql = always`)
dstRunner.Exec(t, `SET distsql = always`)

waitForReplication := func() {
now := ts.Clock().Now()
err := replication.SetupOrAdvanceStandbyReaderCatalog(
ctx, serverutils.TestTenantID(), now, dstInternal, dstTenant.ClusterSettings(),
)
if err != nil {
t.Fatal(err)
}
now = ts.Clock().Now()
lm := dstTenant.LeaseManager().(*lease.Manager)
testutils.SucceedsSoon(t, func() error {
if lm.GetSafeReplicationTS().Less(now) {
return errors.AssertionFailedf("waiting for descriptor close timestamp to catch up")
}
return nil
})
}

for _, tc := range testcases {
var runner *sqlutils.SQLRunner
if tc.standby {
waitForReplication()
runner = dstRunner
} else {
runner = srcRunner
}
if tc.expected == nil {
runner.Exec(t, tc.stmt)
} else {
runner.CheckQueryResultsRetry(t, tc.stmt, tc.expected)
}
}
}
27 changes: 24 additions & 3 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ func makeExternalSpanSendFunc(
ext *fetchpb.IndexFetchSpec_ExternalRowData, db *kv.DB, batchRequestsIssued *int64,
) sendFunc {
return func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
ba.Timestamp = ext.AsOf
for _, req := range ba.Requests {
// We only allow external row data for a few known types of request.
switch r := req.GetInner().(type) {
Expand All @@ -310,12 +309,34 @@ func makeExternalSpanSendFunc(
}
}
log.VEventf(ctx, 2, "kv external fetcher: sending a batch with %d requests", len(ba.Requests))
res, err := db.NonTransactionalSender().Send(ctx, ba)

// Open a new transaction with fixed timestamp set to the external
// timestamp. We must do this with txn.Send rather than using
// db.NonTransactionalSender to get the 1-to-1 request-response guarantee
// required by txnKVFetcher.
// TODO(michae2): Explore whether we should keep this transaction open for
// the duration of the surrounding transaction.
var res *kvpb.BatchResponse
err := db.TxnWithAdmissionControl(
ctx, ba.AdmissionHeader.Source, admissionpb.WorkPriority(ba.AdmissionHeader.Priority),
kv.SteppingDisabled,
func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetFixedTimestamp(ctx, ext.AsOf); err != nil {
return err
}
var err *kvpb.Error
res, err = txn.Send(ctx, ba)
if err != nil {
return err.GoError()
}
return nil
})

// Note that in some code paths there is no concurrency when using the
// sendFunc, but we choose to unconditionally use atomics here since its
// overhead should be negligible in the grand scheme of things anyway.
atomic.AddInt64(batchRequestsIssued, 1)
return res, err.GoError()
return res, err
}
}

Expand Down

0 comments on commit 121da38

Please sign in to comment.