Skip to content

Commit

Permalink
misc updates for consistency, typos and better error tracking (#33)
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar authored Aug 4, 2020
1 parent 7b3da6b commit 5c1dedb
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 42 deletions.
47 changes: 31 additions & 16 deletions surveyor/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,83 +67,93 @@ func (o *jsAdvisoryOptions) Validate() error {
var (
// API Audit
jsAPIAuditCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_api_audit"),
Name: prometheus.BuildFQName("nats", "jetstream", "api_audit"),
Help: "JetStream API access audit events",
}, []string{"server", "subject", "account"})

jsAPIErrorsCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_api_errors"),
Name: prometheus.BuildFQName("nats", "jetstream", "api_errors"),
Help: "JetStream API Errors Count",
}, []string{"server", "subject", "account"})

// Delivery Exceeded
jsDeliveryExceededCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_delivery_exceeded_count"),
Name: prometheus.BuildFQName("nats", "jetstream", "delivery_exceeded_count"),
Help: "Advisories about JetStream Consumer Delivery Exceeded events",
}, []string{"account", "stream", "consumer"})

jsDeliveryTerminatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_delivery_terminated_count"),
Name: prometheus.BuildFQName("nats", "jetstream", "delivery_terminated_count"),
Help: "Advisories about JetStream Consumer Delivery Terminated events",
}, []string{"account", "stream", "consumer"})

// Ack Samples
jsAckMetricDelay = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_acknowledgement_duration"),
Name: prometheus.BuildFQName("nats", "jetstream", "acknowledgement_duration"),
Help: "How long an Acknowledged message took to be Acknowledged",
}, []string{"account", "stream", "consumer"})

jsAckMetricDeliveries = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_acknowledgement_deliveries"),
Name: prometheus.BuildFQName("nats", "jetstream", "acknowledgement_deliveries"),
Help: "How many times messages took to be delivered and Acknowledged",
}, []string{"account", "stream", "consumer"})

// Misc
jsAdvisoriesGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_advisory_count"),
Name: prometheus.BuildFQName("nats", "jetstream", "advisory_count"),
Help: "Number of JetStream Advisory listeners that are running",
})

jsUnknownAdvisoryCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_unknown_advisories"),
Name: prometheus.BuildFQName("nats", "jetstream", "unknown_advisories"),
Help: "Unsupported JetStream Advisory types received",
}, []string{"schema", "account"})

jsTotalAdvisoryCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "jetstream", "total_advisories"),
Help: "Unsupported JetStream Advisories handled in total",
}, []string{"account"})

jsAdvisoryParseErrorCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "jetstream", "advisory_parse_errors"),
Help: "Number of advisories that could not be parsed",
}, []string{"account"})

// Stream and Consumer actions
jsConsumerActionCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_consumer_actions"),
Name: prometheus.BuildFQName("nats", "jetstream", "consumer_actions"),
Help: "Actions performed on consumers",
}, []string{"account", "stream", "action"})

jsStreamActionCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_stream_actions"),
Name: prometheus.BuildFQName("nats", "jetstream", "stream_actions"),
Help: "Actions performed on streams",
}, []string{"account", "stream", "action"})

// Snapshot create
jsSnapshotSizeCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_snapshot_size_bytes"),
Name: prometheus.BuildFQName("nats", "jetstream", "snapshot_size_bytes"),
Help: "The size of snapshots being created",
}, []string{"account", "stream"})

jsSnapthotDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_snapshot_duration"),
Name: prometheus.BuildFQName("nats", "jetstream", "snapshot_duration"),
Help: "How long a snapshot takes to be processed",
}, []string{"account", "stream"})

// Restore
jsRestoreCreatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_created_count"),
Name: prometheus.BuildFQName("nats", "jetstream", "restore_created_count"),
Help: "How many restore operations were started",
}, []string{"account", "stream"})

jsRestoreSizeCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_size_bytes"),
Name: prometheus.BuildFQName("nats", "jetstream", "restore_size_bytes"),
Help: "The size of restores that was completed",
}, []string{"account", "stream"})

jsRestoreDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("nats", "survey", "jetstream_restore_duration"),
Name: prometheus.BuildFQName("nats", "jetstream", "restore_duration"),
Help: "How long a restore took to be processed",
}, []string{"account", "stream"})
)
Expand All @@ -164,6 +174,8 @@ func init() {
prometheus.MustRegister(jsRestoreSizeCtr)
prometheus.MustRegister(jsRestoreDuration)
prometheus.MustRegister(jsRestoreCreatedCtr)
prometheus.MustRegister(jsTotalAdvisoryCtr)
prometheus.MustRegister(jsAdvisoryParseErrorCtr)
}

