From ceb04ead4eb770bed0f87ef63c3847c09e2bd911 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Mon, 18 Dec 2023 16:34:18 +0900 Subject: [PATCH] implement malloc metrics (#2161) * add malloc_info Signed-off-by: Kosuke Morimoto * add malloc_info metrics Signed-off-by: Kosuke Morimoto * fix Signed-off-by: Kosuke Morimoto * fix Signed-off-by: Kosuke Morimoto * add malloc trim Signed-off-by: Kosuke Morimoto * add core ngt benchmark Signed-off-by: Kosuke Morimoto * add memory metrics Signed-off-by: Kosuke Morimoto * fix Signed-off-by: Kosuke Morimoto * style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in 94ff86c according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2161 * fix Signed-off-by: Kosuke Morimoto * resolve golangci Signed-off-by: Kosuke Morimoto * make format Signed-off-by: Kosuke Morimoto * style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in 6dfa2c8 according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2161 * make format Signed-off-by: Kosuke Morimoto * fix Signed-off-by: Kosuke Morimoto * fix Signed-off-by: Kosuke Morimoto * fix Signed-off-by: Kosuke Morimoto * make format Signed-off-by: Kosuke Morimoto * style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in 8c29a17 according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2161 * fix Signed-off-by: Kosuke Morimoto * make format Signed-off-by: Kosuke Morimoto * remove malloc_info Signed-off-by: Kosuke Morimoto * remove unused error Signed-off-by: Kosuke Morimoto * make format Signed-off-by: Kosuke Morimoto * style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in c6e1311 according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2161 * type assertion check Signed-off-by: Kosuke Morimoto * style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in c8b302e according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2161 * skipcq: GO-R1005 Signed-off-by: Kosuke Morimoto --------- Signed-off-by: Kosuke Morimoto Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> --- .../observability/metrics/mem/index/index.go | 390 ----------- .../metrics/mem/index/index_test.go | 262 -------- .../metrics/mem/malloc/malloc_test.go | 174 ----- internal/observability/metrics/mem/mem.go | 614 ++++++++++++++++++ internal/observability/observability.go | 4 +- pkg/agent/core/ngt/usecase/agentd.go | 2 + 6 files changed, 618 insertions(+), 828 deletions(-) delete mode 100644 internal/observability/metrics/mem/index/index.go delete mode 100644 internal/observability/metrics/mem/index/index_test.go delete mode 100644 internal/observability/metrics/mem/malloc/malloc_test.go create mode 100644 internal/observability/metrics/mem/mem.go diff --git a/internal/observability/metrics/mem/index/index.go b/internal/observability/metrics/mem/index/index.go deleted file mode 100644 index 40775d18b3..0000000000 --- a/internal/observability/metrics/mem/index/index.go +++ /dev/null @@ -1,390 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package index - -import ( - "context" - "runtime" - "time" - - "github.com/vdaas/vald/internal/observability/metrics" - "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/view" -) - -const ( - allocMetricsName = "alloc_bytes" - allocMetricsDescription = "Currently allocated number of bytes on the heap" - - totalAllocMetricsName = "alloc_bytes_total" - totalAllocMetricsDescription = "Cumulative bytes allocated for heap objects" - - sysMetricsName = "sys_bytes" - sysMetricsDescription = "Total bytes of memory obtained from the OS" - - mallocsMetricsName = "mallocs_total" - mallocsMetricsDescription = "The cumulative count of heap objects allocated" - - freesMetricsName = "frees_total" - freesMetricsDescription = "The cumulative count of heap objects freed" - - heapAllocMetricsName = "heap_alloc_bytes" - heapAllocMetricsDescription = "Bytes of allocated heap object" - - heapSysMetricsName = "heap_sys_bytes" - heapSysMetricsDescription = "Bytes of heap memory obtained from the OS" - - heapIdleMetricsName = "heap_idle_bytes" - heapIdleMetricsDescription = "Bytes in idle (unused) spans" - - heapInuseMetricsName = "heap_inuse_bytes" - heapInuseMetricsDescription = "Bytes in in-use spans" - - heapReleasedMetricsName = "heap_released_bytes" - heapReleasedMetricsDescription = "Bytes of physical memory returned to the OS" - - stackInuseMetricsName = "stack_inuse_bytes" - stackInuseMetricsDescription = "Bytes in stack spans" - - stackSysMetricsName = "stack_sys_bytes" - stackSysMetricsDescription = "Bytes of stack memory obtained from the OS" - - pauseTotalMsMetricsName = "pause_ms_total" - pauseTotalMsMetricsDescription = "The cumulative milliseconds in GC" - - numGCMetricsName = "gc_count" - numGCMetricsDescription = "The number of completed GC cycles" -) - -type memoryMetrics struct{} - -func New() metrics.Metric { - return &memoryMetrics{} -} - -func (*memoryMetrics) View() ([]*metrics.View, error) { - alloc, err := view.New( - view.MatchInstrumentName(allocMetricsDescription), - view.WithSetDescription(allocMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - totalAlloc, err := view.New( - view.MatchInstrumentName(totalAllocMetricsDescription), - view.WithSetDescription(totalAllocMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - sys, err := view.New( - view.MatchInstrumentName(sysMetricsName), - view.WithSetDescription(sysMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - mallocs, err := view.New( - view.MatchInstrumentName(mallocsMetricsName), - view.WithSetDescription(mallocsMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - frees, err := view.New( - view.MatchInstrumentName(freesMetricsName), - view.WithSetDescription(freesMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - heapAlloc, err := view.New( - view.MatchInstrumentName(heapAllocMetricsName), - view.WithSetDescription(heapAllocMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - heapSys, err := view.New( - view.MatchInstrumentName(heapSysMetricsName), - view.WithSetDescription(heapSysMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - heapIdle, err := view.New( - view.MatchInstrumentName(heapIdleMetricsName), - view.WithSetDescription(heapIdleMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - heapInuse, err := view.New( - view.MatchInstrumentName(heapInuseMetricsName), - view.WithSetDescription(heapInuseMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - heapReleased, err := view.New( - view.MatchInstrumentName(heapReleasedMetricsName), - view.WithSetDescription(heapReleasedMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - stackInuse, err := view.New( - view.MatchInstrumentName(stackInuseMetricsName), - view.WithSetDescription(stackInuseMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - stackSys, err := view.New( - view.MatchInstrumentName(stackSysMetricsName), - view.WithSetDescription(stackSysMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - pauseTotalMs, err := view.New( - view.MatchInstrumentName(pauseTotalMsMetricsName), - view.WithSetDescription(pauseTotalMsMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - numGC, err := view.New( - view.MatchInstrumentName(numGCMetricsName), - view.WithSetDescription(numGCMetricsDescription), - view.WithSetAggregation(aggregation.LastValue{}), - ) - if err != nil { - return nil, err - } - - return []*metrics.View{ - &alloc, - &totalAlloc, - &sys, - &mallocs, - &frees, - &heapAlloc, - &heapSys, - &heapIdle, - &heapInuse, - &heapReleased, - &stackInuse, - &stackSys, - &pauseTotalMs, - &numGC, - }, nil -} - -func (*memoryMetrics) Register(m metrics.Meter) error { - alloc, err := m.AsyncInt64().Gauge( - allocMetricsName, - metrics.WithDescription(allocMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - totalAlloc, err := m.AsyncInt64().Gauge( - totalAllocMetricsDescription, - metrics.WithDescription(totalAllocMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - sys, err := m.AsyncInt64().Gauge( - sysMetricsName, - metrics.WithDescription(sysMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - mallocs, err := m.AsyncInt64().Gauge( - mallocsMetricsName, - metrics.WithDescription(mallocsMetricsDescription), - metrics.WithUnit(metrics.Dimensionless), - ) - if err != nil { - return err - } - - frees, err := m.AsyncInt64().Gauge( - freesMetricsName, - metrics.WithDescription(freesMetricsDescription), - metrics.WithUnit(metrics.Dimensionless), - ) - if err != nil { - return err - } - - heapAlloc, err := m.AsyncInt64().Gauge( - heapAllocMetricsName, - metrics.WithDescription(heapAllocMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - heapSys, err := m.AsyncInt64().Gauge( - heapSysMetricsName, - metrics.WithDescription(heapSysMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - heapIdle, err := m.AsyncInt64().Gauge( - heapIdleMetricsName, - metrics.WithDescription(heapIdleMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - heapInuse, err := m.AsyncInt64().Gauge( - heapInuseMetricsName, - metrics.WithDescription(heapInuseMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - heapReleased, err := m.AsyncInt64().Gauge( - heapReleasedMetricsName, - metrics.WithDescription(heapReleasedMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - stackInuse, err := m.AsyncInt64().Gauge( - stackInuseMetricsName, - metrics.WithDescription(stackInuseMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - stackSys, err := m.AsyncInt64().Gauge( - stackSysMetricsName, - metrics.WithDescription(stackSysMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - pauseTotalMs, err := m.AsyncInt64().Gauge( - pauseTotalMsMetricsName, - metrics.WithDescription(pauseTotalMsMetricsDescription), - metrics.WithUnit(metrics.Milliseconds), - ) - if err != nil { - return err - } - - numGC, err := m.AsyncInt64().Gauge( - numGCMetricsName, - metrics.WithDescription(numGCMetricsDescription), - metrics.WithUnit(metrics.Bytes), - ) - if err != nil { - return err - } - - return m.RegisterCallback( - []metrics.AsynchronousInstrument{ - alloc, - totalAlloc, - sys, - mallocs, - frees, - heapAlloc, - heapSys, - heapIdle, - heapInuse, - heapReleased, - stackInuse, - stackSys, - pauseTotalMs, - numGC, - }, - func(ctx context.Context) { - var mstats runtime.MemStats - runtime.ReadMemStats(&mstats) - - alloc.Observe(ctx, int64(mstats.Alloc)) - totalAlloc.Observe(ctx, int64(mstats.TotalAlloc)) - sys.Observe(ctx, int64(mstats.Sys)) - mallocs.Observe(ctx, int64(mstats.Mallocs)) - frees.Observe(ctx, int64(mstats.Frees)) - heapAlloc.Observe(ctx, int64(mstats.HeapAlloc)) - heapSys.Observe(ctx, int64(mstats.HeapSys)) - heapIdle.Observe(ctx, int64(mstats.HeapIdle)) - heapInuse.Observe(ctx, int64(mstats.HeapInuse)) - heapReleased.Observe(ctx, int64(mstats.HeapReleased)) - stackInuse.Observe(ctx, int64(mstats.StackInuse)) - stackSys.Observe(ctx, int64(mstats.StackSys)) - - ptMs := int64(0) - if mstats.PauseTotalNs > 0 { - ptMs = int64(mstats.PauseTotalNs / uint64(time.Millisecond)) - } - pauseTotalMs.Observe(ctx, ptMs) - numGC.Observe(ctx, int64(mstats.NextGC)) - }, - ) -} diff --git a/internal/observability/metrics/mem/index/index_test.go b/internal/observability/metrics/mem/index/index_test.go deleted file mode 100644 index 5f514693b1..0000000000 --- a/internal/observability/metrics/mem/index/index_test.go +++ /dev/null @@ -1,262 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package index - -// NOT IMPLEMENTED BELOW -// -// func TestNew(t *testing.T) { -// type want struct { -// want metrics.Metric -// } -// type test struct { -// name string -// want want -// checkFunc func(want, metrics.Metric) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got metrics.Metric) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := New() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_memoryMetrics_View(t *testing.T) { -// type want struct { -// want []*metrics.View -// err error -// } -// type test struct { -// name string -// m *memoryMetrics -// want want -// checkFunc func(want, []*metrics.View, error) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got []*metrics.View, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &memoryMetrics{} -// -// got, err := m.View() -// if err := checkFunc(test.want, got, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_memoryMetrics_Register(t *testing.T) { -// type args struct { -// m metrics.Meter -// } -// type want struct { -// err error -// } -// type test struct { -// name string -// args args -// m *memoryMetrics -// want want -// checkFunc func(want, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// m:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// m:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &memoryMetrics{} -// -// err := m.Register(test.args.m) -// if err := checkFunc(test.want, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/internal/observability/metrics/mem/malloc/malloc_test.go b/internal/observability/metrics/mem/malloc/malloc_test.go deleted file mode 100644 index c71e1540db..0000000000 --- a/internal/observability/metrics/mem/malloc/malloc_test.go +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package malloc - -// NOT IMPLEMENTED BELOW -// -// func TestNew(t *testing.T) { -// type want struct { -// want metrics.Metrics -// } -// type test struct { -// name string -// want want -// checkFunc func(want, metrics.Metrics) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got metrics.Metrics) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := New() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_mallocMetrics_View(t *testing.T) { -// type want struct { -// want []*metrics.View -// err error -// } -// type test struct { -// name string -// m *mallocMetrics -// want want -// checkFunc func(want, []*metrics.View, error) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got []*metrics.View, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// m := &mallocMetrics{} -// -// got, err := m.View() -// if err := checkFunc(test.want, got, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/internal/observability/metrics/mem/mem.go b/internal/observability/metrics/mem/mem.go new file mode 100644 index 0000000000..f582f59c12 --- /dev/null +++ b/internal/observability/metrics/mem/mem.go @@ -0,0 +1,614 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package mem + +import ( + "context" + "fmt" + "os" + "runtime" + "strconv" + "strings" + "time" + + "github.com/vdaas/vald/internal/conv" + "github.com/vdaas/vald/internal/observability/metrics" + "go.opentelemetry.io/otel/metric/instrument/asyncint64" + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/view" +) + +const ( + // metrics from runtime.Memstats + allocMetricsName = "alloc_bytes" + allocMetricsDescription = "Currently allocated number of bytes on the heap" + + totalAllocMetricsName = "alloc_bytes_total" + totalAllocMetricsDescription = "Cumulative bytes allocated for heap objects" + + sysMetricsName = "sys_bytes" + sysMetricsDescription = "Total bytes of memory obtained from the OS" + + lookupsMetricsName = "lookups_count" + lookupsMetricsDescription = "The number of pointers" + + mallocsMetricsName = "mallocs_total" + mallocsMetricsDescription = "The cumulative count of heap objects allocated" + + freesMetricsName = "frees_total" + freesMetricsDescription = "The cumulative count of heap objects freed" + + heapAllocMetricsName = "heap_alloc_bytes" + heapAllocMetricsDescription = "Bytes of allocated heap object" + + heapSysMetricsName = "heap_sys_bytes" + heapSysMetricsDescription = "Bytes of heap memory obtained from the OS" + + heapIdleMetricsName = "heap_idle_bytes" + heapIdleMetricsDescription = "Bytes in idle (unused) spans" + + heapInuseMetricsName = "heap_inuse_bytes" + heapInuseMetricsDescription = "Bytes in in-use spans" + + heapReleasedMetricsName = "heap_released_bytes" + heapReleasedMetricsDescription = "Bytes of physical memory returned to the OS" + + heapObjectsMetricsName = "heap_objects_count" + heapObjectsMetricsDescription = "The number of allocated heap objects" + + stackInuseMetricsName = "stack_inuse_bytes" + stackInuseMetricsDescription = "Bytes in stack spans" + + stackSysMetricsName = "stack_sys_bytes" + stackSysMetricsDescription = "Bytes of stack memory obtained from the OS" + + mspanInuseMetricsName = "mspan_inuse_bytes" + mspanInuseMetricsDescription = "Bytes of allocated mspan structures" + + mspanSysMetricsName = "mspan_sys_bytes" + mspanSysMetricsDescription = "Bytes of memory obtained from the OS for mspan structures" + + mcacheInuseMetricsName = "mcache_inuse_bytes" + mcacheInuseMetricsDescription = "Bytes of allocated mcache structures" + + mcacheSysMetricsName = "mcache_sys_bytes" + mcacheSysMetricsDescription = "Bytes of memory obtained from the OS mcache structures" + + buckHashSysMetricsName = "buckhash_sys_bytes" + buckHashSysMetricsDescription = "Bytes of memory in profiling bucket hash tables" + + gcSysMetricsName = "gc_sys_bytes" + gcSysMetricsDescription = "Bytes of memory in GC metadata" + + otherSysMetricsName = "other_sys_bytes" + otherSysMetricsDescription = "Bytes of memory in misc off-heap runtime allocations" + + nextGcSysMetricsName = "next_gc_bytes" + nextGcSysMetricsDescription = "Target heap size of the next GC" + + pauseTotalMsMetricsName = "pause_ms_total" + pauseTotalMsMetricsDescription = "The cumulative milliseconds in GC" + + numGCMetricsName = "gc_count" + numGCMetricsDescription = "The number of completed GC cycles" + + numForcedGCMetricsName = "forced_gc_count" + numForcedGCMetricsDescription = "The number of GC cycles called by the application" + + heapWillReturnMetricsName = "heap_will_return_bytes" + heapWillReturnMetricsDescription = "Bytes of returning to OS. It contains the two following parts (heapWillReturn = heapIdle - heapReleased)" + + liveObjectsMetricsName = "live_objects_count" + liveObjectsMetricsDescription = "The cumulative count of living heap objects allocated. It contains the two following parts (liveObjects = mallocs - frees)" + + // metrics from /proc//status + vmpeakMetricsName = "vmpeak_bytes" + vmpeakMetricsDescription = "peak virtual memory size" + + vmsizeMetricsName = "vmsize_bytes" + vmsizeMetricsDescription = "toal program size" + + vmdataMetricsName = "vmdata_bytes" + vmdataMetricsDescription = "size of private data segments" + + vmrssMetricsName = "vmrss_bytes" + vmrssMetricsDescription = "size of memory portions. It contains the three following parts (VmRSS = RssAnon + RssFile + RssShmem)" + + vmhwmMetricsName = "vmhwm_bytes" + vmhwmMetricsDescription = "peak resident set size (\"high water mark\")" + + vmstkMetricsName = "vmstk_bytes" + vmstkMetricsDescription = "size of stack segments" + + vmswapMetricsName = "vmswap_bytes" + vmswapMetricsDescription = "amount of swap used by anonymous private data (shmem swap usage is not included)" + + vmexeMetricsName = "vmexe_bytes" + vmexeMetricsDescription = "size of text segment" + + vmlibMetricsName = "vmlib_bytes" + vmlibMetricsDescription = "size of shared library code" + + vmlckMetricsName = "vmlck_bytes" + vmlckMetricsDescription = "locked memory size" + + vmpinMetricsName = "vmpin_bytes" + vmpinMetricsDescription = "pinned memory size" + + vmpteMetricsName = "vmpte_bytes" + vmpteMetricsDescription = "size of page table entries" + + k = 1024 +) + +type metricsInfo struct { + Name string + Desc string + Unit metrics.Unit + Value func() int64 +} + +func getMemstatsMetrics() []*metricsInfo { + var m runtime.MemStats + runtime.ReadMemStats(&m) + return []*metricsInfo{ + { + Name: allocMetricsName, + Desc: allocMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.Alloc) + }, + }, + { + Name: totalAllocMetricsName, + Desc: totalAllocMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.TotalAlloc) + }, + }, + { + Name: sysMetricsName, + Desc: sysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.Sys) + }, + }, + { + Name: lookupsMetricsName, + Desc: lookupsMetricsDescription, + Unit: metrics.Dimensionless, + Value: func() int64 { + return int64(m.Lookups) + }, + }, + { + Name: mallocsMetricsName, + Desc: mallocsMetricsDescription, + Unit: metrics.Dimensionless, + Value: func() int64 { + return int64(m.Mallocs) + }, + }, + { + Name: freesMetricsName, + Desc: freesMetricsDescription, + Unit: metrics.Dimensionless, + Value: func() int64 { + return int64(m.Frees) + }, + }, + { + Name: heapAllocMetricsName, + Desc: heapAllocMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.HeapAlloc) + }, + }, + { + Name: heapSysMetricsName, + Desc: heapSysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.HeapSys) + }, + }, + { + Name: heapIdleMetricsName, + Desc: heapIdleMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.HeapIdle) + }, + }, + { + Name: heapInuseMetricsName, + Desc: heapInuseMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.HeapInuse) + }, + }, + { + Name: heapReleasedMetricsName, + Desc: heapReleasedMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.HeapReleased) + }, + }, + { + Name: heapObjectsMetricsName, + Desc: heapObjectsMetricsDescription, + Unit: metrics.Dimensionless, + Value: func() int64 { + return int64(m.HeapObjects) + }, + }, + { + Name: stackInuseMetricsName, + Desc: stackInuseMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.StackInuse) + }, + }, + { + Name: stackSysMetricsName, + Desc: stackSysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.StackSys) + }, + }, + { + Name: mspanInuseMetricsName, + Desc: mspanInuseMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.MSpanInuse) + }, + }, + { + Name: mspanSysMetricsName, + Desc: mspanSysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.MSpanSys) + }, + }, + { + Name: mcacheInuseMetricsName, + Desc: mcacheInuseMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.MCacheInuse) + }, + }, + { + Name: mcacheSysMetricsName, + Desc: mcacheSysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.MCacheSys) + }, + }, + { + Name: buckHashSysMetricsName, + Desc: buckHashSysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.BuckHashSys) + }, + }, + { + Name: gcSysMetricsName, + Desc: gcSysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.GCSys) + }, + }, + { + Name: otherSysMetricsName, + Desc: otherSysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.OtherSys) + }, + }, + { + Name: nextGcSysMetricsName, + Desc: nextGcSysMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.NextGC) + }, + }, + { + Name: pauseTotalMsMetricsName, + Desc: pauseTotalMsMetricsDescription, + Unit: metrics.Milliseconds, + Value: func() int64 { + ptMs := int64(0) + if m.PauseTotalNs > 0 { + ptMs = int64(m.PauseTotalNs / uint64(time.Millisecond)) + } + return ptMs + }, + }, + { + Name: numGCMetricsName, + Desc: numGCMetricsDescription, + Unit: metrics.Dimensionless, + Value: func() int64 { + return int64(m.NumGC) + }, + }, + { + Name: numForcedGCMetricsName, + Desc: numForcedGCMetricsDescription, + Unit: metrics.Dimensionless, + Value: func() int64 { + return int64(m.NumForcedGC) + }, + }, + { + Name: heapWillReturnMetricsName, + Desc: heapWillReturnMetricsDescription, + Unit: metrics.Bytes, + Value: func() int64 { + return int64(m.HeapIdle - m.HeapReleased) + }, + }, + { + Name: liveObjectsMetricsName, + Desc: liveObjectsMetricsDescription, + Unit: metrics.Dimensionless, + Value: func() int64 { + return int64(m.Mallocs - m.Frees) + }, + }, + } +} + +// skipcq: GO-R1005 +func getProcStatusMetrics(pid int) ([]*metricsInfo, error) { + buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/status", pid)) + if err != nil { + return nil, err + } + lines := strings.Split(conv.Btoa(buf), "\n") + m := make([]*metricsInfo, 0) + for _, line := range lines { + fields := strings.Fields(line) + switch { + case strings.HasPrefix(line, "VmPeak"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmpeakMetricsName, + Desc: vmpeakMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmSize"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmsizeMetricsName, + Desc: vmsizeMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmHWM"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmhwmMetricsName, + Desc: vmhwmMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmRSS"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmrssMetricsName, + Desc: vmrssMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmData"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmdataMetricsName, + Desc: vmdataMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmStk"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmstkMetricsName, + Desc: vmstkMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmExe"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmexeMetricsName, + Desc: vmexeMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmLck"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmlckMetricsName, + Desc: vmlckMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmLib"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmlibMetricsName, + Desc: vmlibMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmPTE"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmpteMetricsName, + Desc: vmpteMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmSwap"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmswapMetricsName, + Desc: vmswapMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + case strings.HasPrefix(line, "VmPin"): + f, err := strconv.ParseInt(fields[1], 10, 64) + if err == nil { + m = append(m, &metricsInfo{ + Name: vmpinMetricsName, + Desc: vmpinMetricsDescription, + Unit: unit.Bytes, + Value: func() int64 { + return f * k + }, + }) + } + } + } + return m, nil +} + +type memMetrics struct { + pid int +} + +func New() metrics.Metric { + return &memMetrics{ + pid: os.Getpid(), + } +} + +func (mm *memMetrics) View() ([]*metrics.View, error) { + mInfo := getMemstatsMetrics() + if m, err := getProcStatusMetrics(mm.pid); err == nil { + mInfo = append(mInfo, m...) + } + + views := make([]*metrics.View, 0, len(mInfo)) + for _, m := range mInfo { + v, err := view.New( + view.MatchInstrumentName(m.Name), + view.WithSetDescription(m.Desc), + view.WithSetAggregation(aggregation.LastValue{}), + ) + if err != nil { + return nil, err + } + views = append(views, &v) + } + return views, nil +} + +func (mm *memMetrics) Register(m metrics.Meter) error { + mInfo := getMemstatsMetrics() + if metrics, err := getProcStatusMetrics(mm.pid); err == nil { + mInfo = append(mInfo, metrics...) + } + + instruments := make([]metrics.AsynchronousInstrument, 0, len(mInfo)) + for _, info := range mInfo { + instrument, err := m.AsyncInt64().Gauge( + info.Name, + metrics.WithDescription(info.Desc), + metrics.WithUnit(info.Unit), + ) + if err != nil { + return err + } + instruments = append(instruments, instrument) + } + + return m.RegisterCallback( + instruments, + func(ctx context.Context) { + var mstats runtime.MemStats + runtime.ReadMemStats(&mstats) + + for i, instrument := range instruments { + g, ok := instrument.(asyncint64.Gauge) + if ok { + g.Observe(ctx, mInfo[i].Value()) + } + } + }, + ) +} diff --git a/internal/observability/observability.go b/internal/observability/observability.go index 340f547a77..d8d32f9454 100644 --- a/internal/observability/observability.go +++ b/internal/observability/observability.go @@ -24,7 +24,7 @@ import ( "github.com/vdaas/vald/internal/observability/exporter/otlp" "github.com/vdaas/vald/internal/observability/metrics" "github.com/vdaas/vald/internal/observability/metrics/grpc" - "github.com/vdaas/vald/internal/observability/metrics/mem/index" + "github.com/vdaas/vald/internal/observability/metrics/mem" "github.com/vdaas/vald/internal/observability/metrics/runtime/cgo" "github.com/vdaas/vald/internal/observability/metrics/runtime/goroutine" "github.com/vdaas/vald/internal/observability/metrics/version" @@ -58,7 +58,7 @@ func NewWithConfig(cfg *config.Observability, ms ...metrics.Metric) (Observabili ms = append(ms, goroutine.New()) } if cfg.Metrics.EnableMemory { - ms = append(ms, index.New()) + ms = append(ms, mem.New()) } if cfg.Metrics.EnableVersionInfo { ms = append(ms, version.New(cfg.Metrics.VersionInfoLabels...)) diff --git a/pkg/agent/core/ngt/usecase/agentd.go b/pkg/agent/core/ngt/usecase/agentd.go index b1b1fb1fb3..fe1d63ab35 100644 --- a/pkg/agent/core/ngt/usecase/agentd.go +++ b/pkg/agent/core/ngt/usecase/agentd.go @@ -26,6 +26,7 @@ import ( "github.com/vdaas/vald/internal/observability" ngtmetrics "github.com/vdaas/vald/internal/observability/metrics/agent/core/ngt" infometrics "github.com/vdaas/vald/internal/observability/metrics/info" + memmetrics "github.com/vdaas/vald/internal/observability/metrics/mem" "github.com/vdaas/vald/internal/runner" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/servers/server" @@ -97,6 +98,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { cfg.Observability, ngtmetrics.New(ngt), infometrics.New("agent_core_ngt_info", "Agent NGT info", *cfg.NGT), + memmetrics.New(), ) if err != nil { return nil, err