Skip to content

Commit

Permalink
Switch to pipeline for gathering many queue sizes quickly, remove exp…
Browse files Browse the repository at this point in the history
…licit default queue size, #197
  • Loading branch information
mperham committed Jan 16, 2019
1 parent 4098ac0 commit 06b0850
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/contribsys/faktory/manager"
"github.com/contribsys/faktory/storage"
"github.com/contribsys/faktory/util"
"github.com/go-redis/redis"
)

type RuntimeStats struct {
Expand Down Expand Up @@ -308,37 +309,42 @@ func (s *Server) uptimeInSeconds() int {
}

func (s *Server) CurrentState() (map[string]interface{}, error) {
defalt, err := s.store.GetQueue("default")
queueCmd := map[string]*redis.IntCmd{}
_, err := s.store.Redis().Pipelined(func(pipe redis.Pipeliner) error {
s.store.EachQueue(func(q storage.Queue) {
queueCmd[q.Name()] = pipe.LLen(q.Name())
})
return nil
})
if err != nil {
return nil, err
}

totalQueued := 0
totalQueues := 0
queues := make(map[string]interface{})
// queue size is cached so this should be very efficient.
s.store.EachQueue(func(q storage.Queue) {
queueSize := int(q.Size())
queues[string(q.Name())] = queueSize
totalQueued += queueSize
totalQueues++
})
queues := map[string]int64{}
totalQueued := int64(0)
totalQueues := len(queueCmd)
for name, cmd := range queueCmd {
qsize := cmd.Val()
totalQueued += qsize
queues[name] = qsize
}

return map[string]interface{}{
"server_utc_time": time.Now().UTC().Format("03:04:05 UTC"),
"faktory": map[string]interface{}{
"default_size": defalt.Size(),
"total_failures": s.store.TotalFailures(),
"total_processed": s.store.TotalProcessed(),
"total_enqueued": totalQueued,
"total_queues": totalQueues,
"queues": queues,
"tasks": s.taskRunner.Stats()},
"tasks": s.taskRunner.Stats(),
},
"server": map[string]interface{}{
"faktory_version": client.Version,
"uptime": s.uptimeInSeconds(),
"connections": atomic.LoadUint64(&s.Stats.Connections),
"command_count": atomic.LoadUint64(&s.Stats.Commands),
"used_memory_mb": util.MemoryUsage()},
"used_memory_mb": util.MemoryUsage(),
},
}, nil
}

0 comments on commit 06b0850

Please sign in to comment.