Skip to content

Commit

Permalink
kvserver: add metric and log when raft.Storage returns an error
Browse files Browse the repository at this point in the history
The raft.storage.error metric is incremented on an error, and the error
is logged every 30s (across all replicas).

This was motivated by a test cluster that slowed to a crawl because of
deliberate data loss, but was hard to diagnose. The metric could be used
for alerting, since we don't expect to see transient errors.

Informs #113053

Epic: none

Release note: None
  • Loading branch information
sumeerbhola committed Oct 27, 2023
1 parent 39ea7d8 commit 0618e95
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,12 @@ cache will already have moved on to newer entries.
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftStorageError = metric.Metadata{
Name: "raft.storage.error",
Help: "Number of calls to the raft.Storage API that returned an error",
Measurement: "Error Count",
Unit: metric.Unit_COUNT,
}

// Raft message metrics.
metaRaftRcvdProp = metric.Metadata{
Expand Down Expand Up @@ -2503,6 +2509,7 @@ type StoreMetrics struct {
RaftSchedulerLatency metric.IHistogram
RaftTimeoutCampaign *metric.Counter
RaftStorageReadBytes *metric.Counter
RaftStorageError *metric.Counter
WALBytesWritten *metric.Gauge
WALBytesIn *metric.Gauge

Expand Down Expand Up @@ -3205,6 +3212,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
}),
RaftTimeoutCampaign: metric.NewCounter(metaRaftTimeoutCampaign),
RaftStorageReadBytes: metric.NewCounter(metaRaftStorageReadBytes),
RaftStorageError: metric.NewCounter(metaRaftStorageError),

// Raft message metrics.
RaftRcvdMessages: [maxRaftMsgType + 1]*metric.Counter{
Expand Down
27 changes: 26 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,
hs, err := r.mu.stateLoader.LoadHardState(ctx, r.store.TODOEngine())
// For uninitialized ranges, membership is unknown at this point.
if raft.IsEmptyHardState(hs) || err != nil {
if err != nil {
r.reportRaftStorageError(err)
}
return raftpb.HardState{}, raftpb.ConfState{}, err
}
cs := r.mu.state.Desc.Replicas().ConfState()
Expand All @@ -97,7 +100,11 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,
//
// Entries can return log entries that are not yet stable in durable storage.
func (r *replicaRaftStorage) Entries(lo, hi uint64, maxBytes uint64) ([]raftpb.Entry, error) {
return r.TypedEntries(kvpb.RaftIndex(lo), kvpb.RaftIndex(hi), maxBytes)
entries, err := r.TypedEntries(kvpb.RaftIndex(lo), kvpb.RaftIndex(hi), maxBytes)
if err != nil {
r.reportRaftStorageError(err)
}
return entries, err
}

func (r *replicaRaftStorage) TypedEntries(
Expand Down Expand Up @@ -128,6 +135,9 @@ const invalidLastTerm = 0
// Term implements the raft.Storage interface.
func (r *replicaRaftStorage) Term(i uint64) (uint64, error) {
term, err := r.TypedTerm(kvpb.RaftIndex(i))
if err != nil {
r.reportRaftStorageError(err)
}
return uint64(term), err
}

Expand Down Expand Up @@ -166,6 +176,9 @@ func (r *Replica) raftLastIndexRLocked() kvpb.RaftIndex {
// LastIndex requires that r.mu is held for reading.
func (r *replicaRaftStorage) LastIndex() (uint64, error) {
index, err := r.TypedLastIndex()
if err != nil {
r.reportRaftStorageError(err)
}
return uint64(index), err
}

Expand All @@ -190,6 +203,9 @@ func (r *Replica) raftFirstIndexRLocked() kvpb.RaftIndex {
// FirstIndex requires that r.mu is held for reading.
func (r *replicaRaftStorage) FirstIndex() (uint64, error) {
index, err := r.TypedFirstIndex()
if err != nil {
r.reportRaftStorageError(err)
}
return uint64(index), err
}

Expand Down Expand Up @@ -237,6 +253,15 @@ func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) {
}, nil
}

var raftStorageErrorLogger = log.Every(30 * time.Second)

func (r *replicaRaftStorage) reportRaftStorageError(err error) {
if raftStorageErrorLogger.ShouldLog() {
log.Errorf(r.raftCtx, "error in raft.Storage %v", err)
}
r.store.metrics.RaftStorageError.Inc(1)
}

// raftSnapshotLocked requires that r.mu is held for writing.
func (r *Replica) raftSnapshotLocked() (raftpb.Snapshot, error) {
return (*replicaRaftStorage)(r).Snapshot()
Expand Down

0 comments on commit 0618e95

Please sign in to comment.