Skip to content

Commit

Permalink
raftlog: introduce & use in loqrecovery, kvserver
Browse files Browse the repository at this point in the history
This commit introduces a new package `raftlog`. Aspirationally, this
will at some point become the de facto way of encapsulating the raft log
encoding and may play a role in programmatically constructing and
inspecting raft logs (e.g.  for testing).

For now, we introduce two concepts:

- `raftlog.Entry`, which wraps a `raftpb.Entry` and all of the
  information derived from it, such as the command ID, the
`kvserverpb.RaftCommand`, the configuration change (if any), etc.

- `raftlog.Iterator`, a way to iterate over the raft log in terms of
  `raftlog.Entry` (as opposed to `raftpb.Entry` which requires lots of
manual processing).

Both are then applied across the codebase, concretely:

- `loqrecovery` is simplified via `raftlog.Iterator` to pull commit
  triggers out of the raft log.
- debug pretty-printing is simpler thanks to use of `raftlog.Entry`.
- `decodedRaftEntry` is now structurally a `raftpb.Entry`, and again
  lots manual custom unmarshaling code evaporates.

It's currently difficult to create "interesting" raft log entries if
trying to stay away from manual population of large datastructures
(which is prone to rotting), so there's zero unit testing of
`raftlog.Iterator`. However, the code is not new, instead it was
deduplicated from a few places, and is now tested through all of them;
so I don't feel to bad about it. I still think it is a priority to be
able to "comfortably" create at least "simple" raft logs, meaning we
need to be able to string together `batcheval` and entry creation at
least in a rudimentary fashion. I intend to look into this next and
add comprehensive unit tests for `raftlog.{Entry,Iterator}`.

Touches cockroachdb#75729.

Release note: None
  • Loading branch information
tbg committed Feb 6, 2022
1 parent 4ea464d commit 49a0636
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 318 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ go_library(
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary",
Expand Down Expand Up @@ -336,6 +337,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptstorage",
"//pkg/kv/kvserver/protectedts/ptverifier",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/spanset",
Expand Down
50 changes: 9 additions & 41 deletions pkg/kv/kvserver/debug_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -206,52 +206,20 @@ func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) {
}

