Skip to content

Commit

Permalink
raftlog: introduce & use in loqrecovery, kvserver
Browse files Browse the repository at this point in the history
This commit introduces a new package `raftlog`. Aspirationally, this
will at some point become the de facto way of encapsulating the raft log
encoding and may play a role in programmatically constructing and
inspecting raft logs (e.g.  for testing).

For now, we introduce two concepts:

- `raftlog.Entry`, which wraps a `raftpb.Entry` and all of the
  information derived from it, such as the command ID, the
`kvserverpb.RaftCommand`, the configuration change (if any), etc.

- `raftlog.Iterator`, a way to iterate over the raft log in terms of
  `raftlog.Entry` (as opposed to `raftpb.Entry` which requires lots of
manual processing).

Both are then applied across the codebase, concretely:

- `loqrecovery` is simplified via `raftlog.Iterator` to pull commit
  triggers out of the raft log.
- debug pretty-printing is simpler thanks to use of `raftlog.Entry`.
- `decodedRaftEntry` is now structurally a `raftpb.Entry`, and again
  lots manual custom unmarshaling code evaporates.

It's currently difficult to create "interesting" raft log entries if
trying to stay away from manual population of large datastructures
(which is prone to rotting), so there's zero unit testing of
`raftlog.Iterator`. However, the code is not new, instead it was
deduplicated from a few places, and is now tested through all of them;
so I don't feel to bad about it. I still think it is a priority to be
able to "comfortably" create at least "simple" raft logs, meaning we
need to be able to string together `batcheval` and entry creation at
least in a rudimentary fashion. I intend to look into this next and
add comprehensive unit tests for `raftlog.{Entry,Iterator}`.

Touches cockroachdb#75729.

Release note: None
  • Loading branch information
tbg committed Feb 11, 2022
1 parent 4ea464d commit 8bea106
Show file tree
Hide file tree
Showing 15 changed files with 653 additions and 324 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/protectedts/ptverifier:ptverifier_test",
"//pkg/kv/kvserver/protectedts:protectedts_test",
"//pkg/kv/kvserver/raftentry:raftentry_test",
"//pkg/kv/kvserver/raftlog:raftlog_test",
"//pkg/kv/kvserver/rangefeed:rangefeed_test",
"//pkg/kv/kvserver/rditer:rditer_test",
"//pkg/kv/kvserver/reports:reports_test",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ go_library(
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary",
Expand Down Expand Up @@ -336,6 +337,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptstorage",
"//pkg/kv/kvserver/protectedts/ptverifier",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/spanset",
Expand Down
51 changes: 10 additions & 41 deletions pkg/kv/kvserver/debug_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -206,52 +206,21 @@ func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) {
}

