Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
76858: kvserver: allow circuit-breaker to serve reads r=erikgrinaker a=tbg

This commit revamps an earlier implementation (#71806) of per-Replica
circuit breakers (#33007). The earlier implementation relied on context
cancellation and coarsely failed all requests addressing the Replica
when the breaker was tripped.

This had two downsides: First, there was a (small) performance overhead
for implementing the cancellation that was paid even in the common case
of a healthy Replica. Second, and more importantly, the coarseness meant
that we'd potentially fail many requests that would otherwise succeed,
and in particular follower reads.

`@nvanbenschoten` suggested in #74799 that latching could be extended with
the concept of "poisoning" and that this could result in fine-grained
circuit breaker behavior where only requests that are truly affected by
unavailability (at the replication layer) would be rejected.

This commit implements that strategy:

A request's latches are poisoned if its completion is predicated on the
replication layer being healthy.  In other words, when the breaker
trips, all inflight proposals have their latches poisoned and new
proposals are failed fast. However, and this is the big difference,
reads can potentially still be accepted in either of two scenarios:

- a valid follower read remains valid regardless of the circuit breaker
  status, and also regardless of inflight proposals (follower reads
  don't observe latches).
- a read that can be served under the current lease and which does
  not conflict with any of the stuck proposals in the replication
  layer (= poisoned latches) can also be served.

In short, reads only fail fast if they encounter a poisoned latch or
need to request a lease. (If they opted out of fail-fast behavior,
they behave as today).

Latch poisoning is added as a first-class concept in the `concurrency`
package, and a structured error `PoisonError` is introduced. This error
in particular contains the span and timestamp of the poisoned latch that
prompted the fail-fast.

Lease proposals now always use `poison.Policy_Wait`, leaving the
fail-fast behavior to the caller. This simplifies things since multiple
callers with their own `poison.Policy` can multiplex onto a single
inflight lease proposal.

Addresses #74799.

Release note: None
Release justification: 22.1 project work

77221: changefeedccl: Fix flaky test. r=miretskiy a=miretskiy

Fix flaky TestChangefeedHandlesDrainingNodes test.
The source of the flake was that cluster setting updates propagate
asynchronously to the other nodes in the cluster.  Thus, it was possible
for the test to flake because some of the nodes were observing the
old value for the setting.

The flake is fixed by introducing testing utility function that
sets the setting and ensures the setting propagates to all nodes in
the test cluster.

Fixes #76806

Release Notes: none

77317: util/log: remove Safe in favor of redact.Safe r=yuzefovich a=yuzefovich

My desire to make this change is to break the dependency of `treewindow`
on `util/log` (which is a part of the effort to clean up the
dependencies of `execgen`).

Addresses: #77234.

Release note: None

Release justification: low risk change to clean up the dependencies
a bit.

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
4 people committed Mar 3, 2022
4 parents d075b79 + 55fc27d + 8850dc1 + 8a77fe5 commit fd15d3a
Show file tree
Hide file tree
Showing 137 changed files with 1,486 additions and 1,088 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ go_library(
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_google_btree//:btree",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_shopify_sarama//:sarama",
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

type changeAggregator struct {
Expand Down Expand Up @@ -1349,7 +1350,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
log.Safe(resolved.Timestamp), resolved.Span, log.Safe(cf.highWaterAtStart))
redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart))
continue
}
if err := cf.forwardFrontier(resolved); err != nil {
Expand Down
37 changes: 20 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4227,8 +4227,8 @@ func TestChangefeedNodeShutdown(t *testing.T) {
defer tc.Stopper().Stop(context.Background())

db := tc.ServerConn(1)
serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", time.Millisecond)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`)
sqlDB.Exec(t, `CREATE DATABASE d`)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
Expand Down Expand Up @@ -4439,19 +4439,22 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
skip.UnderRace(t, "Takes too long with race enabled")

shouldDrain := true
knobs := base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{},
Flowinfra: &flowinfra.TestingKnobs{
FlowRegistryDraining: func() bool {
if shouldDrain {
shouldDrain = false
return true
}
return false
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{},
Flowinfra: &flowinfra.TestingKnobs{
FlowRegistryDraining: func() bool {
if shouldDrain {
shouldDrain = false
return true
}
return false
},
},
},
}}
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}

sinkDir, cleanupFn := testutils.TempDir(t)
defer cleanupFn()
Expand All @@ -4466,9 +4469,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {

db := tc.ServerConn(1)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`)
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`)
serverutils.SetClusterSetting(t, tc, "kv.rangefeed.enabled", true)
serverutils.SetClusterSetting(t, tc, "kv.closed_timestamp.target_duration", time.Second)
serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", 10*time.Millisecond)

sqlutils.CreateTable(
t, db, "foo",
Expand All @@ -4491,9 +4494,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
defer closeFeed(t, feed)

// At this point, the job created by feed will fail to start running on node 0 due to draining
// registry. However, this job will be retried, and it should succeeded.
// registry. However, this job will be retried, and it should succeed.
// Note: This test is a bit unrealistic in that if the registry is draining, that
// means that the server is draining (i.e being shut down). We don't do a full shutdown
// means that the server is draining (i.e. being shut down). We don't do a full shutdown
// here, but we are simulating a restart by failing to start a flow the first time around.
assertPayloads(t, feed, []string{
`foo: [1]->{"after": {"k": 1, "v": 1}}`,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_redact//:redact",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/kvccl/kvtenantccl/tenant_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func testTenantTracesAreRedactedImpl(t *testing.T, redactable bool) {
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: func(args kvserverbase.FilterArgs) *roachpb.Error {
log.Eventf(args.Ctx, "%v", sensitiveString)
log.Eventf(args.Ctx, "%v", log.Safe(visibleString))
log.Eventf(args.Ctx, "%v", redact.Safe(visibleString))
return nil
},
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/mt_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -147,7 +148,7 @@ func waitForSignals(
}

log.Ops.Shoutf(ctx, severity.ERROR,
"received signal '%s' during shutdown, initiating hard shutdown", log.Safe(sig))
"received signal '%s' during shutdown, initiating hard shutdown", redact.Safe(sig))
panic("terminate")
case <-stopper.IsStopped():
const msgDone = "server shutdown completed"
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -398,7 +399,7 @@ func handleNodeDecommissionSelf(
cliflags.NodeDecommissionSelf.Name)
}

log.Infof(ctx, "%s node %d", log.Safe(command), localNodeID)
log.Infof(ctx, "%s node %d", redact.Safe(command), localNodeID)
return []roachpb.NodeID{localNodeID}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ func waitForShutdown(
// shutdown process.
log.Ops.Shoutf(shutdownCtx, severity.ERROR,
"received signal '%s' during shutdown, initiating hard shutdown%s",
log.Safe(sig), log.Safe(hardShutdownHint))
redact.Safe(sig), redact.Safe(hardShutdownHint))
handleSignalDuringShutdown(sig)
panic("unreachable")

Expand Down Expand Up @@ -1187,7 +1187,7 @@ func setupAndInitializeLoggingAndProfiling(
"- %s\n"+
"- %s",
build.MakeIssueURL(53404),
log.Safe(docs.URL("secure-a-cluster.html")),
redact.Safe(docs.URL("secure-a-cluster.html")),
)
}

Expand All @@ -1201,7 +1201,7 @@ func setupAndInitializeLoggingAndProfiling(
"For more information, see:\n\n" +
"- %s"
log.Shoutf(ctx, severity.WARNING, warningString,
log.Safe(docs.URL("cockroach-start.html#locality")))
redact.Safe(docs.URL("cockroach-start.html#locality")))
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ PROTOBUF_SRCS = [
"//pkg/kv/kvnemesis:kvnemesis_go_proto",
"//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto",
"//pkg/kv/kvserver/concurrency/lock:lock_go_proto",
"//pkg/kv/kvserver/concurrency/poison:poison_go_proto",
"//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto",
Expand Down
3 changes: 2 additions & 1 deletion pkg/gossip/infostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

type stringMatcher interface {
Expand Down Expand Up @@ -253,7 +254,7 @@ func (is *infoStore) addInfo(key string, i *Info) error {
if highWaterStamp, ok := is.highWaterStamps[i.NodeID]; ok && highWaterStamp >= i.OrigStamp {
// Report both timestamps in the crash.
log.Fatalf(context.Background(),
"high water stamp %d >= %d", log.Safe(highWaterStamp), log.Safe(i.OrigStamp))
"high water stamp %d >= %d", redact.Safe(highWaterStamp), redact.Safe(i.OrigStamp))
}
}
// Update info map.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func (ds *DistSender) getRoutingInfo(
}
if !containsFn(returnToken.Desc(), descKey) {
log.Fatalf(ctx, "programming error: range resolution returning non-matching descriptor: "+
"desc: %s, key: %s, reverse: %t", returnToken.Desc(), descKey, log.Safe(useReverseScan))
"desc: %s, key: %s, reverse: %t", returnToken.Desc(), descKey, redact.Safe(useReverseScan))
}
}

