Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
108175: kvserver: unskip lease preferences during outage  r=andrewbaptist a=kvoli

Previously, `TestLeasePreferenceDuringOutage` would force replication
queue processing of the test range, then assert that the range
up-replicated and lease transferred to a preferred locality.

This test was skipped, and two of the assumptions it relied on to pass
were no longer true.

After #85219, the replicate queue no longer
re-processes replicas. Instead, the queue requeues replicas after
processing, at the appropriate priority. This broke the test due to the
replicate queue being disabled, making the re-queue a no-op.

After #94023, the replicate queue no longer looked for lease transfers,
after processing a replication action. Combined with #85219, the queue
would now be guaranteed to not process both up-replication and lease
transfers from a single enqueue.

Update the test to not require a manual process, instead using a queue
range filter, which allows tests which disable automatic replication, to
still process filtered ranges via the various replica queues. Also,
ensure that the non-stopped stores are considered live targets, after
simulating an outage (bumping manual clocks, stopping servers) -- so
that the expected up-replication, then lease transfer can proceed.

Fixes: #88769
Release note: None

109432: cluster-ui: handle partial response errors on the database details page r=THardy98 a=THardy98

Part of: #102386

**Demos** (Note: these demos show this same logic applied to both the databases and database table pages as well):
DB-Console
- https://www.loom.com/share/5108dd655ad342f28323e72eaf68219c
- https://www.loom.com/share/1973383dacd7494a84e10bf39e5b85a3

This change applies the same error handling ideas from #109245 to the
database details page, enabling non-admin users to use the database
details page and providing better transparency to data fetching issues.

Errors encountered while fetching table details can be viewed via the
tooltip provided by the `Caution` icon at the table's name.
`unavailable` cells also provide a tooltip that displays the error
impacting that exact cell.

Release note (ui change): Non-admin users are able to use the database
details page.

110292: c2c: use seperate spanConfigEventStreamSpec in the span config event stream r=stevendanna a=msbutler

Previously, the spanConfigEventStream used a streamPartitionSpec, which contained a bunch of fields unecessary for span config streaming. This patch creates a new spanConfigEventStreamSpec which contains the fields only necessary for span config event streaming.

Informs #109059

Release note: None

110309: teamcity-trigger: ensure that `race` tag is only passed once r=healthy-pod a=healthy-pod

By running under `-race`, the go command defines the `race` build tag for us [1].

Previously, we defined it under `TAGS` to let the issue poster know that this is a failure under `race` and indicate that in the issue.

At the time, defining the tag twice didn't cause issues but after #109773, it led to build failures [2].

To reproduce locally:
```
bazel test -s --config=race pkg/util/ctxgroup:all --test_env=GOTRACEBACK=all --define gotags=bazel,gss,race
```

As a follow-up, we should find another way to let the issue poster know that a failure was running under `race`.

[1] https://go.dev/doc/articles/race_detector#Excluding_Tests
[2] #109994 (comment)

Epic: none
Release note: None

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Thomas Hardy <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: healthy-pod <[email protected]>
  • Loading branch information
5 people committed Sep 11, 2023
5 parents 2810022 + fe8b67b + 001ca87 + fcf2650 + d99f8d5 commit 5fc80c3
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 395 deletions.
21 changes: 7 additions & 14 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,11 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
}
}

const defaultBatchSize = 1 << 20

func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) {
const defaultInitialScanParallelism = 16
const defaultMinCheckpointFrequency = 10 * time.Second
const defaultBatchSize = 1 << 20

if cfg.InitialScanParallelism <= 0 {
cfg.InitialScanParallelism = defaultInitialScanParallelism
Expand All @@ -502,27 +503,19 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) {
}
}

func validateSpecs(evalCtx *eval.Context, spec streampb.StreamPartitionSpec) error {
if !evalCtx.SessionData().AvoidBuffering {
return errors.New("partition streaming requires 'SET avoid_buffering = true' option")
}
if len(spec.Spans) == 0 {
return errors.AssertionFailedf("expected at least one span, got none")
}
return nil
}

