Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
108197: kvserver: pass Desc and SpanConf through allocator r=kvoli a=andrewbaptist

Previously the different layers of the allocator would load the Desc and
SpanConf as needed, this had a risk of them changing between various
loads and could cause strange and hard to track down races. Now they are
loaded once and passed through all the layers.

Epic: none

Release note: None

108399: roachtest: Treat job query error as retryable r=miretskiy a=miretskiy

Retry attempts to retrieve job status (in cdc test) as long as the parent context is active.
Attempts to query jobs table may fail if the
cluster is under significant load.

Epic: None
Fixes #108399

Release note: None

Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Aug 9, 2023
3 parents b91c919 + 8478b85 + 8fc7974 commit 1925449
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 108 deletions.
21 changes: 15 additions & 6 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
Expand Down Expand Up @@ -293,7 +294,7 @@ func runCDCBenchScan(
// Wait for the changefeed to complete, and compute throughput.
m.Go(func(ctx context.Context) error {
t.L().Printf("waiting for changefeed to finish")
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
info, err := waitForChangefeed(ctx, conn, jobID, t.L(), func(info changefeedInfo) (bool, error) {
switch jobs.Status(info.status) {
case jobs.StatusSucceeded:
return true, nil
Expand Down Expand Up @@ -444,7 +445,7 @@ func runCDCBenchWorkload(
// the changefeed wasn't lagging by more than 1-2 minutes, but with 100k
// ranges it was found to sometimes lag by over 8 minutes.
m.Go(func(ctx context.Context) error {
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
info, err := waitForChangefeed(ctx, conn, jobID, t.L(), func(info changefeedInfo) (bool, error) {
switch jobs.Status(info.status) {
case jobs.StatusPending, jobs.StatusRunning:
doneValue := done.Load()
Expand All @@ -465,7 +466,7 @@ func runCDCBenchWorkload(
now := timeutil.Now()
t.L().Printf("waiting for changefeed watermark to reach current time (%s)",
now.Format(time.RFC3339))
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
info, err := waitForChangefeed(ctx, conn, jobID, t.L(), func(info changefeedInfo) (bool, error) {
switch jobs.Status(info.status) {
case jobs.StatusPending, jobs.StatusRunning:
return info.highwaterTime.After(now), nil
Expand Down Expand Up @@ -539,11 +540,15 @@ func getAllZoneTargets(ctx context.Context, t test.Test, conn *gosql.DB) []strin

// waitForChangefeed waits until the changefeed satisfies the given closure.
func waitForChangefeed(
ctx context.Context, conn *gosql.DB, jobID int, f func(changefeedInfo) (bool, error),
ctx context.Context,
conn *gosql.DB,
jobID int,
logger *logger.Logger,
f func(changefeedInfo) (bool, error),
) (changefeedInfo, error) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
for attempt := 0; ; attempt++ {
select {
case <-ticker.C:
case <-ctx.Done():
Expand All @@ -552,7 +557,11 @@ func waitForChangefeed(

info, err := getChangefeedInfo(conn, jobID)
if err != nil {
return changefeedInfo{}, err
logger.Errorf("error getting changefeed info: %v (attempt %d)", err, attempt+1)
if attempt > 5 {
return changefeedInfo{}, errors.Wrap(err, "failed 5 attempts to get changefeed info")
}
continue
} else if info.errMsg != "" {
return changefeedInfo{}, errors.Errorf("changefeed error: %s", info.errMsg)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,13 +1961,13 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
func (a *Allocator) ValidLeaseTargets(
ctx context.Context,
storePool storepool.AllocatorStorePool,
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
existing []roachpb.ReplicaDescriptor,
leaseRepl interface {
StoreID() roachpb.StoreID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
Desc() *roachpb.RangeDescriptor
},
opts allocator.TransferLeaseOptions,
) []roachpb.ReplicaDescriptor {
Expand Down Expand Up @@ -2014,9 +2014,8 @@ func (a *Allocator) ValidLeaseTargets(
// replica set, however are in the candidate list. Uninitialized
// replicas will always need a snapshot.
existingCandidates := []roachpb.ReplicaDescriptor{}
rangeDesc := leaseRepl.Desc()
for _, candidate := range candidates {
if _, ok := rangeDesc.GetReplicaDescriptor(candidate.StoreID); ok {
if _, ok := desc.GetReplicaDescriptor(candidate.StoreID); ok {
existingCandidates = append(existingCandidates, candidate)
} else {
validSnapshotCandidates = append(validSnapshotCandidates, candidate)
Expand All @@ -2027,7 +2026,7 @@ func (a *Allocator) ValidLeaseTargets(

status := leaseRepl.RaftStatus()
if a.knobs != nil && a.knobs.RaftStatusFn != nil {
status = a.knobs.RaftStatusFn(leaseRepl)
status = a.knobs.RaftStatusFn(desc, leaseRepl.StoreID())
}

candidates = append(validSnapshotCandidates, excludeReplicasInNeedOfSnapshots(
Expand Down Expand Up @@ -2220,14 +2219,14 @@ func (a *Allocator) IOOverloadOptions() IOOverloadOptions {
func (a *Allocator) TransferLeaseTarget(
ctx context.Context,
storePool storepool.AllocatorStorePool,
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
existing []roachpb.ReplicaDescriptor,
leaseRepl interface {
StoreID() roachpb.StoreID
GetRangeID() roachpb.RangeID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
Desc() *roachpb.RangeDescriptor
},
usageInfo allocator.RangeUsageInfo,
forceDecisionWithoutStats bool,
Expand Down Expand Up @@ -2259,7 +2258,7 @@ func (a *Allocator) TransferLeaseTarget(
return roachpb.ReplicaDescriptor{}
}

validTargets := a.ValidLeaseTargets(ctx, storePool, conf, existing, leaseRepl, opts)
validTargets := a.ValidLeaseTargets(ctx, storePool, desc, conf, existing, leaseRepl, opts)

// Short-circuit if there are no valid targets out there.
if len(validTargets) == 0 || (len(validTargets) == 1 && validTargets[0].StoreID == leaseRepl.StoreID()) {
Expand Down Expand Up @@ -2494,13 +2493,13 @@ func getLoadDelta(
func (a *Allocator) ShouldTransferLease(
ctx context.Context,
storePool storepool.AllocatorStorePool,
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
existing []roachpb.ReplicaDescriptor,
leaseRepl interface {
StoreID() roachpb.StoreID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
Desc() *roachpb.RangeDescriptor
},
usageInfo allocator.RangeUsageInfo,
) bool {
Expand All @@ -2510,6 +2509,7 @@ func (a *Allocator) ShouldTransferLease(
existing = a.ValidLeaseTargets(
ctx,
storePool,
desc,
conf,
existing,
leaseRepl,
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,7 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2096,6 +2097,7 @@ func TestAllocatorTransferLeaseTargetIOOverloadCheck(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
existing,
&mockRepl{
Expand Down Expand Up @@ -2211,6 +2213,7 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
repl,
Expand Down Expand Up @@ -2303,6 +2306,7 @@ func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) {
target := a.TransferLeaseTarget(
context.Background(),
sp,
&roachpb.RangeDescriptor{},
c.conf,
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2415,6 +2419,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
storePool,
&roachpb.RangeDescriptor{},
c.conf,
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2699,6 +2704,7 @@ func TestAllocatorShouldTransferLease(t *testing.T) {
result := a.ShouldTransferLease(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2767,6 +2773,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) {
result := a.ShouldTransferLease(
ctx,
storePool,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2814,6 +2821,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
result := a.ShouldTransferLease(
ctx,
storePool,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
replicas(1, 2, 3),
&mockRepl{storeID: 2, replicationFactor: 3},
Expand Down Expand Up @@ -2955,6 +2963,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
result := a.ShouldTransferLease(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand All @@ -2970,6 +2979,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand All @@ -2989,6 +2999,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
target = a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand Down Expand Up @@ -3082,6 +3093,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand All @@ -3102,6 +3114,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) {
target = a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand Down Expand Up @@ -5732,6 +5745,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
storePool,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
existing,
&mockRepl{
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ type TestingKnobs struct {
// targets produced by the Allocator to include replicas that may be waiting
// for snapshots.
AllowLeaseTransfersToReplicasNeedingSnapshots bool
RaftStatusFn func(r interface {
Desc() *roachpb.RangeDescriptor
StoreID() roachpb.StoreID
}) *raft.Status
RaftStatusFn func(
desc *roachpb.RangeDescriptor,
storeID roachpb.StoreID,
) *raft.Status
// BlockTransferTarget can be used to block returning any transfer targets
// from TransferLeaseTarget.
BlockTransferTarget func() bool
Expand Down
Loading

0 comments on commit 1925449

Please sign in to comment.