Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
60719: sql/opt: propagate row-level locking mode to index, lookup, inverted, and zigzag joins r=nvanbenschoten a=nvanbenschoten

Fixes #56941.

The first commit updates the execbuilder to propagate row-level locking modes
through transformations from standard Scan and Join operations to
specialized IndexJoin, LookupJoin, and InvertedJoin operations.

The second commit updates the execbuilder to propagate row-level locking modes
through transformations from standard Scan and Join operations to
ZigZagJoins, and allows for the use of zigzag joins when explicit
row-level locking modes are in use.

Release note (sql change): table scans performed as a part of index
joins, lookup joins, and inverted joins now respect the row-level
locking strength and wait policy specified by the optional
FOR SHARE/UPDATE [NOWAIT] clause on SELECT statements.

Release note (sql change): table scans performed as a part of zigzag
joins now respect the row-level locking strength and wait policy
specified by the optional FOR SHARE/UPDATE [NOWAIT] clause on SELECT
statements.

80070: sql: do not close stmt buffer of internal executor in errCallback r=yuzefovich a=yuzefovich

Previously, we would close the stmt buffer of the internal executor in
`errCallback`, "just to be safe" since it was assumed that the buffer is
already closed when the callback is executed. The callback runs whenever
`run()` loop of connExecutor exits with an error.

However, it is possible for the following sequence of events to happen:
- The new goroutine is spun up for the internal executor before any
commands are pushed into the stmt buffer.
- The context is canceled before the new goroutine blocks waiting for
the command to execute, i.e. `run()` loop is exited before any commands
are executed.
- The `errCallback` with the context cancellation error is evaluated.
This closes the stmt buffer. The goroutine exits.
- The main goroutine tries to push some commands into the buffer only to
find that it was already closed. An assertion error is returned, and
a sentry event is created.

I think we should just not close the stmt buffer in the `errCallback`
since this was never necessary and can lead to the scenario described
above where no sentry event should be emitted.

Fixes: #79746.

Release note: None

80190: kvserver: version gate locality-aware load-based rebalancing r=aayushshah15 a=aayushshah15

This commit introduces a set of deprecated store rebalancer methods
corresponding to the pre-22.1 load-rebalancing scheme. Until a store detects
that the version upgrade (to 22.1) has been finalized, the store will fall back
to the old (pre-22.1) load-based rebalancing logic that wasn't locality aware.

This is done in order to minimize risk of unexpected behavior in mixed version
clusters.

All the tests corresponding to the old logic have been re-introduced, and are unchanged.

Resolves #76702

Release note: None


80263: bazel: run `returncheck` lint in `nogo` r=rail a=rickystewart

Unfortunately `returncheck` is not implemented as an `Analyzer` so it
can't be integrated with `nogo` directly. Instead I've copied/adapted
the existing code from https://github.com/cockroachdb/returncheck. The
vendored `returncheck` can be deleted from tree when we no longer need
to support `make lint`.

Closes #73391.

Release note: None

80317: bazel,dev: provide opt-out for building w/ `nogo` r=rail a=rickystewart

Now we provide `--//build/toolchains:nogo_disable` as an option which
will force `nogo` checks OFF even if you have globally configured
`lintonbuild`. This can be used by end users for builds but primarily I
want a way to force builds to finish faster when `nogo` checks are not
particularly useful: namely, when building `dev` for the first time (via
the `dev` script) and when building the `bazel-remote` binary. Also have
`dev build` infer when passing `nogo_disable` will be OK and pass it in
when appropriate.

Release note: None

80551: sql/gcjob/gcjobnotifier: fix rare panic r=ajwerner a=ajwerner

If a notifier is added before the initial system config, it can result in a
crash. This may be more common in 22.1, where the initial config takes longer
to be populated.

Fixes #77425

