diff --git a/cmd/katalyst-agent/app/options/qrm/memory_plugin.go b/cmd/katalyst-agent/app/options/qrm/memory_plugin.go index f4de2ce2f..60874886a 100644 --- a/cmd/katalyst-agent/app/options/qrm/memory_plugin.go +++ b/cmd/katalyst-agent/app/options/qrm/memory_plugin.go @@ -37,6 +37,7 @@ type MemoryOptions struct { SockMemOptions LogCacheOptions + FragMemOptions } type SockMemOptions struct { @@ -64,6 +65,13 @@ type LogCacheOptions struct { FileFilters []string } +type FragMemOptions struct { + EnableSettingFragMem bool + // SetMemFragScoreAsync sets the threashold of frag score for async memory compaction. + // The async compaction behavior will be triggered while exceeding this score. + SetMemFragScoreAsync int +} + func NewMemoryOptions() *MemoryOptions { return &MemoryOptions{ PolicyName: "dynamic", @@ -87,6 +95,10 @@ func NewMemoryOptions() *MemoryOptions { PathList: []string{}, FileFilters: []string{".*\\.log.*"}, }, + FragMemOptions: FragMemOptions{ + EnableSettingFragMem: false, + SetMemFragScoreAsync: 80, + }, } } @@ -131,6 +143,10 @@ func (o *MemoryOptions) AddFlags(fss *cliflag.NamedFlagSets) { "the absolute path list where files will be checked to evict page cache") fs.StringSliceVar(&o.FileFilters, "qrm-memory-logcache-file-filters", o.FileFilters, "string list to filter log files, default to *log*") + fs.BoolVar(&o.EnableSettingFragMem, "enable-setting-mem-compaction", + o.EnableSettingFragMem, "if set true, we will enable memory compaction related features") + fs.IntVar(&o.SetMemFragScoreAsync, "qrm-memory-frag-score-async", + o.SetMemFragScoreAsync, "set the threshold of frag score for async memory compaction") } func (o *MemoryOptions) ApplyTo(conf *qrmconfig.MemoryQRMPluginConfig) error { @@ -153,5 +169,7 @@ func (o *MemoryOptions) ApplyTo(conf *qrmconfig.MemoryQRMPluginConfig) error { conf.MaxInterval = o.MaxInterval conf.PathList = o.PathList conf.FileFilters = o.FileFilters + conf.EnableSettingFragMem = o.EnableSettingFragMem + conf.SetMemFragScoreAsync = o.SetMemFragScoreAsync return nil } diff --git a/pkg/agent/qrm-plugins/memory/consts/consts.go b/pkg/agent/qrm-plugins/memory/consts/consts.go index bec95e8ec..25e0cfe34 100644 --- a/pkg/agent/qrm-plugins/memory/consts/consts.go +++ b/pkg/agent/qrm-plugins/memory/consts/consts.go @@ -34,4 +34,5 @@ const ( CommunicateWithAdvisor = MemoryPluginDynamicPolicyName + "_communicate_with_advisor" DropCache = MemoryPluginDynamicPolicyName + "_drop_cache" EvictLogCache = MemoryPluginDynamicPolicyName + "_evict_log_cache" + SetMemCompact = MemoryPluginDynamicPolicyName + "_mem_compact" ) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index df84789d3..0aeb097e7 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -41,6 +41,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/oom" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/handlers/fragmem" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/handlers/logcache" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/handlers/sockmem" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" @@ -149,6 +150,7 @@ type DynamicPolicy struct { enableSettingMemoryMigrate bool enableSettingSockMem bool + enableSettingFragMem bool enableMemoryAdvisor bool memoryAdvisorSocketAbsPath string memoryPluginSocketAbsPath string @@ -219,6 +221,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration defaultAsyncLimitedWorkers: asyncworker.NewAsyncLimitedWorkers(memoryPluginAsyncWorkersName, defaultAsyncWorkLimit, wrappedEmitter), enableSettingMemoryMigrate: conf.EnableSettingMemoryMigrate, enableSettingSockMem: conf.EnableSettingSockMem, + enableSettingFragMem: conf.EnableSettingFragMem, enableMemoryAdvisor: conf.EnableMemoryAdvisor, memoryAdvisorSocketAbsPath: conf.MemoryAdvisorSocketAbsPath, memoryPluginSocketAbsPath: conf.MemoryPluginSocketAbsPath, @@ -414,6 +417,15 @@ func (p *DynamicPolicy) Start() (err error) { general.Errorf("evictLogCache failed, err=%v", err) } } + if p.enableSettingFragMem { + general.Infof("setFragMem enabled") + err := periodicalhandler.RegisterPeriodicalHandlerWithHealthz(memconsts.SetMemCompact, + general.HealthzCheckStateNotReady, qrm.QRMMemoryPluginPeriodicalHandlerGroupName, + fragmem.SetMemCompact, 1800*time.Second, healthCheckTolerationTimes) + if err != nil { + general.Infof("setFragMem failed, err=%v", err) + } + } go wait.Until(func() { periodicalhandler.ReadyToStartHandlersByGroup(qrm.QRMMemoryPluginPeriodicalHandlerGroupName) diff --git a/pkg/agent/qrm-plugins/memory/handlers/fragmem/const.go b/pkg/agent/qrm-plugins/memory/handlers/fragmem/const.go new file mode 100644 index 000000000..d799f73ad --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/fragmem/const.go @@ -0,0 +1,36 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fragmem + +const ( + // Constants for fragmem related kernel features + hostCompactProactivenessFile = "/proc/sys/vm/compaction_proactiveness" + hostMemNodePath = "/sys/devices/system/node/node" + + fragScoreMin = 60.0 + fragScoreMax = 95.0 + minFragScoreGap = 8 + delayCompactTimes = 10 + sleepCompactTime = 10 + minHostLoad = 100 + + commandKcompactd = "kcompactd" +) + +const ( + metricNameMemoryCompaction = "async_handler_memory_compaction" +) diff --git a/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_linux.go b/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_linux.go new file mode 100644 index 000000000..a830352c4 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_linux.go @@ -0,0 +1,167 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fragmem + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/errors" + + memconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/consts" + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/process" +) + +var ( + delayTimes int = 1 + mu sync.RWMutex +) + +// SetDelayValue sets the value of the global delayTimes +func SetDelayTimes(value int) { + mu.Lock() + defer mu.Unlock() + delayTimes = value +} + +// GetDelayValue returns the value of the global delayTimes +func GetDelayTimes() int { + mu.RLock() + defer mu.RUnlock() + return delayTimes +} + +func isHighSystemLoad(metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) bool { + load, err := helper.GetNodeMetricWithTime(metaServer.MetricsFetcher, emitter, consts.MetricLoad5MinSystem) + if err != nil { + general.Errorf("Failed to get load: %v", err) + return false + } + + numCPU := metaServer.CPUTopology.NumCPUs + loadPerCPU := int(load.Value) * 100 / numCPU + general.Infof("Host load info: load: %v, numCPU: %v, loadPerCPU: %v", load.Value, numCPU, loadPerCPU) + + return loadPerCPU > minHostLoad +} + +func memCompacWithBestEffort(conf *coreconfig.Configuration, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) { + fragScoreWatermark := uint64(general.Clamp(float64(conf.SetMemFragScoreAsync), fragScoreMin, fragScoreMax)) + for _, numaID := range metaServer.CPUDetails.NUMANodes().ToSliceNoSortInt() { + // Step 1.0, get fragScore from Malachite + score, err := helper.GetNumaMetricWithTime(metaServer.MetricsFetcher, emitter, consts.MetricMemFragScoreNuma, numaID) + if err != nil { + general.Errorf("Failed to get frag score:%v", err) + continue + } + fragScore := int(score.Value) + general.Infof("NUMA fragScore info: node:%d, fragScore:%d, fragScoreGate:%d", numaID, fragScore, fragScoreWatermark) + // Step 1.1, if the fragScore exceeds the target, we will initiate memory compaction + if fragScore < int(fragScoreWatermark) { + continue + } + + // Step 2, check if kcompactd is in D state + if process.IsCommandInDState(commandKcompactd) { + general.Infof("kcompactd is in D state") + return + } + + // Step 3, do memory compaction in node level + _ = emitter.StoreInt64(metricNameMemoryCompaction, 1, metrics.MetricTypeNameRaw) + setHostMemCompact(numaID) + time.Sleep(sleepCompactTime * time.Second) + + // Step 4, if memory compaction is not effective, extend the check interval + /* + * If mem fragScore shows no significant improvement after compaction + * (frag_score < (fragScoreWatermark-minFragScoreGap)), + * it indicates that the system has minimal physical memory fragmentation + */ + newScore, err := helper.GetNumaMetricWithTime(metaServer.MetricsFetcher, emitter, consts.MetricMemFragScoreNuma, numaID) + if err != nil { + continue + } + newFragScore := int(newScore.Value) + general.Infof("Node fragScore new info: node:%d, fragScore:%d", numaID, newFragScore) + // compare the new and old fragScore to avoid ineffective compaction. + if newFragScore >= int(fragScoreWatermark-minFragScoreGap) { + general.Infof("Not so much memory fragmentation in this node, increase the scanning cycle") + SetDelayTimes(delayCompactTimes) + } + } +} + +/* SetMemCompact is the unified solution for memory compaction. +* it includes 3 parts: +* 1, set the threshold of fragmentation score that triggers synchronous memory compaction in the memory slow path. +* 2, if has proactive compaction feature, then set the threshold of fragmentation score for asynchronous memory compaction through compaction_proactiveness. +* 3, if no proactive compaction feature, then use the threshold of fragmentation score to trigger manually memory compaction. + */ +func SetMemCompact(conf *coreconfig.Configuration, + _ interface{}, _ *dynamicconfig.DynamicAgentConfiguration, + emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, +) { + general.Infof("called") + + var errList []error + defer func() { + _ = general.UpdateHealthzStateByError(memconsts.SetMemCompact, errors.NewAggregate(errList)) + }() + + if conf == nil || emitter == nil || metaServer == nil { + general.Errorf("nil input, conf:%v, emitter:%v, metaServer:%v", conf, emitter, metaServer) + return + } + + if delay := GetDelayTimes(); delay > 0 { + general.Infof("No memory fragmentation in this node, skip this scanning cycle, delay=%d", delay) + SetDelayTimes(delay - 1) + return + } + + // EnableSettingMemCompact featuregate. + if !conf.EnableSettingFragMem { + general.Infof("EnableSettingFragMem disabled") + return + } + + // Step1, check proactive compaction. + // if proactive compaction feature enabled, then return. + if !checkCompactionProactivenessDisabled(hostCompactProactivenessFile) { + general.Infof("proactive compaction enabled, then return") + return + } + + // Step2, avoid too much host pressure. + if isHighSystemLoad(metaServer, emitter) { + return + } + + // Step3, user space memory compaction will be trigger while exceeding fragScoreWatermark(default:80). + memCompacWithBestEffort(conf, metaServer, emitter) +} diff --git a/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_linux_test.go b/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_linux_test.go new file mode 100644 index 000000000..8b1f542fe --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_linux_test.go @@ -0,0 +1,215 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fragmem + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent" + configagent "github.com/kubewharf/katalyst-core/pkg/config/agent" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + metaagent "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +func makeMetaServer() (*metaserver.MetaServer, error) { + server := &metaserver.MetaServer{ + MetaAgent: &metaagent.MetaAgent{}, + } + + cpuTopology, err := machine.GenerateDummyCPUTopology(16, 1, 2) + if err != nil { + return nil, err + } + + server.KatalystMachineInfo = &machine.KatalystMachineInfo{ + CPUTopology: cpuTopology, + } + server.MetricsFetcher = metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}) + return server, nil +} + +func TestSetMemCompact(t *testing.T) { + t.Parallel() + SetMemCompact(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + FragMemOptions: qrm.FragMemOptions{ + EnableSettingFragMem: false, + }, + }, + }, + }, + }, + }, nil, &dynamicconfig.DynamicAgentConfiguration{}, nil, nil) + + SetMemCompact(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + FragMemOptions: qrm.FragMemOptions{ + EnableSettingFragMem: true, + }, + }, + }, + }, + }, + }, nil, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, nil) + + SetMemCompact(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + FragMemOptions: qrm.FragMemOptions{ + EnableSettingFragMem: true, + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, nil) + + metaServer, err := makeMetaServer() + assert.NoError(t, err) + metaServer.PodFetcher = &pod.PodFetcherStub{PodList: []*v1.Pod{}} + SetMemCompact(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + FragMemOptions: qrm.FragMemOptions{ + EnableSettingFragMem: true, + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, metaServer) + + SetMemCompact(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + FragMemOptions: qrm.FragMemOptions{ + EnableSettingFragMem: false, + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, metaServer) + + SetMemCompact(&coreconfig.Configuration{ + AgentConfiguration: &agent.AgentConfiguration{ + StaticAgentConfiguration: &configagent.StaticAgentConfiguration{ + QRMPluginsConfiguration: &qrm.QRMPluginsConfiguration{ + MemoryQRMPluginConfig: &qrm.MemoryQRMPluginConfig{ + FragMemOptions: qrm.FragMemOptions{ + EnableSettingFragMem: true, + SetMemFragScoreAsync: 80, + }, + }, + }, + }, + }, + }, metrics.DummyMetrics{}, &dynamicconfig.DynamicAgentConfiguration{}, metrics.DummyMetrics{}, metaServer) +} + +// Helper function to create a temporary file with given content +func createTempFile(t *testing.T, content string) string { + file, err := ioutil.TempFile("", "testfile") + assert.NoError(t, err) + defer file.Close() + + _, err = file.WriteString(content) + assert.NoError(t, err) + + return file.Name() +} + +func TestCheckCompactionProactivenessDisabled(t *testing.T) { + t.Parallel() + tests := []struct { + name string + fileContent string + expected bool + }{ + {"File does not exist", "", true}, + {"File content is empty", "", true}, + {"File content is zero", "0", true}, + {"File content is negative", "-1", true}, + {"File content is positive", "1", false}, + {"File content is not a number", "abc", true}, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + tt := tt // Reassign the loop variable + t.Parallel() + var filePath string + if tt.fileContent != "" { + filePath = createTempFile(t, tt.fileContent) + defer os.Remove(filePath) // Clean up + } else { + filePath = "nonexistentfile" + } + + result := checkCompactionProactivenessDisabled(filePath) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestSetDelayTimes(t *testing.T) { + t.Parallel() + // Test case 1: Set delayTimes to a positive value + SetDelayTimes(10) + assert.Equal(t, 10, GetDelayTimes(), "Expected delayTimes to be 10") + + // Test case 2: Set delayTimes to zero + SetDelayTimes(0) + assert.Equal(t, 0, GetDelayTimes(), "Expected delayTimes to be 0") + + // Test case 3: Set delayTimes to a negative value + SetDelayTimes(-5) + assert.Equal(t, -5, GetDelayTimes(), "Expected delayTimes to be -5") +} + +func TestSetHostCompact(t *testing.T) { + t.Parallel() + setHostMemCompact(25535) +} diff --git a/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_unsupported.go b/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_unsupported.go new file mode 100644 index 000000000..04a76a451 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/fragmem/fragmem_unsupported.go @@ -0,0 +1,31 @@ +//go:build !linux + +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fragmem + +import ( + coreconfig "github.com/kubewharf/katalyst-core/pkg/config" + dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +func SetMemCompact(conf *coreconfig.Configuration, + _ interface{}, _ *dynamicconfig.DynamicAgentConfiguration, + emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer) { +} diff --git a/pkg/agent/qrm-plugins/memory/handlers/fragmem/utils_linux.go b/pkg/agent/qrm-plugins/memory/handlers/fragmem/utils_linux.go new file mode 100644 index 000000000..4100af473 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/handlers/fragmem/utils_linux.go @@ -0,0 +1,57 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fragmem + +import ( + "fmt" + "io/ioutil" + "os" + "strconv" + "strings" +) + +// checkCompactionProactivenessDisabled checks the content of /proc/sys/vm/compaction_proactiveness +// and returns false if the file exists and its content is greater than 0, otherwise returns true. +func checkCompactionProactivenessDisabled(filePath string) bool { + // Check if the file exists + if _, err := os.Stat(filePath); os.IsNotExist(err) { + return true + } + + // Read the file content + content, err := ioutil.ReadFile(filePath) + if err != nil { + return true + } + + // Convert content to integer + value, err := strconv.Atoi(strings.TrimSpace(string(content))) + if err != nil { + return true + } + + // Return false if value is greater than 0 + return value <= 0 +} + +func setHostMemCompact(node int) { + targetFile := hostMemNodePath + strconv.Itoa(node) + "/compact" + _ = os.WriteFile(targetFile, []byte(fmt.Sprintf("%d", 1)), 0o644) +} diff --git a/pkg/config/agent/qrm/memory_plugin.go b/pkg/config/agent/qrm/memory_plugin.go index 5c463c5b1..655f24c62 100644 --- a/pkg/config/agent/qrm/memory_plugin.go +++ b/pkg/config/agent/qrm/memory_plugin.go @@ -44,6 +44,8 @@ type MemoryQRMPluginConfig struct { SockMemQRMPluginConfig // LogCacheQRMPluginConfig: the configuration for logcache evicting LogCacheQRMPluginConfig + // FragMemOptions: the configuration for memory compaction related features + FragMemOptions } type SockMemQRMPluginConfig struct { @@ -72,6 +74,13 @@ type LogCacheQRMPluginConfig struct { FileFilters []string } +type FragMemOptions struct { + EnableSettingFragMem bool + // SetMemFragScoreAsync sets the threashold of frag score for async memory compaction. + // The async compaction behavior will be triggered while exceeding this score. + SetMemFragScoreAsync int +} + func NewMemoryQRMPluginConfig() *MemoryQRMPluginConfig { return &MemoryQRMPluginConfig{} } diff --git a/pkg/consts/metric.go b/pkg/consts/metric.go index f8411623a..7e62775ae 100644 --- a/pkg/consts/metric.go +++ b/pkg/consts/metric.go @@ -160,6 +160,7 @@ const ( MetricMemLatencyReadNuma = "mem.latency.read.numa" MetricMemLatencyWriteNuma = "mem.latency.write.numa" MetricMemAMDL3MissLatencyNuma = "mem.latency.amd.l3.miss" + MetricMemFragScoreNuma = "mem.frag.score.numa" MetricCPUUsageNuma = "cpu.usage.numa" ) diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go index 0cce6ba77..1356cc131 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go @@ -141,6 +141,7 @@ func (m *MalachiteMetricsProvisioner) updateSystemStats() error { } else { m.processSystemMemoryData(systemMemoryData) m.processSystemNumaData(systemMemoryData, systemComputeData) + m.processSystemExtFragData(systemMemoryData) } systemIOData, err := m.malachiteClient.GetSystemIOStats() @@ -527,6 +528,15 @@ func (m *MalachiteMetricsProvisioner) processSystemNumaData(systemMemoryData *ma } } +func (m *MalachiteMetricsProvisioner) processSystemExtFragData(systemMemoryData *malachitetypes.SystemMemoryData) { + updateTime := time.Unix(systemMemoryData.UpdateTime, 0) + + for _, numa := range systemMemoryData.ExtFrag { + m.metricStore.SetNumaMetric(numa.ID, consts.MetricMemFragScoreNuma, + utilmetric.MetricData{Value: float64(numa.MemFragScore), Time: &updateTime}) + } +} + func (m *MalachiteMetricsProvisioner) processSystemCPUComputeData(systemComputeData *malachitetypes.SystemComputeData) { // todo, currently we only get a unified data for the whole system compute data updateTime := time.Unix(systemComputeData.UpdateTime, 0) diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner_test.go b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner_test.go index 586181b94..199e4464a 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner_test.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner_test.go @@ -123,6 +123,7 @@ func Test_noneExistMetricsProvisioner(t *testing.T) { implement.(*MalachiteMetricsProvisioner).processSystemMemoryData(fakeSystemMemory) implement.(*MalachiteMetricsProvisioner).processSystemIOData(fakeSystemIO) implement.(*MalachiteMetricsProvisioner).processSystemNumaData(fakeSystemMemory, fakeSystemCompute) + implement.(*MalachiteMetricsProvisioner).processSystemExtFragData(fakeSystemMemory) implement.(*MalachiteMetricsProvisioner).processSystemCPUComputeData(fakeSystemCompute) implement.(*MalachiteMetricsProvisioner).processSystemNetData(fakeSystemNet) diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/types/system.go b/pkg/metaserver/agent/metric/provisioner/malachite/types/system.go index cad02f0de..a794eae3b 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/types/system.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/types/system.go @@ -129,9 +129,10 @@ type MalachiteSystemMemoryResponse struct { } type SystemMemoryData struct { - System System `json:"system"` - Numa []Numa `json:"numa"` - UpdateTime int64 `json:"update_time"` + System System `json:"system"` + Numa []Numa `json:"numa"` + ExtFrag []ExtFrag `json:"extfrag"` + UpdateTime int64 `json:"update_time"` } type System struct { @@ -185,6 +186,11 @@ type Numa struct { AMDL3MissLatencyMax float64 `json:"amd_l3_miss_latency_max"` } +type ExtFrag struct { + ID int `json:"id"` + MemFragScore uint64 `json:"mem_frag_score"` +} + type Some struct { Avg10 float64 `json:"avg10"` Avg60 float64 `json:"avg60"` diff --git a/pkg/util/process/system.go b/pkg/util/process/system.go index 1e5fd28df..06e03a83f 100644 --- a/pkg/util/process/system.go +++ b/pkg/util/process/system.go @@ -17,7 +17,9 @@ limitations under the License. package process import ( + "bytes" "fmt" + "os/exec" "strconv" "strings" @@ -62,3 +64,18 @@ func CPUSetParse(s string) (sets.Int, error) { return b, nil } + +// IsCommandInDState checks if the specified command is in the D (uninterruptible sleep) state. +func IsCommandInDState(command string) bool { + // Execute the command: ps -aux | grep " D" | grep | grep -v grep + cmd := exec.Command("sh", "-c", "ps -aux | grep ' D' | grep "+command+" | grep -v grep") + var out bytes.Buffer + cmd.Stdout = &out + err := cmd.Run() + + // If the command runs successfully and output is not empty, the process is in D state + if err == nil && strings.TrimSpace(out.String()) != "" { + return true + } + return false +} diff --git a/pkg/util/process/system_test.go b/pkg/util/process/system_test.go new file mode 100644 index 000000000..215eb7aeb --- /dev/null +++ b/pkg/util/process/system_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package process + +import ( + "testing" +) + +func TestIsCommandInDState(t *testing.T) { + t.Parallel() + got := IsCommandInDState("NotExisted") + if got != false { + t.Errorf("IsCommandInDState() = %v, want false", got) + } +}