Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add v3 snapshot metrics (fsync, network) #9997

Merged
merged 2 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions etcdserver/api/rafthttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"path"
"strings"
"time"

"github.com/coreos/etcd/etcdserver/api/snap"
pioutil "github.com/coreos/etcd/pkg/ioutil"
Expand Down Expand Up @@ -185,6 +186,8 @@ func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid
}
}

const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"

// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
Expand All @@ -195,16 +198,20 @@ func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()

if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}

w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())

if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}

Expand All @@ -213,46 +220,49 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
dec := &messageDecoder{r: r.Body}
// let snapshots be very large since they can exceed 512MB for large installations
m, err := dec.decodeLimit(uint64(1 << 63))
from := types.ID(m.From).String()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
if h.lg != nil {
h.lg.Warn(
"failed to decode Raft message",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Error(err),
)
} else {
plog.Error(msg)
}
http.Error(w, msg, http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}

msgSize := m.Size()
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(msgSize))
receivedBytes.WithLabelValues(from).Add(float64(msgSize))

if m.Type != raftpb.MsgSnap {
if h.lg != nil {
h.lg.Warn(
"unexpected Raft message type",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.String("message-type", m.Type.String()),
)
} else {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
}
http.Error(w, "wrong raft message type", http.StatusBadRequest)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}

if h.lg != nil {
h.lg.Info(
"receiving database snapshot",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
zap.Int("incoming-snapshot-message-size-bytes", msgSize),
zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
Expand All @@ -269,24 +279,25 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.lg.Warn(
"failed to save incoming database snapshot",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
zap.Error(err),
)
} else {
plog.Error(msg)
}
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}

receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
receivedBytes.WithLabelValues(from).Add(float64(n))

if h.lg != nil {
h.lg.Info(
"received and saved database snapshot",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
zap.Int64("incoming-snapshot-size-bytes", n),
zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
Expand All @@ -307,20 +318,24 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.lg.Warn(
"failed to process Raft message",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Error(err),
)
} else {
plog.Error(msg)
}
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
}
return
}

// Write StatusNoContent header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)

snapshotReceive.WithLabelValues(from).Inc()
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
}

type streamHandler struct {
Expand Down
70 changes: 70 additions & 0 deletions etcdserver/api/rafthttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,68 @@ var (
[]string{"From"},
)

snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_success",
Help: "Total number of successful snapshot sends",
},
[]string{"To"},
)

snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_failures",
Help: "Total number of snapshot send failures",
},
[]string{"To"},
)

snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot sends",

// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"To"},
)

snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_success",
Help: "Total number of successful snapshot receives",
},
[]string{"From"},
)

snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_failures",
Help: "Total number of snapshot receive failures",
},
[]string{"From"},
)

snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot receives",

// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"From"},
)

rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Expand All @@ -92,5 +154,13 @@ func init() {
prometheus.MustRegister(receivedBytes)
prometheus.MustRegister(sentFailures)
prometheus.MustRegister(recvFailures)

prometheus.MustRegister(snapshotSend)
prometheus.MustRegister(snapshotSendFailures)
prometheus.MustRegister(snapshotSendSeconds)
prometheus.MustRegister(snapshotReceive)
prometheus.MustRegister(snapshotReceiveFailures)
prometheus.MustRegister(snapshotReceiveSeconds)

prometheus.MustRegister(rttSec)
}
17 changes: 12 additions & 5 deletions etcdserver/api/rafthttp/snapshot_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
func (s *snapshotSender) stop() { close(s.stopc) }

func (s *snapshotSender) send(merged snap.Message) {
start := time.Now()

m := merged.Message
to := types.ID(m.To).String()

body := createSnapBody(s.tr.Logger, merged)
defer body.Close()
Expand All @@ -79,7 +82,7 @@ func (s *snapshotSender) send(merged snap.Message) {
s.tr.Logger.Info(
"sending database snapshot",
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
zap.String("remote-peer-id", types.ID(m.To).String()),
zap.String("remote-peer-id", to),
zap.Int64("bytes", merged.TotalSize),
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
)
Expand All @@ -94,7 +97,7 @@ func (s *snapshotSender) send(merged snap.Message) {
s.tr.Logger.Warn(
"failed to send database snapshot",
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
zap.String("remote-peer-id", types.ID(m.To).String()),
zap.String("remote-peer-id", to),
zap.Int64("bytes", merged.TotalSize),
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
zap.Error(err),
Expand All @@ -116,7 +119,8 @@ func (s *snapshotSender) send(merged snap.Message) {
// machine knows about it, it would pause a while and retry sending
// new snapshot message.
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
sentFailures.WithLabelValues(to).Inc()
snapshotSendFailures.WithLabelValues(to).Inc()
return
}
s.status.activate()
Expand All @@ -126,15 +130,18 @@ func (s *snapshotSender) send(merged snap.Message) {
s.tr.Logger.Info(
"sent database snapshot",
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
zap.String("remote-peer-id", types.ID(m.To).String()),
zap.String("remote-peer-id", to),
zap.Int64("bytes", merged.TotalSize),
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
)
} else {
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
}

sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))

snapshotSend.WithLabelValues(to).Inc()
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
}

// post posts the given request.
Expand Down
6 changes: 6 additions & 0 deletions etcdserver/api/snap/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"

"github.com/coreos/etcd/pkg/fileutil"

Expand All @@ -33,14 +34,18 @@ var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
// SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
start := time.Now()

f, err := ioutil.TempFile(s.dir, "tmp")
if err != nil {
return 0, err
}
var n int64
n, err = io.Copy(f, r)
if err == nil {
fsyncStart := time.Now()
err = fileutil.Fsync(f)
snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
}
f.Close()
if err != nil {
Expand Down Expand Up @@ -69,6 +74,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
}

snapDBSaveSec.Observe(time.Since(start).Seconds())
return n, nil
}

Expand Down
24 changes: 24 additions & 0 deletions etcdserver/api/snap/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,34 @@ var (
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})

snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "save_total_duration_seconds",
Help: "The total latency distributions of v3 snapshot save",

// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
})

snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "fsync_duration_seconds",
Help: "The latency distributions of fsyncing .snap.db file",

// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
)

func init() {
prometheus.MustRegister(snapMarshallingSec)
prometheus.MustRegister(snapSaveSec)
prometheus.MustRegister(snapFsyncSec)
prometheus.MustRegister(snapDBSaveSec)
prometheus.MustRegister(snapDBFsyncSec)
}