Release note (bug fix): Fixes a rare crash which can occur when restarting a
node after dropping tables.

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
6 people committed Apr 26, 2022
7 parents dbbe083 + 75ac30c + d1898a9 + d819235 + 64cb8f2 + 7f1e0c8 + 7234a57 commit 8944fd0
Show file tree
Hide file tree
Showing 46 changed files with 2,990 additions and 211 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ nogo(
"//pkg/testutils/lint/passes/leaktestcall",
"//pkg/testutils/lint/passes/nilness",
"//pkg/testutils/lint/passes/nocopy",
"//pkg/testutils/lint/passes/returncheck",
"//pkg/testutils/lint/passes/returnerrcheck",
"//pkg/testutils/lint/passes/timer",
"//pkg/testutils/lint/passes/unconvert",
Expand Down
9 changes: 9 additions & 0 deletions build/bazelutil/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@
"cockroach/pkg/.*$": "first-party code"
}
},
"returncheck": {
"exclude_files": {
"cockroach/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go": "existing failure",
"cockroach/pkg/kv/txn_external_test.go": "existing failure"
},
"only_files": {
"cockroach/pkg/.*$": "first-party code"
}
},
"returnerrcheck": {
"only_files": {
"cockroach/pkg/.*$": "first-party code"
Expand Down
9 changes: 9 additions & 0 deletions build/toolchains/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ config_setting(
name = "nogo",
flag_values = {
":nogo_flag": "true",
":nogo_disable_flag": "false",
},
)

Expand All @@ -433,6 +434,14 @@ config_setting(
visibility = ["//build/bazelutil:__pkg__"],
)

# Note: nogo_disable can be set to force nogo checks off even if
# `build --config lintonbuild` is set in .bazelrc.
bool_flag(
name = "nogo_disable_flag",
build_setting_default = False,
visibility = ["//visibility:public"],
)

bool_flag(
name = "force_build_cdeps_flag",
build_setting_default = False,
Expand Down
4 changes: 2 additions & 2 deletions dev
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ fi
if [[ ! -f "$BINARY_PATH" ]]; then
echo "$BINARY_PATH not found, building..."
mkdir -p $BINARY_DIR
bazel build //pkg/cmd/dev --//build/toolchains:nogo_flag
cp $(bazel info bazel-bin --//build/toolchains:nogo_flag)/pkg/cmd/dev/dev_/dev $BINARY_PATH
bazel build //pkg/cmd/dev --//build/toolchains:nogo_disable_flag
cp $(bazel info bazel-bin --//build/toolchains:nogo_disable_flag)/pkg/cmd/dev/dev_/dev $BINARY_PATH
# The Bazel-built binary won't have write permissions.
chmod a+w $BINARY_PATH
fi
Expand Down
13 changes: 12 additions & 1 deletion pkg/cmd/dev/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
crossFlag = "cross"
crossFlag = "cross"
nogoDisableFlag = "--//build/toolchains:nogo_disable_flag"
)

type buildTarget struct {
Expand Down Expand Up @@ -316,6 +317,7 @@ func (d *dev) getBasicBuildArgs(
args = append(args, fmt.Sprintf("--local_cpu_resources=%d", numCPUs))
}

canDisableNogo := true
shouldBuildWithTestConfig := false
for _, target := range targets {
target = strings.TrimPrefix(target, "./")
Expand All @@ -338,6 +340,9 @@ func (d *dev) getBasicBuildArgs(
if typ == "go_test" || typ == "go_transition_test" || typ == "test_suite" {
shouldBuildWithTestConfig = true
}
if strings.HasPrefix(fullTargetName, "//") {
canDisableNogo = false
}
}
continue
}
Expand All @@ -354,6 +359,9 @@ func (d *dev) getBasicBuildArgs(
} else {
buildTargets = append(buildTargets, buildTarget{fullName: aliased, kind: "go_binary"})
}
if strings.HasPrefix(aliased, "//") {
canDisableNogo = false
}
}

// Add --config=with_ui iff we're building a target that needs it.
Expand All @@ -366,6 +374,9 @@ func (d *dev) getBasicBuildArgs(
if shouldBuildWithTestConfig {
args = append(args, "--config=test")
}
if canDisableNogo {
args = append(args, nogoDisableFlag)
}
return args, buildTargets, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/dev/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (d *dev) setUpCache(ctx context.Context) (string, error) {

log.Printf("Configuring cache...\n")

err := d.exec.CommandContextInheritingStdStreams(ctx, "bazel", "build", bazelRemoteTarget, "--//build/toolchains:nonogo_explicit_flag")
err := d.exec.CommandContextInheritingStdStreams(ctx, "bazel", "build", bazelRemoteTarget, nogoDisableFlag)
if err != nil {
return "", err
}
bazelRemoteLoc, err := d.exec.CommandContextSilent(ctx, "bazel", "run", bazelRemoteTarget, "--//build/toolchains:nonogo_explicit_flag", "--run_under=//build/bazelutil/whereis")
bazelRemoteLoc, err := d.exec.CommandContextSilent(ctx, "bazel", "run", bazelRemoteTarget, nogoDisableFlag, "--run_under=//build/bazelutil/whereis")
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/dev/testdata/datadriven/dev-build
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ cp sandbox/pkg/cmd/cockroach/cockroach_/cockroach crdb-checkout/cockroach
exec
dev build stress
----
bazel build @com_github_cockroachdb_stress//:stress
bazel build @com_github_cockroachdb_stress//:stress --//build/toolchains:nogo_disable_flag
bazel info workspace --color=no
mkdir crdb-checkout/bin
bazel info bazel-bin --color=no
Expand Down
105 changes: 81 additions & 24 deletions pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"sort"
"strings"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

Expand All @@ -52,15 +54,34 @@ func registerRebalanceLoad(r registry.Registry) {
rebalanceMode string,
maxDuration time.Duration,
concurrency int,
mixedVersion bool,
) {
roachNodes := c.Range(1, c.Spec().NodeCount-1)
appNode := c.Node(c.Spec().NodeCount)
splits := len(roachNodes) - 1 // n-1 splits => n ranges => 1 lease per node

c.Put(ctx, t.Cockroach(), "./cockroach", roachNodes)
startOpts := option.DefaultStartOpts()
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, "--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), roachNodes)
settings := install.MakeClusterSettings()
if mixedVersion {
predecessorVersion, err := PredecessorVersion(*t.BuildVersion())
require.NoError(t, err)
settings.Binary = uploadVersion(ctx, t, c, c.All(), predecessorVersion)
// Upgrade some (or all) of the first N-1 CRDB nodes. We ignore the last
// CRDB node (to leave at least one node on the older version), and the
// app node.
lastNodeToUpgrade := rand.Intn(c.Spec().NodeCount-2) + 1
t.L().Printf("upgrading %d nodes to the current cockroach binary", lastNodeToUpgrade)
nodesToUpgrade := c.Range(1, lastNodeToUpgrade)
// An empty string means that the cockroach binary specified by the
// `cockroach` flag will be used.
const newVersion = ""
c.Start(ctx, t.L(), startOpts, settings, roachNodes)
upgradeNodes(ctx, nodesToUpgrade, newVersion, t, c)
} else {
c.Put(ctx, t.Cockroach(), "./cockroach", roachNodes)
c.Start(ctx, t.L(), startOpts, settings, roachNodes)
}

c.Put(ctx, t.DeprecatedWorkload(), "./workload", appNode)
c.Run(ctx, appNode, fmt.Sprintf("./workload init kv --drop --splits=%d {pgurl:1}", splits))
Expand Down Expand Up @@ -131,30 +152,66 @@ func registerRebalanceLoad(r registry.Registry) {

concurrency := 128

r.Add(registry.TestSpec{
Name: `rebalance/by-load/leases`,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(4), // the last node is just used to generate load
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency)
r.Add(
registry.TestSpec{
Name: `rebalance/by-load/leases`,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(4), // the last node is just used to generate load
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency, false /* mixedVersion */)
},
},
})
r.Add(registry.TestSpec{
Name: `rebalance/by-load/replicas`,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(7), // the last node is just used to generate load
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, "leases and replicas", 5*time.Minute, concurrency)
)
r.Add(
registry.TestSpec{
Name: `rebalance/by-load/leases/mixed-version`,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(4), // the last node is just used to generate load
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency, true /* mixedVersion */)
},
},
)
r.Add(
registry.TestSpec{
Name: `rebalance/by-load/replicas`,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(7), // the last node is just used to generate load
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(
ctx, t, c, "leases and replicas", 5*time.Minute, concurrency, false, /* mixedVersion */
)
},
},
)
r.Add(
registry.TestSpec{
Name: `rebalance/by-load/replicas/mixed-version`,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(7), // the last node is just used to generate load
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(
ctx, t, c, "leases and replicas", 5*time.Minute, concurrency, true, /* mixedVersion */
)
},
},
})
)
}