// NewJetStreamAdvisoryListener creates a new JetStream advisory reporter
Expand All @@ -186,7 +198,7 @@ func NewJetStreamAdvisoryListener(f string, sopts Options) (*JSAdvisoryListener,

sopts.Name = fmt.Sprintf("%s (jetstream %s)", sopts.Name, opts.AccountName)
sopts.Credentials = opts.Credentials
nc, err := connect(&sopts)
nc, err := connect(&sopts, fmt.Sprintf("%s JetStream %s", sopts.Name, opts.AccountName))
if err != nil {
return nil, fmt.Errorf("nats connection failed: %s", err)
}
Expand Down Expand Up @@ -222,10 +234,13 @@ func (o *JSAdvisoryListener) Start() error {
func (o *JSAdvisoryListener) advisoryHandler(m *nats.Msg) {
schema, event, err := jsm.ParseEvent(m.Data)
if err != nil {
jsAdvisoryParseErrorCtr.WithLabelValues(o.opts.AccountName).Inc()
log.Printf("Could not parse JetStream API Audit Advisory: %s", err)
return
}

jsTotalAdvisoryCtr.WithLabelValues(o.opts.AccountName).Inc()

switch event := event.(type) {
case *advisory.JetStreamAPIAuditV1:
if strings.HasPrefix(event.Response, api.ErrPrefix) {
Expand Down
20 changes: 10 additions & 10 deletions surveyor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,47 +71,47 @@ func (o *serviceObsOptions) Validate() error {

var (
observationsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName("nats", "survey", "observerations_count"),
Name: prometheus.BuildFQName("nats", "latency", "observations_count"),
Help: "Number of Service Latency listeners that are running",
})

observationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "nats_latency_observation_count",
Name: prometheus.BuildFQName("nats", "latency", "observations_received_count"),
Help: "Number of observations received by this surveyor across all services",
}, []string{"service", "app"})

serviceRequestStatus = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "nats_latency_observation_status_count",
Name: prometheus.BuildFQName("nats", "latency", "observation_status_count"),
Help: "The status result codes for requests to a service",
}, []string{"service", "status"})

invalidObservationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "nats_latency_observation_error_count",
Name: prometheus.BuildFQName("nats", "latency", "observation_error_count"),
Help: "Number of observations received by this surveyor across all services that could not be handled",
}, []string{"service"})

serviceLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_service_duration",
Name: prometheus.BuildFQName("nats", "latency", "service_duration"),
Help: "Time spent serving the request in the service",
}, []string{"service", "app"})

totalLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_total_duration",
Name: prometheus.BuildFQName("nats", "latency", "total_duration"),
Help: "Total time spent serving a service including network overheads",
}, []string{"service", "app"})

requestorRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_requestor_rtt",
Name: prometheus.BuildFQName("nats", "latency", "requestor_rtt"),
Help: "The RTT to the client making a request",
}, []string{"service", "app"})

responderRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_responder_rtt",
Name: prometheus.BuildFQName("nats", "latency", "responder_rtt"),
Help: "The RTT to the service serving the request",
}, []string{"service", "app"})

systemRTT = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "nats_latency_system_rtt",
Name: prometheus.BuildFQName("nats", "latency", "system_rtt"),
Help: "The RTT within the NATS system - time traveling clusters, gateways and leaf nodes",
}, []string{"service", "app"})
)
Expand Down Expand Up @@ -148,7 +148,7 @@ func NewServiceObservation(f string, sopts Options) (*ServiceObsListener, error)

sopts.Name = fmt.Sprintf("%s (observing %s)", sopts.Name, opts.ServiceName)
sopts.Credentials = opts.Credentials
nc, err := connect(&sopts)
nc, err := connect(&sopts, fmt.Sprintf("%s Service %s", sopts.Name, opts.ServiceName))
if err != nil {
return nil, fmt.Errorf("nats connection failed: %s", err)
}
Expand Down
34 changes: 18 additions & 16 deletions surveyor/surveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,41 +103,43 @@ type Surveyor struct {
jsAPIAudits []*JSAdvisoryListener
}

func connect(opts *Options) (*nats.Conn, error) {
reconnCtr := prometheus.NewCounter(prometheus.CounterOpts{
func connect(opts *Options, name string) (*nats.Conn, error) {
reconnCtr := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"),
Help: "Number of times the surveyor reconnected to the NATS cluster",
})
}, []string{"name"})
prometheus.Register(reconnCtr)

nopts := []nats.Option{nats.Name(opts.Name)}
if opts.Credentials != "" {
nopts := []nats.Option{nats.Name(name)}

switch {
case opts.Credentials != "":
nopts = append(nopts, nats.UserCredentials(opts.Credentials))
} else if opts.Nkey != "" {
case opts.Nkey != "":
o, err := nats.NkeyOptionFromSeed(opts.Nkey)
if err != nil {
return nil, fmt.Errorf("unable to load nkey: %v", err)
}
nopts = append(nopts, o)
} else if opts.NATSUser != "" {
case opts.NATSUser != "":
nopts = append(nopts, nats.UserInfo(opts.NATSUser, opts.NATSPassword))
}

nopts = append(nopts, nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
log.Printf("Disconnected, will possibly miss replies: %s", err)
nopts = append(nopts, nats.DisconnectErrHandler(func(c *nats.Conn, err error) {
log.Printf("%q disconnected, will possibly miss replies: %s", c.Opts.Name, err)
}))
nopts = append(nopts, nats.ReconnectHandler(func(c *nats.Conn) {
reconnCtr.Inc()
log.Printf("Reconnected to %v", c.ConnectedAddr())
reconnCtr.WithLabelValues(c.Opts.Name).Inc()
log.Printf("%q reconnected to %v", c.Opts.Name, c.ConnectedAddr())
}))
nopts = append(nopts, nats.ClosedHandler(func(_ *nats.Conn) {
log.Println("Connection permanently lost!")
nopts = append(nopts, nats.ClosedHandler(func(c *nats.Conn) {
log.Printf("%q connection permanently lost!", c.Opts.Name)
}))
nopts = append(nopts, nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
if s != nil {
log.Printf("Error: err=%v", err)
log.Printf("Error: name=%q err=%v", c.Opts.Name, err)
} else {
log.Printf("Error: subject=%s, err=%v", s.Subject, err)
log.Printf("Error: name=%q, subject=%s, err=%v", c.Opts.Name, s.Subject, err)
}
}))
nopts = append(nopts, nats.MaxReconnects(10240))
Expand All @@ -161,7 +163,7 @@ func connect(opts *Options) (*nats.Conn, error) {

// NewSurveyor creates a surveyor
func NewSurveyor(opts *Options) (*Surveyor, error) {
nc, err := connect(opts)
nc, err := connect(opts, fmt.Sprintf("%s Messaging", opts.Name))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 5c1dedb

Please sign in to comment.