Skip to content

Commit

Permalink
stats - moving queues map inside faktory.queues on /stats
Browse files Browse the repository at this point in the history
  • Loading branch information
thapakazi committed Jan 11, 2019
1 parent 06abefa commit 1c9f857
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 55 deletions.
16 changes: 6 additions & 10 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,13 @@ func (s *Server) CurrentState() (map[string]interface{}, error) {

totalQueued := 0
totalQueues := 0
// this map exposes all queues: backlog information
queues := make(map[string]interface{})
// queue size is cached so this should be very efficient.
s.store.EachQueue(func(q storage.Queue) {
totalQueued += int(q.Size())
queueSize := int(q.Size())
queues[string(q.Name())] = queueSize
totalQueued += queueSize
totalQueues++
})

Expand All @@ -329,6 +333,7 @@ func (s *Server) CurrentState() (map[string]interface{}, error) {
"total_processed": s.store.TotalProcessed(),
"total_enqueued": totalQueued,
"total_queues": totalQueues,
"queues": queues,
"tasks": s.taskRunner.Stats()},
"server": map[string]interface{}{
"faktory_version": client.Version,
Expand All @@ -338,12 +343,3 @@ func (s *Server) CurrentState() (map[string]interface{}, error) {
"used_memory_mb": util.MemoryUsage()},
}, nil
}

func (s *Server) CurrentQueueState() (map[string]interface{}, error) {

queues := make(map[string]interface{})
s.Store().EachQueue(func(q storage.Queue) {
queues[string(q.Name())] = q.Size()
})
return queues, nil
}
17 changes: 0 additions & 17 deletions webui/pages.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,6 @@ import (
"github.com/contribsys/faktory/server"
)

func statsQueueHandler(w http.ResponseWriter, r *http.Request) {
hash, err := ctx(r).Server().CurrentQueueState()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data, err := json.Marshal(hash)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Add("Content-Type", "application/json")
w.Header().Add("Cache-Control", "no-cache")
w.Write(data)
}

func statsHandler(w http.ResponseWriter, r *http.Request) {
hash, err := ctx(r).Server().CurrentState()
if err != nil {
Expand Down
39 changes: 12 additions & 27 deletions webui/pages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func TestPages(t *testing.T) {
assert.NoError(t, err)

s.Stats.StartedAt = time.Now().Add(-1234567 * time.Second)
str := s.Store()
str.GetQueue("default")
q, _ := str.GetQueue("foobar")
q.Clear()
args := []string{"faktory", "rocks", "!!", ":)"}
for _, v := range args {
q.Push(5, []byte(v))
}

w := httptest.NewRecorder()
statsHandler(w, req)
Expand All @@ -50,35 +58,12 @@ func TestPages(t *testing.T) {
s := content["server"].(map[string]interface{})
uid := s["uptime"].(float64)
assert.Equal(t, float64(1234567), uid)
})

t.Run("Stats/Queues", func(t *testing.T) {
req, err := ui.NewRequest("GET", "http://localhost:7420/stats/queues", nil)
assert.NoError(t, err)

str := s.Store()
str.GetQueue("default")
q, _ := str.GetQueue("foobar")
q.Clear()
q.Push(5, []byte("faktory"))
q.Push(5, []byte("rocks"))
q.Push(5, []byte("!!"))

w := httptest.NewRecorder()
statsQueueHandler(w, req)
assert.Equal(t, 200, w.Code)
assert.Equal(t, "application/json", w.Header().Get("Content-Type"))

var content map[string]interface{}
err = json.Unmarshal(w.Body.Bytes(), &content)
assert.NoError(t, err)

defaultQ := content["default"].(float64)
queues := content["faktory"].(map[string]interface{})["queues"].(map[string]interface{})
defaultQ := queues["default"].(float64)
assert.Equal(t, 0.0, defaultQ)

foobarQ := content["foobar"].(float64)
assert.Equal(t, 3.0, foobarQ)

foobarQ := queues["foobar"].(float64)
assert.Equal(t, float64(len(args)), foobarQ)
})

t.Run("Queues", func(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion webui/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func newWeb(s *server.Server, opts Options) *WebUI {

ui.Mux.HandleFunc("/static/", staticHandler)
ui.Mux.HandleFunc("/stats", DebugLog(ui, statsHandler))
ui.Mux.HandleFunc("/stats/queues", DebugLog(ui, statsQueueHandler))

ui.Mux.HandleFunc("/", Log(ui, GetOnly(indexHandler)))
ui.Mux.HandleFunc("/queues", Log(ui, queuesHandler))
Expand Down

0 comments on commit 1c9f857

Please sign in to comment.