Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
108483: roachtest: wait smaller duration before timining out lease prefs r=erikgrinaker a=kvoli

Previously, the `lease-preferences` roachtests would fail on the test
tiemout, despite the test actually failing much earlier. 5 minutes
stopping a node, the node is considered dead, and up-replication will
begin. If the cluster doesn't satisfy lease preferences within that 5
minute window, the test will reliably fail -- making the remainder of
the test timeout (30 or 60 minutes), wasted time.

Fail earlier, by adhering to a post event timeout. This timeout set to
10 minutes.

Informs: #108425
Epic: none
Release note: None

108775: kvserver: fix double span Finish on reproposals r=erikgrinaker a=pavelkalinnikov

Since #106750, `ProposalData` is being copied on superseding reproposals. The caller only knows about the original proposal, and only detaches its context / tracing span when abandoning the request (e.g. on timeout). By the time of abandoning the request, there might have been a few superseding reproposals, and the context / tracing span is being used by multiple `ProposalData` objects.

In addition to rewriting the original `proposal.ctx`, we should do the same for all the `ProposalData` objects corresponding to the same request. This commit unbinds the context for older proposals when reproposing them, and unbinds the latest reproposal context when cleaning up / abandoning the request.

Fixes #107521
Fixes #108534
Fixes #108241
Fixes #108663
Fixes #108696
Fixes #108837

Epic: CRDB-25287
Release note: none

108942: backupccl: deflake memory monitor restore test r=rhu713 a=rhu713

Deflake the TestRestoreMemoryMonitoring tests by only asserting the lower bound in the number of files produced in the backup to account for elastic CPU limiter.

Fixes: #108239

Release note: None

109040: server: add more info to the combined statement endpoint r=maryliag a=maryliag

Previously, it was hard to know which tables were
used to populate the SQL Activity, making debug for it complicated.
This commit adds extra information to the
`combinedstmts` to help:
- oldestAggregatedTsReturned
- stmtsSourceTable
- txnsSourceTable

Returning a value for `olderAggregatedTsReturned` will also allow us to show proper warning when the older timestamp doesn't match the start date of the selected time period on the Search Criteria.

Part Of #108989

Release note: None

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: Rui Hu <[email protected]>
Co-authored-by: maryliag <[email protected]>
  • Loading branch information
