From a76f1f3f4b14294c5e23ef2502d0224983f806bb Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Tue, 22 Aug 2023 18:47:40 -0700 Subject: [PATCH] fix: stop creating new otel interceptor to avoid memory leak (#15174) Signed-off-by: Alexander Matyushentsev Signed-off-by: jmilic1 <70441727+jmilic1@users.noreply.github.com> --- cmpserver/apiclient/clientset.go | 5 ++--- pkg/apiclient/apiclient.go | 5 ++--- reposerver/apiclient/clientset.go | 5 ++--- server/server.go | 5 ++--- util/grpc/trace.go | 33 +++++++++++++++++++++++++++++++ 5 files changed, 41 insertions(+), 12 deletions(-) create mode 100644 util/grpc/trace.go diff --git a/cmpserver/apiclient/clientset.go b/cmpserver/apiclient/clientset.go index 6b4c19f0261ff..025625ff8092e 100644 --- a/cmpserver/apiclient/clientset.go +++ b/cmpserver/apiclient/clientset.go @@ -7,7 +7,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" log "github.com/sirupsen/logrus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -47,8 +46,8 @@ func NewConnection(address string) (*grpc.ClientConn, error) { grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unaryInterceptors...)), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize), grpc.MaxCallSendMsgSize(MaxGRPCMessageSize)), - grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), - grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()), + grpc.WithUnaryInterceptor(grpc_util.OTELUnaryClientInterceptor()), + grpc.WithStreamInterceptor(grpc_util.OTELStreamClientInterceptor()), } dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) diff --git a/pkg/apiclient/apiclient.go b/pkg/apiclient/apiclient.go index 6eefb0f672790..8f9a32d8e255e 100644 --- a/pkg/apiclient/apiclient.go +++ b/pkg/apiclient/apiclient.go @@ -22,7 +22,6 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/hashicorp/go-retryablehttp" log "github.com/sirupsen/logrus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/oauth2" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -525,8 +524,8 @@ func (c *client) newConn() (*grpc.ClientConn, io.Closer, error) { dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize), grpc.MaxCallSendMsgSize(MaxGRPCMessageSize))) dialOpts = append(dialOpts, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...))) dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(grpc_retry.UnaryClientInterceptor(retryOpts...)))) - dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor())) - dialOpts = append(dialOpts, grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor())) + dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(grpc_util.OTELUnaryClientInterceptor())) + dialOpts = append(dialOpts, grpc.WithStreamInterceptor(grpc_util.OTELStreamClientInterceptor())) ctx := context.Background() diff --git a/reposerver/apiclient/clientset.go b/reposerver/apiclient/clientset.go index 4a42235b7049c..417dc758ef5bd 100644 --- a/reposerver/apiclient/clientset.go +++ b/reposerver/apiclient/clientset.go @@ -9,7 +9,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" log "github.com/sirupsen/logrus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -67,8 +66,8 @@ func NewConnection(address string, timeoutSeconds int, tlsConfig *TLSConfigurati grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unaryInterceptors...)), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize), grpc.MaxCallSendMsgSize(MaxGRPCMessageSize)), - grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), - grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()), + grpc.WithUnaryInterceptor(argogrpc.OTELUnaryClientInterceptor()), + grpc.WithStreamInterceptor(argogrpc.OTELStreamClientInterceptor()), } tlsC := &tls.Config{} diff --git a/server/server.go b/server/server.go index 28a1da6ec72fb..4dff6de3621b4 100644 --- a/server/server.go +++ b/server/server.go @@ -62,7 +62,6 @@ import ( "github.com/argoproj/argo-cd/v2/pkg/apiclient" accountpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/account" applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - applicationsetpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/applicationset" certificatepkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/certificate" clusterpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/cluster" @@ -435,8 +434,8 @@ func (a *ArgoCDServer) Listen() (*Listeners, error) { var dOpts []grpc.DialOption dOpts = append(dOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(apiclient.MaxGRPCMessageSize))) dOpts = append(dOpts, grpc.WithUserAgent(fmt.Sprintf("%s/%s", common.ArgoCDUserAgentName, common.GetVersion().Version))) - dOpts = append(dOpts, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor())) - dOpts = append(dOpts, grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor())) + dOpts = append(dOpts, grpc.WithUnaryInterceptor(grpc_util.OTELUnaryClientInterceptor())) + dOpts = append(dOpts, grpc.WithStreamInterceptor(grpc_util.OTELStreamClientInterceptor())) if a.useTLS() { // The following sets up the dial Options for grpc-gateway to talk to gRPC server over TLS. // grpc-gateway is just translating HTTP/HTTPS requests as gRPC requests over localhost, diff --git a/util/grpc/trace.go b/util/grpc/trace.go new file mode 100644 index 0000000000000..484e2b61dc253 --- /dev/null +++ b/util/grpc/trace.go @@ -0,0 +1,33 @@ +package grpc + +import ( + "sync" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" +) + +var ( + otelUnaryInterceptor grpc.UnaryClientInterceptor + otelStreamInterceptor grpc.StreamClientInterceptor + interceptorsInitialized = sync.Once{} +) + +// otel interceptors must be created once to avoid memory leak +// see https://github.com/open-telemetry/opentelemetry-go-contrib/issues/4226 for details +func ensureInitialized() { + interceptorsInitialized.Do(func() { + otelUnaryInterceptor = otelgrpc.UnaryClientInterceptor() + otelStreamInterceptor = otelgrpc.StreamClientInterceptor() + }) +} + +func OTELUnaryClientInterceptor() grpc.UnaryClientInterceptor { + ensureInitialized() + return otelUnaryInterceptor +} + +func OTELStreamClientInterceptor() grpc.StreamClientInterceptor { + ensureInitialized() + return otelStreamInterceptor +}