diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index 60e5395a300..733de474135 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -78,6 +78,12 @@ var ( Name: "proposals_failed_total", Help: "The total number of failed proposals seen.", }) + slowReadIndex = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "slow_read_indexes_total", + Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.", + }) leaseExpired = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "etcd_debugging", Subsystem: "server", @@ -109,6 +115,7 @@ func init() { prometheus.MustRegister(proposalsApplied) prometheus.MustRegister(proposalsPending) prometheus.MustRegister(proposalsFailed) + prometheus.MustRegister(slowReadIndex) prometheus.MustRegister(leaseExpired) prometheus.MustRegister(quotaBackendBytes) prometheus.MustRegister(currentVersion) diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 0a6fa7df4a8..48c345d72f6 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "encoding/binary" - "fmt" "time" "github.com/coreos/etcd/auth" @@ -632,8 +631,9 @@ func (s *EtcdServer) linearizableReadLoop() { var rs raft.ReadState for { - ctx := make([]byte, 8) - binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next()) + ctxToSend := make([]byte, 8) + id1 := s.reqIDGen.Next() + binary.BigEndian.PutUint64(ctxToSend, id1) select { case <-s.readwaitc: @@ -650,7 +650,7 @@ func (s *EtcdServer) linearizableReadLoop() { lg := s.getLogger() cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - if err := s.r.ReadIndex(cctx, ctx); err != nil { + if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { cancel() if err == raft.ErrStopped { return @@ -672,19 +672,24 @@ func (s *EtcdServer) linearizableReadLoop() { for !timeout && !done { select { case rs = <-s.r.readStateC: - done = bytes.Equal(rs.RequestCtx, ctx) + done = bytes.Equal(rs.RequestCtx, ctxToSend) if !done { // a previous request might time out. now we should ignore the response of it and // continue waiting for the response of the current requests. + id2 := uint64(0) + if len(rs.RequestCtx) == 8 { + id2 = binary.BigEndian.Uint64(rs.RequestCtx) + } if lg != nil { lg.Warn( - "ignored out-of-date read index response", - zap.String("ctx-expected", fmt.Sprintf("%+v", string(rs.RequestCtx))), - zap.String("ctx-got", fmt.Sprintf("%+v", string(ctx))), + "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", + zap.Uint64("sent-request-id", id1), + zap.Uint64("received-request-id", id2), ) } else { - plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx) + plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2) } + slowReadIndex.Inc() } case <-time.After(s.Cfg.ReqTimeout()): if lg != nil { @@ -694,6 +699,7 @@ func (s *EtcdServer) linearizableReadLoop() { } nr.notify(ErrTimeout) timeout = true + slowReadIndex.Inc() case <-s.stopping: return }