Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: refactor replicaAppBatch for standalone log application #93266

Merged
merged 20 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions pkg/kv/kvserver/app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,37 @@ package kvserver
import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

type appBatchStats struct {
// TODO(sep-raft-log):
// numEntries
// numEntriesBytes
// numEntriesEmpty
numMutations int
numEntriesProcessed int
numEntriesProcessedBytes int64
numEmptyEntries int
followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes
// NB: update `merge` when adding a new field.
}

func (s *appBatchStats) merge(ss appBatchStats) {
s.numMutations += ss.numMutations
s.numEntriesProcessed += ss.numEntriesProcessed
s.numEntriesProcessedBytes += ss.numEntriesProcessedBytes
ss.numEmptyEntries += ss.numEmptyEntries
s.followerStoreWriteBytes.Merge(ss.followerStoreWriteBytes)
}

// appBatch is the in-progress foundation for standalone log entry
// application[^1], i.e. the act of applying raft log entries to the state
// machine in a library-style fashion, without a running CockroachDB server.
Expand All @@ -44,6 +68,7 @@ import (
//
// [^1]: https://github.com/cockroachdb/cockroach/issues/75729
type appBatch struct {
appBatchStats
// TODO(tbg): this will absorb the following fields from replicaAppBatch:
//
// - batch
Expand Down Expand Up @@ -88,6 +113,49 @@ func (b *appBatch) toCheckedCmd(
cmd.Cmd.LogicalOpLog = nil
cmd.Cmd.ClosedTimestamp = nil
} else {
// If the command was using the deprecated version of the MVCCStats proto,
// migrate it to the new version and clear out the field.
res := cmd.ReplicatedResult()
if deprecatedDelta := res.DeprecatedDelta; deprecatedDelta != nil {
tbg marked this conversation as resolved.
Show resolved Hide resolved
if res.Delta != (enginepb.MVCCStatsDelta{}) {
log.Fatalf(ctx, "stats delta not empty but deprecated delta provided: %+v", cmd)
}
res.Delta = deprecatedDelta.ToStatsDelta()
res.DeprecatedDelta = nil
}
log.Event(ctx, "applying command")
}
}

// runPreAddTriggers runs any triggers that must fire before the command is
// added to the appBatch's pebble batch. That is, the pebble batch at this point
// will have materialized the raft log up to but excluding the current command.
func (b *appBatch) runPreAddTriggers(ctx context.Context, cmd *raftlog.ReplicatedCmd) error {
// None currently.
return nil
}

// addWriteBatch adds the command's writes to the batch.
func (b *appBatch) addWriteBatch(
ctx context.Context, batch storage.Batch, cmd *replicatedCmd,
tbg marked this conversation as resolved.
Show resolved Hide resolved
) error {
wb := cmd.Cmd.WriteBatch
if wb == nil {
return nil
}
if mutations, err := storage.PebbleBatchCount(wb.Data); err != nil {
log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err)
} else {
b.numMutations += mutations
}
if err := batch.ApplyBatchRepr(wb.Data, false); err != nil {
return errors.Wrapf(err, "unable to apply WriteBatch")
}
return nil
}

func (b *appBatch) runPostAddTriggers(ctx context.Context, cmd *raftlog.ReplicatedCmd) error {
// TODO(sep-raft-log): currently they are commingled in runPostAddTriggersReplicaOnly,
// extract them from that method.
return nil
}
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// All state transitions performed by the state machine are expected to be
// deterministic, which ensures that if each instance is driven from the
// same consistent shared log, they will all stay in sync.
//
// The implementation may not be and commonly is not thread safe.
type StateMachine interface {
// NewEphemeralBatch creates an EphemeralBatch. This kind of batch is not able
// to make changes to the StateMachine, but can be used for the purpose of
Expand All @@ -39,6 +41,9 @@ type StateMachine interface {
// that a group of Commands will have on the replicated state machine.
// Commands are staged in the batch one-by-one and then the entire batch is
// applied to the StateMachine at once via its ApplyToStateMachine method.
//
// There must only be a single EphemeralBatch *or* Batch open at any given
// point in time.
NewBatch() Batch
// ApplySideEffects applies the in-memory side-effects of a Command to
// the replicated state machine. The method will be called in the order
Expand Down Expand Up @@ -66,6 +71,10 @@ var ErrRemoved = errors.New("replica removed")
type EphemeralBatch interface {
// Stage inserts a Command into the Batch. In doing so, the Command is
// checked for rejection and a CheckedCommand is returned.
//
// TODO(tbg): consider renaming this to Add, so that in implementations
// of this we less unambiguously refer to "staging" commands into the
// pebble batch.
Stage(context.Context, Command) (CheckedCommand, error)
// Close closes the batch and releases any resources that it holds.
Close()
Expand Down
Loading