Skip to content

Commit

Permalink
add more metrics about processing sync/async events
Browse files Browse the repository at this point in the history
  • Loading branch information
Maciej Winnicki committed Nov 9, 2017
1 parent 4cff750 commit 9221bed
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
55 changes: 49 additions & 6 deletions router/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,62 @@ import (
)

func init() {
prometheus.MustRegister(routerBacklog, routerDroppedEvents, routerProcessingDuration)
prometheus.MustRegister(routerEventsAsyncReceived)
prometheus.MustRegister(routerEventsAsyncDropped)
prometheus.MustRegister(routerEventsAsyncProceeded)
prometheus.MustRegister(routerEventsSyncReceived)
prometheus.MustRegister(routerEventsSyncProceeded)
prometheus.MustRegister(routerBacklog)
prometheus.MustRegister(routerProcessingDuration)
}

var routerDroppedEvents = prometheus.NewCounter(
var routerEventsAsyncReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gateway",
Subsystem: "router",
Name: "dropped_events_total",
Help: "Dropped events due to insufficient processing power.",
Name: "events_async_received_total",
Help: "Total of asynchronously handled events received (including system events).",
})

var routerEventsAsyncDropped = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gateway",
Subsystem: "router",
Name: "events_async_dropped_total",
Help: "Total of asynchronously handled events dropped due to insufficient processing power.",
})

var routerEventsAsyncProceeded = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gateway",
Subsystem: "router",
Name: "events_async_proceeded_total",
Help: "Total of asynchronously proceeded events.",
})

var routerEventsSyncReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gateway",
Subsystem: "router",
Name: "events_sync_received_total",
Help: "Total of synchronously handled (HTTP and invoke) events received.",
})

var routerEventsSyncProceeded = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gateway",
Subsystem: "router",
Name: "events_sync_proceeded_total",
Help: "Total of synchronously proceeded events. This counter excludes events for which there was no function " +
"registered or error occured during processing phase.",
})

var routerBacklog = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "gateway",
Subsystem: "router",
Name: "backlog_events",
Help: "Gauge of events count waiting to be processed by the router.",
Help: "Gauge of asynchronous events count waiting to be processed by the router.",
})

var routerProcessingDuration = prometheus.NewHistogram(
Expand All @@ -33,20 +72,24 @@ var routerProcessingDuration = prometheus.NewHistogram(
Subsystem: "router",
Name: "event_processing_seconds",
Help: "Bucketed histogram of processing duration of an event in the router. " +
"From receiving the event to calling a function.",
"From receiving the asynchronous event to calling a function.",
Buckets: prometheus.ExponentialBuckets(0.00001, 2, 20),
})

var receivedEventsMutex = sync.Mutex{}
var receivedEvents = map[string]time.Time{}

func reportReceivedEvent(id string) {
routerEventsAsyncReceived.Inc()

receivedEventsMutex.Lock()
defer receivedEventsMutex.Unlock()
receivedEvents[id] = time.Now()
}

func reportProceededEvent(id string) {
routerEventsAsyncProceeded.Inc()

receivedEventsMutex.Lock()
defer receivedEventsMutex.Unlock()
if startTime, ok := receivedEvents[id]; ok {
Expand Down
12 changes: 10 additions & 2 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// isHTTPEvent checks if a request carries HTTP event. It also accepts pre-flight CORS requests because CORS is
// resolved downstream.
if isHTTPEvent(r) {
routerEventsSyncReceived.Inc()

event, _, err := router.eventFromRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -76,9 +78,9 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if event.Type == eventpkg.TypeInvoke {
routerEventsSyncReceived.Inc()
router.handleInvokeEvent(event, w, r)
} else if !event.IsSystem() {
reportReceivedEvent(event.ID)
router.enqueueWork(path, event)
w.WriteHeader(http.StatusAccepted)
}
Expand Down Expand Up @@ -286,6 +288,8 @@ func (router *Router) handleHTTPEvent(event *eventpkg.Event, w http.ResponseWrit
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

routerEventsSyncProceeded.Inc()
}

if corsConfig == nil {
Expand Down Expand Up @@ -316,9 +320,13 @@ func (router *Router) handleInvokeEvent(event *eventpkg.Event, w http.ResponseWr
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

routerEventsSyncProceeded.Inc()
}

func (router *Router) enqueueWork(path string, event *eventpkg.Event) {
reportReceivedEvent(event.ID)

if event.IsSystem() {
router.log.Debug("System event received.", zap.Object("event", event))
}
Expand All @@ -331,7 +339,7 @@ func (router *Router) enqueueWork(path string, event *eventpkg.Event) {
routerBacklog.Inc()
default:
// We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now.
routerDroppedEvents.Inc()
routerEventsAsyncDropped.Inc()
}
}

Expand Down

0 comments on commit 9221bed

Please sign in to comment.