Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#109238

108922: sql: use seeded random for schemachange opsgen r=annrpom a=annrpom

A schemachange TestWorkload failure was difficult to reproduce; to address this, this patch uses randutil's `NewTestRand()` to allow for a global seed to be set when stressing this test.

Epic: none
Informs: cockroachdb#108695
Informs: cockroachdb#105517
Informs: cockroachdb#109218
Release note: none

109223: loqrecovery: add logging of for network errors r=pavelkalinnikov a=aliher1911

Previously unrecoverable errors that caused nodes to be skipped were treated opaquely and provided no visibility of what actual error is.
This PR adds extra logging for errors at info level. Since number of dead nodes is not expected to be high (otherwise LOQ tools won't give much benefit), extra logging is justified as it adds more visibility to help discover more retryable cases.

Epic: none

Release note: None

Touches: cockroachdb#108429

109232: server: properly connect the scheduler latency sampler to its listener r=irfansharif,stevendanna a=knz

Prior to this patch, if we had N servers running (e.g. in tests) we would have N^2 callbacks called every sample round (one sampler per server, N calls per sampler) because it used a global callback registry.

Also it wasn't particularly elegant design, as we should eschew global registries altogether.

This patch enhances the situation by only having one listener per sampler.

Release note: None

Needed to solve cockroachdb#109225.
Epic: CRDB-26691

109238: cmd: error if try to use --stream-output with --stress r=liamgillies a=liamgillies

Previously running these two flags together would force serialization on `--stress`, which doesn't make any sense. This PR adds an error when the two flags are attempted to be used together.

Fixes: cockroachdb#108150
Release note: None

Co-authored-by: Annie Pompa <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Liam Gillies <[email protected]>
  • Loading branch information
5 people committed Aug 22, 2023
5 parents 33d6552 + b21b3e5 + 95ea6ef + 9d6e4b0 + 4ba98af commit a03be32
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 96 deletions.
4 changes: 4 additions & 0 deletions pkg/cmd/dev/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func (d *dev) test(cmd *cobra.Command, commandLine []string) error {
}
}

if stress && streamOutput {
return fmt.Errorf("cannot combine --%s and --%s", stressFlag, streamOutputFlag)
}

