Skip to content

Commit

Permalink
Merge #63045 #63900
Browse files Browse the repository at this point in the history
63045: kvserver: rationalize timestamps in proposals  r=andreimatei a=andreimatei

see individual commits

63900: sql: support spilling to disk for bufferNode r=yuzefovich a=yuzefovich

This commit refactors several `planNode`s that need to buffer rows to
use a disk-backed row container instead of pure in-memory one. In order
to achieve that a couple of light wrappers are introduced on top of the
corresponding container and an iterator over it.

Still, one - probably important - case is not fixed properly: namely, if
a subquery is executed in AllRows or AllRowsNormalized mode, then we
first buffer the rows into the disk-backed container only to materialize
it later into a single tuple. Addressing this is left as a TODO.

Fixes: #62301.
Fixes: #62674.

Release note (sql change): CockroachDB now should be more stable when
executing queries with subqueries producing many rows (previously we
could OOM crash and now we will use the temporary disk storage).

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Apr 22, 2021
3 parents 20e7fc3 + 3f38eb2 + 0a8baa9 commit bb86f86
Show file tree
Hide file tree
Showing 30 changed files with 731 additions and 332 deletions.
2 changes: 2 additions & 0 deletions docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ File | Type
--|--
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/kvserver/closedts/ctpb/entry.go | `LAI`
pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy`
pkg/kv/kvserver/raft.go | `SnapshotRequest_Type`
pkg/roachpb/data.go | `LeaseSequence`
pkg/roachpb/data.go | `ReplicaChangeType`
pkg/roachpb/metadata.go | `NodeID`
pkg/roachpb/metadata.go | `StoreID`
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4702,7 +4702,7 @@ func TestAckWriteBeforeApplication(t *testing.T) {
blockPreApplication, blockPostApplication := make(chan struct{}), make(chan struct{})
applyFilterFn := func(ch chan struct{}) kvserverbase.ReplicaApplyFilter {
return func(filterArgs kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 && filterArgs.Timestamp == magicTS {
if atomic.LoadInt32(&filterActive) == 1 && filterArgs.WriteTimestamp == magicTS {
<-ch
}
return 0, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3342,7 +3342,7 @@ func TestProposalOverhead(t *testing.T) {
}
// Sometime the logical portion of the timestamp can be non-zero which makes
// the overhead non-deterministic.
args.Cmd.ReplicatedEvalResult.Timestamp.Logical = 0
args.Cmd.ReplicatedEvalResult.WriteTimestamp.Logical = 0
atomic.StoreUint32(&overhead, uint32(args.Cmd.Size()-args.Cmd.WriteBatch.Size()))
// We don't want to print the WriteBatch because it's explicitly
// excluded from the size computation. Nil'ing it out does not
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/closedts/ctpb/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Epoch int64
// mix-ups in positional arguments.
type LAI int64

// SafeValue implements the redact.SafeValue interface.
func (LAI) SafeValue() {}

// String formats Entry for human consumption as well as testing (by avoiding
// randomness in the output caused by map iteraton order).
func (e Entry) String() string {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/util/hlc",
"@com_github_cockroachdb_redact//:redact",
],
)
14 changes: 13 additions & 1 deletion pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/redact"
)

// MergeQueueEnabled is a setting that controls whether the merge queue is
Expand All @@ -39,9 +40,20 @@ var MergeQueueEnabled = settings.RegisterBoolSetting(
// larger than the heartbeat interval used by the coordinator.
const TxnCleanupThreshold = time.Hour

// CmdIDKey is a Raft command id.
// CmdIDKey is a Raft command id. This will be logged unredacted - keep it random.
type CmdIDKey string

// SafeFormat implements redact.SafeFormatter.
func (s CmdIDKey) SafeFormat(sp redact.SafePrinter, verb rune) {
sp.Printf("%q", redact.SafeString(s))
}

func (s CmdIDKey) String() string {
return redact.StringWithoutMarkers(s)
}

var _ redact.SafeFormatter = CmdIDKey("")

// FilterArgs groups the arguments to a ReplicaCommandFilter.
type FilterArgs struct {
Ctx context.Context
Expand Down
210 changes: 106 additions & 104 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ message ReplicatedEvalResult {
Merge merge = 4;
ComputeChecksum compute_checksum = 21;
bool is_lease_request = 6;
// Duplicates BatchRequest.Timestamp for proposer-evaluated KV. Used
// to verify the validity of the command (for lease coverage and GC
// threshold).
util.hlc.Timestamp timestamp = 8 [(gogoproto.nullable) = false];
// The timestamp at which this command is writing. Used to verify the validity
// of the command against the GC threshold and to update the followers'
// clocks. If the request that produced this command is not an IntentWrite
// one, then the request's write timestamp is meaningless; for such request's,
// this field is simply a clock reading from the proposer.
util.hlc.Timestamp write_timestamp = 8 [(gogoproto.nullable) = false];
// The stats delta corresponding to the data in this WriteBatch. On
// a split, contains only the contributions to the left-hand side.
storage.enginepb.MVCCStats deprecated_delta = 10; // See #18828
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ type Replica struct {
failureToGossipSystemConfig bool

tenantID roachpb.TenantID // Set when first initialized, not modified after

// Historical information about the command that set the closed timestamp.
closedTimestampSetter closedTimestampSetterInfo
}

rangefeedMu struct {
Expand Down Expand Up @@ -1358,8 +1361,8 @@ func (r *Replica) checkSpanInRangeRLocked(ctx context.Context, rspan roachpb.RSp
)
}

// checkTSAboveGCThresholdRLocked returns an error if a request (identified
// by its MVCC timestamp) can be run on the replica.
// checkTSAboveGCThresholdRLocked returns an error if a request (identified by
// its read timestamp) wants to read below the range's GC threshold.
func (r *Replica) checkTSAboveGCThresholdRLocked(
ts hlc.Timestamp, st kvserverpb.LeaseStatus, isAdmin bool,
) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func isTrivial(r *kvserverpb.ReplicatedEvalResult) bool {
// it is trivial.
allowlist := *r
allowlist.Delta = enginepb.MVCCStatsDelta{}
allowlist.Timestamp = hlc.Timestamp{}
allowlist.WriteTimestamp = hlc.Timestamp{}
allowlist.DeprecatedDelta = nil
allowlist.PrevLeaseProposal = nil
allowlist.State = nil
Expand All @@ -85,7 +85,7 @@ func clearTrivialReplicatedEvalResultFields(r *kvserverpb.ReplicatedEvalResult)
// they don't trigger an assertion at the end of the application process
// (which checks that all fields were handled).
r.IsLeaseRequest = false
r.Timestamp = hlc.Timestamp{}
r.WriteTimestamp = hlc.Timestamp{}
r.PrevLeaseProposal = nil
// The state fields cleared here were already applied to the in-memory view of
// replica state for this batch.
Expand Down
175 changes: 153 additions & 22 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ import (

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/kr/pretty"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
Expand Down Expand Up @@ -318,16 +322,19 @@ func checkForcedErr(
)
}

// Verify that the batch timestamp is after the GC threshold. This is
// Verify that command is not trying to write below the GC threshold. This is
// necessary because not all commands declare read access on the GC
// threshold key, even though they implicitly depend on it. This means
// that access to this state will not be serialized by latching,
// so we must perform this check upstream and downstream of raft.
// See #14833.
ts := raftCmd.ReplicatedEvalResult.Timestamp
if ts.LessEq(*replicaState.GCThreshold) {
// TODO(andrei,nvanbenschoten,bdarnell): Is this check below-Raft actually
// necessary, given that we've check at evaluation time that the request
// evaluates at a timestamp above the GC threshold? Does it actually matter if
// the GC threshold has advanced since then?
wts := raftCmd.ReplicatedEvalResult.WriteTimestamp
if !wts.IsEmpty() && wts.LessEq(*replicaState.GCThreshold) {
return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{
Timestamp: ts,
Timestamp: wts,
Threshold: *replicaState.GCThreshold,
})
}
Expand All @@ -353,6 +360,7 @@ func (sm *replicaStateMachine) NewBatch(ephemeral bool) apply.Batch {
b.state = r.mu.state
b.state.Stats = &b.stats
*b.state.Stats = *r.mu.state.Stats
b.closedTimestampSetter = r.mu.closedTimestampSetter
r.mu.RUnlock()
b.start = timeutil.Now()
return b
Expand All @@ -375,6 +383,9 @@ type replicaAppBatch struct {
// under the Replica.mu when the batch is initialized and is updated in
// stageTrivialReplicatedEvalResult.
state kvserverpb.ReplicaState
// closedTimestampSetter maintains historical information about the
// advancement of the closed timestamp.
closedTimestampSetter closedTimestampSetterInfo
// stats is stored on the application batch to avoid an allocation in
// tracking the batch's view of replicaState. All pointer fields in
// replicaState other than Stats are overwritten completely rather than
Expand Down Expand Up @@ -451,17 +462,11 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
cmd.raftCmd.LogicalOpLog = nil
cmd.raftCmd.ClosedTimestamp = nil
} else {
// Assert that we're not writing under the closed timestamp. We can only do
// these checks on IsIntentWrite requests, since others (for example,
// EndTxn) can operate below the closed timestamp. In turn, this means that
// we can only assert on the leaseholder, as only that replica has
// cmd.proposal.Request filled in.
if cmd.IsLocal() && cmd.proposal.Request.IsIntentWrite() {
wts := cmd.proposal.Request.WriteTimestamp()
if wts.LessEq(b.state.RaftClosedTimestamp) {
return nil, makeNonDeterministicFailure("writing at %s below closed ts: %s (%s)",
wts, b.state.RaftClosedTimestamp.String(), cmd.proposal.Request.String())
}
if err := b.assertNoCmdClosedTimestampRegression(cmd); err != nil {
return nil, err
}
if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil {
return nil, err
}
log.Event(ctx, "applying command")
}
Expand All @@ -484,7 +489,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
}

// Update the batch's max timestamp.
if clockTS, ok := cmd.replicatedResult().Timestamp.TryToClockTimestamp(); ok {
if clockTS, ok := cmd.replicatedResult().WriteTimestamp.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}

Expand Down Expand Up @@ -828,12 +833,8 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
b.state.LeaseAppliedIndex = leaseAppliedIndex
}
if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() {
if cts.Less(b.state.RaftClosedTimestamp) {
log.Fatalf(ctx,
"closed timestamp regressing from %s to %s when applying command %x",
b.state.RaftClosedTimestamp, cts, cmd.idKey)
}
b.state.RaftClosedTimestamp = *cts
b.closedTimestampSetter.record(cmd, b.state.Lease)
if clockTS, ok := cts.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}
Expand Down Expand Up @@ -896,6 +897,17 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
r.mu.Lock()
r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex
r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex

// Sanity check that the RaftClosedTimestamp doesn't go backwards.
existingClosed := r.mu.state.RaftClosedTimestamp
newClosed := b.state.RaftClosedTimestamp
if !newClosed.IsEmpty() && newClosed.Less(existingClosed) && raftClosedTimestampAssertionsEnabled {
return errors.AssertionFailedf(
"raft closed timestamp regression; replica has: %s, new batch has: %s.",
existingClosed.String(), newClosed.String())
}
r.mu.closedTimestampSetter = b.closedTimestampSetter

closedTimestampUpdated := r.mu.state.RaftClosedTimestamp.Forward(b.state.RaftClosedTimestamp)
prevStats := *r.mu.state.Stats
*r.mu.state.Stats = *b.state.Stats
Expand Down Expand Up @@ -1009,6 +1021,77 @@ func (b *replicaAppBatch) Close() {
*b = replicaAppBatch{}
}

// raftClosedTimestampAssertionsEnabled provides an emergency way of shutting
// down assertions.
var raftClosedTimestampAssertionsEnabled = envutil.EnvOrDefaultBool("COCKROACH_RAFT_CLOSEDTS_ASSERTIONS_ENABLED", true)

// Assert that the current command is not writing under the closed timestamp.
// This check only applies to IntentWrite commands, since others (for example,
// EndTxn) can operate below the closed timestamp.
//
// Note that we check that we're we're writing under b.state.RaftClosedTimestamp
// (i.e. below the timestamp closed by previous commands), not below
// cmd.raftCmd.ClosedTimestamp. A command is allowed to write below the closed
// timestamp carried by itself; in other words cmd.raftCmd.ClosedTimestamp is a
// promise about future commands, not the command carrying it.
func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) error {
if !cmd.IsLocal() || !cmd.proposal.Request.IsIntentWrite() {
return nil
}
if !raftClosedTimestampAssertionsEnabled {
return nil
}
wts := cmd.raftCmd.ReplicatedEvalResult.WriteTimestamp
if !wts.IsEmpty() && wts.LessEq(b.state.RaftClosedTimestamp) {
wts := wts // Make a shadow variable that escapes to the heap.
var req redact.StringBuilder
if cmd.proposal != nil {
req.Print(cmd.proposal.Request)
} else {
req.SafeString("request unknown; not leaseholder")
}
return wrapWithNonDeterministicFailure(errors.AssertionFailedf(
"command writing below closed timestamp; cmd: %x, write ts: %s, "+
"batch state closed: %s, command closed: %s, request: %s, lease: %s",
cmd.idKey, wts,
b.state.RaftClosedTimestamp, cmd.raftCmd.ClosedTimestamp,
req, b.state.Lease),
"command writing below closed timestamp")
}
return nil
}

// Assert that the closed timestamp carried by the command is not below one from
// previous commands.
func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCmd) error {
if !raftClosedTimestampAssertionsEnabled {
return nil
}
existingClosed := &b.state.RaftClosedTimestamp
newClosed := cmd.raftCmd.ClosedTimestamp
if newClosed != nil && !newClosed.IsEmpty() && newClosed.Less(*existingClosed) {
var req redact.StringBuilder
if cmd.IsLocal() && cmd.proposal.Request.IsIntentWrite() {
req.Print(cmd.proposal.Request)
} else {
req.SafeString("<unknown; not leaseholder>")
}
var prevReq redact.StringBuilder
if req := b.closedTimestampSetter.leaseReq; req != nil {
prevReq.Printf("lease acquisition: %s (prev: %s)", req.Lease, req.PrevLease)
} else {
prevReq.SafeString("<unknown; not leaseholder or not lease request>")
}

return errors.AssertionFailedf(
"raft closed timestamp regression in cmd: %x; batch state: %s, command: %s, lease: %s, req: %s, applying at LAI: %d.\n"+
"Closed timestamp was set by req: %s under lease: %s; applied at LAI: %d. Batch idx: %d.",
cmd.idKey, existingClosed, newClosed, b.state.Lease, req, cmd.leaseIndex,
prevReq, b.closedTimestampSetter.lease, b.closedTimestampSetter.leaseIdx, b.entries)
}
return nil
}

// ephemeralReplicaAppBatch implements the apply.Batch interface.
//
// The batch performs the bare-minimum amount of work to be able to
Expand Down Expand Up @@ -1257,3 +1340,51 @@ func (sm *replicaStateMachine) moveStats() applyCommittedEntriesStats {
sm.stats = applyCommittedEntriesStats{}
return stats
}

// closedTimestampSetterInfo contains information about the command that last
// bumped the closed timestamp.
type closedTimestampSetterInfo struct {
// lease represents the lease under which the command is being applied.
lease *roachpb.Lease
// leaseIdx is the LAI of the command.
leaseIdx ctpb.LAI
// leaseReq is set if the request that generated this command was a
// RequestLeaseRequest. This is only ever set on the leaseholder replica since
// only the leaseholder has information about the request corresponding to a
// command.
// NOTE: We only keep track of lease requests because keeping track of all
// requests would be too expensive: cloning the request is expensive and also
// requests can be large in memory.
leaseReq *roachpb.RequestLeaseRequest
// split and merge are set if the request was an EndTxn with the respective
// commit trigger set.
split, merge bool
}

// record saves information about the command that update's the replica's closed
// timestamp.
func (s *closedTimestampSetterInfo) record(cmd *replicatedCmd, lease *roachpb.Lease) {
*s = closedTimestampSetterInfo{}
s.leaseIdx = ctpb.LAI(cmd.leaseIndex)
s.lease = lease
if !cmd.IsLocal() {
return
}
req := cmd.proposal.Request
et, ok := req.GetArg(roachpb.EndTxn)
if ok {
endTxn := et.(*roachpb.EndTxnRequest)
if trig := endTxn.InternalCommitTrigger; trig != nil {
if trig.SplitTrigger != nil {
s.split = true
} else if trig.MergeTrigger != nil {
s.merge = true
}
}
} else if req.IsLeaseRequest() {
// Make a deep copy since we're not allowed to hold on to request
// memory.
lr, _ := req.GetArg(roachpb.RequestLease)
s.leaseReq = protoutil.Clone(lr).(*roachpb.RequestLeaseRequest)
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) {
ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{
State: &kvserverpb.ReplicaState{Desc: &newDesc},
ChangeReplicas: &kvserverpb.ChangeReplicas{ChangeReplicasTrigger: trigger},
Timestamp: r.mu.state.GCThreshold.Add(1, 0),
WriteTimestamp: r.mu.state.GCThreshold.Add(1, 0),
},
},
confChange: &decodedConfChange{
Expand Down
Loading

0 comments on commit bb86f86

Please sign in to comment.