func isLoadEvenlyDistributed(l *logger.Logger, db *gosql.DB, numNodes int) (bool, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"compact_span_client.go",
"consistency_queue.go",
"debug_print.go",
"deprecated_store_rebalancer.go",
"doc.go",
"lease_history.go",
"log.go",
Expand Down Expand Up @@ -239,6 +240,7 @@ go_test(
"closed_timestamp_test.go",
"consistency_queue_test.go",
"debug_print_test.go",
"deprecated_store_rebalancer_test.go",
"gossip_test.go",
"helpers_test.go",
"intent_resolver_integration_test.go",
Expand Down
21 changes: 12 additions & 9 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ func (a *Allocator) AllocateTargetFromList(
candidateStores storepool.StoreList,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options *RangeCountScorerOptions,
options ScorerOptions,
allowMultipleReplsPerNode bool,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
Expand Down Expand Up @@ -1449,7 +1449,7 @@ func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions
return &RangeCountScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
deterministic: a.StorePool.Deterministic,
rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.StorePool.St.SV),
rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
}
}

Expand All @@ -1467,7 +1467,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
// made by the replicateQueue during normal course of operations. In other
// words, we don't want stores that are too far away from the mean to be
// affected by the jitter.
jitter: rangeRebalanceThreshold.Get(&a.StorePool.St.SV),
jitter: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
}
}