if rewrite {
ignoreCache = true
disableTestSharding = true
Expand Down
12 changes: 6 additions & 6 deletions pkg/cmd/dev/testdata/datadriven/testlogic
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,28 @@ getenv DEV_I_UNDERSTAND_ABOUT_STRESS
bazel test //pkg/sql/logictest/tests/... --test_env=GOTRACEBACK=all --local_cpu_resources=8 --test_arg -show-sql --test_timeout=65 --test_arg -test.timeout=1m0s --test_env=COCKROACH_STRESS=true --notest_keep_going --runs_per_test=500 --test_filter auto_span_config_reconciliation/ --test_sharding_strategy=disabled --test_output errors

exec
dev testlogic base --files=auto_span_config_reconciliation --stress --stream-output
dev testlogic base --files=auto_span_config_reconciliation --stress
----
bazel info workspace --color=no
bazel info workspace --color=no
bazel run pkg/cmd/generate-logictest -- -out-dir=crdb-checkout
getenv DEV_I_UNDERSTAND_ABOUT_STRESS
bazel test //pkg/sql/logictest/tests/... --test_env=GOTRACEBACK=all --test_arg -show-sql --test_env=COCKROACH_STRESS=true --notest_keep_going --runs_per_test=500 --test_filter auto_span_config_reconciliation/ --test_sharding_strategy=disabled --test_output streamed
bazel test //pkg/sql/logictest/tests/... --test_env=GOTRACEBACK=all --test_arg -show-sql --test_env=COCKROACH_STRESS=true --notest_keep_going --runs_per_test=500 --test_filter auto_span_config_reconciliation/ --test_sharding_strategy=disabled --test_output errors

exec
dev testlogic base --files=auto_span_config_reconciliation --stress --stream-output
dev testlogic base --files=auto_span_config_reconciliation --stress
----
bazel info workspace --color=no
bazel info workspace --color=no
bazel run pkg/cmd/generate-logictest -- -out-dir=crdb-checkout
getenv DEV_I_UNDERSTAND_ABOUT_STRESS
bazel test //pkg/sql/logictest/tests/... --test_env=GOTRACEBACK=all --test_arg -show-sql --test_env=COCKROACH_STRESS=true --notest_keep_going --runs_per_test=500 --test_filter auto_span_config_reconciliation/ --test_sharding_strategy=disabled --test_output streamed
bazel test //pkg/sql/logictest/tests/... --test_env=GOTRACEBACK=all --test_arg -show-sql --test_env=COCKROACH_STRESS=true --notest_keep_going --runs_per_test=500 --test_filter auto_span_config_reconciliation/ --test_sharding_strategy=disabled --test_output errors

exec
dev testlogic base --files=auto_span_config_reconciliation --stress --stream-output
dev testlogic base --files=auto_span_config_reconciliation --stress
----
bazel info workspace --color=no
bazel info workspace --color=no
bazel run pkg/cmd/generate-logictest -- -out-dir=crdb-checkout
getenv DEV_I_UNDERSTAND_ABOUT_STRESS
bazel test //pkg/sql/logictest/tests/... --test_env=GOTRACEBACK=all --test_arg -show-sql --test_env=COCKROACH_STRESS=true --notest_keep_going --runs_per_test=500 --test_filter auto_span_config_reconciliation/ --test_sharding_strategy=disabled --test_output streamed
bazel test //pkg/sql/logictest/tests/... --test_env=GOTRACEBACK=all --test_arg -show-sql --test_env=COCKROACH_STRESS=true --notest_keep_going --runs_per_test=500 --test_filter auto_span_config_reconciliation/ --test_sharding_strategy=disabled --test_output errors
3 changes: 3 additions & 0 deletions pkg/cmd/dev/testlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ func (d *dev) testlogic(cmd *cobra.Command, commandLine []string) error {
args = append(args, fmt.Sprintf("--test_env=COCKROACH_WORKSPACE=%s", workspace))
args = append(args, "--test_arg", "-rewrite")
}
if stress && streamOutput {
return fmt.Errorf("cannot combine --%s and --%s", stressFlag, streamOutputFlag)
}
if showDiff {
args = append(args, "--test_arg", "-show-diff")
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/kv/kvserver/loqrecovery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,19 @@ func checkRangeHealth(
return loqrecoverypb.RangeHealth_LOSS_OF_QUORUM
}

// makeVisitAvailableNodes creates a function to visit available remote nodes.
//
// Returned function would dial all cluster nodes from gossip and executes
// visitor function with admin client after connection is established. Function
// will perform retries on dial operation as well on visitor execution.
//
// For former, grpcutil.IsConnectionUnavailable check on returned error will
// abort retry loop because that indicates that node is not available. The
// expectation here is that we don't know if nodes in gossip are available or
// not and we don't want to block on dead nodes indefinitely.
//
// For latter, errors marked with errMarkRetry marker are retried. It is up
// to the visitor to mark appropriate errors are retryable.
func makeVisitAvailableNodes(
g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context,
) visitNodeAdminFn {
Expand All @@ -655,6 +668,8 @@ func makeVisitAvailableNodes(
// them and let caller handle incomplete info.
if err != nil {
if grpcutil.IsConnectionUnavailable(err) {
log.Infof(ctx, "rejecting node n%d because of suspected un-retryable error: %s",
node.NodeID, err)
return nil
}
// This was an initial heartbeat type error, we must retry as node seems
Expand Down Expand Up @@ -706,6 +721,18 @@ func makeVisitAvailableNodes(
}
}

// makeVisitNode creates a function to visit a remote node.
//
// Returned function would dial a node and executes visitor function with
// status client after connection is established. Function will perform
// retries on dial operation as well on visitor execution.
//
// For former, closed connection errors will abort retry loop because that
// indicates that node is not available. The expectation here is that we are
// trying to talk to available nodes and all other errors are transient.
//
// For latter, errors marked with errMarkRetry marker are retried. It is up
// to the visitor to mark appropriate errors are retryable.
func makeVisitNode(g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context) visitNodeStatusFn {
return func(ctx context.Context, nodeID roachpb.NodeID, retryOpts retry.Options,
visitor func(client serverpb.StatusClient) error,
Expand All @@ -721,6 +748,8 @@ func makeVisitNode(g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context)
conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).Connect(ctx)
if err != nil {
if grpcutil.IsClosedConnection(err) {
log.Infof(ctx, "can't dial node n%d because connection is permanently closed: %s",
node.NodeID, err)
return err
}
// Retry any other transient connection flakes.
Expand Down
11 changes: 2 additions & 9 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,15 +925,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
}
blobspb.RegisterBlobServer(grpcServer.Server, blobService)

{ // wire up admission control's scheduler latency listener
slcbID := schedulerlatency.RegisterCallback(
node.storeCfg.SchedulerLatencyListener.SchedulerLatency,
)
stopper.AddCloser(stop.CloserFn(func() {
schedulerlatency.UnregisterCallback(slcbID)
}))
}

replicationReporter := reports.NewReporter(
db, node.stores, storePool, st, nodeLiveness, internalExecutor, systemConfigWatcher,
)
Expand Down Expand Up @@ -1793,6 +1784,8 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
// Start measuring the Go scheduler latency.
if err := schedulerlatency.StartSampler(
workersCtx, s.st, s.stopper, s.registry, base.DefaultMetricsSampleInterval,
// Wire up admission control's scheduler latency listener.
s.node.storeCfg.SchedulerLatencyListener,
); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
// Start measuring the Go scheduler latency.
if err := schedulerlatency.StartSampler(
workersCtx, s.sqlServer.cfg.Settings, s.stopper, s.registry, base.DefaultMetricsSampleInterval,
nil, /* listener */
); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/schedulerlatency",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
5 changes: 2 additions & 3 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/schedulerlatency"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)
Expand Down Expand Up @@ -361,9 +362,7 @@ type elasticCPULimiter interface {

// SchedulerLatencyListener listens to the latest scheduler latency data. We
// expect this to be called every scheduler_latency.sample_period.
type SchedulerLatencyListener interface {
SchedulerLatency(p99, period time.Duration)
}
type SchedulerLatencyListener = schedulerlatency.LatencyObserver

// grantKind represents the two kind of ways we grant admission: using a slot
// or a token. The slot terminology is akin to a scheduler, where a scheduling
Expand Down
1 change: 1 addition & 0 deletions pkg/util/schedulerlatency/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
57 changes: 5 additions & 52 deletions pkg/util/schedulerlatency/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,57 +10,10 @@

package schedulerlatency

import (
"time"
import "time"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// Callback is provided the current value of the scheduler's p99 latency and the
// period over which the measurement applies.
type Callback func(p99 time.Duration, period time.Duration)

// RegisterCallback registers a callback to be run with observed scheduling
// latencies every scheduler_latency.sample_period.
func RegisterCallback(cb Callback) (id int64) {
globallyRegisteredCallbacks.mu.Lock()
defer globallyRegisteredCallbacks.mu.Unlock()

id = globallyRegisteredCallbacks.mu.nextID
globallyRegisteredCallbacks.mu.nextID++
globallyRegisteredCallbacks.mu.callbacks = append(globallyRegisteredCallbacks.mu.callbacks,
callbackWithID{
id: id,
cb: cb,
})
return id
type LatencyObserver interface {
// SchedulerLatency is provided the current value of the scheduler's p99 latency and the
// period over which the measurement applies.
SchedulerLatency(p99 time.Duration, period time.Duration)
}

// UnregisterCallback unregisters the callback to be run with observed
// scheduling latencies.
func UnregisterCallback(id int64) {
globallyRegisteredCallbacks.mu.Lock()
defer globallyRegisteredCallbacks.mu.Unlock()

newCBs := []callbackWithID(nil)
for i := range globallyRegisteredCallbacks.mu.callbacks {
if globallyRegisteredCallbacks.mu.callbacks[i].id == id {
continue
}
newCBs = append(newCBs, globallyRegisteredCallbacks.mu.callbacks[i])
}
globallyRegisteredCallbacks.mu.callbacks = newCBs
}

type callbackWithID struct {
cb Callback
id int64 // used to uniquely identify a registered callback; used when unregistering
}

var globallyRegisteredCallbacks = struct {
mu struct {
syncutil.Mutex
nextID int64 // used to allocate IDs to registered callbacks
callbacks []callbackWithID
}
}{}
18 changes: 9 additions & 9 deletions pkg/util/schedulerlatency/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func StartSampler(
stopper *stop.Stopper,
registry *metric.Registry,
statsInterval time.Duration,
listener LatencyObserver,
) error {
return stopper.RunAsyncTask(ctx, "scheduler-latency-sampler", func(ctx context.Context) {
settingsValuesMu := struct {
Expand All @@ -84,7 +85,7 @@ func StartSampler(
settingsValuesMu.period = samplePeriod.Get(&st.SV)
settingsValuesMu.duration = sampleDuration.Get(&st.SV)

s := newSampler(settingsValuesMu.period, settingsValuesMu.duration)
s := newSampler(settingsValuesMu.period, settingsValuesMu.duration, listener)
_ = stopper.RunAsyncTask(ctx, "export-scheduler-stats", func(ctx context.Context) {
// cpuSchedulerLatencyBuckets are prometheus histogram buckets
// suitable for a histogram that records a (second-denominated)
Expand Down Expand Up @@ -157,15 +158,16 @@ func StartSampler(

// sampler contains the local state maintained across scheduler latency samples.
type sampler struct {
mu struct {
listener LatencyObserver
mu struct {
syncutil.Mutex
ringBuffer ring.Buffer[*metrics.Float64Histogram]
lastIntervalHistogram *metrics.Float64Histogram
}
}

func newSampler(period, duration time.Duration) *sampler {
s := &sampler{}
func newSampler(period, duration time.Duration, listener LatencyObserver) *sampler {
s := &sampler{listener: listener}
s.mu.ringBuffer = ring.MakeBuffer(([]*metrics.Float64Histogram)(nil))
s.setPeriodAndDuration(period, duration)
return s
Expand Down Expand Up @@ -197,11 +199,9 @@ func (s *sampler) sampleOnTickAndInvokeCallbacks(period time.Duration) {
s.mu.lastIntervalHistogram = sub(latestCumulative, oldestCumulative)
p99 := time.Duration(int64(percentile(s.mu.lastIntervalHistogram, 0.99) * float64(time.Second.Nanoseconds())))

globallyRegisteredCallbacks.mu.Lock()
defer globallyRegisteredCallbacks.mu.Unlock()
cbs := globallyRegisteredCallbacks.mu.callbacks
for i := range cbs {
cbs[i].cb(p99, period)
// Perform the callback if there's a listener.
if s.listener != nil {
s.listener.SchedulerLatency(p99, period)
}
}

Expand Down
34 changes: 22 additions & 12 deletions pkg/util/schedulerlatency/scheduler_latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -63,20 +64,10 @@ func TestSchedulerLatencySampler(t *testing.T) {
}(i)
}

mu := struct {
syncutil.Mutex
p99 time.Duration
}{}
slcbID := RegisterCallback(func(p99 time.Duration, period time.Duration) {
require.Equal(t, samplePeriod.Default(), period)
mu.Lock()
defer mu.Unlock()
mu.p99 = p99
})
defer UnregisterCallback(slcbID)
mu := testListener{}

reg := metric.NewRegistry()
require.NoError(t, StartSampler(ctx, st, stopper, reg, 10*time.Second))
require.NoError(t, StartSampler(ctx, st, stopper, reg, 10*time.Second, &mu))
testutils.SucceedsSoon(t, func() error {
mu.Lock()
defer mu.Unlock()
Expand All @@ -98,6 +89,25 @@ func TestSchedulerLatencySampler(t *testing.T) {
})
return err
})

if mu.err != nil {
t.Fatal(mu.err)
}
}

type testListener struct {
syncutil.Mutex
err error
p99 time.Duration
}

func (l *testListener) SchedulerLatency(p99 time.Duration, period time.Duration) {
l.Lock()
defer l.Unlock()
if samplePeriod.Default() != period {
l.err = errors.CombineErrors(l.err, errors.Newf("mismatch: expected %v, got %v", samplePeriod.Default(), period))
}
l.p99 = p99
}

func TestComputeSchedulerPercentile(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/encoding",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/workload",
Expand Down
Loading

0 comments on commit a03be32

Please sign in to comment.