From 701537e9c58595c9d6cd68f4b4b82d1251aca9bc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 10 Mar 2017 16:27:00 -0800 Subject: [PATCH 1/3] Limit parallelism during garbage collection This PR introduces a parallelism limit during garbage collection. This is used to avoid large resource usage spikes if garbage collecting many allocations at once. --- client/client.go | 8 +- client/config/config.go | 13 +- client/gc.go | 296 +++++++++++------- command/agent/agent.go | 1 + command/agent/config-test-fixtures/basic.hcl | 1 + command/agent/config.go | 8 + command/agent/config_parse.go | 1 + command/agent/config_parse_test.go | 1 + command/agent/config_test.go | 1 + .../docs/agent/configuration/client.html.md | 4 + 10 files changed, 208 insertions(+), 126 deletions(-) diff --git a/client/client.go b/client/client.go index e5329e2a875..1dc8d19b83d 100644 --- a/client/client.go +++ b/client/client.go @@ -238,6 +238,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg DiskUsageThreshold: cfg.GCDiskUsageThreshold, InodeUsageThreshold: cfg.GCInodeUsageThreshold, Interval: cfg.GCInterval, + ParallelDestroys: cfg.GCParallelDestroys, ReservedDiskMB: cfg.Node.Reserved.DiskMB, } c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, gcConfig) @@ -1832,10 +1833,11 @@ func (c *Client) removeAlloc(alloc *structs.Allocation) error { delete(c.allocs, alloc.ID) c.allocLock.Unlock() - // Remove the allocrunner from garbage collector - c.garbageCollector.Remove(ar) + // Ensure the GC has a reference and then collect. Collecting through the GC + // applies rate limiting + c.garbageCollector.MarkForCollection(ar) + go c.garbageCollector.Collect(alloc.ID) - ar.Destroy() return nil } diff --git a/client/config/config.go b/client/config/config.go index 02f17c873aa..6924fd7229e 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -155,12 +155,16 @@ type Config struct { // collection GCInterval time.Duration - // GCDiskUsageThreshold is the disk usage threshold beyond which the Nomad - // client triggers GC of terminal allocations + // GCParallelDestroys is the number of parallel destroys the garbage + // collector will allow. + GCParallelDestroys int + + // GCDiskUsageThreshold is the disk usage threshold given as a percent + // beyond which the Nomad client triggers GC of terminal allocations GCDiskUsageThreshold float64 - // GCInodeUsageThreshold is the inode usage threshold beyond which the Nomad - // client triggers GC of the terminal allocations + // GCInodeUsageThreshold is the inode usage threshold given as a percent + // beyond which the Nomad client triggers GC of the terminal allocations GCInodeUsageThreshold float64 // LogLevel is the level of the logs to putout @@ -194,6 +198,7 @@ func DefaultConfig() *Config { TLSConfig: &config.TLSConfig{}, LogLevel: "DEBUG", GCInterval: 1 * time.Minute, + GCParallelDestroys: 2, GCDiskUsageThreshold: 80, GCInodeUsageThreshold: 70, } diff --git a/client/gc.go b/client/gc.go index 5861dee0659..6199521b985 100644 --- a/client/gc.go +++ b/client/gc.go @@ -14,113 +14,11 @@ import ( const ( // MB is a constant which converts values in bytes to MB MB = 1024 * 1024 -) - -// GCAlloc wraps an allocation runner and an index enabling it to be used within -// a PQ -type GCAlloc struct { - timeStamp time.Time - allocRunner *AllocRunner - index int -} - -type GCAllocPQImpl []*GCAlloc - -func (pq GCAllocPQImpl) Len() int { - return len(pq) -} - -func (pq GCAllocPQImpl) Less(i, j int) bool { - return pq[i].timeStamp.Before(pq[j].timeStamp) -} - -func (pq GCAllocPQImpl) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].index = i - pq[j].index = j -} - -func (pq *GCAllocPQImpl) Push(x interface{}) { - n := len(*pq) - item := x.(*GCAlloc) - item.index = n - *pq = append(*pq, item) -} -func (pq *GCAllocPQImpl) Pop() interface{} { - old := *pq - n := len(old) - item := old[n-1] - item.index = -1 // for safety - *pq = old[0 : n-1] - return item -} - -// IndexedGCAllocPQ is an indexed PQ which maintains a list of allocation runner -// based on their termination time. -type IndexedGCAllocPQ struct { - index map[string]*GCAlloc - heap GCAllocPQImpl - - pqLock sync.Mutex -} - -func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { - return &IndexedGCAllocPQ{ - index: make(map[string]*GCAlloc), - heap: make(GCAllocPQImpl, 0), - } -} - -func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { - i.pqLock.Lock() - defer i.pqLock.Unlock() - - alloc := ar.Alloc() - if _, ok := i.index[alloc.ID]; ok { - return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID) - } - gcAlloc := &GCAlloc{ - timeStamp: time.Now(), - allocRunner: ar, - } - i.index[alloc.ID] = gcAlloc - heap.Push(&i.heap, gcAlloc) - return nil -} - -func (i *IndexedGCAllocPQ) Pop() *GCAlloc { - i.pqLock.Lock() - defer i.pqLock.Unlock() - - if len(i.heap) == 0 { - return nil - } - - gcAlloc := heap.Pop(&i.heap).(*GCAlloc) - delete(i.index, gcAlloc.allocRunner.Alloc().ID) - return gcAlloc -} - -func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) { - i.pqLock.Lock() - defer i.pqLock.Unlock() - - if gcAlloc, ok := i.index[allocID]; ok { - heap.Remove(&i.heap, gcAlloc.index) - delete(i.index, allocID) - return gcAlloc, nil - } - - return nil, fmt.Errorf("alloc %q not present", allocID) -} - -func (i *IndexedGCAllocPQ) Length() int { - i.pqLock.Lock() - defer i.pqLock.Unlock() - - return len(i.heap) -} + // destroyWaitLimit is the timeout after which the max destroy parallelism + // is abandoned to continue to make progress + destroyWaitLimit = 3 * time.Minute +) // GCConfig allows changing the behaviour of the garbage collector type GCConfig struct { @@ -128,6 +26,7 @@ type GCConfig struct { InodeUsageThreshold float64 Interval time.Duration ReservedDiskMB int + ParallelDestroys int } // AllocGarbageCollector garbage collects terminated allocations on a node @@ -136,21 +35,29 @@ type AllocGarbageCollector struct { statsCollector stats.NodeStatsCollector config *GCConfig logger *log.Logger + destroyCh chan struct{} shutdownCh chan struct{} } // NewAllocGarbageCollector returns a garbage collector for terminated // allocations on a node. func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, config *GCConfig) *AllocGarbageCollector { + // Require at least 1 to make progress + if config.ParallelDestroys <= 0 { + logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys) + config.ParallelDestroys = 1 + } + gc := &AllocGarbageCollector{ allocRunners: NewIndexedGCAllocPQ(), statsCollector: statsCollector, config: config, logger: logger, + destroyCh: make(chan struct{}, config.ParallelDestroys), shutdownCh: make(chan struct{}), } - go gc.run() + go gc.run() return gc } @@ -173,6 +80,12 @@ func (a *AllocGarbageCollector) run() { // allocations to make disk space available. func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { for { + select { + case <-a.shutdownCh: + return nil + default: + } + // Check if we have enough free space err := a.statsCollector.Collect() if err != nil { @@ -207,17 +120,46 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) // Destroy the alloc runner and wait until it exits - ar.Destroy() + a.destroyAllocRunner(ar) + } + return nil +} + +// destroyAllocRunner is used to destroy an allocation runner. It will acquire a +// lock to restrict parallelism and then destroy the alloc runner, returning +// once the allocation has been destroyed. +func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) { + // Acquire the destroy lock + locked := false + select { + case a.destroyCh <- struct{}{}: + locked = true + case <-time.After(destroyWaitLimit): + a.logger.Printf("[WARN] client: garbage collecting contention when attempting destroy of allocation %q", ar.Alloc().ID) + case <-a.shutdownCh: + return + } + + ar.Destroy() + + select { + case <-ar.WaitCh(): + case <-a.shutdownCh: + } + + a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID) + + if locked { select { - case <-ar.WaitCh(): - case <-a.shutdownCh: + case <-a.destroyCh: + default: } } - return nil } func (a *AllocGarbageCollector) Stop() { close(a.shutdownCh) + close(a.destroyCh) } // Collect garbage collects a single allocation on a node @@ -229,21 +171,28 @@ func (a *AllocGarbageCollector) Collect(allocID string) error { ar := gcAlloc.allocRunner a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID) - ar.Destroy() + a.destroyAllocRunner(ar) return nil } // CollectAll garbage collects all termianated allocations on a node func (a *AllocGarbageCollector) CollectAll() error { for { + select { + case <-a.shutdownCh: + return nil + default: + } + gcAlloc := a.allocRunners.Pop() if gcAlloc == nil { break } + ar := gcAlloc.allocRunner a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID) - ar.Destroy() + go a.destroyAllocRunner(ar) } return nil } @@ -274,6 +223,12 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e var diskCleared int for { + select { + case <-a.shutdownCh: + return nil + default: + } + // Collect host stats and see if we still need to remove older // allocations var allocDirStats *stats.DiskStats @@ -305,11 +260,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) // Destroy the alloc runner and wait until it exits - ar.Destroy() - select { - case <-ar.WaitCh(): - case <-a.shutdownCh: - } + a.destroyAllocRunner(ar) // Call stats collect again diskCleared += alloc.Resources.DiskMB @@ -324,7 +275,7 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error { } if ar.Alloc() == nil { a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting") - ar.Destroy() + a.destroyAllocRunner(ar) } a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) @@ -342,3 +293,110 @@ func (a *AllocGarbageCollector) Remove(ar *AllocRunner) { a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID) } } + +// GCAlloc wraps an allocation runner and an index enabling it to be used within +// a PQ +type GCAlloc struct { + timeStamp time.Time + allocRunner *AllocRunner + index int +} + +type GCAllocPQImpl []*GCAlloc + +func (pq GCAllocPQImpl) Len() int { + return len(pq) +} + +func (pq GCAllocPQImpl) Less(i, j int) bool { + return pq[i].timeStamp.Before(pq[j].timeStamp) +} + +func (pq GCAllocPQImpl) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *GCAllocPQImpl) Push(x interface{}) { + n := len(*pq) + item := x.(*GCAlloc) + item.index = n + *pq = append(*pq, item) +} + +func (pq *GCAllocPQImpl) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// IndexedGCAllocPQ is an indexed PQ which maintains a list of allocation runner +// based on their termination time. +type IndexedGCAllocPQ struct { + index map[string]*GCAlloc + heap GCAllocPQImpl + + pqLock sync.Mutex +} + +func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { + return &IndexedGCAllocPQ{ + index: make(map[string]*GCAlloc), + heap: make(GCAllocPQImpl, 0), + } +} + +func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { + i.pqLock.Lock() + defer i.pqLock.Unlock() + + alloc := ar.Alloc() + if _, ok := i.index[alloc.ID]; ok { + // No work to do + return nil + } + gcAlloc := &GCAlloc{ + timeStamp: time.Now(), + allocRunner: ar, + } + i.index[alloc.ID] = gcAlloc + heap.Push(&i.heap, gcAlloc) + return nil +} + +func (i *IndexedGCAllocPQ) Pop() *GCAlloc { + i.pqLock.Lock() + defer i.pqLock.Unlock() + + if len(i.heap) == 0 { + return nil + } + + gcAlloc := heap.Pop(&i.heap).(*GCAlloc) + delete(i.index, gcAlloc.allocRunner.Alloc().ID) + return gcAlloc +} + +func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) { + i.pqLock.Lock() + defer i.pqLock.Unlock() + + if gcAlloc, ok := i.index[allocID]; ok { + heap.Remove(&i.heap, gcAlloc.index) + delete(i.index, allocID) + return gcAlloc, nil + } + + return nil, fmt.Errorf("alloc %q not present", allocID) +} + +func (i *IndexedGCAllocPQ) Length() int { + i.pqLock.Lock() + defer i.pqLock.Unlock() + + return len(i.heap) +} diff --git a/command/agent/agent.go b/command/agent/agent.go index 61d1985c002..4c9ca7d0073 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -310,6 +310,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { // Set the GC related configs conf.GCInterval = a.config.Client.GCInterval + conf.GCParallelDestroys = a.config.Client.GCParallelDestroys conf.GCDiskUsageThreshold = a.config.Client.GCDiskUsageThreshold conf.GCInodeUsageThreshold = a.config.Client.GCInodeUsageThreshold conf.NoHostUUID = a.config.Client.NoHostUUID diff --git a/command/agent/config-test-fixtures/basic.hcl b/command/agent/config-test-fixtures/basic.hcl index 8974fdf35fb..28d71e64a8c 100644 --- a/command/agent/config-test-fixtures/basic.hcl +++ b/command/agent/config-test-fixtures/basic.hcl @@ -54,6 +54,7 @@ client { collection_interval = "5s" } gc_interval = "6s" + gc_parallel_destroys = 6 gc_disk_usage_threshold = 82 gc_inode_usage_threshold = 91 no_host_uuid = true diff --git a/command/agent/config.go b/command/agent/config.go index ccb5db5609b..ce14d03b63a 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -202,6 +202,10 @@ type ClientConfig struct { // collection GCInterval time.Duration `mapstructure:"gc_interval"` + // GCParallelDestroys is the number of parallel destroys the garbage + // collector will allow. + GCParallelDestroys int `mapstructure:"gc_parallel_destroys"` + // GCInodeUsageThreshold is the inode usage threshold beyond which the Nomad // client triggers GC of the terminal allocations GCDiskUsageThreshold float64 `mapstructure:"gc_disk_usage_threshold"` @@ -524,6 +528,7 @@ func DefaultConfig() *Config { ClientMaxPort: 14512, Reserved: &Resources{}, GCInterval: 1 * time.Minute, + GCParallelDestroys: 2, GCInodeUsageThreshold: 70, GCDiskUsageThreshold: 80, }, @@ -929,6 +934,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { if b.GCInterval != 0 { result.GCInterval = b.GCInterval } + if b.GCParallelDestroys != 0 { + result.GCParallelDestroys = b.GCParallelDestroys + } if b.GCDiskUsageThreshold != 0 { result.GCDiskUsageThreshold = b.GCDiskUsageThreshold } diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index ea2e4e0fa9a..f2bda68f149 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -344,6 +344,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error { "gc_interval", "gc_disk_usage_threshold", "gc_inode_usage_threshold", + "gc_parallel_destroys", "no_host_uuid", } if err := checkHCLKeys(listVal, valid); err != nil { diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index f0da05e7f04..db5dca3319e 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -71,6 +71,7 @@ func TestConfig_Parse(t *testing.T) { ParsedReservedPorts: []int{1, 10, 11, 12, 100}, }, GCInterval: 6 * time.Second, + GCParallelDestroys: 6, GCDiskUsageThreshold: 82, GCInodeUsageThreshold: 91, NoHostUUID: true, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 9cce09e24f9..ae5f5653398 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -212,6 +212,7 @@ func TestConfig_Merge(t *testing.T) { ParsedReservedPorts: []int{1, 2, 3}, }, GCInterval: 6 * time.Second, + GCParallelDestroys: 6, GCDiskUsageThreshold: 71, GCInodeUsageThreshold: 86, }, diff --git a/website/source/docs/agent/configuration/client.html.md b/website/source/docs/agent/configuration/client.html.md index 77aa64e7700..025107d29d9 100644 --- a/website/source/docs/agent/configuration/client.html.md +++ b/website/source/docs/agent/configuration/client.html.md @@ -92,6 +92,10 @@ client { - `gc_inode_usage_threshold` `(float: 70)` - Specifies the inode usage percent which Nomad tries to maintain by garbage collecting terminal allocations. +- `gc_parallel_destroys` `(int: 2)` - Specifies the maximum number of + parallel destroys allowed by the garbage collector. This value should be + relatively low to avoid high resource usage during garbage collections. + - `no_host_uuid` `(bool: false)` - Force the UUID generated by the client to be randomly generated and not be based on the host's UUID. From 050cdd5b14a8795030ec22bf2fef6e13413d734e Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 10 Mar 2017 16:46:22 -0800 Subject: [PATCH 2/3] remove escape --- client/gc.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/client/gc.go b/client/gc.go index 6199521b985..57770acff9d 100644 --- a/client/gc.go +++ b/client/gc.go @@ -14,10 +14,6 @@ import ( const ( // MB is a constant which converts values in bytes to MB MB = 1024 * 1024 - - // destroyWaitLimit is the timeout after which the max destroy parallelism - // is abandoned to continue to make progress - destroyWaitLimit = 3 * time.Minute ) // GCConfig allows changing the behaviour of the garbage collector @@ -130,12 +126,8 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { // once the allocation has been destroyed. func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) { // Acquire the destroy lock - locked := false select { case a.destroyCh <- struct{}{}: - locked = true - case <-time.After(destroyWaitLimit): - a.logger.Printf("[WARN] client: garbage collecting contention when attempting destroy of allocation %q", ar.Alloc().ID) case <-a.shutdownCh: return } @@ -149,11 +141,9 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) { a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID) - if locked { - select { - case <-a.destroyCh: - default: - } + select { + case <-a.destroyCh: + default: } } From 38ebed5e0ed3f958492185fe85e67f962f980a49 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 14 Mar 2017 10:45:15 -0700 Subject: [PATCH 3/3] Review fixes --- client/gc.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/client/gc.go b/client/gc.go index 57770acff9d..a07db1415a8 100644 --- a/client/gc.go +++ b/client/gc.go @@ -127,9 +127,9 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) { // Acquire the destroy lock select { - case a.destroyCh <- struct{}{}: case <-a.shutdownCh: return + case a.destroyCh <- struct{}{}: } ar.Destroy() @@ -141,15 +141,12 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) { a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID) - select { - case <-a.destroyCh: - default: - } + // Release the lock + <-a.destroyCh } func (a *AllocGarbageCollector) Stop() { close(a.shutdownCh) - close(a.destroyCh) } // Collect garbage collects a single allocation on a node