Skip to content

Commit

Permalink
Merge #85853 #85878
Browse files Browse the repository at this point in the history
85853: kv: ensure secondary tenants route follower reads to the closest replica r=arulajmani a=arulajmani

The dist sender uses node locality information to rank replicas of a
range by latency. Previously, this node locality information was read
off a node descriptor available in Gossip. Unfortunately, secondary
tenants do not have access to Gossip, and as such, would end up
randomizing this list of replicas. This manifested itself through
unpredictable latencies when running follower reads.

We're no longer susceptible to this hazard with this patch. This is done
by eschewing the need of a node descriptor from gossip in the
DistSender; instead, we now instantiate the DistSender with locality
information.

However, we do still use Gossip to get the current node's
ID when ranking replicas. This is done to ascertain if there is a local
replica, and if there is, to always route to it. Unfortunately, because
secondary tenants don't have access to Gossip, they can't conform to
these semantics. They're susceptible to a hazard where a request may
be routed to another replica in the same locality tier as the client
even though the client has a local replica as well. This shouldn't be
a concern in practice given the diversity heuristic.

Resolves #81000

Release note (bug fix): fix an issue where secondary tenants could
route follower reads to a random, far away replica instead of one
closer.

85878: gcjob: issue DeleteRange tombstones and then wait for GC r=ajwerner a=ajwerner

Note that this does not change anything about tenant GC.

Fixes #70427

Release note (sql change): The asynchronous garbage collection process has
been changed such that very soon after dropping a table, index, or database, or
after refreshing a materialized view, the system will issue range deletion
tombstones over the dropped data. These tombstones will result in the KV
statistics properly counting these bytes as garbage. Before this change, the
asynchronous "gc job" would wait out the TTL and then issue a lower-level
operation to clear out the data. That meant that while the job was waiting
out the TTL, the data would appear in the statistics to still be live. This
was confusing.

Co-authored-by: Arul Ajmani <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Aug 14, 2022
3 parents ec5847d + 0e08303 + de12cee commit b6d1689
Show file tree
Hide file tree
Showing 75 changed files with 1,498 additions and 444 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-54 set the active cluster version in the format '<major>.<minor>'
version version 22.1-58 set the active cluster version in the format '<major>.<minor>'
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<tr><td><code>server.oidc_authentication.redirect_url</code></td><td>string</td><td><code>https://localhost:8080/oidc/v1/callback</code></td><td>sets OIDC redirect URL via a URL string or a JSON string containing a required `redirect_urls` key with an object that maps from region keys to URL strings (URLs should point to your load balancer and must route to the path /oidc/v1/callback) </td></tr>
<tr><td><code>server.oidc_authentication.scopes</code></td><td>string</td><td><code>openid</code></td><td>sets OIDC scopes to include with authentication request (space delimited list of strings, required to start with `openid`)</td></tr>
<tr><td><code>server.rangelog.ttl</code></td><td>duration</td><td><code>720h0m0s</code></td><td>if nonzero, range log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours.</td></tr>
<tr><td><code>server.secondary_tenants.redact_trace.enabled</code></td><td>boolean</td><td><code>true</code></td><td>controls if server side traces are redacted for tenant operations</td></tr>
<tr><td><code>server.shutdown.connection_wait</code></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td></tr>
<tr><td><code>server.shutdown.drain_wait</code></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)</td></tr>
<tr><td><code>server.shutdown.lease_transfer_wait</code></td><td>duration</td><td><code>5s</code></td><td>the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td></tr>
Expand Down Expand Up @@ -219,6 +220,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-54</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-58</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
15 changes: 9 additions & 6 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8963,12 +8963,15 @@ func TestGCDropIndexSpanExpansion(t *testing.T) {
// the tenant. More investigation is required. Tracked with #76378.
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
GCJob: &sql.GCJobTestingKnobs{RunBeforePerformGC: func(id jobspb.JobID) error {
gcJobID = id
aboutToGC <- struct{}{}
<-allowGC
return nil
}},
GCJob: &sql.GCJobTestingKnobs{
RunBeforePerformGC: func(id jobspb.JobID) error {
gcJobID = id
aboutToGC <- struct{}{}
<-allowGC
return nil
},
SkipWaitingForMVCCGC: true,
},
},
}})
defer tc.Stopper().Stop(ctx)
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security/username"
clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
Expand All @@ -48,15 +49,22 @@ func TestFullClusterBackup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