func tryRaftLogEntry(kv storage.MVCCKeyValue) (string, error) {
var ent raftpb.Entry
if err := maybeUnmarshalInline(kv.Value, &ent); err != nil {
var e raftlog.Entry
if err := e.LoadFromRawValue(kv.Value); err != nil {
return "", err
}

var cmd kvserverpb.RaftCommand
switch ent.Type {
case raftpb.EntryNormal:
if len(ent.Data) == 0 {
return fmt.Sprintf("%s: EMPTY\n", &ent), nil
}
_, cmdData := kvserverbase.DecodeRaftCommand(ent.Data)
if err := protoutil.Unmarshal(cmdData, &cmd); err != nil {
return "", err
}
case raftpb.EntryConfChange, raftpb.EntryConfChangeV2:
var c raftpb.ConfChangeI
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
if err := protoutil.Unmarshal(ent.Data, &cc); err != nil {
return "", err
}
c = cc
} else {
var cc raftpb.ConfChangeV2
if err := protoutil.Unmarshal(ent.Data, &cc); err != nil {
return "", err
}
c = cc
}

var ctx kvserverpb.ConfChangeContext
if err := protoutil.Unmarshal(c.AsV2().Context, &ctx); err != nil {
return "", err
}
if err := protoutil.Unmarshal(ctx.Payload, &cmd); err != nil {
return "", err
}
default:
return "", fmt.Errorf("unknown log entry type: %s", &ent)
if len(e.Ent.Data) == 0 {
return fmt.Sprintf("%s: EMPTY\n", &e.Ent), nil
}
ent.Data = nil
e.Ent.Data = nil
cmd := e.Cmd

var leaseStr string
if l := cmd.DeprecatedProposerLease; l != nil {
leaseStr = l.String() // use full lease, if available
leaseStr = l.String() // use the full lease, if available
} else {
leaseStr = fmt.Sprintf("lease #%d", cmd.ProposerLeaseSequence)
}
Expand All @@ -262,7 +230,7 @@ func tryRaftLogEntry(kv storage.MVCCKeyValue) (string, error) {
}
cmd.WriteBatch = nil

return fmt.Sprintf("%s by %s\n%s\nwrite batch:\n%s", &ent, leaseStr, &cmd, wbStr), nil
return fmt.Sprintf("%s by %s\n%s\nwrite batch:\n%s", &e.Ent, leaseStr, &cmd, wbStr), nil
}

func tryTxn(kv storage.MVCCKeyValue) (string, error) {
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/loqrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@ go_library(
deps = [
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb:with-mocks",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
)

Expand Down
87 changes: 10 additions & 77 deletions pkg/kv/kvserver/loqrecovery/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@ import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
)

// CollectReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery.
Expand Down Expand Up @@ -80,88 +75,25 @@ func CollectReplicaInfo(
return loqrecoverypb.NodeReplicaInfo{Replicas: replicas}, nil
}

// GetDescriptorChangesFromRaftLog iterates over raft log between indicies
// GetDescriptorChangesFromRaftLog iterates over raft log between indices
// lo (inclusive) and hi (exclusive) and searches for changes to range
// descriptor. Changes are identified by commit trigger content which is
// extracted either from EntryNormal where change updates key range info
// (split/merge) or from EntryConfChange* for changes in replica set.
// descriptors, as identified by presence of a commit trigger.
func GetDescriptorChangesFromRaftLog(
rangeID roachpb.RangeID, lo, hi uint64, reader storage.Reader,
) ([]loqrecoverypb.DescriptorChangeInfo, error) {
it := raftlog.NewIterator(rangeID, reader)
defer it.Close()
it.Seek(lo, hi)
var changes []loqrecoverypb.DescriptorChangeInfo

key := keys.RaftLogKey(rangeID, lo)
endKey := keys.RaftLogKey(rangeID, hi)
iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
UpperBound: endKey,
})
defer iter.Close()

var meta enginepb.MVCCMetadata
var ent raftpb.Entry

decodeRaftChange := func(ccI raftpb.ConfChangeI) ([]byte, error) {
var ccC kvserverpb.ConfChangeContext
if err := protoutil.Unmarshal(ccI.AsV2().Context, &ccC); err != nil {
return nil, errors.Wrap(err, "while unmarshaling CCContext")
}
return ccC.Payload, nil
}

iter.SeekGE(storage.MakeMVCCMetadataKey(key))
for ; ; iter.Next() {
ok, err := iter.Valid()
for ; ; it.Next() {
ok, err := it.Valid()
if err != nil {
return nil, err
}
if !ok {
return changes, nil
}
if err := protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil {
return nil, errors.Wrap(err, "unable to decode raft log MVCCMetadata")
}
if err := storage.MakeValue(meta).GetProto(&ent); err != nil {
return nil, errors.Wrap(err, "unable to unmarshal raft Entry")
}
if len(ent.Data) == 0 {
continue
}
// Following code extracts our raft command from raft log entry. Depending
// on entry type we either need to extract encoded command from configuration
// change (for replica changes) or from normal command (for splits and
// merges).
var payload []byte
switch ent.Type {
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := protoutil.Unmarshal(ent.Data, &cc); err != nil {
return nil, errors.Wrap(err, "while unmarshaling ConfChange")
}
payload, err = decodeRaftChange(cc)
if err != nil {
return nil, err
}
case raftpb.EntryConfChangeV2:
var cc raftpb.ConfChangeV2
if err := protoutil.Unmarshal(ent.Data, &cc); err != nil {
return nil, errors.Wrap(err, "while unmarshaling ConfChangeV2")
}
payload, err = decodeRaftChange(cc)
if err != nil {
return nil, err
}
case raftpb.EntryNormal:
_, payload = kvserverbase.DecodeRaftCommand(ent.Data)
default:
continue
}
if len(payload) == 0 {
continue
}
var raftCmd kvserverpb.RaftCommand
if err := protoutil.Unmarshal(payload, &raftCmd); err != nil {
return nil, errors.Wrap(err, "unable to unmarshal raft command")
}
raftCmd := it.UnsafeEntry().Cmd
switch {
case raftCmd.ReplicatedEvalResult.Split != nil:
changes = append(changes,
Expand All @@ -184,4 +116,5 @@ func GetDescriptorChangesFromRaftLog(
})
}
}
return changes, nil
}
22 changes: 22 additions & 0 deletions pkg/kv/kvserver/raftlog/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "raftlog",
srcs = [
"entry.go",
"iterator.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb:with-mocks",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
)
101 changes: 101 additions & 0 deletions pkg/kv/kvserver/raftlog/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package raftlog

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
)

