Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocation resources returned in a struct #1260

Merged
merged 3 commits into from
Jun 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query
return &resp, qm, nil
}

func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (map[string]*TaskResourceUsage, error) {
func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (*AllocResourceUsage, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, err
Expand All @@ -58,13 +58,9 @@ func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (map[string]*Tas
if err != nil {
return nil, err
}
resp := make(map[string][]*TaskResourceUsage)
var resp AllocResourceUsage
_, err = client.query("/v1/client/allocation/"+alloc.ID+"/stats", &resp, nil)
res := make(map[string]*TaskResourceUsage)
for task, ru := range resp {
res[task] = ru[0]
}
return res, err
return &resp, err
}

// Allocation is used for serialization of allocations.
Expand Down
5 changes: 2 additions & 3 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ func (n *Nodes) Stats(nodeID string, q *QueryOptions) (*HostStats, error) {
if err != nil {
return nil, err
}
var resp []HostStats
var resp HostStats
if _, err := client.query("/v1/client/stats", &resp, nil); err != nil {
return nil, err
}

return &resp[0], nil
return &resp, nil
}

// Node is used to deserialize a node entry.
Expand Down
8 changes: 8 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ type TaskResourceUsage struct {
Pids map[string]*ResourceUsage
}

// AllocResourceUsage holds the aggregated task resource usage of the
// allocation.
type AllocResourceUsage struct {
ResourceUsage *ResourceUsage
Tasks map[string]*TaskResourceUsage
Timestamp int64
}

// RestartPolicy defines how the Nomad client restarts
// tasks in a taskgroup when they fail
type RestartPolicy struct {
Expand Down
63 changes: 46 additions & 17 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"

cstructs "github.com/hashicorp/nomad/client/structs"
)

const (
Expand All @@ -28,9 +30,8 @@ const (
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation)

// AllocStatsReporter exposes stats related APIs of an allocation runner
type AllocStatsReporter interface {
AllocStats() map[string]TaskStatsReporter
LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error)
}

// AllocRunner is used to wrap an allocation and provide the execution context.
Expand Down Expand Up @@ -482,27 +483,55 @@ func (r *AllocRunner) StatsReporter() AllocStatsReporter {
return r
}

// AllocStats returns the stats reporter of all the tasks running in the
// allocation
func (r *AllocRunner) AllocStats() map[string]TaskStatsReporter {
// LatestAllocStats returns the latest allocation stats. If the optional taskFilter is set
// the allocation stats will only include the given task.
func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
Copy link
Contributor

@diptanu diptanu Jun 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dadgar I think we should keep the StatsReporter interface, so that we can add methods around histograms etc, otherwise we will have to add those methods in the client as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

r.taskLock.RLock()
defer r.taskLock.RUnlock()
res := make(map[string]TaskStatsReporter)
for task, tr := range r.tasks {
res[task] = tr.StatsReporter()

astat := &cstructs.AllocResourceUsage{
Tasks: make(map[string]*cstructs.TaskResourceUsage),
}
return res
}

// TaskStats returns the stats reporter for a specific task running in the
// allocation
func (r *AllocRunner) TaskStats(task string) (TaskStatsReporter, error) {
tr, ok := r.tasks[task]
if !ok {
return nil, fmt.Errorf("task %q not running in allocation %v", task, r.alloc.ID)
var flat []*cstructs.TaskResourceUsage
if taskFilter != "" {
tr, ok := r.tasks[taskFilter]
if !ok {
return nil, fmt.Errorf("allocation %q has no task %q", r.alloc.ID, taskFilter)
}
l := tr.LatestResourceUsage()
if l != nil {
astat.Tasks[taskFilter] = l
flat = []*cstructs.TaskResourceUsage{l}
astat.Timestamp = l.Timestamp
}
} else {
for task, tr := range r.tasks {
l := tr.LatestResourceUsage()
if l != nil {
astat.Tasks[task] = l
flat = append(flat, l)
if l.Timestamp > astat.Timestamp {
astat.Timestamp = l.Timestamp
}
}
}
}

return tr.StatsReporter(), nil
astat.ResourceUsage = sumTaskResourceUsage(flat)
return astat, nil
}

// sumTaskResourceUsage takes a set of task resources and sums their resources
func sumTaskResourceUsage(usages []*cstructs.TaskResourceUsage) *cstructs.ResourceUsage {
summed := &cstructs.ResourceUsage{
MemoryStats: &cstructs.MemoryStats{},
CpuStats: &cstructs.CpuStats{},
}
for _, usage := range usages {
summed.Add(usage.ResourceUsage)
}
return summed
}

// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
Expand Down
82 changes: 18 additions & 64 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,12 @@ const (
// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
// Client
type ClientStatsReporter interface {
// AllocStats returns a map of alloc ids and their corresponding stats
// collector
AllocStats() map[string]AllocStatsReporter
// GetAllocStats returns the AllocStatsReporter for the passed allocation.
// If it does not exist an error is reported.
GetAllocStats(allocID string) (AllocStatsReporter, error)

// HostStats returns resource usage stats for the host
HostStats() []*stats.HostStats

// HostStatsTS returns a time series of host resource usage stats
HostStatsTS(since int64) []*stats.HostStats
// LatestHostStats returns the latest resource usage stats for the host
LatestHostStats() *stats.HostStats
}

// Client is used to implement the client interaction with Nomad. Clients
Expand Down Expand Up @@ -138,7 +135,7 @@ type Client struct {

// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
resourceUsage *stats.RingBuff
resourceUsage *stats.HostStats
resourceUsageLock sync.RWMutex

shutdown bool
Expand All @@ -151,11 +148,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error)
// Create a logger
logger := log.New(cfg.LogOutput, "", log.LstdFlags)

resourceUsage, err := stats.NewRingBuff(cfg.StatsDataPoints)
if err != nil {
return nil, err
}

// Create the client
c := &Client{
config: cfg,
Expand All @@ -164,7 +156,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error)
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(),
resourceUsage: resourceUsage,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -400,58 +391,21 @@ func (c *Client) StatsReporter() ClientStatsReporter {
return c
}

// AllocStats returns all the stats reporter of the allocations running on a
// Nomad client
func (c *Client) AllocStats() map[string]AllocStatsReporter {
res := make(map[string]AllocStatsReporter)
for alloc, ar := range c.getAllocRunners() {
res[alloc] = ar
func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
ar, ok := c.allocs[allocID]
if !ok {
return nil, fmt.Errorf("unknown allocation ID %q", allocID)
}
return res
return ar.StatsReporter(), nil
}

// HostStats returns all the stats related to a Nomad client
func (c *Client) HostStats() []*stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
val := c.resourceUsage.Peek()
ru, _ := val.(*stats.HostStats)
return []*stats.HostStats{ru}
}

func (c *Client) HostStatsTS(since int64) []*stats.HostStats {
func (c *Client) LatestHostStats() *stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()

values := c.resourceUsage.Values()
low := 0
high := len(values) - 1
var idx int

for {
mid := (low + high) >> 1
midVal, _ := values[mid].(*stats.HostStats)
if midVal.Timestamp < since {
low = mid + 1
} else if midVal.Timestamp > since {
high = mid - 1
} else if midVal.Timestamp == since {
idx = mid
break
}
if low > high {
idx = low
break
}
}
values = values[idx:]
ts := make([]*stats.HostStats, len(values))
for index, val := range values {
ru, _ := val.(*stats.HostStats)
ts[index] = ru
}
return ts

return c.resourceUsage
}

// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
Expand Down Expand Up @@ -1437,9 +1391,9 @@ func (c *Client) collectHostStats() {
continue
}

c.resourceUsageLock.RLock()
c.resourceUsage.Enqueue(ru)
c.resourceUsageLock.RUnlock()
c.resourceUsageLock.Lock()
c.resourceUsage = ru
c.resourceUsageLock.Unlock()
c.emitStats(ru)
case <-c.shutdownCh:
return
Expand Down
5 changes: 0 additions & 5 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ type Config struct {
// ConsulConfig is this Agent's Consul configuration
ConsulConfig *config.ConsulConfig

// StatsDataPoints is the number of resource usage data points the Nomad
// client keeps in memory
StatsDataPoints int

// StatsCollectionInterval is the interval at which the Nomad client
// collects resource usage stats
StatsCollectionInterval time.Duration
Expand All @@ -143,7 +139,6 @@ func DefaultConfig() *Config {
},
LogOutput: os.Stderr,
Region: "global",
StatsDataPoints: 60,
StatsCollectionInterval: 1 * time.Second,
}
}
Expand Down
15 changes: 8 additions & 7 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/helper/fields"
shelpers "github.com/hashicorp/nomad/helper/stats"
Expand Down Expand Up @@ -140,7 +141,7 @@ type DockerHandle struct {
maxKillTimeout time.Duration
resourceUsageLock sync.RWMutex
resourceUsage *cstructs.TaskResourceUsage
waitCh chan *cstructs.WaitResult
waitCh chan *dstructs.WaitResult
doneCh chan bool
}

Expand Down Expand Up @@ -541,7 +542,7 @@ func (d *DockerDriver) recoverablePullError(err error, image string) error {
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return cstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
return dstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}

func (d *DockerDriver) Periodic() (bool, time.Duration) {
Expand Down Expand Up @@ -797,7 +798,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
doneCh: make(chan bool),
waitCh: make(chan *cstructs.WaitResult, 1),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil {
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err)
Expand Down Expand Up @@ -872,7 +873,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
killTimeout: pid.KillTimeout,
maxKillTimeout: pid.MaxKillTimeout,
doneCh: make(chan bool),
waitCh: make(chan *cstructs.WaitResult, 1),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, pid.ContainerID)); err != nil {
h.logger.Printf("[ERR] driver.docker: error registering services with consul: %v", err)
Expand Down Expand Up @@ -904,7 +905,7 @@ func (h *DockerHandle) ContainerID() string {
return h.containerID
}

func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
func (h *DockerHandle) WaitCh() chan *dstructs.WaitResult {
return h.waitCh
}

Expand Down Expand Up @@ -961,7 +962,7 @@ func (h *DockerHandle) run() {
}

close(h.doneCh)
h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err)
h.waitCh <- dstructs.NewWaitResult(exitCode, 0, err)
close(h.waitCh)

// Remove services
Expand Down
5 changes: 3 additions & 2 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"

cstructs "github.com/hashicorp/nomad/client/driver/structs"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
)

// BuiltinDrivers contains the built in registered drivers
Expand Down Expand Up @@ -105,7 +106,7 @@ type DriverHandle interface {
ID() string

// WaitCh is used to return a channel used wait for task completion
WaitCh() chan *cstructs.WaitResult
WaitCh() chan *dstructs.WaitResult

// Update is used to update the task if possible and update task related
// configurations.
Expand Down
Loading