settings := clustersettings.MakeTestingClusterSettings()
params := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
// We compare job progress before and after a restore. Disable
// the automatic jobs checkpointing which could possibly mutate
// the progress data during the backup/restore process.
JobDisablePersistingCheckpoints: true,
},
GCJob: &sql.GCJobTestingKnobs{
// We want to run the GC job to completion without waiting for
// MVCC GC.
SkipWaitingForMVCCGC: true,
},
},
}}
const numAccounts = 10
Expand All @@ -67,7 +75,7 @@ func TestFullClusterBackup(t *testing.T) {

// Closed when the restore is allowed to progress with the rest of the backup.
allowProgressAfterPreRestore := make(chan struct{})
// Closed to signal the the zones have been restored.
// Closed to signal the zones have been restored.
restoredZones := make(chan struct{})
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
Expand Down
11 changes: 8 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,14 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
return errors.Errorf("unexpected SST ingestion: %v", t)

case *roachpb.RangeFeedDeleteRange:
// For now, we just error on MVCC range tombstones. These are currently
// For now, we just ignore on MVCC range tombstones. These are currently
// only expected to be used by schema GC and IMPORT INTO, and such spans
// should not have active changefeeds across them.
// should not have active changefeeds across them, at least at the times
// of interest. A case where one will show up in a changefeed is when
// the primary index changes while we're watching it and then the old
// primary index is dropped. In this case, we'll get a schema event to
// restart into the new primary index, but the DeleteRange may come
// through before the schema event.
//
// TODO(erikgrinaker): Write an end-to-end test which verifies that an
// IMPORT INTO which gets rolled back using MVCC range tombstones will
Expand All @@ -136,7 +141,7 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
// catchup scans should detect that this happened and prevent reading
// anything in that timespan. See:
// https://github.com/cockroachdb/cockroach/issues/70433
return errors.Errorf("unexpected MVCC range deletion: %v", t)
continue

default:
return errors.Errorf("unexpected RangeFeedEvent variant %v", t)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
embed = [":kvfollowerreadsccl"],
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/utilccl",
"//pkg/keys",
"//pkg/kv",
Expand All @@ -57,6 +58,7 @@ go_test(
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/boundedstaleness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func TestBoundedStalenessDataDriven(t *testing.T) {
for i := 0; i < numNodes; i++ {
i := i
clusterArgs.ServerArgsPerNode[i] = base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
Expand Down
216 changes: 214 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ package kvfollowerreadsccl

import (
"context"
gosql "database/sql"
"fmt"
"math"
"net/url"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
// Blank import kvtenantccl so that we can create a tenant.
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -24,10 +29,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -568,13 +575,17 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
tc := testcluster.StartTestCluster(t, 4,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{UseDatabase: "t"},
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
UseDatabase: "t",
},
// n4 pretends to have low latency to n2 and n3, so that it tries to use
// them for follower reads.
// Also, we're going to collect a trace of the test's final query.
ServerArgsPerNode: map[int]base.TestServerArgs{
3: {
UseDatabase: "t",
DisableDefaultTestTenant: true,
UseDatabase: "t",
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
// Inhibit the checking of connection health done by the
Expand Down Expand Up @@ -690,3 +701,204 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.NoError(t, err)
require.Greater(t, followerReadsCountAfter, followerReadsCountBefore)
}

// TestSecondaryTenantFollowerReadsRouting ensures that secondary tenants route
// their requests to the nearest replica. The test runs two versions -- one
// where accurate latency information between nodes is available and another
// where it needs to be estimated using node localities.
func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()

skip.UnderStressRace(t, "times out")

testutils.RunTrueAndFalse(t, "valid-latency-func", func(t *testing.T, validLatencyFunc bool) {
const numNodes = 4

serverArgs := make(map[int]base.TestServerArgs)
localities := make(map[int]roachpb.Locality)
for i := 0; i < numNodes; i++ {
regionName := fmt.Sprintf("region_%d", i)
if i == 3 {
// Make it such that n4 and n2 are in the same region. Below, we'll
// expect a follower read from n4 to be served by n2 because they're
// in the same locality (when validLatencyFunc is false).
regionName = fmt.Sprintf("region_%d", 1)
}
locality := roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionName}},
}
localities[i] = locality
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
DisableDefaultTestTenant: true, // we'll create one ourselves below.
}
}
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

historicalQuery := `SELECT * FROM t.test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracingpb.Recording, 1)

var tenants [numNodes]serverutils.TestTenantInterface
for i := 0; i < numNodes; i++ {
knobs := base.TestingKnobs{}
if i == 3 { // n4
knobs = base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
DontConsiderConnHealth: true,
// For the validLatencyFunc=true version of the test, the client
// pretends to have a low latency connection to n2. As a result, we
// expect n2 to be used for follower reads originating from n4.
//
// For the variant where no latency information is available, we
// expect n2 to serve follower reads as well, but because it
// is in the same locality as the client.
LatencyFunc: func(addr string) (time.Duration, bool) {
if !validLatencyFunc {
return 0, false
}
if addr == tc.Server(1).RPCAddr() {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
},
},
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
if stmt == historicalQuery {
recCh <- trace
}
},
},
}
}
tt, err := tc.Server(i).StartTenant(ctx, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Locality: localities[i],
TestingKnobs: knobs,
})
require.NoError(t, err)
tenants[i] = tt
}

// Speed up closing of timestamps in order to sleep less below before we can
// use follower_read_timestamp(). Note that we need to override the setting
// for the tenant as well, because the builtin is run in the tenant's sql pod.
systemSQL := sqlutils.MakeSQLRunner(tc.Conns[0])
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
// We're making assertions on traces collected by the tenant using log lines
// in KV so we must ensure they're not redacted.
systemSQL.Exec(t, `SET CLUSTER SETTING server.secondary_tenants.redact_trace.enabled = 'false'`)

// Wait until all tenant servers are aware of the setting override.
testutils.SucceedsSoon(t, func() error {
settingNames := []string{
"kv.closed_timestamp.target_duration", "kv.closed_timestamp.side_transport_interval", "kv.closed_timestamp.propagation_slack",
}
for _, settingName := range settingNames {
for i := 0; i < numNodes; i++ {
pgURL, cleanup := sqlutils.PGUrl(t, tenants[i].SQLAddr(), "Tenant", url.User(username.RootUser))
defer cleanup()
db, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Fatal(err)
}
defer db.Close()

var val string
err = db.QueryRow(
fmt.Sprintf("SHOW CLUSTER SETTING %s", settingName),
).Scan(&val)
require.NoError(t, err)
if val != "00:00:00.1" {
return errors.Errorf("tenant server %d is still waiting for %s update: currently %s",
i,
settingName,
val,
)
}
}
}
return nil
})

pgURL, cleanupPGUrl := sqlutils.PGUrl(
t, tenants[3].SQLAddr(), "Tenant", url.User(username.RootUser),
)
defer cleanupPGUrl()
tenantSQLDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer tenantSQLDB.Close()
tenantSQL := sqlutils.MakeSQLRunner(tenantSQLDB)

tenantSQL.Exec(t, `CREATE DATABASE t`)
tenantSQL.Exec(t, `CREATE TABLE t.test (k INT PRIMARY KEY)`)

startKey := keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix()
tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2))
desc := tc.LookupRangeOrFatal(t, startKey)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, desc.Replicas().Descriptors())

// Sleep so that we can perform follower reads. The read timestamp needs to be
// above the timestamp when the table was created.
log.Infof(ctx, "test sleeping for the follower read timestamps to pass the table creation timestamp...")
time.Sleep(500 * time.Millisecond)
log.Infof(ctx, "test sleeping... done")

getFollowerReadCounts := func() [numNodes]int64 {
var counts [numNodes]int64
for i := range tc.Servers {
err := tc.Servers[i].Stores().VisitStores(func(s *kvserver.Store) error {
counts[i] = s.Metrics().FollowerReadsCount.Count()
return nil
})
require.NoError(t, err)
}
return counts
}

// Check that the cache was indeed populated.
tenantSQL.Exec(t, `SELECT * FROM t.test WHERE k = 1`)
tablePrefix := keys.MustAddr(keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix())
cache := tenants[3].DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, entry.Desc().Replicas().Descriptors())

followerReadCountsBefore := getFollowerReadCounts()
tenantSQL.Exec(t, historicalQuery)
followerReadsCountsAfter := getFollowerReadCounts()

rec := <-recCh
// Look at the trace and check that we've served a follower read.
require.True(t, kv.OnlyFollowerReads(rec), "query was served through follower reads: %s", rec)

for i := 0; i < numNodes; i++ {
if i == 1 { // n2
require.Greater(t, followerReadsCountsAfter[i], followerReadCountsBefore[i])
continue
}
require.Equal(t, followerReadsCountsAfter[i], followerReadCountsBefore[i])
}
})
}
Loading

0 comments on commit b6d1689

Please sign in to comment.