// Entry contains data related to the current raft log entry.
type Entry struct {
Meta enginepb.MVCCMetadata // optional
Ent raftpb.Entry
ID kvserverbase.CmdIDKey
CC1 *raftpb.ConfChange
CC2 *raftpb.ConfChangeV2
CCC *kvserverpb.ConfChangeContext
Cmd kvserverpb.RaftCommand
}

func (e *Entry) CC() raftpb.ConfChangeI {
if e.CC1 != nil {
return e.CC1
}
if e.CC2 != nil {
return e.CC2
}
// NB: nil != interface{}(nil).
return nil
}

func (e *Entry) LoadFromRawValue(b []byte) error {
if err := protoutil.Unmarshal(b, &e.Meta); err != nil {
return errors.Wrap(err, "decoding raft log MVCCMetadata")
}

if err := storage.MakeValue(e.Meta).GetProto(&e.Ent); err != nil {
return errors.Wrap(err, "unmarshalling raft Entry")
}

return e.load()
}

func (e *Entry) Load() error {
e.Meta = enginepb.MVCCMetadata{}
return e.load()
}

func (e *Entry) load() error {
if len(e.Ent.Data) == 0 {
// Raft-proposed empty entry.
return nil
}

var payload []byte
switch e.Ent.Type {
case raftpb.EntryNormal:
e.ID, payload = kvserverbase.DecodeRaftCommand(e.Ent.Data)
case raftpb.EntryConfChange:
e.CC1 = &raftpb.ConfChange{}
if err := protoutil.Unmarshal(e.Ent.Data, e.CC1); err != nil {
return errors.Wrap(err, "unmarshalling ConfChange")
}
e.CCC = &kvserverpb.ConfChangeContext{}
if err := protoutil.Unmarshal(e.CC1.Context, e.CCC); err != nil {
return errors.Wrap(err, "unmarshalling ConfChangeContext")
}
payload = e.CCC.Payload
e.ID = kvserverbase.CmdIDKey(e.CCC.CommandID)
case raftpb.EntryConfChangeV2:
e.CC2 = &raftpb.ConfChangeV2{}
if err := protoutil.Unmarshal(e.Ent.Data, e.CC2); err != nil {
return errors.Wrap(err, "unmarshalling ConfChangeV2")
}
e.CCC = &kvserverpb.ConfChangeContext{}
if err := protoutil.Unmarshal(e.CC2.Context, e.CCC); err != nil {
return errors.Wrap(err, "unmarshalling ConfChangeContext")
}
payload = e.CCC.Payload
e.ID = kvserverbase.CmdIDKey(e.CCC.CommandID)
default:
return errors.AssertionFailedf("unknown entry type %d", e.Ent.Type)
}

// TODO(tbg): can len(payload)==0 if we propose an empty command to wake up leader?
// If so, is that a problem here?
return errors.Wrap(protoutil.Unmarshal(payload, &e.Cmd), "unmarshalling RaftCommand")
}
Loading

0 comments on commit 49a0636

Please sign in to comment.