Expand Down Expand Up @@ -764,7 +764,7 @@ func (ds *DistSender) Send(
// We already verified above that the batch contains only scan requests of the same type.
// Such a batch should never need splitting.
log.Fatalf(ctx, "batch with MaxSpanRequestKeys=%d, TargetBytes=%d needs splitting",
log.Safe(ba.MaxSpanRequestKeys), log.Safe(ba.TargetBytes))
redact.Safe(ba.MaxSpanRequestKeys), redact.Safe(ba.TargetBytes))
}
var singleRplChunk [1]*roachpb.BatchResponse
rplChunks := singleRplChunk[:0:1]
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

//go:generate mockgen -destination=mocks_generated_test.go --package=rangefeed . DB
Expand Down Expand Up @@ -299,7 +300,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
}
if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() {
log.Warningf(ctx, "rangefeed failed %d times, restarting: %v",
log.Safe(i), err)
redact.Safe(i), err)
}
if ctx.Err() != nil {
log.VEventf(ctx, 1, "exiting rangefeed")
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ go_library(
"replica_backpressure.go",
"replica_batch_updates.go",
"replica_circuit_breaker.go",
"replica_circuit_breaker_cancelstorage.go",
"replica_closedts.go",
"replica_command.go",
"replica_consistency.go",
Expand Down Expand Up @@ -124,6 +123,7 @@ go_library(
"//pkg/kv/kvserver/closedts/sidetransport",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/concurrency/poison",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/gc",
"//pkg/kv/kvserver/idalloc",
Expand Down Expand Up @@ -225,7 +225,6 @@ go_test(
"client_rangefeed_test.go",
"client_relocate_range_test.go",
"client_replica_backpressure_test.go",
"client_replica_circuit_breaker_bench_test.go",
"client_replica_circuit_breaker_test.go",
"client_replica_gc_test.go",
"client_replica_test.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ go_library(
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//types",
"@com_github_kr_pretty//:pretty",
"@org_golang_x_time//rate",
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

func init() {
Expand Down Expand Up @@ -268,10 +269,10 @@ func PushTxn(
s = "failed to push"
}
log.Infof(ctx, "%s %s (push type=%s) %s: %s (pushee last active: %s)",
args.PusherTxn.Short(), log.Safe(s),
log.Safe(pushType),
args.PusherTxn.Short(), redact.Safe(s),
redact.Safe(pushType),
args.PusheeTxn.Short(),
log.Safe(reason),
redact.Safe(reason),
reply.PusheeTxn.LastActive())
}

Expand Down
Loading

0 comments on commit fd15d3a

Please sign in to comment.