Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#93912

56675: rfc: closed timestamps v2 RFC r=andreimatei a=andreimatei

We need a more flexible closed timestamps mechanism because the
Non-blocking Transactions project will use two different closed
timestamps policies for different ranges. The existing mechanism closes
a single timestamps for all the ranges for which a node is the
leaseholder, which is not good enough.
Besides, the existing mechanism is quite complex. The alternative
proposed by this RFC seems significantly simpler.

Release note: None
Epic: None

93239: kvserver: decouple cmd checks in replicaAppBatch r=pavelkalinnikov a=tbg

This refactors the command application pre-flight checks
on replicaAppBatch such that they can move to appBatch
once that struct evolves from a stub into an actual
implementation of `apply.Batch`.

Epic: CRDB-220
Release note: None

93910: go.mod: bump Pebble to 6ca7956d84c1 r=nicktrav a=jbowens

```
6ca7956d db: tweak NextPrefix semantics in prefix-iteration mode
7fced9a5 metamorphic: fix NextPrefix integration
33377d59 metamorphic: fix NextPrefix integration
```

Epic: None
Release note: none

93912: scripts: adapt bump-pebble.sh for removal of vendor r=jbowens a=jbowens

Remove all references to the vendor directory in the bump-pebble script.

Release note: None
Epic: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
4 people committed Dec 19, 2022
5 parents 91fbbad + 99a5df9 + 711893e + 2611320 + e49ed89 commit bb6ed57
Show file tree
Hide file tree
Showing 14 changed files with 803 additions and 100 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1485,10 +1485,10 @@ def go_deps():
patches = [
"@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch",
],
sha256 = "0401364025128ab1d3d592293d0c3e1109eb6b50f6c30bb948c5743e7200e9e1",
strip_prefix = "github.com/cockroachdb/[email protected]20221212215000-0893071d8a52",
sha256 = "8e17511f72ac42694d17f2fc293a484a6f243d0ea6bb592120ca9424bce3dbfb",
strip_prefix = "github.com/cockroachdb/[email protected]20221219140626-6ca7956d84c1",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20221212215000-0893071d8a52.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20221219140626-6ca7956d84c1.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/google-api-go-client/com_github_cockroachdb_google_api_go_client-v0.80.1-0.20221117193156-6a9f7150cb93.zip": "b3378c579f4f4340403038305907d672c86f615f8233118a8873ebe4229c4f39",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20211118104740-dabe8e521a4f.zip": "1972c3f171f118add3fd9e64bcea6cbb9959a3b7fa0ada308e8a7310813fea74",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20221212215000-0893071d8a52.zip": "0401364025128ab1d3d592293d0c3e1109eb6b50f6c30bb948c5743e7200e9e1",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20221219140626-6ca7956d84c1.zip": "8e17511f72ac42694d17f2fc293a484a6f243d0ea6bb592120ca9424bce3dbfb",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.3.zip": "7778b1e4485e4f17f35e5e592d87eb99c29e173ac9507801d000ad76dd0c261e",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/sentry-go/com_github_cockroachdb_sentry_go-v0.6.1-cockroachdb.2.zip": "fbb2207d02aecfdd411b1357efe1192dbb827959e36b7cab7491731ac55935c9",
Expand Down
581 changes: 581 additions & 0 deletions docs/RFCS/20191108_closed_timestamps_v2.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ require (
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55
github.com/cockroachdb/gostdlib v1.19.0
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f
github.com/cockroachdb/pebble v0.0.0-20221212215000-0893071d8a52
github.com/cockroachdb/pebble v0.0.0-20221219140626-6ca7956d84c1
github.com/cockroachdb/redact v1.1.3
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd
github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ github.com/cockroachdb/gostdlib v1.19.0/go.mod h1:+dqqpARXbE/gRDEhCak6dm0l14AaTy
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0njg5jJ1DdKCFPdMBrp/mdZfCpa5h+WM74=
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/pebble v0.0.0-20221212215000-0893071d8a52 h1:kT8xHygytiJSQpqKv01NpGMG3ZdTlCjr9lW5K5RDh+k=
github.com/cockroachdb/pebble v0.0.0-20221212215000-0893071d8a52/go.mod h1:JsehdjcR1QgLZkqBeYrbVdE3cdxbdrycA/PN+Cg+RNw=
github.com/cockroachdb/pebble v0.0.0-20221219140626-6ca7956d84c1 h1:blSuAUKDTqzi+l1WP0RvB2uh4AZYLuAKUm7Xc6e2G68=
github.com/cockroachdb/pebble v0.0.0-20221219140626-6ca7956d84c1/go.mod h1:JsehdjcR1QgLZkqBeYrbVdE3cdxbdrycA/PN+Cg+RNw=
github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ=
github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd h1:KFOt5I9nEKZgCnOSmy8r4Oykh8BYQO8bFOTgHDS8YZA=
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
srcs = [
"addressing.go",
"allocation_op.go",
"app_batch.go",
"consistency_queue.go",
"debug_print.go",
"doc.go",
Expand Down
93 changes: 93 additions & 0 deletions pkg/kv/kvserver/app_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// appBatch is the in-progress foundation for standalone log entry
// application[^1], i.e. the act of applying raft log entries to the state
// machine in a library-style fashion, without a running CockroachDB server.
//
// The intended usage is as follows. Starting with a ReplicatedCmd per Entry,
//
// 1. check it via assertAndCheckCommand followed by toCheckedCmd
// 2. run pre-add triggers (which may augment the WriteBatch)
// 3. stage the WriteBatch into a pebble Batch
// 4. run post-add triggers (metrics, etc)
//
// when all Entries have been added, the batch can be committed. In the course
// of time, appBatch will become an implementation of apply.Batch itself; at the
// time of writing it is only used by the replicaAppBatch implementation of
// apply.Batch, which goes through the above steps while interspersing:
//
// 1a. testing interceptors between assertAndCheckCommand and toCheckedCmd
// 2b. pre-add triggers specific to online command application (e.g. acquiring locks
// during replica-spanning operations), and
// 4b. post-add triggers specific to online command application (e.g. updates to
// Replica in-mem state)
//
// [^1]: https://github.com/cockroachdb/cockroach/issues/75729
type appBatch struct {
// TODO(tbg): this will absorb the following fields from replicaAppBatch:
//
// - batch
// - state
// - changeRemovesReplica
}

func (b *appBatch) assertAndCheckCommand(
ctx context.Context, cmd *raftlog.ReplicatedCmd, state *kvserverpb.ReplicaState, isLocal bool,
) (kvserverbase.ForcedErrResult, error) {
if log.V(4) {
log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s",
cmd.ID, cmd.Index(), cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp)
}

if cmd.Index() == 0 {
return kvserverbase.ForcedErrResult{}, errors.AssertionFailedf("processRaftCommand requires a non-zero index")
}
if idx, applied := cmd.Index(), state.RaftAppliedIndex; idx != applied+1 {
// If we have an out-of-order index, there's corruption. No sense in
// trying to update anything or running the command. Simply return.
return kvserverbase.ForcedErrResult{}, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx)
}

// TODO(sep-raft-log): move the closedts checks from replicaAppBatch here as
// well. This just needs a bit more untangling as they reference *Replica, but
// for no super-convincing reason.

return kvserverbase.CheckForcedErr(ctx, cmd.ID, &cmd.Cmd, isLocal, state), nil
}