func tryRaftLogEntry(kv storage.MVCCKeyValue) (string, error) {
var ent raftpb.Entry
if err := maybeUnmarshalInline(kv.Value, &ent); err != nil {
e, err := raftlog.NewEntryFromRawValue(kv.Value)
if err != nil {
return "", err
}
defer e.Release()

var cmd kvserverpb.RaftCommand
switch ent.Type {
case raftpb.EntryNormal:
if len(ent.Data) == 0 {
return fmt.Sprintf("%s: EMPTY\n", &ent), nil
}
_, cmdData := kvserverbase.DecodeRaftCommand(ent.Data)
if err := protoutil.Unmarshal(cmdData, &cmd); err != nil {
return "", err
}
case raftpb.EntryConfChange, raftpb.EntryConfChangeV2:
var c raftpb.ConfChangeI
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
if err := protoutil.Unmarshal(ent.Data, &cc); err != nil {
return "", err
}
c = cc
} else {
var cc raftpb.ConfChangeV2
if err := protoutil.Unmarshal(ent.Data, &cc); err != nil {
return "", err
}
c = cc
}

var ctx kvserverpb.ConfChangeContext
if err := protoutil.Unmarshal(c.AsV2().Context, &ctx); err != nil {
return "", err
}
if err := protoutil.Unmarshal(ctx.Payload, &cmd); err != nil {
return "", err
}
default:
return "", fmt.Errorf("unknown log entry type: %s", &ent)
if len(e.Data) == 0 {
return fmt.Sprintf("%s: EMPTY\n", &e.Entry), nil
}
ent.Data = nil
e.Data = nil
cmd := e.Cmd

var leaseStr string
if l := cmd.DeprecatedProposerLease; l != nil {
leaseStr = l.String() // use full lease, if available
leaseStr = l.String() // use the full lease, if available
} else {
leaseStr = fmt.Sprintf("lease #%d", cmd.ProposerLeaseSequence)
}
Expand All @@ -262,7 +231,7 @@ func tryRaftLogEntry(kv storage.MVCCKeyValue) (string, error) {
}
cmd.WriteBatch = nil

return fmt.Sprintf("%s by %s\n%s\nwrite batch:\n%s", &ent, leaseStr, &cmd, wbStr), nil
return fmt.Sprintf("%s by %s\n%s\nwrite batch:\n%s", &e.Entry, leaseStr, &cmd, wbStr), nil
}

func tryTxn(kv storage.MVCCKeyValue) (string, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvserverbase/raftversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func DecodeRaftCommand(data []byte) (CmdIDKey, []byte) {
// raft log entries. No correctness guarantees are provided.
//
// See: #75729
//
// TODO(tbg): this method can go away since EncodeRaftCommand is exported, too.
func EncodeTestRaftCommand(command []byte, commandID CmdIDKey) []byte {
return EncodeRaftCommand(RaftVersionStandard, commandID, command)
}
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/loqrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@ go_library(
deps = [
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb:with-mocks",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
)

Expand Down
92 changes: 9 additions & 83 deletions pkg/kv/kvserver/loqrecovery/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@ import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
)

// CollectReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery.
Expand Down Expand Up @@ -80,88 +75,15 @@ func CollectReplicaInfo(
return loqrecoverypb.NodeReplicaInfo{Replicas: replicas}, nil
}

// GetDescriptorChangesFromRaftLog iterates over raft log between indicies
// GetDescriptorChangesFromRaftLog iterates over raft log between indices
// lo (inclusive) and hi (exclusive) and searches for changes to range
// descriptor. Changes are identified by commit trigger content which is
// extracted either from EntryNormal where change updates key range info
// (split/merge) or from EntryConfChange* for changes in replica set.
// descriptors, as identified by presence of a commit trigger.
func GetDescriptorChangesFromRaftLog(
rangeID roachpb.RangeID, lo, hi uint64, reader storage.Reader,
) ([]loqrecoverypb.DescriptorChangeInfo, error) {
var changes []loqrecoverypb.DescriptorChangeInfo

key := keys.RaftLogKey(rangeID, lo)
endKey := keys.RaftLogKey(rangeID, hi)
iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
UpperBound: endKey,
})
defer iter.Close()

var meta enginepb.MVCCMetadata
var ent raftpb.Entry

decodeRaftChange := func(ccI raftpb.ConfChangeI) ([]byte, error) {
var ccC kvserverpb.ConfChangeContext
if err := protoutil.Unmarshal(ccI.AsV2().Context, &ccC); err != nil {
return nil, errors.Wrap(err, "while unmarshaling CCContext")
}
return ccC.Payload, nil
}

iter.SeekGE(storage.MakeMVCCMetadataKey(key))
for ; ; iter.Next() {
ok, err := iter.Valid()
if err != nil {
return nil, err
}
if !ok {
return changes, nil
}
if err := protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil {
return nil, errors.Wrap(err, "unable to decode raft log MVCCMetadata")
}
if err := storage.MakeValue(meta).GetProto(&ent); err != nil {
return nil, errors.Wrap(err, "unable to unmarshal raft Entry")
}
if len(ent.Data) == 0 {
continue
}
// Following code extracts our raft command from raft log entry. Depending
// on entry type we either need to extract encoded command from configuration
// change (for replica changes) or from normal command (for splits and
// merges).
var payload []byte
switch ent.Type {
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := protoutil.Unmarshal(ent.Data, &cc); err != nil {
return nil, errors.Wrap(err, "while unmarshaling ConfChange")
}
payload, err = decodeRaftChange(cc)
if err != nil {
return nil, err
}
case raftpb.EntryConfChangeV2:
var cc raftpb.ConfChangeV2
if err := protoutil.Unmarshal(ent.Data, &cc); err != nil {
return nil, errors.Wrap(err, "while unmarshaling ConfChangeV2")
}
payload, err = decodeRaftChange(cc)
if err != nil {
return nil, err
}
case raftpb.EntryNormal:
_, payload = kvserverbase.DecodeRaftCommand(ent.Data)
default:
continue
}
if len(payload) == 0 {
continue
}
var raftCmd kvserverpb.RaftCommand
if err := protoutil.Unmarshal(payload, &raftCmd); err != nil {
return nil, errors.Wrap(err, "unable to unmarshal raft command")
}
if err := raftlog.Visit(context.Background(), rangeID, reader, lo, hi, func(ctx context.Context, e *raftlog.Entry) error {
raftCmd := e.Cmd
switch {
case raftCmd.ReplicatedEvalResult.Split != nil:
changes = append(changes,
Expand All @@ -183,5 +105,9 @@ func GetDescriptorChangesFromRaftLog(
Desc: raftCmd.ReplicatedEvalResult.ChangeReplicas.Desc,
})
}
return nil
}); err != nil {
return nil, err
}
return changes, nil
}
42 changes: 42 additions & 0 deletions pkg/kv/kvserver/raftlog/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "raftlog",
srcs = [
"entry.go",
"iterator.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/iterutil",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
)

go_test(
name = "raftlog_test",
srcs = ["iterator_test.go"],
embed = [":raftlog"],
deps = [
"//pkg/keys",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
)
Loading

0 comments on commit 8bea106

Please sign in to comment.