Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83531: rfc: add rfc for invisible index feature r=wenyihu6 a=wenyihu6

This commit adds an RFC for the invisible index feature.

Related issue: #72576, #82363

Release justification: low risk to the existing functionality; this commit just
adds rfc.

Release Note: none

86267: allocator: select a good enough store for decom/recovery r=lidorcarmel a=lidorcarmel

Until now, when decommissioning a node, or when recovering from a dead
node, the allocator tries to pick one of the best possible stores as
the target for the recovery.

Because of that, we sometimes see multiple stores recover replicas
to the same store, for example, when decommissioning a node and
at the same time adding a new node.

This PR changes the way we select a destination store by choosing
a random store out of all the stores that are "good enough" for
the replica. The risk diversity is still enforced, but we may
recover a replica to a store that is considered "over full", for
example.

Note that during upreplication the allocator will still try to use
one of the "best" stores as targets.

Fixes: #86265

Release note: None

Release justification: a relatively small change, and it can be
reverted by setting kv.allocator.recovery_store_selector=best.

86345: clusterversion: prevent upgrades from development versions r=ajwerner a=dt

This change defines a new "unstableVersionsAbove" point on the cluster
version  line, above which any cluster versions are considered unstable
development-only versions which are still subject to change.

Performing an upgrade to a version while it is still unstable leaves a
cluster in a state where it persists a version that claims it has done
that upgrade and all prior, however those upgrades are still subject to
change by nature of being unstable. If it subsequently upgraded to a
stable version, this could result in subtle and nearly impossible to
detect issues, as being at or above a particular version is used to
assume that all subsequent version upgrades _as released_ were run; on a
cluster that ran an earlier iteration of an upgrade this does not hold.

Thus to prevent clusters which upgrade to development versions from
subsequently upgrading to a stable version, we offset all development
versions -- those above the unstableVersionsAbove point -- into the far
future by adding one million to their major version e.g. v22.x-y becomes
1000022.x-y. This means an attempt to subsequently "upgrade" to a stable
version -- such as v22.2 -- will look like a downgrade and be forbidden.

On the release branch, prior to starting to publish upgradable releases,
the unstableVersionsAbove value should be set to invalidVersionKey to
reflect that all version upgrades in that release branch are now
considered to be stable, meaning they must be treated as immutable and
append-only.

Release note (ops change): clusters that are upgraded to an alpha or
other manual build from the development branch will not be able to be
subsequently upgraded to a release build.

Release justification: high-priority change to existing functionality,
to allow releasing alphas with known version upgrade bugs while ensuring
they do not subsequently upgrade into stable version but silently
corrupted clusters.

86630: kvserver: add additional testing to multiqueue r=AlexTalks a=AlexTalks

Add testing for cancelation of multi-queue requests
and fix a bug where the channel wasn't closed on task
cancelation.

Release justification: Test-only change.
Release note: None

86801: ttl: add queries to job details r=otan a=rafiss

fixes #81905

This helps with observability so users can understand what the TTL job
is doing behind the scenes.

The job details in the DB console will show:
```
ttl for defaultdb.public.t
-- for each range, iterate to find rows:
SELECT id FROM [108 AS tbl_name]
AS OF SYSTEM TIME '30s'
WHERE <crdb_internal_expiration OR ttl_expiration_expression> <= $1
AND (id) > (<range start>) AND (id) < (<range end>)
ORDER BY id
LIMIT <ttl_select_batch_size>
-- then delete with:
DELETE FROM [108 AS tbl_name]
WHERE <crdb_internal_expiration OR ttl_expiration_expression> <= $1
AND (id) IN (<rows selected above>)
```

Release note: None

Release justification: low risk change

86876: kv: Include error information in `crdb_internal.active_range_feeds` r=miretskiy a=miretskiy

Include error count, and the last error information in
`crdb_internal.active_range_feeds` table whenever rangefeed
disconnects due to an error.

Release justification: observability improvement.
Release note: None

86901: sql: fix cluster_execution_insights priority level r=j82w a=j82w

This fixes the priority level and converts it to be a string.

closes: #86900, closes #86867

Release justification: Category 2: Bug fixes and
low-risk updates to new functionality

Release note (sql change): Fix the insight
execution priority to have correct value
instead of always being default. Changed
the column to be a string to avoid
converting it in the ui.

86921: kvserver: incorporate sending queue priority into snapshot requests r=AlexTalks a=AlexTalks

