From 85df557a10706f6c13b18e36d7255fdf41bb0a3c Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Wed, 11 Sep 2024 14:51:07 +0800 Subject: [PATCH] enhance: improve log encoding performance on proxy nodes (#36123) See #36122 This PR is designed to enhance log performance through two improvements: 1. Optimize JSON encoding by switching JSON serializer to `json-iterator`. 2. Adding support of lazy initialization `WithLazy`. --------- Signed-off-by: Ted Xu --- go.mod | 3 +- go.sum | 9 +--- internal/proxy/impl.go | 8 ++-- internal/proxy/task_search.go | 2 +- pkg/go.mod | 6 +-- pkg/go.sum | 4 ++ pkg/log/global.go | 4 +- pkg/log/lazy_with.go | 74 ++++++++++++++++++++++++++++ pkg/log/mlogger.go | 5 +- pkg/log/zap_text_encoder.go | 6 +-- pkg/log/zap_text_encoder_test.go | 82 ++++++++++++++++++++++++++++++++ 11 files changed, 181 insertions(+), 22 deletions(-) create mode 100644 pkg/log/lazy_with.go create mode 100644 pkg/log/zap_text_encoder_test.go diff --git a/go.mod b/go.mod index 17012275450cb..1ac7ef8d6208c 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.25.0 golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 golang.org/x/net v0.27.0 @@ -95,7 +95,6 @@ require ( github.com/bytedance/sonic/loader v0.2.0 // indirect github.com/campoy/embedmd v1.0.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cilium/ebpf v0.11.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect diff --git a/go.sum b/go.sum index 640a6bbce8850..e5c034510d10c 100644 --- a/go.sum +++ b/go.sum @@ -128,9 +128,6 @@ github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= -github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= -github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= -github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= github.com/bytedance/sonic v1.12.2 h1:oaMFuRTpMHYLpCntGca65YWt5ny+wAceDERTkT2L9lg= github.com/bytedance/sonic v1.12.2/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= @@ -153,9 +150,6 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -1005,7 +999,8 @@ go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index a1c7d236f651a..71579c6889403 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2976,10 +2976,10 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) zap.String("role", typeutil.ProxyRole), zap.String("db", request.DbName), zap.String("collection", request.CollectionName), - zap.Any("partitions", request.PartitionNames), - zap.Any("dsl", request.Dsl), - zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)), - zap.Any("OutputFields", request.OutputFields), + zap.Strings("partitions", request.PartitionNames), + zap.String("dsl", request.Dsl), + zap.Int("len(PlaceholderGroup)", len(request.PlaceholderGroup)), + zap.Strings("OutputFields", request.OutputFields), zap.Any("search_params", request.SearchParams), zap.String("ConsistencyLevel", request.GetConsistencyLevel().String()), zap.Bool("useDefaultConsistency", request.GetUseDefaultConsistency()), diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index b8aa6be104818..812906081fc53 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -550,7 +550,7 @@ func (t *searchTask) tryParsePartitionIDsFromPlan(plan *planpb.PlanNode) ([]int6 func (t *searchTask) Execute(ctx context.Context) error { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search-Execute") defer sp.End() - log := log.Ctx(ctx).With(zap.Int64("nq", t.SearchRequest.GetNq())) + log := log.Ctx(ctx).WithLazy(zap.Int64("nq", t.SearchRequest.GetNq())) tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute search %d", t.ID())) defer tr.CtxElapse(ctx, "done") diff --git a/pkg/go.mod b/pkg/go.mod index 0b9765782063e..fba4d00bf1e1f 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -12,6 +12,7 @@ require ( github.com/containerd/cgroups/v3 v3.0.3 github.com/expr-lang/expr v1.15.7 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 + github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.7 github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240822040249-4bbc8f623cbb github.com/nats-io/nats-server/v2 v2.10.12 @@ -45,7 +46,7 @@ require ( go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/atomic v1.10.0 go.uber.org/automaxprocs v1.5.3 - go.uber.org/zap v1.20.0 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.25.0 golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 golang.org/x/net v0.27.0 @@ -105,7 +106,6 @@ require ( github.com/hashicorp/hcl v1.0.0 // indirect github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect github.com/jonboulle/clockwork v0.2.2 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/linkedin/goavro/v2 v2.11.1 // indirect @@ -165,7 +165,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect - go.uber.org/multierr v1.7.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/oauth2 v0.20.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect diff --git a/pkg/go.sum b/pkg/go.sum index 242f315cdd565..b81ae8d933c9f 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -842,11 +842,15 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.20.0 h1:N4oPlghZwYG55MlU6LXk/Zp00FVNE9X9wrYO8CEs4lc= go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/pkg/log/global.go b/pkg/log/global.go index 297c906fcf0eb..879da86bddb94 100644 --- a/pkg/log/global.go +++ b/pkg/log/global.go @@ -102,7 +102,9 @@ func RatedWarn(cost float64, msg string, fields ...zap.Field) bool { // Fields added to the child don't affect the parent, and vice versa. func With(fields ...zap.Field) *MLogger { return &MLogger{ - Logger: L().With(fields...).WithOptions(zap.AddCallerSkip(-1)), + Logger: L().WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return NewLazyWith(core, fields) + })).WithOptions(zap.AddCallerSkip(-1)), } } diff --git a/pkg/log/lazy_with.go b/pkg/log/lazy_with.go new file mode 100644 index 0000000000000..3d66ce3df8796 --- /dev/null +++ b/pkg/log/lazy_with.go @@ -0,0 +1,74 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "sync" + "sync/atomic" + + "go.uber.org/zap/zapcore" +) + +// lazyWithCore wraps zapcore.Core with lazy initialization. +// Copied from https://github.com/uber-go/zap/issues/1426 to avoid data race. +type lazyWithCore struct { + corePtr atomic.Pointer[zapcore.Core] + once sync.Once + fields []zapcore.Field +} + +var _ zapcore.Core = (*lazyWithCore)(nil) + +func NewLazyWith(core zapcore.Core, fields []zapcore.Field) zapcore.Core { + d := lazyWithCore{fields: fields} + d.corePtr.Store(&core) + return &d +} + +func (d *lazyWithCore) initOnce() zapcore.Core { + core := *d.corePtr.Load() + d.once.Do(func() { + core = core.With(d.fields) + d.corePtr.Store(&core) + }) + return core +} + +func (d *lazyWithCore) Enabled(level zapcore.Level) bool { + // Init not needed + return (*d.corePtr.Load()).Enabled(level) +} + +func (d *lazyWithCore) Sync() error { + // Init needed + return d.initOnce().Sync() +} + +// Write implements zapcore.Core. +func (d *lazyWithCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { + return (*d.corePtr.Load()).Write(entry, fields) +} + +func (d *lazyWithCore) With(fields []zapcore.Field) zapcore.Core { + d.initOnce() + return (*d.corePtr.Load()).With(fields) +} + +func (d *lazyWithCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + d.initOnce() + return (*d.corePtr.Load()).Check(e, ce) +} diff --git a/pkg/log/mlogger.go b/pkg/log/mlogger.go index e697237fd4b1c..8f4bb14b0c84a 100644 --- a/pkg/log/mlogger.go +++ b/pkg/log/mlogger.go @@ -21,6 +21,7 @@ import ( "github.com/uber/jaeger-client-go/utils" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // MLogger is a wrapper type of zap.Logger. @@ -32,7 +33,9 @@ type MLogger struct { // With encapsulates zap.Logger With method to return MLogger instance. func (l *MLogger) With(fields ...zap.Field) *MLogger { nl := &MLogger{ - Logger: l.Logger.With(fields...), + Logger: l.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return NewLazyWith(core, fields) + })), } return nl } diff --git a/pkg/log/zap_text_encoder.go b/pkg/log/zap_text_encoder.go index 6bd0bacfa7960..70c7bfd3a9ae5 100644 --- a/pkg/log/zap_text_encoder.go +++ b/pkg/log/zap_text_encoder.go @@ -35,13 +35,13 @@ package log import ( "encoding/base64" - "encoding/json" "fmt" "math" "sync" "time" "unicode/utf8" + jsoniter "github.com/json-iterator/go" "go.uber.org/zap/buffer" "go.uber.org/zap/zapcore" ) @@ -102,7 +102,7 @@ type textEncoder struct { // for encoding generic values by reflection reflectBuf *buffer.Buffer - reflectEnc *json.Encoder + reflectEnc *jsoniter.Encoder } func NewTextEncoder(encoderConfig *zapcore.EncoderConfig, spaced bool, disableErrorVerbose bool) zapcore.Encoder { @@ -196,7 +196,7 @@ func (enc *textEncoder) AddInt64(key string, val int64) { func (enc *textEncoder) resetReflectBuf() { if enc.reflectBuf == nil { enc.reflectBuf = _pool.Get() - enc.reflectEnc = json.NewEncoder(enc.reflectBuf) + enc.reflectEnc = jsoniter.NewEncoder(enc.reflectBuf) } else { enc.reflectBuf.Reset() } diff --git a/pkg/log/zap_text_encoder_test.go b/pkg/log/zap_text_encoder_test.go new file mode 100644 index 0000000000000..ca604863586d2 --- /dev/null +++ b/pkg/log/zap_text_encoder_test.go @@ -0,0 +1,82 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "fmt" + "testing" + + "go.uber.org/zap" +) + +type foo struct { + Key string + Value string +} + +func BenchmarkZapReflect(b *testing.B) { + payload := make([]foo, 10) + for i := 0; i < len(payload); i++ { + payload[i] = foo{Key: fmt.Sprintf("key%d", i), Value: fmt.Sprintf("value%d", i)} + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + With(zap.Any("payload", payload)) + } +} + +func BenchmarkZapWithLazy(b *testing.B) { + payload := make([]foo, 10) + for i := 0; i < len(payload); i++ { + payload[i] = foo{Key: fmt.Sprintf("key%d", i), Value: fmt.Sprintf("value%d", i)} + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + L().WithLazy(zap.Any("payload", payload)) + } +} + +// The following two benchmarks are validations if `WithLazy` has the same performance as `With` in the worst case. +func BenchmarkWithLazyLog(b *testing.B) { + payload := make([]foo, 10) + for i := 0; i < len(payload); i++ { + payload[i] = foo{Key: fmt.Sprintf("key%d", i), Value: fmt.Sprintf("value%d", i)} + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + log := L().WithLazy(zap.Any("payload", payload)) + log.Info("test") + log.Warn("test") + } +} + +func BenchmarkWithLog(b *testing.B) { + payload := make([]foo, 10) + for i := 0; i < len(payload); i++ { + payload[i] = foo{Key: fmt.Sprintf("key%d", i), Value: fmt.Sprintf("value%d", i)} + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + log := L().With(zap.Any("payload", payload)) + log.Info("test") + log.Warn("test") + } +}