From ee693515f4d75ef8eb13f1efa1c74b9662c2e5bb Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Sat, 11 Aug 2018 21:47:38 -0700 Subject: [PATCH] etcdserver/api/rafthttp: add v3 snapshot send/receive metrics etcd_network_snapshot_send etcd_network_snapshot_send_failures etcd_network_snapshot_send_total_duration_seconds etcd_network_snapshot_receive etcd_network_snapshot_receive_failures etcd_network_snapshot_receive_total_duration_seconds Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/http.go | 29 ++++++--- etcdserver/api/rafthttp/metrics.go | 70 ++++++++++++++++++++++ etcdserver/api/rafthttp/snapshot_sender.go | 17 ++++-- 3 files changed, 103 insertions(+), 13 deletions(-) diff --git a/etcdserver/api/rafthttp/http.go b/etcdserver/api/rafthttp/http.go index 6f49ca146542..7111638af80d 100644 --- a/etcdserver/api/rafthttp/http.go +++ b/etcdserver/api/rafthttp/http.go @@ -22,6 +22,7 @@ import ( "net/http" "path" "strings" + "time" "github.com/coreos/etcd/etcdserver/api/snap" pioutil "github.com/coreos/etcd/pkg/ioutil" @@ -195,9 +196,12 @@ func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid // received and processed. // 2. this case should happen rarely, so no further optimization is done. func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + start := time.Now() + if r.Method != "POST" { w.Header().Set("Allow", "POST") http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + snapshotReceiveFailures.WithLabelValues("").Inc() return } @@ -205,6 +209,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil { http.Error(w, err.Error(), http.StatusPreconditionFailed) + snapshotReceiveFailures.WithLabelValues("").Inc() return } @@ -213,13 +218,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { dec := &messageDecoder{r: r.Body} // let snapshots be very large since they can exceed 512MB for large installations m, err := dec.decodeLimit(uint64(1 << 63)) + from := types.ID(m.From).String() if err != nil { msg := fmt.Sprintf("failed to decode raft message (%v)", err) if h.lg != nil { h.lg.Warn( "failed to decode Raft message", zap.String("local-member-id", h.localID.String()), - zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.String("remote-snapshot-sender-id", from), zap.Error(err), ) } else { @@ -227,24 +233,26 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } http.Error(w, msg, http.StatusBadRequest) recvFailures.WithLabelValues(r.RemoteAddr).Inc() + snapshotReceiveFailures.WithLabelValues(from).Inc() return } msgSize := m.Size() - receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(msgSize)) + receivedBytes.WithLabelValues(from).Add(float64(msgSize)) if m.Type != raftpb.MsgSnap { if h.lg != nil { h.lg.Warn( "unexpected Raft message type", zap.String("local-member-id", h.localID.String()), - zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.String("remote-snapshot-sender-id", from), zap.String("message-type", m.Type.String()), ) } else { plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) } http.Error(w, "wrong raft message type", http.StatusBadRequest) + snapshotReceiveFailures.WithLabelValues(from).Inc() return } @@ -252,7 +260,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.lg.Info( "receiving database snapshot", zap.String("local-member-id", h.localID.String()), - zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.String("remote-snapshot-sender-id", from), zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index), zap.Int("incoming-snapshot-message-size-bytes", msgSize), zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))), @@ -269,7 +277,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.lg.Warn( "failed to save incoming database snapshot", zap.String("local-member-id", h.localID.String()), - zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.String("remote-snapshot-sender-id", from), zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index), zap.Error(err), ) @@ -277,16 +285,17 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { plog.Error(msg) } http.Error(w, msg, http.StatusInternalServerError) + snapshotReceiveFailures.WithLabelValues(from).Inc() return } - receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n)) + receivedBytes.WithLabelValues(from).Add(float64(n)) if h.lg != nil { h.lg.Info( "received and saved database snapshot", zap.String("local-member-id", h.localID.String()), - zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.String("remote-snapshot-sender-id", from), zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index), zap.Int64("incoming-snapshot-size-bytes", n), zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))), @@ -307,13 +316,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.lg.Warn( "failed to process Raft message", zap.String("local-member-id", h.localID.String()), - zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.String("remote-snapshot-sender-id", from), zap.Error(err), ) } else { plog.Error(msg) } http.Error(w, msg, http.StatusInternalServerError) + snapshotReceiveFailures.WithLabelValues(from).Inc() } return } @@ -321,6 +331,9 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Write StatusNoContent header after the message has been processed by // raft, which facilitates the client to report MsgSnap status. w.WriteHeader(http.StatusNoContent) + + snapshotReceive.WithLabelValues(from).Inc() + snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds()) } type streamHandler struct { diff --git a/etcdserver/api/rafthttp/metrics.go b/etcdserver/api/rafthttp/metrics.go index 4df45d35e906..f7a31bd2301f 100644 --- a/etcdserver/api/rafthttp/metrics.go +++ b/etcdserver/api/rafthttp/metrics.go @@ -71,6 +71,68 @@ var ( []string{"From"}, ) + snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_send", + Help: "Total number of snapshot sends", + }, + []string{"To"}, + ) + + snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_send_failures", + Help: "Total number of snapshot send failures", + }, + []string{"To"}, + ) + + snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_send_total_duration_seconds", + Help: "Total latency distributions of v3 snapshot sends", + + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^13 == 8.192 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), + }, + []string{"To"}, + ) + + snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_receive", + Help: "Total number of snapshot receives", + }, + []string{"From"}, + ) + + snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_receive_failures", + Help: "Total number of snapshot receive failures", + }, + []string{"From"}, + ) + + snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_receive_total_duration_seconds", + Help: "Total latency distributions of v3 snapshot receives", + + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^13 == 8.192 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), + }, + []string{"From"}, + ) + rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "etcd", Subsystem: "network", @@ -92,5 +154,13 @@ func init() { prometheus.MustRegister(receivedBytes) prometheus.MustRegister(sentFailures) prometheus.MustRegister(recvFailures) + + prometheus.MustRegister(snapshotSend) + prometheus.MustRegister(snapshotSendFailures) + prometheus.MustRegister(snapshotSendSeconds) + prometheus.MustRegister(snapshotReceive) + prometheus.MustRegister(snapshotReceiveFailures) + prometheus.MustRegister(snapshotReceiveSeconds) + prometheus.MustRegister(rttSec) } diff --git a/etcdserver/api/rafthttp/snapshot_sender.go b/etcdserver/api/rafthttp/snapshot_sender.go index 675de33e18bb..fec6238e6d28 100644 --- a/etcdserver/api/rafthttp/snapshot_sender.go +++ b/etcdserver/api/rafthttp/snapshot_sender.go @@ -67,7 +67,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe func (s *snapshotSender) stop() { close(s.stopc) } func (s *snapshotSender) send(merged snap.Message) { + start := time.Now() + m := merged.Message + to := types.ID(m.To).String() body := createSnapBody(s.tr.Logger, merged) defer body.Close() @@ -79,7 +82,7 @@ func (s *snapshotSender) send(merged snap.Message) { s.tr.Logger.Info( "sending database snapshot", zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index), - zap.String("remote-peer-id", types.ID(m.To).String()), + zap.String("remote-peer-id", to), zap.Int64("bytes", merged.TotalSize), zap.String("size", humanize.Bytes(uint64(merged.TotalSize))), ) @@ -94,7 +97,7 @@ func (s *snapshotSender) send(merged snap.Message) { s.tr.Logger.Warn( "failed to send database snapshot", zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index), - zap.String("remote-peer-id", types.ID(m.To).String()), + zap.String("remote-peer-id", to), zap.Int64("bytes", merged.TotalSize), zap.String("size", humanize.Bytes(uint64(merged.TotalSize))), zap.Error(err), @@ -116,7 +119,8 @@ func (s *snapshotSender) send(merged snap.Message) { // machine knows about it, it would pause a while and retry sending // new snapshot message. s.r.ReportSnapshot(m.To, raft.SnapshotFailure) - sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() + sentFailures.WithLabelValues(to).Inc() + snapshotSendFailures.WithLabelValues(to).Inc() return } s.status.activate() @@ -126,7 +130,7 @@ func (s *snapshotSender) send(merged snap.Message) { s.tr.Logger.Info( "sent database snapshot", zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index), - zap.String("remote-peer-id", types.ID(m.To).String()), + zap.String("remote-peer-id", to), zap.Int64("bytes", merged.TotalSize), zap.String("size", humanize.Bytes(uint64(merged.TotalSize))), ) @@ -134,7 +138,10 @@ func (s *snapshotSender) send(merged snap.Message) { plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To)) } - sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize)) + sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize)) + + snapshotSend.WithLabelValues(to).Inc() + snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds()) } // post posts the given request.