Skip to content

Commit

Permalink
kvserver,keys: force-flush when needed by split/merge/migrate
Browse files Browse the repository at this point in the history
- ReplicatedEvalResult.DoTimelyApplicationToAllReplicas is added and set
  in splitTriggerHelper, MergeTrigger, Migrate, Subsume.
- The previous setting to true is gated on cluster version
  V25_1_AddRangeForceFlushKey.
- This causes ReplicaState.ForceFlushIndex to be set, and
  RangeForceFlushKey (a replicated range-id local key) to be written when
  applying the corresponding batch to the state machine. The index is set
  to the index of the entry being applied, and is monotonically
  increasing.
- replica_rac2.Processor and rac2.RangeController have
  ForceFlushIndexChangedLocked methods that are called whenever the
  Replica sees a change in the force-flush-index. When in pull-mode the
  rangeController acts on the latest value it has seem to force-flush
  a replicaSendStream that has a send-queue that has some entries at
  or equal to the force-flush-index.

Fixes #135601

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola authored and kvoli committed Nov 26, 2024
1 parent aded21f commit 8d6aa4f
Show file tree
Hide file tree
Showing 28 changed files with 1,157 additions and 66 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,4 @@ trace.span_registry.enabled boolean false if set, ongoing traces can be seen at
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.database_locality_metadata.enabled boolean true if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000024.3-upgrading-to-1000025.1-step-004 set the active cluster version in the format '<major>.<minor>' application
version version 1000024.3-upgrading-to-1000025.1-step-006 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,6 @@
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-database-locality-metadata-enabled" class="anchored"><code>ui.database_locality_metadata.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.3-upgrading-to-1000025.1-step-004</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.3-upgrading-to-1000025.1-step-006</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ const (
// V25_1_AddJobsTables added new jobs tables.
V25_1_AddJobsTables

// V25_1_AddRangeForceFlushKey adds the RangeForceFlushKey, a replicated
// range-ID local key, which is written below raft.
V25_1_AddRangeForceFlushKey

// *************************************************
// Step (1) Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -295,6 +299,8 @@ var versionTable = [numKeys]roachpb.Version{

V25_1_AddJobsTables: {Major: 24, Minor: 3, Internal: 4},

V25_1_AddRangeForceFlushKey: {Major: 24, Minor: 3, Internal: 6},

// *************************************************
// Step (2): Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down
2 changes: 2 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ var (
// LocalRangeAppliedStateSuffix is the suffix for the range applied state
// key.
LocalRangeAppliedStateSuffix = []byte("rask")
// LocalRangeForceFlushSuffix is the suffix for the range force flush key.
LocalRangeForceFlushSuffix = []byte("rffk")
// This was previously used for the replicated RaftTruncatedState. It is no
// longer used and this key has been removed via a migration. See
// LocalRaftTruncatedStateSuffix for the corresponding unreplicated
Expand Down
13 changes: 7 additions & 6 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,14 @@ var _ = [...]interface{}{
// range as a whole. Though they are replicated, they are unaddressable.
// Typical examples are MVCC stats and the abort span. They all share
// `LocalRangeIDPrefix` and `LocalRangeIDReplicatedInfix`.
AbortSpanKey, // "abc-"
AbortSpanKey, // "abc-"
RangeGCThresholdKey, // "lgc-"
RangeAppliedStateKey, // "rask"
RangeForceFlushKey, // "rffk"
RangeLeaseKey, // "rll-"
RangePriorReadSummaryKey, // "rprs"
ReplicatedSharedLocksTransactionLatchingKey, // "rsl-"
RangeGCThresholdKey, // "lgc-"
RangeAppliedStateKey, // "rask"
RangeLeaseKey, // "rll-"
RangePriorReadSummaryKey, // "rprs"
RangeVersionKey, // "rver"
RangeVersionKey, // "rver"

// 2. Unreplicated range-ID local keys: These contain metadata that
// pertain to just one replica of a range. They are unreplicated and
Expand Down
11 changes: 11 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ func RangeAppliedStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeAppliedStateKey()
}

// RangeForceFlushKey returns a system-local key for the range force flush key.
func RangeForceFlushKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeForceFlushKey()
}

// RangeLeaseKey returns a system-local key for a range lease.
func RangeLeaseKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeLeaseKey()
Expand Down Expand Up @@ -1129,6 +1134,12 @@ func (b RangeIDPrefixBuf) RangeAppliedStateKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeAppliedStateSuffix...)
}

// RangeForceFlushKey returns a system-local key for the range force flush
// key.
func (b RangeIDPrefixBuf) RangeForceFlushKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeForceFlushSuffix...)
}

// RangeLeaseKey returns a system-local key for a range lease.
func (b RangeIDPrefixBuf) RangeLeaseKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeLeaseSuffix...)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/build",
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/abortspan",
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
Expand Down Expand Up @@ -1363,6 +1364,10 @@ func splitTriggerHelper(
// hand side range (i.e. it goes from zero to its stats).
RHSDelta: *h.AbsPostSplitRight(),
}
// TODO: explain with reference to comment in split_delay_helper.go.
if rec.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}

pd.Local.Metrics = &result.Metrics{
SplitsWithEstimatedStats: h.splitsWithEstimates,
Expand Down Expand Up @@ -1459,6 +1464,14 @@ func mergeTrigger(
pd.Replicated.Merge = &kvserverpb.Merge{
MergeTrigger: *merge,
}
// TODO: explain with reference to kvserver.AdminMerge. This is not
// technically necessary since the call to waitForApplication happens
// earlier in kvserver.AdminMerge, when sending a kvpb.SubsumeRequest, but
// since we have force-flushed once during the merge txn anyway, let's
// complete the story and finish the merge on all replicas.
if rec.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}

{
// If we have GC hints populated that means we are trying to perform
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
Expand Down Expand Up @@ -81,6 +82,11 @@ func Migrate(
// as all below-raft migrations (the only users of Migrate) were introduced
// after it.
pd.Replicated.State.Version = &migrationVersion
// TODO: explain with reference to the waitForApplication call in
// replica_write.go for a MigrateRequest.
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}
return pd, nil
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
Expand Down Expand Up @@ -156,5 +157,10 @@ func Subsume(
reply.ReadSummary = &priorReadSum
reply.ClosedTimestamp = cArgs.EvalCtx.GetCurrentClosedTimestamp(ctx)

return result.Result{}, nil
var pd result.Result
// TODO: explain with reference to kvserver.AdminMerge.
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}
return pd, nil
}
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ func (p *Result) MergeAndDestroy(q Result) error {
log.Fatalf(context.TODO(), "unhandled EvalResult: %s",
pretty.Diff(*q.Replicated.State, kvserverpb.ReplicaState{}))
}
if q.Replicated.State.ForceFlushIndex != (roachpb.ForceFlushIndex{}) {
return errors.AssertionFailedf("must not specify ForceFlushIndex")
}
q.Replicated.State = nil
}

Expand Down Expand Up @@ -350,6 +353,11 @@ func (p *Result) MergeAndDestroy(q Result) error {
}
q.Replicated.IsProbe = false

if q.Replicated.DoTimelyApplicationToAllReplicas {
p.Replicated.DoTimelyApplicationToAllReplicas = true
}
q.Replicated.DoTimelyApplicationToAllReplicas = false

if p.Local.EncounteredIntents == nil {
p.Local.EncounteredIntents = q.Local.EncounteredIntents
} else {
Expand Down
Loading

0 comments on commit 8d6aa4f

Please sign in to comment.