This change modifies the `(Delegated)SnapshotRequest` Raft RPCs in order
to incorporate the name of the sending queue, as well as the sending
queue's priority, in order to be used to prioritize queued snapshots on
a receiving store.

Release justification: Low-risk change to existing functionality.
Release note: None

86923: dev: bump version r=yuzefovich a=yuzefovich

This commit bumps version since there appears to have been a "merge
skew" between #85095 and #86167, and somehow I had a `dev` binary that
didn't include the benchmark fix from the latter.

Release justification: test-only change.

Release note: None

Co-authored-by: wenyihu3 <[email protected]>
Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: j82w <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
9 people committed Aug 26, 2022
10 parents 5c0af43 + 8c4d31f + 3ce0fd5 + ebb812e + 73aa6f2 + bc2789e + 07cb344 + f4f3dc4 + 9913ba5 + 28db5f3 commit 3688055
Show file tree
Hide file tree
Showing 42 changed files with 1,288 additions and 251 deletions.
2 changes: 1 addition & 1 deletion dev
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fi
set -euo pipefail

# Bump this counter to force rebuilding `dev` on all machines.
DEV_VERSION=53
DEV_VERSION=54

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
BINARY_DIR=$THIS_DIR/bin/dev-versions
Expand Down
428 changes: 428 additions & 0 deletions docs/RFCS/20220628_invisible_index.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-62 set the active cluster version in the format '<major>.<minor>'
version version 1000022.1-62 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-62</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.1-62</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1727,6 +1727,7 @@ GO_TARGETS = [
"//pkg/sql/syntheticprivilege:syntheticprivilege_test",
"//pkg/sql/tests:tests",
"//pkg/sql/tests:tests_test",
"//pkg/sql/ttl/ttlbase:ttlbase",
"//pkg/sql/ttl/ttljob:ttljob",
"//pkg/sql/ttl/ttljob:ttljob_test",
"//pkg/sql/ttl/ttlschedule:ttlschedule",
Expand Down Expand Up @@ -2752,6 +2753,7 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/storageparam/tablestorageparam:get_x_data",
"//pkg/sql/syntheticprivilege:get_x_data",
"//pkg/sql/tests:get_x_data",
"//pkg/sql/ttl/ttlbase:get_x_data",
"//pkg/sql/ttl/ttljob:get_x_data",
"//pkg/sql/ttl/ttlschedule:get_x_data",
"//pkg/sql/types:get_x_data",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8228,7 +8228,7 @@ func TestManifestTooNew(t *testing.T) {
require.NoError(t, protoutil.Unmarshal(manifestData, &backupManifest))

// Bump the version and write it back out to make it look newer.
backupManifest.ClusterVersion = roachpb.Version{Major: 99, Minor: 1}
backupManifest.ClusterVersion = roachpb.Version{Major: math.MaxInt32, Minor: 1}
manifestData, err = protoutil.Marshal(&backupManifest)
require.NoError(t, err)
require.NoError(t, os.WriteFile(manifestPath, manifestData, 0644 /* perm */))
Expand All @@ -8238,7 +8238,7 @@ func TestManifestTooNew(t *testing.T) {
require.NoError(t, os.WriteFile(manifestPath+backupinfo.BackupManifestChecksumSuffix, checksum, 0644 /* perm */))

// Verify we reject it.
sqlDB.ExpectErr(t, "backup from version 99.1 is newer than current version", `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`)
sqlDB.ExpectErr(t, "backup from version 2147483647.1 is newer than current version", `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`)

// Bump the version down and write it back out to make it look older.
backupManifest.ClusterVersion = roachpb.Version{Major: 20, Minor: 2, Internal: 2}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ select crdb_internal.get_vmodule()
·

query T
select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', '');
select regexp_replace(regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''), '10000', '');
----
22.1

Expand Down Expand Up @@ -453,7 +453,7 @@ select * from crdb_internal.gossip_alerts

# Anyone can see the executable version.
query T
select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', '');
select regexp_replace(regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''), '10000', '');
----
22.1

Expand Down
51 changes: 43 additions & 8 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type Key int
//
//go:generate stringer -type=Key
const (
_ Key = iota - 1 // want first named one to start at zero
invalidVersionKey Key = iota - 1 // want first named one to start at zero

// V21_2 is CockroachDB v21.2. It's used for all v21.2.x patch releases.
V21_2
Expand Down Expand Up @@ -302,8 +302,8 @@ const TODOPreV21_2 = V21_2
// previously referenced a < 22.1 version until that check/gate can be removed.
const TODOPreV22_1 = V22_1

// versionsSingleton lists all historical versions here in chronological order,
// with comments describing what backwards-incompatible features were
// rawVersionsSingleton lists all historical versions here in chronological
// order, with comments describing what backwards-incompatible features were
// introduced.
//
// A roachpb.Version has the colloquial form MAJOR.MINOR[.PATCH][-INTERNAL],
Expand All @@ -319,7 +319,11 @@ const TODOPreV22_1 = V22_1
// Such clusters would need to be wiped. As a result, do not bump the major or
// minor version until we are absolutely sure that no new migrations will need
// to be added (i.e., when cutting the final release candidate).
var versionsSingleton = keyedVersions{
//
// rawVersionsSingleton is converted to versionsSingleton below, by adding a
// large number to every major if building from master, so as to ensure that
// master builds cannot be upgraded to release-branch builds.
var rawVersionsSingleton = keyedVersions{
{
// V21_2 is CockroachDB v21.2. It's used for all v21.2.x patch releases.
Key: V21_2,
Expand Down Expand Up @@ -479,6 +483,36 @@ var versionsSingleton = keyedVersions{
// *************************************************
}

const (
// unstableVersionsAbove is a cluster version Key above which any upgrades in
// this version are considered unstable development-only versions if it is not
// negative, and upgrading to them should permanently move a cluster to
// development versions. On master it should be the minted version of the last
// release, while on release branches it can be set to invalidVersionKey to
// disable marking any versions as development versions.
unstableVersionsAbove = V22_1

// finalVersion should be set on a release branch to the minted final cluster
// version key, e.g. to V22_2 on the release-22.2 branch once it is minted.
// Setting it has the effect of ensuring no versions are subsequently added.
finalVersion = invalidVersionKey
)

var versionsSingleton = func() keyedVersions {
if unstableVersionsAbove > invalidVersionKey {
const devOffset = 1000000
// Throw every version above the last release (which will be none on a release
// branch) 1 million major versions into the future, so any "upgrade" to a
// release branch build will be a downgrade and thus blocked.
for i := range rawVersionsSingleton {
if rawVersionsSingleton[i].Key > unstableVersionsAbove {
rawVersionsSingleton[i].Major += devOffset
}
}
}
return rawVersionsSingleton
}()

// TODO(irfansharif): clusterversion.binary{,MinimumSupported}Version
// feels out of place. A "cluster version" and a "binary version" are two
// separate concepts.
Expand All @@ -497,11 +531,12 @@ var (
)

func init() {
const isReleaseBranch = false
if isReleaseBranch {
if binaryVersion != ByKey(V21_2) {
panic("unexpected cluster version greater than release's binary version")
if finalVersion > invalidVersionKey {
if binaryVersion != ByKey(finalVersion) {
panic("binary version does not match final version")
}
} else if binaryVersion.Internal == 0 {
panic("a non-upgrade cluster version must be the final version")
}
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ type PartialRangeFeed struct {
CreatedTime time.Time
LastValueReceived time.Time
Resolved hlc.Timestamp
NumErrs int
LastErr error
}

// ActiveRangeFeedIterFn is an iterator function which is passed PartialRangeFeed structure.
Expand Down Expand Up @@ -280,6 +282,14 @@ func (a *activeRangeFeed) onRangeEvent(
a.RangeID = rangeID
}

func (a *activeRangeFeed) setLastError(err error) {
a.Lock()
defer a.Unlock()
a.LastErr = errors.Wrapf(err, "disconnect at %s: checkpoint %s/-%s",
timeutil.Now().Format(time.RFC3339), a.Resolved, timeutil.Since(a.Resolved.GoTime()))
a.NumErrs++
}

// rangeFeedRegistry is responsible for keeping track of currently executing
// range feeds.
type rangeFeedRegistry struct {
Expand Down Expand Up @@ -389,6 +399,8 @@ func (ds *DistSender) partialRangeFeed(
startAfter.Forward(maxTS)

if err != nil {
active.setLastError(err)

if log.V(1) {
log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v",
span, timeutil.Since(startAfter.GoTime()), err)
Expand Down
76 changes: 72 additions & 4 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ var leaseRebalancingAggressiveness = settings.RegisterFloatSetting(
settings.NonNegativeFloat,
)

// recoveryStoreSelector controls the strategy for choosing a store to recover
// replicas to: either to any valid store ("good") or to a store that has low
// range count ("best"). With this set to "good", recovering from a dead node or
// from a decommissioning node can be faster, because nodes can send replicas to
// more target stores (instead of multiple nodes sending replicas to a few
// stores with a low range count).
var recoveryStoreSelector = settings.RegisterStringSetting(
settings.SystemOnly,
"kv.allocator.recovery_store_selector",
"if set to 'good', the allocator may recover replicas to any valid store, if set "+
"to 'best' it will pick one of the most ideal stores",
"good",
)

// AllocatorAction enumerates the various replication adjustments that may be
// recommended by the allocator.
type AllocatorAction int
Expand Down Expand Up @@ -850,21 +864,72 @@ type decisionDetails struct {
Existing string `json:",omitempty"`
}

// CandidateSelector is an interface to select a store from a list of
// candidates.
type CandidateSelector interface {
selectOne(cl candidateList) *candidate
}

// BestCandidateSelector in used to choose the best store to allocate.
type BestCandidateSelector struct {
randGen allocatorRand
}

// NewBestCandidateSelector returns a CandidateSelector for choosing the best
// candidate store.
func (a *Allocator) NewBestCandidateSelector() CandidateSelector {
return &BestCandidateSelector{a.randGen}
}

func (s *BestCandidateSelector) selectOne(cl candidateList) *candidate {
return cl.selectBest(s.randGen)
}

// GoodCandidateSelector is used to choose a random store out of the stores that
// are good enough.
type GoodCandidateSelector struct {
randGen allocatorRand
}

// NewGoodCandidateSelector returns a CandidateSelector for choosing a random store
// out of the stores that are good enough.
func (a *Allocator) NewGoodCandidateSelector() CandidateSelector {
return &GoodCandidateSelector{a.randGen}
}

func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate {
return cl.selectGood(s.randGen)
}

func (a *Allocator) allocateTarget(
ctx context.Context,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string, error) {
candidateStoreList, aliveStoreCount, throttled := a.StorePool.GetStoreList(storepool.StoreFilterThrottled)

// If the replica is alive we are upreplicating, and in that case we want to
// allocate new replicas on the best possible store. Otherwise, the replica is
// dead or decommissioned, and we want to recover the missing replica as soon
// as possible, and therefore any store that is good enough will be
// considered.
var selector CandidateSelector
if replicaStatus == Alive || recoveryStoreSelector.Get(&a.StorePool.St.SV) == "best" {
selector = a.NewBestCandidateSelector()
} else {
selector = a.NewGoodCandidateSelector()
}

target, details := a.AllocateTargetFromList(
ctx,
candidateStoreList,
conf,
existingVoters,
existingNonVoters,
a.ScorerOptions(ctx),
selector,
// When allocating a *new* replica, we explicitly disregard nodes with any
// existing replicas. This is important for multi-store scenarios as
// otherwise, stores on the nodes that have existing replicas are simply
Expand Down Expand Up @@ -902,8 +967,9 @@ func (a *Allocator) AllocateVoter(
ctx context.Context,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, VoterTarget)
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
}

// AllocateNonVoter returns a suitable store for a new allocation of a
Expand All @@ -913,8 +979,9 @@ func (a *Allocator) AllocateNonVoter(
ctx context.Context,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, NonVoterTarget)
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
}

// AllocateTargetFromList returns a suitable store for a new allocation of a
Expand All @@ -926,6 +993,7 @@ func (a *Allocator) AllocateTargetFromList(
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options ScorerOptions,
selector CandidateSelector,
allowMultipleReplsPerNode bool,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
Expand Down Expand Up @@ -967,7 +1035,7 @@ func (a *Allocator) AllocateTargetFromList(
)

log.VEventf(ctx, 3, "allocate %s: %s", targetType, candidates)
if target := candidates.selectGood(a.randGen); target != nil {
if target := selector.selectOne(candidates); target != nil {
log.VEventf(ctx, 3, "add target: %s", target)
details := decisionDetails{Target: target.compactString()}
detailsBytes, err := json.Marshal(details)
Expand Down Expand Up @@ -1101,7 +1169,7 @@ func (a Allocator) RemoveTarget(
)

log.VEventf(ctx, 3, "remove %s: %s", targetType, rankedCandidates)
if bad := rankedCandidates.selectBad(a.randGen); bad != nil {
if bad := rankedCandidates.selectWorst(a.randGen); bad != nil {
for _, exist := range existingReplicas {
if exist.StoreID == bad.store.StoreID {
log.VEventf(ctx, 3, "remove target: %s", bad)
Expand Down
Loading

0 comments on commit 3688055

Please sign in to comment.