diff --git a/rafthttp/http.go b/rafthttp/http.go index 55df26e9b75..1b02d7296d4 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -21,6 +21,7 @@ import ( "net/http" "path" "strings" + "time" pioutil "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/types" @@ -153,6 +154,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c } } +const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER" + // ServeHTTP serves HTTP request to receive and process snapshot message. // // If request sender dies without closing underlying TCP connection, @@ -163,9 +166,12 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c // received and processed. // 2. this case should happen rarely, so no further optimization is done. func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + start := time.Now() + if r.Method != "POST" { w.Header().Set("Allow", "POST") http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc() return } @@ -173,6 +179,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { http.Error(w, err.Error(), http.StatusPreconditionFailed) + snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc() return } @@ -185,19 +192,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { dec := &messageDecoder{r: r.Body} // let snapshots be very large since they can exceed 512MB for large installations m, err := dec.decodeLimit(uint64(1 << 63)) + from := types.ID(m.From).String() if err != nil { msg := fmt.Sprintf("failed to decode raft message (%v)", err) plog.Errorf(msg) http.Error(w, msg, http.StatusBadRequest) recvFailures.WithLabelValues(r.RemoteAddr).Inc() + snapshotReceiveFailures.WithLabelValues(from).Inc() return } - receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) + receivedBytes.WithLabelValues(from).Add(float64(m.Size())) if m.Type != raftpb.MsgSnap { plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) http.Error(w, "wrong raft message type", http.StatusBadRequest) + snapshotReceiveFailures.WithLabelValues(from).Inc() return } @@ -208,9 +218,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { msg := fmt.Sprintf("failed to save KV snapshot (%v)", err) plog.Error(msg) http.Error(w, msg, http.StatusInternalServerError) + snapshotReceiveFailures.WithLabelValues(from).Inc() return } - receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n)) + receivedBytes.WithLabelValues(from).Add(float64(n)) plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) if err := h.r.Process(context.TODO(), m); err != nil { @@ -223,12 +234,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { msg := fmt.Sprintf("failed to process raft message (%v)", err) plog.Warningf(msg) http.Error(w, msg, http.StatusInternalServerError) + snapshotReceiveFailures.WithLabelValues(from).Inc() } return } // Write StatusNoContent header after the message has been processed by // raft, which facilitates the client to report MsgSnap status. w.WriteHeader(http.StatusNoContent) + + snapshotReceive.WithLabelValues(from).Inc() + snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds()) } type streamHandler struct { diff --git a/rafthttp/metrics.go b/rafthttp/metrics.go index 320bfe72661..2066663c691 100644 --- a/rafthttp/metrics.go +++ b/rafthttp/metrics.go @@ -53,6 +53,68 @@ var ( []string{"From"}, ) + snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_send_success", + Help: "Total number of successful snapshot sends", + }, + []string{"To"}, + ) + + snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_send_failures", + Help: "Total number of snapshot send failures", + }, + []string{"To"}, + ) + + snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_send_total_duration_seconds", + Help: "Total latency distributions of v3 snapshot sends", + + // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2 + // highest bucket start of 0.1 sec * 2^9 == 51.2 sec + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + }, + []string{"To"}, + ) + + snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_receive_success", + Help: "Total number of successful snapshot receives", + }, + []string{"From"}, + ) + + snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_receive_failures", + Help: "Total number of snapshot receive failures", + }, + []string{"From"}, + ) + + snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "network", + Name: "snapshot_receive_total_duration_seconds", + Help: "Total latency distributions of v3 snapshot receives", + + // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2 + // highest bucket start of 0.1 sec * 2^9 == 51.2 sec + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + }, + []string{"From"}, + ) + rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "etcd", Subsystem: "network", @@ -69,5 +131,13 @@ func init() { prometheus.MustRegister(receivedBytes) prometheus.MustRegister(sentFailures) prometheus.MustRegister(recvFailures) + + prometheus.MustRegister(snapshotSend) + prometheus.MustRegister(snapshotSendFailures) + prometheus.MustRegister(snapshotSendSeconds) + prometheus.MustRegister(snapshotReceive) + prometheus.MustRegister(snapshotReceiveFailures) + prometheus.MustRegister(snapshotReceiveSeconds) + prometheus.MustRegister(rtts) } diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 52273c9d195..24eb53553f5 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -64,7 +64,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe func (s *snapshotSender) stop() { close(s.stopc) } func (s *snapshotSender) send(merged snap.Message) { + start := time.Now() + m := merged.Message + to := types.ID(m.To).String() body := createSnapBody(merged) defer body.Close() @@ -92,14 +95,18 @@ func (s *snapshotSender) send(merged snap.Message) { // machine knows about it, it would pause a while and retry sending // new snapshot message. s.r.ReportSnapshot(m.To, raft.SnapshotFailure) - sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() + sentFailures.WithLabelValues(to).Inc() + snapshotSendFailures.WithLabelValues(to).Inc() return } s.status.activate() s.r.ReportSnapshot(m.To, raft.SnapshotFinish) plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To)) - sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize)) + sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize)) + + snapshotSend.WithLabelValues(to).Inc() + snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds()) } // post posts the given request.