func (b *appBatch) toCheckedCmd(
ctx context.Context, cmd *raftlog.ReplicatedCmd, fr kvserverbase.ForcedErrResult,
) {
cmd.ForcedErrResult = fr
if cmd.Rejected() {
log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedError)

// Apply an empty command.
cmd.Cmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{}
cmd.Cmd.WriteBatch = nil
cmd.Cmd.LogicalOpLog = nil
cmd.Cmd.ClosedTimestamp = nil
} else {
log.Event(ctx, "applying command")
}
}
81 changes: 59 additions & 22 deletions pkg/kv/kvserver/kvserverbase/forced_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ var noopOnEmptyRaftCommandErr = roachpb.NewErrorf("no-op on empty Raft entry")
// corresponding to a ProbeRequest is handled.
var NoopOnProbeCommandErr = roachpb.NewErrorf("no-op on ProbeRequest")

// ForcedErrResult is the output from CheckForcedErr.
type ForcedErrResult struct {
LeaseIndex uint64
Rejection ProposalRejectionType
ForcedError *roachpb.Error
}

// CheckForcedErr determines whether or not a command should be applied to the
// replicated state machine after it has been committed to the Raft log. This
// decision is deterministic on all replicas, such that a command that is
Expand Down Expand Up @@ -69,13 +76,16 @@ func CheckForcedErr(
raftCmd *kvserverpb.RaftCommand,
isLocal bool,
replicaState *kvserverpb.ReplicaState,
) (uint64, ProposalRejectionType, *roachpb.Error) {
) ForcedErrResult {
if raftCmd.ReplicatedEvalResult.IsProbe {
// A Probe is handled by forcing an error during application (which
// avoids a separate "success" code path for this type of request)
// that we can special case as indicating success of the probe above
// raft.
return 0, ProposalRejectionPermanent, NoopOnProbeCommandErr
return ForcedErrResult{
Rejection: ProposalRejectionPermanent,
ForcedError: NoopOnProbeCommandErr,
}
}
leaseIndex := replicaState.LeaseAppliedIndex
isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest
Expand All @@ -89,7 +99,11 @@ func CheckForcedErr(
// Nothing to do here except making sure that the corresponding batch
// (which is bogus) doesn't get executed (for it is empty and so
// properties like key range are undefined).
return leaseIndex, ProposalRejectionPermanent, noopOnEmptyRaftCommandErr
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: noopOnEmptyRaftCommandErr,
}
}

