From ee076e0ab2bf07c6117df1570fc5fedec8910a55 Mon Sep 17 00:00:00 2001 From: reus Date: Tue, 19 Nov 2024 12:35:31 +0800 Subject: [PATCH 1/2] fileservice: refactor http trace --- pkg/fileservice/http_trace.go | 122 ++++++++++++++++++ pkg/fileservice/object_storage_http_trace.go | 87 +++++++++++++ pkg/fileservice/reuse.go | 27 ---- pkg/fileservice/s3_fs.go | 86 +----------- pkg/fileservice/s3_fs_test.go | 103 +-------------- .../v2/dashboard/grafana_dashboard_fs.go | 53 +++++--- pkg/util/metric/v2/fileservice.go | 39 +++--- pkg/util/metric/v2/metrics.go | 4 +- 8 files changed, 269 insertions(+), 252 deletions(-) create mode 100644 pkg/fileservice/http_trace.go create mode 100644 pkg/fileservice/object_storage_http_trace.go delete mode 100644 pkg/fileservice/reuse.go diff --git a/pkg/fileservice/http_trace.go b/pkg/fileservice/http_trace.go new file mode 100644 index 0000000000000..7ca6c54caf3be --- /dev/null +++ b/pkg/fileservice/http_trace.go @@ -0,0 +1,122 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileservice + +import ( + "crypto/tls" + "net/http/httptrace" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/reuse" + "github.com/matrixorigin/matrixone/pkg/logutil" + metric "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "go.uber.org/zap" +) + +type traceInfo struct { + times traceTimes + trace *httptrace.ClientTrace +} + +type traceTimes struct { + GetConn time.Time + GotConn time.Time + DNSStart time.Time + ConnectStart time.Time + TSLHandshakeStart time.Time +} + +func newTraceInfo() *traceInfo { + info := new(traceInfo) + info.trace = &httptrace.ClientTrace{ + GetConn: info.GetConn, + GotConn: info.GotConn, + PutIdleConn: info.PutIdleConn, + GotFirstResponseByte: info.GotFirstResponseByte, + DNSStart: info.DNSStart, + DNSDone: info.DNSDone, + ConnectStart: info.ConnectStart, + ConnectDone: info.ConnectDone, + TLSHandshakeStart: info.TLSHandshakeStart, + TLSHandshakeDone: info.TLSHandshakeDone, + } + return info +} + +func init() { + reuse.CreatePool( + newTraceInfo, + resetTracePoint, + reuse.DefaultOptions[traceInfo](). + WithEnableChecker()) +} + +func (traceInfo) TypeName() string { + return "fileservice.traceInfo" +} + +func resetTracePoint(info *traceInfo) { + info.times = traceTimes{} +} + +func (t *traceInfo) GetConn(hostPort string) { + t.times.GetConn = time.Now() + metric.FSHTTPTraceCounter.WithLabelValues("GetConn").Inc() +} + +func (t *traceInfo) GotConn(info httptrace.GotConnInfo) { + t.times.GotConn = time.Now() + metric.FSHTTPTraceCounter.WithLabelValues("GotConn").Inc() + metric.FSHTTPTraceCounter.WithLabelValues("GotConnReused").Inc() + metric.FSHTTPTraceCounter.WithLabelValues("GotConnIdle").Inc() + metric.S3GetConnDurationHistogram.Observe(time.Since(t.times.GetConn).Seconds()) +} + +func (t *traceInfo) PutIdleConn(err error) { + logutil.Info("PutIdleConn error", + zap.Error(err), + ) +} + +func (t *traceInfo) GotFirstResponseByte() { + metric.S3GotFirstResponseDurationHistogram.Observe(time.Since(t.times.GotConn).Seconds()) +} + +func (t *traceInfo) DNSStart(di httptrace.DNSStartInfo) { + t.times.DNSStart = time.Now() + metric.FSHTTPTraceCounter.WithLabelValues("DNSStart").Inc() +} + +func (t *traceInfo) DNSDone(di httptrace.DNSDoneInfo) { + metric.S3DNSResolveDurationHistogram.Observe(time.Since(t.times.DNSStart).Seconds()) +} + +func (t *traceInfo) ConnectStart(network, addr string) { + t.times.ConnectStart = time.Now() + metric.FSHTTPTraceCounter.WithLabelValues("ConnectStart").Inc() +} + +func (t *traceInfo) ConnectDone(network, addr string, err error) { + metric.S3ConnectDurationHistogram.Observe(time.Since(t.times.ConnectStart).Seconds()) +} + +func (t *traceInfo) TLSHandshakeStart() { + metric.FSHTTPTraceCounter.WithLabelValues("TLSHandshakeStart").Inc() + t.times.TSLHandshakeStart = time.Now() +} + +func (t *traceInfo) TLSHandshakeDone(cs tls.ConnectionState, err error) { + metric.S3TLSHandshakeDurationHistogram.Observe(time.Since(t.times.TSLHandshakeStart).Seconds()) +} diff --git a/pkg/fileservice/object_storage_http_trace.go b/pkg/fileservice/object_storage_http_trace.go new file mode 100644 index 0000000000000..784f686fd9688 --- /dev/null +++ b/pkg/fileservice/object_storage_http_trace.go @@ -0,0 +1,87 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileservice + +import ( + "context" + "io" + "iter" + "net/http/httptrace" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/reuse" +) + +type objectStorageHTTPTrace struct { + upstream ObjectStorage +} + +func newObjectStorageHTTPTrace(upstream ObjectStorage) *objectStorageHTTPTrace { + return &objectStorageHTTPTrace{ + upstream: upstream, + } +} + +var _ ObjectStorage = new(objectStorageHTTPTrace) + +func (o *objectStorageHTTPTrace) Delete(ctx context.Context, keys ...string) (err error) { + traceInfo := o.newTraceInfo() + defer o.closeTraceInfo(traceInfo) + ctx = httptrace.WithClientTrace(ctx, traceInfo.trace) + return o.upstream.Delete(ctx, keys...) +} + +func (o *objectStorageHTTPTrace) Exists(ctx context.Context, key string) (bool, error) { + traceInfo := o.newTraceInfo() + defer o.closeTraceInfo(traceInfo) + ctx = httptrace.WithClientTrace(ctx, traceInfo.trace) + return o.upstream.Exists(ctx, key) +} + +func (o *objectStorageHTTPTrace) List(ctx context.Context, prefix string) iter.Seq2[*DirEntry, error] { + traceInfo := o.newTraceInfo() + defer o.closeTraceInfo(traceInfo) + ctx = httptrace.WithClientTrace(ctx, traceInfo.trace) + return o.upstream.List(ctx, prefix) +} + +func (o *objectStorageHTTPTrace) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error) { + traceInfo := o.newTraceInfo() + defer o.closeTraceInfo(traceInfo) + ctx = httptrace.WithClientTrace(ctx, traceInfo.trace) + return o.upstream.Read(ctx, key, min, max) +} + +func (o *objectStorageHTTPTrace) Stat(ctx context.Context, key string) (size int64, err error) { + traceInfo := o.newTraceInfo() + defer o.closeTraceInfo(traceInfo) + ctx = httptrace.WithClientTrace(ctx, traceInfo.trace) + return o.upstream.Stat(ctx, key) +} + +func (o *objectStorageHTTPTrace) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error) { + traceInfo := o.newTraceInfo() + defer o.closeTraceInfo(traceInfo) + ctx = httptrace.WithClientTrace(ctx, traceInfo.trace) + return o.upstream.Write(ctx, key, r, size, expire) +} + +func (o *objectStorageHTTPTrace) newTraceInfo() *traceInfo { + return reuse.Alloc[traceInfo](nil) +} + +func (o *objectStorageHTTPTrace) closeTraceInfo(info *traceInfo) { + reuse.Free(info, nil) +} diff --git a/pkg/fileservice/reuse.go b/pkg/fileservice/reuse.go deleted file mode 100644 index d7565770901c7..0000000000000 --- a/pkg/fileservice/reuse.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2023 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package fileservice - -import ( - "github.com/matrixorigin/matrixone/pkg/common/reuse" -) - -func init() { - reuse.CreatePool[tracePoint]( - newTracePoint, - resetTracePoint, - reuse.DefaultOptions[tracePoint](). - WithEnableChecker()) -} diff --git a/pkg/fileservice/s3_fs.go b/pkg/fileservice/s3_fs.go index d610c133516e9..55a726ba2d1ed 100644 --- a/pkg/fileservice/s3_fs.go +++ b/pkg/fileservice/s3_fs.go @@ -17,11 +17,9 @@ package fileservice import ( "bytes" "context" - "crypto/tls" "errors" "io" "iter" - "net/http/httptrace" pathpkg "path" "runtime" "sort" @@ -30,7 +28,6 @@ import ( "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" - "github.com/matrixorigin/matrixone/pkg/common/reuse" "github.com/matrixorigin/matrixone/pkg/fileservice/fscache" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/perfcounter" @@ -138,6 +135,9 @@ func NewS3FS( "s3", ) + // http trace + fs.storage = newObjectStorageHTTPTrace(fs.storage) + // cache if !noCache { if err := fs.initCaches(ctx, cacheConfig); err != nil { @@ -370,10 +370,6 @@ func (s *S3FS) Write(ctx context.Context, vector IOVector) (err error) { return err } - tp := reuse.Alloc[tracePoint](nil) - defer reuse.Free(tp, nil) - ctx = httptrace.WithClientTrace(ctx, tp.getClientTrace()) - var bytesWritten int start := time.Now() defer func() { @@ -477,10 +473,6 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error) { LogSlowEvent(ctx, time.Millisecond*500) }() - tp := reuse.Alloc[tracePoint](nil) - defer reuse.Free(tp, nil) - ctx = httptrace.WithClientTrace(ctx, tp.getClientTrace()) - if len(vector.Entries) == 0 { return moerr.NewEmptyVectorNoCtx() } @@ -972,75 +964,3 @@ func (s *S3FS) Cost() *CostAttr { List: CostHigh, } } - -type tracePoint struct { - start time.Time - dnsStart time.Time - connectStart time.Time - tlsHandshakeStart time.Time - ct *httptrace.ClientTrace -} - -func newTracePoint() *tracePoint { - tp := &tracePoint{ - ct: &httptrace.ClientTrace{}, - } - tp.ct.GetConn = tp.getConnPoint - tp.ct.GotConn = tp.gotConnPoint - tp.ct.DNSStart = tp.dnsStartPoint - tp.ct.DNSDone = tp.dnsDonePoint - tp.ct.ConnectStart = tp.connectStartPoint - tp.ct.ConnectDone = tp.connectDonePoint - tp.ct.TLSHandshakeStart = tp.tlsHandshakeStartPoint - tp.ct.TLSHandshakeDone = tp.tlsHandshakeDonePoint - return tp -} - -func (tp tracePoint) TypeName() string { - return "fileservice.tracePoint" -} - -func resetTracePoint(tp *tracePoint) { - tp.start = time.Time{} - tp.dnsStart = time.Time{} - tp.connectStart = time.Time{} - tp.tlsHandshakeStart = time.Time{} -} - -func (tp *tracePoint) getClientTrace() *httptrace.ClientTrace { - return tp.ct -} - -func (tp *tracePoint) getConnPoint(hostPort string) { - tp.start = time.Now() -} - -func (tp *tracePoint) gotConnPoint(info httptrace.GotConnInfo) { - metric.S3GetConnDurationHistogram.Observe(time.Since(tp.start).Seconds()) -} - -func (tp *tracePoint) dnsStartPoint(di httptrace.DNSStartInfo) { - metric.S3DNSResolveCounter.Inc() - tp.dnsStart = time.Now() -} - -func (tp *tracePoint) dnsDonePoint(di httptrace.DNSDoneInfo) { - metric.S3DNSResolveDurationHistogram.Observe(time.Since(tp.dnsStart).Seconds()) -} - -func (tp *tracePoint) connectStartPoint(network, addr string) { - metric.S3ConnectCounter.Inc() - tp.connectStart = time.Now() -} - -func (tp *tracePoint) connectDonePoint(network, addr string, err error) { - metric.S3ConnectDurationHistogram.Observe(time.Since(tp.connectStart).Seconds()) -} - -func (tp *tracePoint) tlsHandshakeStartPoint() { - tp.tlsHandshakeStart = time.Now() -} - -func (tp *tracePoint) tlsHandshakeDonePoint(cs tls.ConnectionState, err error) { - metric.S3TLSHandshakeDurationHistogram.Observe(time.Since(tp.tlsHandshakeStart).Seconds()) -} diff --git a/pkg/fileservice/s3_fs_test.go b/pkg/fileservice/s3_fs_test.go index 8925abf40c524..926cb204b3c50 100644 --- a/pkg/fileservice/s3_fs_test.go +++ b/pkg/fileservice/s3_fs_test.go @@ -20,12 +20,9 @@ import ( "encoding/csv" "encoding/json" "encoding/xml" - "fmt" - "net/http/httptrace" "os" "strings" "sync" - "sync/atomic" "testing" "time" @@ -486,6 +483,8 @@ func TestS3FSWithSubPath(t *testing.T) { } func BenchmarkS3ConcurrentRead(b *testing.B) { + ctx := context.Background() + config, err := loadS3TestConfig(b) if err != nil { b.Fatal(err) @@ -498,51 +497,6 @@ func BenchmarkS3ConcurrentRead(b *testing.B) { b.Setenv("AWS_ACCESS_KEY_ID", config.APIKey) b.Setenv("AWS_SECRET_ACCESS_KEY", config.APISecret) - var numRead atomic.Int64 - var numGotConn, numReuse, numConnect atomic.Int64 - var numTLSHandshake atomic.Int64 - ctx := context.Background() - trace := &httptrace.ClientTrace{ - - GetConn: func(hostPort string) { - //fmt.Printf("get conn: %s\n", hostPort) - }, - - GotConn: func(info httptrace.GotConnInfo) { - numGotConn.Add(1) - if info.Reused { - numReuse.Add(1) - } - //fmt.Printf("got conn: %+v\n", info) - }, - - PutIdleConn: func(err error) { - //if err != nil { - // fmt.Printf("put idle conn failed: %v\n", err) - //} - }, - - ConnectStart: func(network, addr string) { - numConnect.Add(1) - //fmt.Printf("connect %v %v\n", network, addr) - }, - - TLSHandshakeStart: func() { - numTLSHandshake.Add(1) - }, - } - - ctx = httptrace.WithClientTrace(ctx, trace) - defer func() { - fmt.Printf("read %v, got %v conns, reuse %v, connect %v, tls handshake %v\n", - numRead.Load(), - numGotConn.Load(), - numReuse.Load(), - numConnect.Load(), - numTLSHandshake.Load(), - ) - }() - fs, err := NewS3FS( ctx, ObjectStorageArguments{ @@ -599,7 +553,6 @@ func BenchmarkS3ConcurrentRead(b *testing.B) { if err != nil { panic(err) } - numRead.Add(1) }() } for i := 0; i < cap(sem); i++ { @@ -610,6 +563,8 @@ func BenchmarkS3ConcurrentRead(b *testing.B) { } func TestSequentialS3Read(t *testing.T) { + ctx := context.Background() + config, err := loadS3TestConfig(t) if err != nil { t.Fatal(err) @@ -619,55 +574,6 @@ func TestSequentialS3Read(t *testing.T) { t.Setenv("AWS_ACCESS_KEY_ID", config.APIKey) t.Setenv("AWS_SECRET_ACCESS_KEY", config.APISecret) - var numRead atomic.Int64 - var numGotConn, numReuse, numConnect atomic.Int64 - var numTLSHandshake atomic.Int64 - ctx := context.Background() - trace := &httptrace.ClientTrace{ - - GetConn: func(hostPort string) { - fmt.Printf("get conn: %s\n", hostPort) - }, - - GotConn: func(info httptrace.GotConnInfo) { - numGotConn.Add(1) - if info.Reused { - numReuse.Add(1) - } else { - fmt.Printf("got conn not reuse: %+v\n", info) - } - }, - - PutIdleConn: func(err error) { - if err != nil { - fmt.Printf("put idle conn failed: %v\n", err) - } - }, - - ConnectDone: func(network string, addr string, err error) { - numConnect.Add(1) - fmt.Printf("connect done: %v %v\n", network, addr) - if err != nil { - fmt.Printf("connect error: %v\n", err) - } - }, - - TLSHandshakeStart: func() { - numTLSHandshake.Add(1) - }, - } - - ctx = httptrace.WithClientTrace(ctx, trace) - defer func() { - fmt.Printf("read %v, got %v conns, reuse %v, connect %v, tls handshake %v\n", - numRead.Load(), - numGotConn.Load(), - numReuse.Load(), - numConnect.Load(), - numTLSHandshake.Load(), - ) - }() - fs, err := NewS3FS( ctx, ObjectStorageArguments{ @@ -715,7 +621,6 @@ func TestSequentialS3Read(t *testing.T) { if err != nil { t.Fatal(err) } - numRead.Add(1) } } diff --git a/pkg/util/metric/v2/dashboard/grafana_dashboard_fs.go b/pkg/util/metric/v2/dashboard/grafana_dashboard_fs.go index e2f9e78107067..08043e4d0bde5 100644 --- a/pkg/util/metric/v2/dashboard/grafana_dashboard_fs.go +++ b/pkg/util/metric/v2/dashboard/grafana_dashboard_fs.go @@ -39,7 +39,7 @@ func (c *DashboardCreator) initFileServiceDashboard() error { c.initFSObjectStorageRow(), c.initFSIOMergerDurationRow(), c.initFSReadWriteDurationRow(), - c.initFSMallocRow(), + c.initFSHTTPTraceRow(), c.initFSReadWriteBytesRow(), c.initFSS3ConnOverviewRow(), c.initFSS3ConnDurationRow(), @@ -202,12 +202,14 @@ func (c *DashboardCreator) initFSS3ConnDurationRow() dashboard.Option { []string{ c.getMetricWithFilter(`mo_fs_s3_conn_duration_seconds_bucket`, `type="connect"`), c.getMetricWithFilter(`mo_fs_s3_conn_duration_seconds_bucket`, `type="get-conn"`), + c.getMetricWithFilter(`mo_fs_s3_conn_duration_seconds_bucket`, `type="got-first-response"`), c.getMetricWithFilter(`mo_fs_s3_conn_duration_seconds_bucket`, `type="dns-resolve"`), c.getMetricWithFilter(`mo_fs_s3_conn_duration_seconds_bucket`, `type="tls-handshake"`), }, []string{ "connect", "get-conn", + "got-first-response", "dns-resolve", "tls-handshake", }, @@ -286,26 +288,6 @@ func (c *DashboardCreator) initFSReadWriteDurationRow() dashboard.Option { ) } -func (c *DashboardCreator) initFSMallocRow() dashboard.Option { - return dashboard.Row( - "malloc stats", - - c.withMultiGraph( - "active objects", - 3, - []string{ - `sum(` + c.getMetricWithFilter("mo_fs_malloc_live_objects", `type="io_entry_data"`) + `)`, - `sum(` + c.getMetricWithFilter("mo_fs_malloc_live_objects", `type="bytes"`) + `)`, - `sum(` + c.getMetricWithFilter("mo_fs_malloc_live_objects", `type="memory_cache"`) + `)`, - }, - []string{ - "io_entry_data", - "bytes", - "memory_cache", - }), - ) -} - func (c *DashboardCreator) initFSObjectStorageRow() dashboard.Option { return dashboard.Row( "Object Storage", @@ -357,3 +339,32 @@ func (c *DashboardCreator) initFSObjectStorageRow() dashboard.Option { ), ) } + +func (c *DashboardCreator) initFSHTTPTraceRow() dashboard.Option { + return dashboard.Row( + "HTTP Trace", + + c.withMultiGraph( + "trace", + 4, + []string{ + `sum(` + c.getMetricWithFilter("mo_fs_http_trace", `op="GetConn"`) + `)`, + `sum(` + c.getMetricWithFilter("mo_fs_http_trace", `op="GotConn"`) + `)`, + `sum(` + c.getMetricWithFilter("mo_fs_http_trace", `op="GotConnReused"`) + `)`, + `sum(` + c.getMetricWithFilter("mo_fs_http_trace", `op="GotConnIdle"`) + `)`, + `sum(` + c.getMetricWithFilter("mo_fs_http_trace", `op="DNSStart"`) + `)`, + `sum(` + c.getMetricWithFilter("mo_fs_http_trace", `op="ConnectStart"`) + `)`, + `sum(` + c.getMetricWithFilter("mo_fs_http_trace", `op="TSLHandshakeStart"`) + `)`, + }, + []string{ + "GetConn", + "GotConn", + "GotConnReused", + "GotConnIdle", + "DNSStart", + "ConnectStart", + "TLSHandshakeStart", + }, + ), + ) +} diff --git a/pkg/util/metric/v2/fileservice.go b/pkg/util/metric/v2/fileservice.go index 93377ade75a74..591efa19c3fb9 100644 --- a/pkg/util/metric/v2/fileservice.go +++ b/pkg/util/metric/v2/fileservice.go @@ -19,22 +19,6 @@ import ( ) var ( - S3ConnectCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: "mo", - Subsystem: "fs", - Name: "s3_connect_total", - Help: "Total number of s3 connect count.", - }) - - S3DNSResolveCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: "mo", - Subsystem: "fs", - Name: "s3_dns_resolve_total", - Help: "Total number of s3 dns resolve count.", - }) - fsReadCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "mo", @@ -73,10 +57,11 @@ var ( Help: "Bucketed histogram of s3 get conn duration.", Buckets: getDurationBuckets(), }, []string{"type"}) - S3GetConnDurationHistogram = s3ConnDurationHistogram.WithLabelValues("get-conn") - S3DNSResolveDurationHistogram = s3ConnDurationHistogram.WithLabelValues("dns-resolve") - S3ConnectDurationHistogram = s3ConnDurationHistogram.WithLabelValues("connect") - S3TLSHandshakeDurationHistogram = s3ConnDurationHistogram.WithLabelValues("tls-handshake") + S3GetConnDurationHistogram = s3ConnDurationHistogram.WithLabelValues("get-conn") + S3GotFirstResponseDurationHistogram = s3ConnDurationHistogram.WithLabelValues("got-first-response") + S3DNSResolveDurationHistogram = s3ConnDurationHistogram.WithLabelValues("dns-resolve") + S3ConnectDurationHistogram = s3ConnDurationHistogram.WithLabelValues("connect") + S3TLSHandshakeDurationHistogram = s3ConnDurationHistogram.WithLabelValues("tls-handshake") localIOBytesHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -183,3 +168,17 @@ func GetFsCacheBytesGauge(name, typ string) (inuse prometheus.Gauge, capacity pr return fsCacheBytes.WithLabelValues(component, "inuse"), fsCacheBytes.WithLabelValues(component, "cap") } + +var ( + FSHTTPTraceCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "mo", + Subsystem: "fs", + Name: "http_trace", + Help: "http trace statistics", + }, + []string{ + "op", + }, + ) +) diff --git a/pkg/util/metric/v2/metrics.go b/pkg/util/metric/v2/metrics.go index dae5b5c976ba8..75f6568f1666d 100644 --- a/pkg/util/metric/v2/metrics.go +++ b/pkg/util/metric/v2/metrics.go @@ -91,8 +91,6 @@ func initTaskMetrics() { func initFileServiceMetrics() { registry.MustRegister(fsReadCounter) registry.MustRegister(fsCacheBytes) - registry.MustRegister(S3ConnectCounter) - registry.MustRegister(S3DNSResolveCounter) registry.MustRegister(s3IOBytesHistogram) registry.MustRegister(s3ConnDurationHistogram) @@ -102,6 +100,8 @@ func initFileServiceMetrics() { registry.MustRegister(ioMergerDuration) registry.MustRegister(fsReadWriteDuration) registry.MustRegister(FSObjectStorageOperations) + + registry.MustRegister(FSHTTPTraceCounter) } func initLogtailMetrics() { From a8b15a897949494797b6555296094aa99bca7860 Mon Sep 17 00:00:00 2001 From: reus Date: Wed, 20 Nov 2024 12:35:19 +0800 Subject: [PATCH 2/2] fileservice: set default dns resolver to caching resolver --- pkg/fileservice/http_client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/fileservice/http_client.go b/pkg/fileservice/http_client.go index 230b4cb110b1f..3ecd1752d351f 100644 --- a/pkg/fileservice/http_client.go +++ b/pkg/fileservice/http_client.go @@ -37,10 +37,14 @@ var ( ) var dnsResolver = dns.NewCachingResolver( - net.DefaultResolver, + nil, dns.MaxCacheEntries(128), ) +func init() { + net.DefaultResolver = dnsResolver +} + var httpDialer = &net.Dialer{ Timeout: connectTimeout, Resolver: dnsResolver,