5 people committed Aug 21, 2023
5 parents 9ad8453 + 664cc2c + 238c9eb + 149ad11 + 0ad6fc0 commit 2ac44e6
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 136 deletions.
6 changes: 6 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -4102,6 +4102,9 @@ tenant pods.
| transactions | [StatementsResponse.ExtendedCollectedTransactionStatistics](#cockroach.server.serverpb.StatementsResponse-cockroach.server.serverpb.StatementsResponse.ExtendedCollectedTransactionStatistics) | repeated | Transactions is transaction-level statistics for the collection of statements in this response. | [reserved](#support-status) |
| stmts_total_runtime_secs | [float](#cockroach.server.serverpb.StatementsResponse-float) | | | [reserved](#support-status) |
| txns_total_runtime_secs | [float](#cockroach.server.serverpb.StatementsResponse-float) | | | [reserved](#support-status) |
| oldest_aggregated_ts_returned | [google.protobuf.Timestamp](#cockroach.server.serverpb.StatementsResponse-google.protobuf.Timestamp) | | OldestAggregatedTsReturned is the timestamp of the oldest entry returned, or null if there is no data returned. | [reserved](#support-status) |
| stmts_source_table | [string](#cockroach.server.serverpb.StatementsResponse-string) | | StmtsSourceTable returns the table used to return the statements data. | [reserved](#support-status) |
| txns_source_table | [string](#cockroach.server.serverpb.StatementsResponse-string) | | TxnsSourceTable returns the table used to return the transactions data. | [reserved](#support-status) |



Expand Down Expand Up @@ -4214,6 +4217,9 @@ Support status: [reserved](#support-status)
| transactions | [StatementsResponse.ExtendedCollectedTransactionStatistics](#cockroach.server.serverpb.StatementsResponse-cockroach.server.serverpb.StatementsResponse.ExtendedCollectedTransactionStatistics) | repeated | Transactions is transaction-level statistics for the collection of statements in this response. | [reserved](#support-status) |
| stmts_total_runtime_secs | [float](#cockroach.server.serverpb.StatementsResponse-float) | | | [reserved](#support-status) |
| txns_total_runtime_secs | [float](#cockroach.server.serverpb.StatementsResponse-float) | | | [reserved](#support-status) |
| oldest_aggregated_ts_returned | [google.protobuf.Timestamp](#cockroach.server.serverpb.StatementsResponse-google.protobuf.Timestamp) | | OldestAggregatedTsReturned is the timestamp of the oldest entry returned, or null if there is no data returned. | [reserved](#support-status) |
| stmts_source_table | [string](#cockroach.server.serverpb.StatementsResponse-string) | | StmtsSourceTable returns the table used to return the statements data. | [reserved](#support-status) |
| txns_source_table | [string](#cockroach.server.serverpb.StatementsResponse-string) | | TxnsSourceTable returns the table used to return the transactions data. | [reserved](#support-status) |



Expand Down
15 changes: 8 additions & 7 deletions pkg/ccl/backupccl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ func runTestRestoreMemoryMonitoring(t *testing.T, numSplits, numInc, restoreProc
const splitSize = 10
numAccounts := numSplits * splitSize
var expectedNumFiles int
var actualNumFiles int
restoreProcessorKnobCount := atomic.Uint32{}
args := base.TestServerArgs{
DefaultTestTenant: base.TODOTestTenantDisabled,
Expand All @@ -593,7 +594,7 @@ func runTestRestoreMemoryMonitoring(t *testing.T, numSplits, numInc, restoreProc
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
// The total size of the backup files should be less than the target
// SST size, thus should all fit in one import span.
require.Equal(t, expectedNumFiles, len(entry.Files))
require.Equal(t, actualNumFiles, len(entry.Files))
restoreProcessorKnobCount.Add(1)
},
},
Expand Down Expand Up @@ -631,11 +632,11 @@ func runTestRestoreMemoryMonitoring(t *testing.T, numSplits, numInc, restoreProc
numIncFiles += len(incSplitsWithFile)
}

// Verify the file counts in the backup is at least what's expected. The
// actual number can be more due to elastic CPU preempting export responses.
expectedNumFiles += numSplits + numIncFiles
// Verify the file counts in the backup.
var numFiles int
sqlDB.QueryRow(t, "SELECT count(*) FROM [SHOW BACKUP FILES FROM latest IN 'userfile:///backup']").Scan(&numFiles)
require.Equal(t, expectedNumFiles, numFiles)
sqlDB.QueryRow(t, "SELECT count(*) FROM [SHOW BACKUP FILES FROM latest IN 'userfile:///backup']").Scan(&actualNumFiles)
require.GreaterOrEqual(t, actualNumFiles, expectedNumFiles)

sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

Expand All @@ -644,8 +645,8 @@ func runTestRestoreMemoryMonitoring(t *testing.T, numSplits, numInc, restoreProc

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
// The expected number is just the ceiling of expectedNumFiles/restoreProcessorMaxFiles.
require.Equal(t, (expectedNumFiles-1)/restoreProcessorMaxFiles+1, int(restoreProcessorKnobCount.Load()))
// The expected number is just the ceiling of actualNumFiles/restoreProcessorMaxFiles.
require.Equal(t, (actualNumFiles-1)/restoreProcessorMaxFiles+1, int(restoreProcessorKnobCount.Load()))

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
Expand Down
49 changes: 28 additions & 21 deletions pkg/cmd/roachtest/tests/lease_preferences.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand All @@ -48,11 +49,12 @@ import (
type leasePreferencesEventFn func(context.Context, test.Test, cluster.Cluster)

type leasePreferencesSpec struct {
preferences string
ranges, replFactor int
eventFn leasePreferencesEventFn
checkNodes []int
waitForLessPreferred bool
preferences string
ranges, replFactor int
eventFn leasePreferencesEventFn
checkNodes []int
waitForLessPreferred bool
postEventWaitDuration time.Duration
}

// makeStopNodesEventFn returns a leasePreferencesEventFn which stops the
Expand Down Expand Up @@ -103,12 +105,13 @@ func registerLeasePreferences(r registry.Registry) {
Cluster: r.MakeClusterSpec(5, spec.CPU(4)),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLeasePreferences(ctx, t, c, leasePreferencesSpec{
preferences: `[+dc=1],[+dc=2]`,
ranges: 1000,
replFactor: 5,
checkNodes: []int{1, 3, 4, 5},
eventFn: makeStopNodesEventFn(2 /* targets */),
waitForLessPreferred: false,
preferences: `[+dc=1],[+dc=2]`,
ranges: 1000,
replFactor: 5,
checkNodes: []int{1, 3, 4, 5},
eventFn: makeStopNodesEventFn(2 /* targets */),
waitForLessPreferred: false,
postEventWaitDuration: 10 * time.Minute,
})
},
})
Expand All @@ -126,12 +129,13 @@ func registerLeasePreferences(r registry.Registry) {
Cluster: r.MakeClusterSpec(5, spec.CPU(4)),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLeasePreferences(ctx, t, c, leasePreferencesSpec{
preferences: `[+dc=1],[+dc=2]`,
ranges: 1000,
replFactor: 5,
eventFn: makeStopNodesEventFn(1, 2 /* targets */),
checkNodes: []int{3, 4, 5},
waitForLessPreferred: false,
preferences: `[+dc=1],[+dc=2]`,
ranges: 1000,
replFactor: 5,
eventFn: makeStopNodesEventFn(1, 2 /* targets */),
checkNodes: []int{3, 4, 5},
waitForLessPreferred: false,
postEventWaitDuration: 10 * time.Minute,
})
},
})
Expand All @@ -150,8 +154,9 @@ func registerLeasePreferences(r registry.Registry) {
replFactor: 5,
eventFn: makeTransferLeasesEventFn(
5 /* gateway */, 5 /* target */),
checkNodes: []int{1, 2, 3, 4, 5},
waitForLessPreferred: false,
checkNodes: []int{1, 2, 3, 4, 5},
waitForLessPreferred: false,
postEventWaitDuration: 10 * time.Minute,
})
},
})
Expand Down Expand Up @@ -224,7 +229,7 @@ func runLeasePreferences(

checkLeasePreferenceConformance := func(ctx context.Context) {
result, err := waitForLeasePreferences(
ctx, t, c, spec.checkNodes, spec.waitForLessPreferred, stableDuration)
ctx, t, c, spec.checkNodes, spec.waitForLessPreferred, stableDuration, spec.postEventWaitDuration)
require.NoError(t, err, result)
require.Truef(t, !result.violating(), "violating lease preferences %s", result)
if spec.waitForLessPreferred {
Expand Down Expand Up @@ -338,7 +343,7 @@ func waitForLeasePreferences(
c cluster.Cluster,
nodes []int,
waitForLessPreferred bool,
stableDuration time.Duration,
stableDuration, maxWaitDuration time.Duration,
) (leasePreferencesResult, error) {
// NB: We are querying metrics, expect these to be populated approximately
// every 10s.
Expand Down Expand Up @@ -370,6 +375,8 @@ func waitForLeasePreferences(
select {
case <-ctx.Done():
return ret, ctx.Err()
case <-time.After(maxWaitDuration):
return ret, errors.Errorf("timed out before lease preferences satisfied")
case <-checkTimer.C:
checkTimer.Read = true
violating, lessPreferred := preferenceMetrics(ctx)
Expand Down
135 changes: 74 additions & 61 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,38 +266,27 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
}
}

func (r *Replica) tryReproposeWithNewLeaseIndex(ctx context.Context, origCmd *replicatedCmd) error {
// NB: `origCmd` remains "Local". It's just not going to signal anyone
// makeReproposal returns a new re-proposal of the given proposal, and a
// function that must be called if this re-proposal is successfully proposed.
//
// We want to move a few items from origP to the new command, but only if we
// managed to propose the new command. For example, if we move the latches over
// too early but then fail to actually get the new proposal started, the old
// proposal will not release the latches. This would result in a lost latch.
func (r *Replica) makeReproposal(origP *ProposalData) (reproposal *ProposalData, success func()) {
// NB: original command remains "Local". It's just not going to signal anyone
// or release any latches.

origP := origCmd.proposal

// We want to move a few items from origCmd to the new command, but only if we
// managed to propose the new command. For example, if we move the latches
// over too early but then fail to actually get the new proposal started, the
// old proposal will not release the latches. This would result in a lost
// latch.
var success bool
seedP := origP.seedProposal
if seedP == nil {
seedP = origP
}

// Go through the original proposal field by field and decide what transfers
// to the new proposal (and how that affects the old proposal). The overall
// goal is that the old proposal remains a local proposal (switching it to
// non-local now invites logic bugs) but not bound to the caller.

// NB: quotaAlloc is always nil here, because we already
// released the quota unconditionally in retrieveLocalProposals.
// So the below is a no-op.
//
// TODO(tbg): if we shifted the release of proposal quota to *after*
// successful application, we could move the quota over
// prematurely releasing it here.
newQuotaAlloc := origP.quotaAlloc
defer func() {
if success {
origP.quotaAlloc = nil
}
}()

newCommand := kvserverpb.RaftCommand{
ProposerLeaseSequence: origP.command.ProposerLeaseSequence,
ReplicatedEvalResult: origP.command.ReplicatedEvalResult,
Expand All @@ -312,25 +301,10 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(ctx context.Context, origCmd *re
AdmissionOriginNode: 0, // assigned on flush
}

// Now we construct the remainder of the ProposalData. First, the pieces
// that actively "move over", i.e. those that have to do with the latches
// held and the caller waiting to be signaled.

// `ec` (latches, etc) transfers to the new proposal.
newEC := origP.ec
defer func() {
if success {
origP.ec = makeEmptyEndCmds()
}
}()

// Ditto doneCh (signal to proposer).
newDoneCh := origP.doneCh
defer func() {
if success {
origP.doneCh = nil
}
}()
// Now we construct the remainder of the ProposalData. The pieces that
// actively "move over", are removed from the original proposal in the
// deferred func below. For example, those fields that have to do with the
// latches held and the caller waiting to be signaled.

// TODO(tbg): work on the lifecycle of ProposalData. This struct (and the
// surrounding replicatedCmd) are populated in an overly ad-hoc manner.
Expand All @@ -345,15 +319,27 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(ctx context.Context, origCmd *re
// for (some reincarnation of) the command to eventually apply, its trace
// will reflect the reproposal as well.
ctx: origP.ctx,
sp: origP.sp, // NB: special handling below
idKey: raftlog.MakeCmdIDKey(),
proposedAtTicks: 0, // set in registerProposalLocked
createdAtTicks: 0, // set in registerProposalLocked
command: &newCommand,
quotaAlloc: newQuotaAlloc,
ec: newEC,
applied: false,
doneCh: newDoneCh,

// Next comes the block of fields that are "moved" to the new proposal. See
// the deferred function call below which, correspondingly, clears these
// fields in the original proposal.
sp: origP.sp,
// NB: quotaAlloc is always nil here, because we already released the quota
// unconditionally in retrieveLocalProposals. So the below is a no-op.
//
// TODO(tbg): if we shifted the release of proposal quota to *after*
// successful application, we could move the quota over prematurely
// releasing it here.
quotaAlloc: origP.quotaAlloc,
ec: origP.ec,
doneCh: origP.doneCh,

applied: false,

// Local is copied over. It won't be used on the old proposal (since that
// proposal got rejected), but since it's still "local" we don't want to put
// it into an undefined state by removing its response. The same goes for
Expand All @@ -365,19 +351,46 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(ctx context.Context, origCmd *re
encodedCommand: nil,
raftAdmissionMeta: nil,
v2SeenDuringApplication: false,

seedProposal: seedP,
}
// If the original proposal had an explicit span, it's an async consensus
// proposal and the span would be finished momentarily (when we return to
// the caller) if we didn't unlink it here, but we want it to continue
// tracking newProposal. We leave it in `origP.ctx` though, since that
// context will become unused once the application of this (soft-failed)
// proposal concludes, i.e. soon after this method returns, in case there
// is anything left to log into it.
defer func() {
if success {
origP.sp = nil
}
}()

return newProposal, func() {
// If the original proposal had an explicit span, it's an async consensus
// proposal and the span would be finished momentarily (when we return to
// the caller) if we didn't unlink it here, but we want it to continue
// tracking newProposal. We leave it in `origP.ctx` though, since that
// context will become unused once the application of this (soft-failed)
// proposal concludes, i.e. soon after this method returns, in case there is
// anything left to log into it.
origP.sp = nil
origP.quotaAlloc = nil
origP.ec = makeEmptyEndCmds()
origP.doneCh = nil

// If the proposal is synchronous, the client is waiting on the seed
// proposal. By the time it has to act on the result, a bunch of reproposals
// can have happened, and some may still be running and using the
// context/tracing span (probably only the latest one, but we assume any,
// for defence-in-depth).
//
// Unbind the latest reproposal's context so that it no longer posts updates
// to the tracing span (it won't apply anyway). Link to the new latest
// reproposal, so that the client can clear its context at post-processing.
// This is effectively a "move" of the context to the reproposal.
//
// TODO(pavelkalinnikov): there should be a better way, after ProposalData
// lifecycle is reconsidered.
//
// TODO(radu): Should this context be created via tracer.ForkSpan?
// We'd need to make sure the span is finished eventually.
origP.ctx = r.AnnotateCtx(context.TODO())
seedP.lastReproposal = newProposal
}
}

func (r *Replica) tryReproposeWithNewLeaseIndex(ctx context.Context, origCmd *replicatedCmd) error {
newProposal, onSuccess := r.makeReproposal(origCmd.proposal)

// We need to track the request again in order to protect its timestamp until
// it gets reproposed.
Expand Down Expand Up @@ -411,7 +424,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(ctx context.Context, origCmd *re
}
log.VEventf(ctx, 2, "reproposed command %x", newProposal.idKey)

success = true
onSuccess()
return nil
}

Expand Down
Loading

0 comments on commit 2ac44e6

Please sign in to comment.