// Verify the lease matches the proposer's expectation. We rely on
Expand Down Expand Up @@ -162,11 +176,15 @@ func CheckForcedErr(
// For lease requests we return a special error that
// redirectOnOrAcquireLease() understands. Note that these
// requests don't go through the DistSender.
return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *replicaState.Lease,
Requested: requestedLease,
Message: "proposed under invalid lease",
})
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *replicaState.Lease,
Requested: requestedLease,
Message: "proposed under invalid lease",
}),
}
}
// We return a NotLeaseHolderError so that the DistSender retries.
// NB: we set proposerStoreID to 0 because we don't know who proposed the
Expand All @@ -176,7 +194,11 @@ func CheckForcedErr(
fmt.Sprintf(
"stale proposal: command was proposed under lease #%d but is being applied "+
"under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease))
return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(nlhe)
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: roachpb.NewError(nlhe),
}
}

if isLeaseRequest {
Expand All @@ -188,11 +210,15 @@ func CheckForcedErr(
// However, leases get special vetting to make sure we don't give one to a replica that was
// since removed (see #15385 and a comment in redirectOnOrAcquireLease).
if _, ok := replicaState.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok {
return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *replicaState.Lease,
Requested: requestedLease,
Message: "replica not part of range",
})
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *replicaState.Lease,
Requested: requestedLease,
Message: "replica not part of range",
}),
}
}
} else if replicaState.LeaseAppliedIndex < raftCmd.MaxLeaseIndex {
// The happy case: the command is applying at or ahead of the minimal
Expand All @@ -219,9 +245,12 @@ func CheckForcedErr(
)
retry = ProposalRejectionIllegalLeaseIndex
}
return leaseIndex, retry, roachpb.NewErrorf(
"command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex,
)
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: retry,
ForcedError: roachpb.NewErrorf(
"command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex,
)}
}

// Verify that command is not trying to write below the GC threshold. This is
Expand All @@ -235,10 +264,18 @@ func CheckForcedErr(
// the GC threshold has advanced since then?
wts := raftCmd.ReplicatedEvalResult.WriteTimestamp
if !wts.IsEmpty() && wts.LessEq(*replicaState.GCThreshold) {
return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{
Timestamp: wts,
Threshold: *replicaState.GCThreshold,
})
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{
Timestamp: wts,
Threshold: *replicaState.GCThreshold,
}),
}
}
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: nil,
}
return leaseIndex, ProposalRejectionPermanent, nil
}
13 changes: 5 additions & 8 deletions pkg/kv/kvserver/raftlog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
)
Expand All @@ -32,13 +31,11 @@ import (
// changes replica boundaries.
type ReplicatedCmd struct {
*Entry
// The following fields are set in shouldApplyCommand when we validate that
// a command applies given the current lease and GC threshold. The process
// of setting these fields is what transforms an apply.Command into an
// The following struct is populated in shouldApplyCommand when we validate
// that a command applies given the current lease and GC threshold. The
// process of populating it is what transforms an apply.Command into an
// apply.CheckedCommand.
LeaseIndex uint64
ForcedErr *roachpb.Error
Rejection kvserverbase.ProposalRejectionType
kvserverbase.ForcedErrResult
}

var _ apply.Command = (*ReplicatedCmd)(nil)
Expand Down Expand Up @@ -106,7 +103,7 @@ func (c *ReplicatedCmd) AckOutcomeAndFinish(context.Context) error { return nil
// A command is rejected if it has a ForcedErr, i.e. if the state machines
// (deterministically) apply the associated entry as a no-op. See
// kvserverbase.CheckForcedErr.
func (c *ReplicatedCmd) Rejected() bool { return c.ForcedErr != nil }
func (c *ReplicatedCmd) Rejected() bool { return c.ForcedError != nil }

// CanAckBeforeApplication implements apply.CheckedCommand.
//
Expand Down
Loading

0 comments on commit bb6ed57

Please sign in to comment.