Skip to content

Commit

Permalink
kvserver: move test to ccl
Browse files Browse the repository at this point in the history
Previously the test was validating the behavior with Global reads,
however since the test was not in the ccl package it required
complicated mangaling of the span configs.  Now it simply uses the
standard SQL commands to create the table.

Epic: none
Fixes: #119230

Release note: None
  • Loading branch information
andrewbaptist committed Feb 26, 2024
1 parent 903e2fc commit efc85ce
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 135 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_test(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/closedts",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
Expand Down Expand Up @@ -100,6 +101,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/quotapool",
Expand Down
124 changes: 124 additions & 0 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@ import (
"fmt"
"sort"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -1064,3 +1072,119 @@ INSERT INTO db.rbr VALUES (1,1),(2,2),(3,3);
}
}
}

// TestStoreRangeSplitAndMergeWithGlobalReads tests that a range configured to
// serve global reads can be split and merged. In essence, this tests whether
// the split and merge transactions can handle having their timestamp bumped by
// the closed timestamp on the ranges they're operating on.
func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Detect splits and merges over the global read ranges. Assert that the split
// and merge transactions commit with pushed write timestamps, and that the
// commit-wait sleep for these transactions is performed before running their
// commit triggers instead of run on the kv client. For details on why this is
// necessary, see maybeCommitWaitBeforeCommitTrigger.
var clockPtr atomic.Pointer[hlc.Clock]
var splits, merges int64
respFilter := func(ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse) *kvpb.Error {
clock := clockPtr.Load()
if clock == nil {
return nil
}
if req, ok := ba.GetArg(kvpb.EndTxn); ok {
endTxn := req.(*kvpb.EndTxnRequest)
if br.Txn.Status == roachpb.COMMITTED && br.Txn.MinTimestamp.Less(br.Txn.WriteTimestamp) {
if ct := endTxn.InternalCommitTrigger; ct != nil {
// The server-side commit-wait sleep should ensure that the commit
// triggers are only run after the commit timestamp is below present
// time.
now := clock.Now()
require.True(t, br.Txn.WriteTimestamp.Less(now))

switch {
case ct.SplitTrigger != nil:
atomic.AddInt64(&splits, 1)
case ct.MergeTrigger != nil:
atomic.AddInt64(&merges, 1)
}
}
}
}
return nil
}
ctx := context.Background()

// Propagate the span configs faster.
st := cluster.MakeTestingClusterSettings()
closedts.TargetDuration.Override(ctx, &st.SV, 20*time.Millisecond)
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 20*time.Millisecond)
kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 20*time.Millisecond)
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
// TODO(baptist): Secondary tenants can't run AdminMerge and there is no
// ability to grant that capability in tenantcapabilities.
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
Settings: st,
Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}}},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Don't let the queues modify our range, control it manually.
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingResponseFilter: respFilter,
},
},
})
defer s.Stopper().Stop(ctx)

// Set the closed_timestamp interval to be short to shorten the test duration
// because we need to wait for a checkpoint on the system config.
tdb := sqlutils.MakeSQLRunner(sqlDB)
store, err := s.StorageLayer().GetStores().(*kvserver.Stores).GetStore(s.StorageLayer().GetFirstStoreID())
require.NoError(t, err)