func streamPartition(
evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte,
) (eval.ValueGenerator, error) {
var spec streampb.StreamPartitionSpec
if err := protoutil.Unmarshal(opaqueSpec, &spec); err != nil {
return nil, errors.Wrapf(err, "invalid partition spec for stream %d", streamID)
}
if err := validateSpecs(evalCtx, spec); err != nil {
return nil, err
if !evalCtx.SessionData().AvoidBuffering {
return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option")
}
if len(spec.Spans) == 0 {
return nil, errors.AssertionFailedf("expected at least one span, got none")
}

setConfigDefaults(&spec.Config)

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
Expand Down
39 changes: 17 additions & 22 deletions pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

type spanConfigEventStream struct {
execCfg *sql.ExecutorConfig
spec streampb.StreamPartitionSpec
spec streampb.SpanConfigEventStreamSpec
mon *mon.BytesMonitor
acc mon.BoundAccount

Expand Down Expand Up @@ -82,15 +82,15 @@ func (s *spanConfigEventStream) Start(ctx context.Context, txn *kv.Txn) error {
s.doneChan = make(chan struct{})

// Reserve batch kvsSize bytes from monitor.
if err := s.acc.Grow(ctx, s.spec.Config.BatchByteSize); err != nil {
return errors.Wrapf(err, "failed to allocated %d bytes from monitor", s.spec.Config.BatchByteSize)
if err := s.acc.Grow(ctx, defaultBatchSize); err != nil {
return errors.Wrapf(err, "failed to allocated %d bytes from monitor", defaultBatchSize)
}

s.rfc = rangefeedcache.NewWatcher(
"spanconfig-subscriber",
s.execCfg.Clock, s.execCfg.RangeFeedFactory,
int(s.spec.Config.BatchByteSize),
s.spec.Spans,
defaultBatchSize,
roachpb.Spans{s.spec.Span},
true, // withPrevValue
spanconfigkvsubscriber.NewSpanConfigDecoder().TranslateEvent,
s.handleUpdate,
Expand Down Expand Up @@ -212,13 +212,10 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
// TODO(msbutler): We may not need a pacer, given how little traffic will come from this
// stream. That being said, we'd still want to buffer updates to ensure we
// don't clog up the rangefeed. Consider using async flushing.
pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency)
pacer := makeCheckpointPacer(s.spec.MinCheckpointFrequency)
bufferedEvents := make([]streampb.StreamedSpanConfigEntry, 0)
batcher := makeStreamEventBatcher()
frontier, err := makeSpanConfigFrontier(s.spec.Spans)
if err != nil {
return err
}
frontier := makeSpanConfigFrontier(s.spec.Span)

for {
select {
Expand Down Expand Up @@ -247,7 +244,7 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
if err != nil {
return err
}
if !tenantID.Equal(s.spec.Config.SpanConfigForTenant) {
if !tenantID.Equal(s.spec.TenantID) {
continue
}

Expand Down Expand Up @@ -278,19 +275,17 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
}
}

func makeSpanConfigFrontier(spans roachpb.Spans) (*spanConfigFrontier, error) {
if len(spans) != 1 {
return nil, errors.AssertionFailedf("unexpected input span length %d", len(spans))
}
func makeSpanConfigFrontier(span roachpb.Span) *spanConfigFrontier {

checkpoint := streampb.StreamEvent_StreamCheckpoint{
ResolvedSpans: []jobspb.ResolvedSpan{{
Span: spans[0],
Span: span,
},
},
}
return &spanConfigFrontier{
checkpoint: checkpoint,
}, nil
}
}

type spanConfigFrontier struct {
Expand All @@ -301,13 +296,13 @@ func (spf *spanConfigFrontier) update(frontier hlc.Timestamp) {
spf.checkpoint.ResolvedSpans[0].Timestamp = frontier
}

func streamSpanConfigPartition(
evalCtx *eval.Context, spec streampb.StreamPartitionSpec,
func streamSpanConfigs(
evalCtx *eval.Context, spec streampb.SpanConfigEventStreamSpec,
) (eval.ValueGenerator, error) {
if err := validateSpecs(evalCtx, spec); err != nil {
return nil, err

if !evalCtx.SessionData().AvoidBuffering {
return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option")
}
setConfigDefaults(&spec.Config)

return &spanConfigEventStream{
spec: spec,
Expand Down
14 changes: 6 additions & 8 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,19 +354,17 @@ func setupSpanConfigsStream(
}); err != nil {
return nil, err
}

spanConfigKey := evalCtx.Codec.TablePrefix(uint32(spanConfigID))

// TODO(msbutler): crop this span to the keyspan within the span config
// table relevant to this specific tenant once I teach the client.Subscribe()
// to stream span configs, which will make testing easier.
span := roachpb.Span{Key: spanConfigKey, EndKey: spanConfigKey.PrefixEnd()}

spec := streampb.StreamPartitionSpec{
Spans: roachpb.Spans{span},
Config: streampb.StreamPartitionSpec_ExecutionConfig{
MinCheckpointFrequency: streamingccl.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV),
SpanConfigForTenant: tenantID,
}}
return streamSpanConfigPartition(evalCtx, spec)
spec := streampb.SpanConfigEventStreamSpec{
Span: span,
TenantID: tenantID,
MinCheckpointFrequency: streamingccl.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV),
}
return streamSpanConfigs(evalCtx, spec)
}
1 change: 0 additions & 1 deletion pkg/cmd/teamcity-trigger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func runTC(queueBuild func(string, map[string]string)) {
opts["env.EXTRA_BAZEL_FLAGS"] = fmt.Sprintf("%s --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error=1", extraBazelFlags)
opts["env.STRESSFLAGS"] = fmt.Sprintf("-maxruns %d -maxtime %s -maxfails %d -p %d",
maxRuns, maxTime, maxFails, noParallelism)
opts["env.TAGS"] = "race"
queueBuildThenWait(buildID, opts)
delete(opts, "env.TAGS")
}
Expand Down
Loading

0 comments on commit 5fc80c3

Please sign in to comment.