Expand Down Expand Up @@ -1532,7 +1532,7 @@ func (a *Allocator) ValidLeaseTargets(

// Determine which store(s) is preferred based on user-specified preferences.
// If any stores match, only consider those stores as candidates.
preferred := a.preferredLeaseholders(conf, candidates)
preferred := a.PreferredLeaseholders(conf, candidates)
if len(preferred) > 0 {
candidates = preferred
}
Expand Down Expand Up @@ -1573,7 +1573,7 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
)
// If there are any replicas that do match lease preferences, then we check if
// the existing leaseholder is one of them.
preferred := a.preferredLeaseholders(conf, candidates)
preferred := a.PreferredLeaseholders(conf, candidates)
if len(preferred) == 0 {
return false
}
Expand Down Expand Up @@ -1754,9 +1754,10 @@ func (a *Allocator) TransferLeaseTarget(
candidates,
storeDescMap,
&QPSScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.StorePool.St.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.StorePool.St.SV),
StoreHealthOptions: a.StoreHealthOptions(ctx),
DeprecatedRangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.StorePool.St.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.StorePool.St.SV),
},
)

Expand Down Expand Up @@ -2171,7 +2172,9 @@ func (a Allocator) shouldTransferLeaseForLeaseCountConvergence(
return false
}

func (a Allocator) preferredLeaseholders(
// PreferredLeaseholders returns a slice of replica descriptors corresponding to
// replicas that meet lease preferences (among the `existing` replicas).
func (a Allocator) PreferredLeaseholders(
conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor,
) []roachpb.ReplicaDescriptor {
// Go one preference at a time. As soon as we've found replicas that match a
Expand Down
Loading

0 comments on commit 8944fd0

Please sign in to comment.