// Create a table that uses global locality.
var tableID int
tdb.Exec(t, `CREATE DATABASE db WITH PRIMARY REGION "us-east";
CREATE TABLE db.foo() LOCALITY GLOBAL;`)
tdb.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'db'`).Scan(&tableID)

// Split the range off immediately to allow it to get its own SpanConfig.
descKey := s.ApplicationLayer().Codec().TablePrefix(uint32(tableID))
require.NoError(t, kvDB.AdminSplit(ctx, descKey, hlc.MaxTimestamp))

// Wait for the SpanConfig to propagate to the range.
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(roachpb.RKey(descKey))
conf, err := repl.LoadSpanConfig(ctx)
if err == nil && conf.GlobalReads {
return nil
}
return errors.Errorf("expected LEAD_FOR_GLOBAL_READS policy for key %s", descKey)
})
// Validate there are no commit waits.
require.Equal(t, int64(0), store.Metrics().CommitWaitsBeforeCommitTrigger.Count())

// Set the clock to the store's clock, which also serves to engage the
// response filter.
clockPtr.Store(s.Clock())

// Write to the range, which has the effect of bumping the closed timestamp.
require.NoError(t, kvDB.Put(ctx, descKey, []byte("foo")))

// Split the range. Should succeed.
splitKey := append(descKey, []byte("split")...)
require.NoError(t, kvDB.AdminSplit(ctx, splitKey, hlc.MaxTimestamp))
require.Equal(t, int64(1), store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&splits))
repl := store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey())

// Merge the range. Should succeed.
require.NoError(t, kvDB.AdminMerge(ctx, descKey))
require.Equal(t, int64(2), store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&merges))
repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
}
135 changes: 0 additions & 135 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3593,141 +3593,6 @@ func TestSplitBlocksReadsToRHS(t *testing.T) {
require.Nil(t, g.Wait())
}

// TestStoreRangeSplitAndMergeWithGlobalReads tests that a range configured to
// serve global reads can be split and merged. In essence, this tests whether
// the split and merge transactions can handle having their timestamp bumped by
// the closed timestamp on the ranges they're operating on.
func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 119230)

// Detect splits and merges over the global read ranges. Assert that the split
// and merge transactions commit with pushed write timestamps, and that the
// commit-wait sleep for these transactions is performed before running their
// commit triggers instead of run on the kv client. For details on why this is
// necessary, see maybeCommitWaitBeforeCommitTrigger.
var clockPtr atomic.Pointer[hlc.Clock]
var splits, merges int64
respFilter := func(ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse) *kvpb.Error {
clock := clockPtr.Load()
if clock == nil {
return nil
}
if req, ok := ba.GetArg(kvpb.EndTxn); ok {
endTxn := req.(*kvpb.EndTxnRequest)
if br.Txn.Status == roachpb.COMMITTED && br.Txn.MinTimestamp.Less(br.Txn.WriteTimestamp) {
if ct := endTxn.InternalCommitTrigger; ct != nil {
// The server-side commit-wait sleep should ensure that the commit
// triggers are only run after the commit timestamp is below present
// time.
now := clock.Now()
require.True(t, br.Txn.WriteTimestamp.Less(now))

switch {
case ct.SplitTrigger != nil:
atomic.AddInt64(&splits, 1)
case ct.MergeTrigger != nil:
atomic.AddInt64(&merges, 1)
}
}
}
}
return nil
}
// Set global reads.
zoneConfig := zonepb.DefaultZoneConfig()
zoneConfig.GlobalReads = proto.Bool(true)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DefaultZoneConfigOverride: &zoneConfig,
},

Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
TestingResponseFilter: respFilter,
},
},
})

defer s.Stopper().Stop(ctx)
// Set the closed_timestamp interval to be short to shorten the test duration
// because we need to wait for a checkpoint on the system config.
tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '20ms'`)
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
config.TestingSetupZoneConfigHook(s.Stopper())

// Split off the range for the test.
descID := bootstrap.TestingUserDescID(0)
descKey := keys.SystemSQLCodec.TablePrefix(descID)
splitArgs := adminSplitArgs(descKey)
_, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs)
require.Nil(t, pErr)

// Set the clock to the store's clock, which also serves to engage the
// response filter.
clockPtr.Store(s.Clock())

// Perform a write to the system config span being watched by
// the SystemConfigProvider.
tdb.Exec(t, "CREATE TABLE foo ()")
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(roachpb.RKey(descKey))
if repl.ClosedTimestampPolicy() != roachpb.LEAD_FOR_GLOBAL_READS {
return errors.Errorf("expected LEAD_FOR_GLOBAL_READS policy")
}
return nil
})

// The commit wait count is 1 due to the split above since global reads are
// set for the default config.
var splitCount = int64(1)
testutils.SucceedsSoon(t, func() error {
if splitCount != store.Metrics().CommitWaitsBeforeCommitTrigger.Count() {
return errors.Errorf("commit wait count is %d", store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
}
if splitCount != atomic.LoadInt64(&splits) {
return errors.Errorf("num splits is %d", atomic.LoadInt64(&splits))
}
return nil
})

// Write to the range, which has the effect of bumping the closed timestamp.
pArgs := putArgs(descKey, []byte("foo"))
_, pErr = kv.SendWrapped(ctx, store.TestSender(), pArgs)
require.Nil(t, pErr)

// Split the range. Should succeed.
splitKey := append(descKey, []byte("split")...)
splitArgs = adminSplitArgs(splitKey)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), splitArgs)
require.Nil(t, pErr)
splitCount++
require.Equal(t, splitCount, store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, splitCount, atomic.LoadInt64(&splits))

repl := store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey())

// Merge the range. Should succeed.
mergeArgs := adminMergeArgs(descKey)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), mergeArgs)
require.Nil(t, pErr)
require.Equal(t, splitCount+1, store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&merges))

repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
}

// TestLBSplitUnsafeKeys tests that load based splits do not split between table
// rows, even when the suggested load based split key is itself between a table row.
func TestLBSplitUnsafeKeys(t *testing.T) {
Expand Down

0 comments on commit efc85ce

Please sign in to comment.