From 19d99e51a60bc31bf50b00ab7c1a4b347015eb34 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Mon, 25 Nov 2024 13:34:20 -0800 Subject: [PATCH 01/15] fix(storage): populate monitored resource correctly --- storage/grpc_metrics.go | 56 ++++++++++++++++-------------------- storage/grpc_metrics_test.go | 24 ++-------------- 2 files changed, 26 insertions(+), 54 deletions(-) diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 149b37807ed4..ef458fa54067 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -17,7 +17,6 @@ package storage import ( "context" "fmt" - "log" "strings" "time" @@ -82,13 +81,6 @@ func metricFormatter(m metricdata.Metrics) string { return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/") } -func gcpAttributeExpectedDefaults() []attribute.KeyValue { - return []attribute.KeyValue{ - {Key: "location", Value: attribute.StringValue("global")}, - {Key: "cloud_platform", Value: attribute.StringValue("unknown")}, - {Key: "host_id", Value: attribute.StringValue("unknown")}} -} - // Added to help with tests type preparedResource struct { projectToUse string @@ -103,29 +95,34 @@ func newPreparedResource(ctx context.Context, project string, resourceOptions [] preparedResource := &preparedResource{} s := detectedAttrs.Set() p, present := s.Value("cloud.account.id") - if present { + // Precedence for user provided project when set + if present && project == "" { preparedResource.projectToUse = p.AsString() } else { preparedResource.projectToUse = project } - updates := []attribute.KeyValue{} - for _, kv := range gcpAttributeExpectedDefaults() { - if val, present := s.Value(kv.Key); !present || val.AsString() == "" { - updates = append(updates, attribute.KeyValue{Key: kv.Key, Value: kv.Value}) - } + mrAttrs := []attribute.KeyValue{ + {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, + {Key: "project_id", Value: attribute.StringValue(project)}, + {Key: "api", Value: attribute.StringValue("grpc")}, + {Key: "instance_id", Value: attribute.StringValue(uuid.New().String())}, + } + if v, ok := s.Value("location"); ok { + mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "location", Value: v}) + } else { + mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "location", Value: attribute.StringValue("global")}) + } + if v, ok := s.Value("cloud.platform"); ok { + mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "cloud_platform", Value: v}) + } else { + mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "cloud_platform", Value: attribute.StringValue("unknown")}) + } + if v, ok := s.Value("host.id"); ok { + mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "host_id", Value: v}) + } else { + mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "host_id", Value: attribute.StringValue("unknown")}) } - r, err := resource.New( - ctx, - resource.WithAttributes( - attribute.KeyValue{Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, - attribute.KeyValue{Key: "instance_id", Value: attribute.StringValue(uuid.New().String())}, - attribute.KeyValue{Key: "project_id", Value: attribute.StringValue(project)}, - attribute.KeyValue{Key: "api", Value: attribute.StringValue("grpc")}, - ), - resource.WithAttributes(detectedAttrs.Attributes()...), - // Last duplicate key / value wins - resource.WithAttributes(updates...), - ) + r, err := resource.New(ctx, resource.WithAttributes(mrAttrs...)) if err != nil { return nil, err } @@ -162,17 +159,12 @@ func newGRPCMetricContext(ctx context.Context, project string, config storageCon if err != nil { return nil, err } - meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource)) // Implementation requires a project, if one is not determined possibly user // credentials. Then we will fail stating gRPC Metrics require a project-id. if project == "" && preparedResource.projectToUse == "" { return nil, fmt.Errorf("google cloud project is required to start client-side metrics") } - // If projectTouse isn't the same as project provided to Storage client, then - // emit a log stating which project is being used to emit metrics to. - if project != preparedResource.projectToUse { - log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project) - } + meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource)) meOpts := []mexporter.Option{ mexporter.WithProjectID(preparedResource.projectToUse), mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index 23b3cf981e1c..9dbb592f576c 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -59,9 +59,9 @@ func TestNewPreparedResource(t *testing.T) { detectedAttributes: []attribute.KeyValue{ {Key: "location", Value: attribute.StringValue("us-central1")}, - {Key: "cloud_platform", + {Key: "cloud.platform", Value: attribute.StringValue("gcp")}, - {Key: "host_id", + {Key: "host.id", Value: attribute.StringValue("gce-instance-id")}, }, wantAttributes: attribute.NewSet(attribute.KeyValue{ @@ -74,26 +74,6 @@ func TestNewPreparedResource(t *testing.T) { Key: "host_id", Value: attribute.StringValue("gce-instance-id"), }), - }, { - desc: "use default when value is empty string", - detectedAttributes: []attribute.KeyValue{ - {Key: "location", - Value: attribute.StringValue("us-central1")}, - {Key: "cloud_platform", - Value: attribute.StringValue("")}, - {Key: "host_id", - Value: attribute.StringValue("")}, - }, - wantAttributes: attribute.NewSet(attribute.KeyValue{ - Key: "location", - Value: attribute.StringValue("us-central1"), - }, attribute.KeyValue{ - Key: "cloud_platform", - Value: attribute.StringValue("unknown"), - }, attribute.KeyValue{ - Key: "host_id", - Value: attribute.StringValue("unknown"), - }), }, } { t.Run(test.desc, func(t *testing.T) { From 004f8f6cab935f605a4597926a26e87295abf59f Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Mon, 25 Nov 2024 17:23:15 -0800 Subject: [PATCH 02/15] refactor with additional integration tests --- storage/grpc_metrics.go | 65 ++++++++++++++++------------ storage/grpc_metrics_test.go | 17 ++++---- storage/integration_test.go | 83 ++++++++++++++++++++++++++++++++++++ storage/option.go | 15 +++++++ 4 files changed, 144 insertions(+), 36 deletions(-) diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index ef458fa54067..cb91879499b5 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -16,6 +16,7 @@ package storage import ( "context" + "errors" "fmt" "strings" "time" @@ -82,28 +83,34 @@ func metricFormatter(m metricdata.Metrics) string { } // Added to help with tests -type preparedResource struct { - projectToUse string - resource *resource.Resource +type storageMonitoredResource struct { + project string + resource *resource.Resource } -func newPreparedResource(ctx context.Context, project string, resourceOptions []resource.Option) (*preparedResource, error) { - detectedAttrs, err := resource.New(ctx, resourceOptions...) +func (smr *storageMonitoredResource) name() string { + return monitoredResourceName +} + +func (smr *storageMonitoredResource) attributes() []string { + return []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"} +} + +func (smr *storageMonitoredResource) detectFromGCP(ctx context.Context, opts ...resource.Option) error { + aopts := append([]resource.Option{resource.WithDetectors(gcp.NewDetector())}, opts...) + detectedAttrs, err := resource.New(ctx, aopts...) if err != nil { - return nil, err + return err } - preparedResource := &preparedResource{} s := detectedAttrs.Set() - p, present := s.Value("cloud.account.id") - // Precedence for user provided project when set - if present && project == "" { - preparedResource.projectToUse = p.AsString() - } else { - preparedResource.projectToUse = project + if p, present := s.Value("cloud.account.id"); present && smr.project == "" { + smr.project = p.AsString() + } else if !present && smr.project == "" { + return errors.New("google cloud project is required to start client-side metrics") } mrAttrs := []attribute.KeyValue{ {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, - {Key: "project_id", Value: attribute.StringValue(project)}, + {Key: "project_id", Value: attribute.StringValue(smr.project)}, {Key: "api", Value: attribute.StringValue("grpc")}, {Key: "instance_id", Value: attribute.StringValue(uuid.New().String())}, } @@ -124,10 +131,10 @@ func newPreparedResource(ctx context.Context, project string, resourceOptions [] } r, err := resource.New(ctx, resource.WithAttributes(mrAttrs...)) if err != nil { - return nil, err + return err } - preparedResource.resource = r - return preparedResource, nil + smr.resource = r + return nil } type metricsContext struct { @@ -155,25 +162,24 @@ func newGRPCMetricContext(ctx context.Context, project string, config storageCon if config.metricExporter != nil { exporter = *config.metricExporter } else { - preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())}) - if err != nil { - return nil, err + smr := &storageMonitoredResource{ + project: project, } - // Implementation requires a project, if one is not determined possibly user - // credentials. Then we will fail stating gRPC Metrics require a project-id. - if project == "" && preparedResource.projectToUse == "" { - return nil, fmt.Errorf("google cloud project is required to start client-side metrics") + if err := smr.detectFromGCP(ctx); err != nil { + return nil, err } - meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource)) + meterOpts = append(meterOpts, metric.WithResource(smr.resource)) meOpts := []mexporter.Option{ - mexporter.WithProjectID(preparedResource.projectToUse), + mexporter.WithProjectID(smr.project), mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), mexporter.WithCreateServiceTimeSeries(), - mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})} - exporter, err = mexporter.New(meOpts...) + mexporter.WithMonitoredResourceDescription(smr.name(), smr.attributes()), + } + ex, err := mexporter.New(meOpts...) if err != nil { return nil, err } + exporter = ex } // Metric views update histogram boundaries to be relevant to GCS // otherwise default OTel histogram boundaries are used. @@ -188,6 +194,9 @@ func newGRPCMetricContext(ctx context.Context, project string, config storageCon } meterOpts = append(meterOpts, metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))), metric.WithView(metricViews...)) + if config.testReader != nil { + meterOpts = append(meterOpts, metric.WithReader(config.testReader)) + } provider := metric.NewMeterProvider(meterOpts...) mo := opentelemetry.MetricsOptions{ MeterProvider: provider, diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index 9dbb592f576c..d66f7a1722ab 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -34,7 +34,7 @@ func TestMetricFormatter(t *testing.T) { } } -func TestNewPreparedResource(t *testing.T) { +func TestStorageMonitoredResource(t *testing.T) { ctx := context.Background() for _, test := range []struct { desc string @@ -77,20 +77,21 @@ func TestNewPreparedResource(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - resourceOptions := []resource.Option{resource.WithAttributes(test.detectedAttributes...)} - result, err := newPreparedResource(ctx, "project", resourceOptions) - if err != nil { - t.Errorf("newPreparedResource: %v", err) + smr := &storageMonitoredResource{ + project: "project", } - resultSet := result.resource.Set() + if err := smr.detectFromGCP(ctx, resource.WithAttributes(test.detectedAttributes...)); err != nil { + t.Errorf("detectFromGCP: %v", err) + } + resultSet := smr.resource.Set() for _, want := range test.wantAttributes.ToSlice() { got, exists := resultSet.Value(want.Key) if !exists { - t.Errorf("newPreparedResource: %v not set", want.Key) + t.Errorf("detectFromGCP: %v not set", want.Key) continue } if got != want.Value { - t.Errorf("newPreparedResource: want[%v] = %v, got: %v", want.Key, want.Value, got) + t.Errorf("detectFromGCP: want[%v] = %v, got: %v", want.Key, want.Value, got) continue } } diff --git a/storage/integration_test.go b/storage/integration_test.go index a9dc265c0ba5..ffaaae3b3818 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -54,6 +54,8 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/googleapis/gax-go/v2/apierror" "go.opentelemetry.io/contrib/detectors/gcp" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "golang.org/x/oauth2/google" "google.golang.org/api/googleapi" @@ -413,6 +415,87 @@ func TestIntegration_DoNotDetectDirectConnectivityWhenDisabled(t *testing.T) { }, internaloption.EnableDirectPath(false)) } +func TestIntegration_MetricsEnablement(t *testing.T) { + ctx := skipHTTP("grpc only test") + mr := metric.NewManualReader() + multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) { + it := client.Bucket(bucket).Objects(ctx, nil) + _, err := it.Next() + if err != iterator.Done { + t.Errorf("Objects.Next: expected iterator.Done got %v", err) + } + metricCheck := map[string]bool{ + "grpc.client.attempt.started": false, + "grpc.client.attempt.duration": false, + "grpc.client.attempt.sent_total_compressed_message_size": false, + "grpc.client.attempt.rcvd_total_compressed_message_size": false, + "grpc.client.call.duration": false, + } + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(context.Background(), &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + metricCheck[m.Name] = true + } + } + for k, v := range metricCheck { + if !v { + t.Errorf("metric %v not found", k) + } + } + }, withTestMetricReader(mr)) +} + +func TestIntegration_MetricsEnablementInGCE(t *testing.T) { + ctx := skipHTTP("grpc only test") + mr := metric.NewManualReader() + multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) { + detectedAttrs, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector())) + if err != nil { + t.Fatalf("resource.New: %v", err) + } + attrs := detectedAttrs.Set() + if v, exists := attrs.Value("cloud.platform"); !exists || v.AsString() != "gcp_compute_engine" { + t.Skip("only testable in a GCE instance") + } + it := client.Buckets(ctx, testutil.ProjID()) + _, _ = it.Next() + metricCheck := map[string]bool{ + "grpc.client.attempt.started": false, + "grpc.client.attempt.duration": false, + "grpc.client.attempt.sent_total_compressed_message_size": false, + "grpc.client.attempt.rcvd_total_compressed_message_size": false, + "grpc.client.call.duration": false, + "grpc.lb.rls.cache_entries": false, + "grpc.lb.rls.cache_size": false, + "grpc.lb.rls.default_target_picks": false, + // TODO: determine a way to force these metrics to be collected + // "grpc.lb.wrr.rr_fallback": false, + // "grpc.lb.wrr.endpoint_weight_not_yet_usable": false, + // "grpc.lb.wrr.endpoint_weight_stale": false, + // "grpc.lb.wrr.endpoint_weights": false, + // "grpc.lb.rls.target_picks": false, + // "grpc.lb.rls.failed_picks": false, + } + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(context.Background(), &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + metricCheck[m.Name] = true + } + } + for k, v := range metricCheck { + if !v { + t.Errorf("metric %v not found", k) + } + } + }, withTestMetricReader(mr)) +} + func TestIntegration_BucketCreateDelete(t *testing.T) { ctx := skipJSONReads(context.Background(), "no reads in test") multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) { diff --git a/storage/option.go b/storage/option.go index 3b0cf9e71831..7714371226b9 100644 --- a/storage/option.go +++ b/storage/option.go @@ -79,6 +79,7 @@ type storageConfig struct { disableClientMetrics bool metricExporter *metric.Exporter metricInterval time.Duration + testReader *metric.ManualReader readStallTimeoutConfig *experimental.ReadStallTimeoutConfig } @@ -192,6 +193,20 @@ func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) { c.metricExporter = w.metricExporter } +type withTestMetricReaderConfig struct { + internaloption.EmbeddableAdapter + // exporter override + metricReader *metric.ManualReader +} + +func withTestMetricReader(ex *metric.ManualReader) option.ClientOption { + return &withTestMetricReaderConfig{metricReader: ex} +} + +func (w *withTestMetricReaderConfig) ApplyStorageOpt(c *storageConfig) { + c.testReader = w.metricReader +} + // WithReadStallTimeout is an option that may be passed to [NewClient]. // It enables the client to retry the stalled read request, happens as part of // storage.Reader creation. As the name suggest, timeout is adjusted dynamically From 09fc863a42d0ad88baf81e5c146279b5a453c1fe Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Tue, 26 Nov 2024 13:57:36 -0800 Subject: [PATCH 03/15] refactor initialization of metricsContext --- storage/grpc_client.go | 14 ++ storage/grpc_metrics.go | 282 +++++++++++++++++------------------ storage/grpc_metrics_test.go | 36 +++-- 3 files changed, 179 insertions(+), 153 deletions(-) diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 937360a4afd8..10e324591b33 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -34,6 +34,7 @@ import ( "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" + "google.golang.org/api/transport" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" @@ -117,6 +118,19 @@ type grpcStorageClient struct { settings *settings } +func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { + var project string + c, err := transport.Creds(ctx, s.clientOption...) + if err == nil { + project = c.ProjectID + } + metricsContext, err := newGRPCMetricContext(ctx, project, config.metricInterval, config.testReader) + if err != nil { + return nil, fmt.Errorf("gRPC Metrics: %w", err) + } + return metricsContext, nil +} + // newGRPCStorageClient initializes a new storageClient that uses the gRPC // Storage API. func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) { diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index cb91879499b5..358559d9bbe9 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -29,8 +29,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/api/option" - "google.golang.org/api/transport" "google.golang.org/grpc" + "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/stats/opentelemetry" ) @@ -39,102 +39,80 @@ const ( metricPrefix = "storage.googleapis.com/client/" ) -func latencyHistogramBoundaries() []float64 { - boundaries := []float64{} - boundary := 0.0 - increment := 0.002 - // 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range - for i := 0; i < 50; i++ { - boundaries = append(boundaries, boundary) - // increment by 2ms - boundary += increment - } - // For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes - for i := 0; i < 150 && boundary < 300; i++ { - boundaries = append(boundaries, boundary) - if i != 0 && i%10 == 0 { - increment *= 2 - } - boundary += increment - } - return boundaries -} - -func sizeHistogramBoundaries() []float64 { - kb := 1024.0 - mb := 1024.0 * kb - gb := 1024.0 * mb - boundaries := []float64{} - boundary := 0.0 - increment := 128 * kb - // 128 KiB increments up to 4MiB, then exponential growth - for len(boundaries) < 200 && boundary <= 16*gb { - boundaries = append(boundaries, boundary) - boundary += increment - if boundary >= 4*mb { - increment *= 2 - } - } - return boundaries -} - -func metricFormatter(m metricdata.Metrics) string { - return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/") -} - // Added to help with tests type storageMonitoredResource struct { - project string - resource *resource.Resource + project string + api string + location string + instanceID string + cloudPlatform string + hostID string + resource *resource.Resource } -func (smr *storageMonitoredResource) name() string { - return monitoredResourceName +func (smr *storageMonitoredResource) toResource() *resource.Resource { + if smr.resource != nil { + return smr.resource + } + // Ignore err since we aren't detecting attributes + smr.resource, _ = resource.New(context.TODO(), resource.WithAttributes([]attribute.KeyValue{ + {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, + {Key: "project_id", Value: attribute.StringValue(smr.project)}, + {Key: "api", Value: attribute.StringValue(smr.api)}, + {Key: "instance_id", Value: attribute.StringValue(smr.instanceID)}, + {Key: "location", Value: attribute.StringValue(smr.location)}, + {Key: "cloud_platform", Value: attribute.StringValue(smr.cloudPlatform)}, + {Key: "host_id", Value: attribute.StringValue(smr.hostID)}, + }...)) + return smr.resource } -func (smr *storageMonitoredResource) attributes() []string { - return []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"} +func (smr *storageMonitoredResource) exporter() (metric.Exporter, error) { + exporter, err := mexporter.New( + mexporter.WithProjectID(smr.project), + mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), + mexporter.WithCreateServiceTimeSeries(), + mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"}), + ) + if err != nil { + return nil, err + } + return exporter, nil } -func (smr *storageMonitoredResource) detectFromGCP(ctx context.Context, opts ...resource.Option) error { +func newStorageMonitoredResource(ctx context.Context, project, api string, opts ...resource.Option) (*storageMonitoredResource, error) { aopts := append([]resource.Option{resource.WithDetectors(gcp.NewDetector())}, opts...) detectedAttrs, err := resource.New(ctx, aopts...) if err != nil { - return err + return nil, err + } + smr := &storageMonitoredResource{ + instanceID: uuid.New().String(), + api: api, + project: project, } s := detectedAttrs.Set() if p, present := s.Value("cloud.account.id"); present && smr.project == "" { smr.project = p.AsString() } else if !present && smr.project == "" { - return errors.New("google cloud project is required to start client-side metrics") - } - mrAttrs := []attribute.KeyValue{ - {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, - {Key: "project_id", Value: attribute.StringValue(smr.project)}, - {Key: "api", Value: attribute.StringValue("grpc")}, - {Key: "instance_id", Value: attribute.StringValue(uuid.New().String())}, + return nil, errors.New("google cloud project is required to start client-side metrics") } if v, ok := s.Value("location"); ok { - mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "location", Value: v}) + smr.location = v.AsString() } else { - mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "location", Value: attribute.StringValue("global")}) + smr.location = "global" } if v, ok := s.Value("cloud.platform"); ok { - mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "cloud_platform", Value: v}) + smr.cloudPlatform = v.AsString() } else { - mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "cloud_platform", Value: attribute.StringValue("unknown")}) + smr.cloudPlatform = "unknown" } if v, ok := s.Value("host.id"); ok { - mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "host_id", Value: v}) + smr.hostID = v.AsString() } else { - mrAttrs = append(mrAttrs, attribute.KeyValue{Key: "host_id", Value: attribute.StringValue("unknown")}) - } - r, err := resource.New(ctx, resource.WithAttributes(mrAttrs...)) - if err != nil { - return err + smr.hostID = "unknown" } - smr.resource = r - return nil + return smr, nil } type metricsContext struct { @@ -146,61 +124,41 @@ type metricsContext struct { close func() } -func createHistogramView(name string, boundaries []float64) metric.View { - return metric.NewView(metric.Instrument{ - Name: name, - Kind: metric.InstrumentKindHistogram, - }, metric.Stream{ - Name: name, - Aggregation: metric.AggregationExplicitBucketHistogram{Boundaries: boundaries}, - }) -} - -func newGRPCMetricContext(ctx context.Context, project string, config storageConfig) (*metricsContext, error) { - var exporter metric.Exporter - meterOpts := []metric.Option{} - if config.metricExporter != nil { - exporter = *config.metricExporter - } else { - smr := &storageMonitoredResource{ - project: project, - } - if err := smr.detectFromGCP(ctx); err != nil { - return nil, err - } - meterOpts = append(meterOpts, metric.WithResource(smr.resource)) - meOpts := []mexporter.Option{ - mexporter.WithProjectID(smr.project), - mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), - mexporter.WithCreateServiceTimeSeries(), - mexporter.WithMonitoredResourceDescription(smr.name(), smr.attributes()), - } - ex, err := mexporter.New(meOpts...) - if err != nil { - return nil, err - } - exporter = ex +func newGRPCMetricContext(ctx context.Context, project string, interval time.Duration, testReader metric.Reader) (*metricsContext, error) { + smr, err := newStorageMonitoredResource(ctx, project, "grpc") + if err != nil { + return nil, err } - // Metric views update histogram boundaries to be relevant to GCS - // otherwise default OTel histogram boundaries are used. - metricViews := []metric.View{ - createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()), - createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), - createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()), + exporter, err := smr.exporter() + if err != nil { + return nil, err + } + if interval == 0 { + interval = time.Minute } - interval := time.Minute - if config.metricInterval > 0 { - interval = config.metricInterval + meterOpts := []metric.Option{ + metric.WithResource(smr.resource), + metric.WithReader( + metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))), + // Metric views update histogram boundaries to be relevant to GCS + // otherwise default OTel histogram boundaries are used. + metric.WithView( + createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()), + createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), + createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries())), } - meterOpts = append(meterOpts, metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))), - metric.WithView(metricViews...)) - if config.testReader != nil { - meterOpts = append(meterOpts, metric.WithReader(config.testReader)) + if testReader != nil { + meterOpts = append(meterOpts, metric.WithReader(testReader)) } provider := metric.NewMeterProvider(meterOpts...) mo := opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics().Add( + Metrics: stats.NewMetrics( + "grpc.client.attempt.started", + "grpc.client.attempt.duration", + "grpc.client.attempt.sent_total_compressed_message_size", + "grpc.client.attempt.rcvd_total_compressed_message_size", + "grpc.client.call.duration", "grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", @@ -209,41 +167,26 @@ func newGRPCMetricContext(ctx context.Context, project string, config storageCon "grpc.lb.rls.cache_size", "grpc.lb.rls.default_target_picks", "grpc.lb.rls.target_picks", - "grpc.lb.rls.failed_picks"), + "grpc.lb.rls.failed_picks", + ), OptionalLabels: []string{"grpc.lb.locality"}, } opts := []option.ClientOption{ - option.WithGRPCDialOption(opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})), - option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), + option.WithGRPCDialOption( + opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})), + option.WithGRPCDialOption( + grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), } context := &metricsContext{ clientOpts: opts, provider: provider, - close: createShutdown(ctx, provider), + close: func() { + provider.Shutdown(ctx) + }, } return context, nil } -func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { - var project string - c, err := transport.Creds(ctx, s.clientOption...) - if err == nil { - project = c.ProjectID - } - // Enable client-side metrics for gRPC - metricsContext, err := newGRPCMetricContext(ctx, project, config) - if err != nil { - return nil, fmt.Errorf("gRPC Metrics: %w", err) - } - return metricsContext, nil -} - -func createShutdown(ctx context.Context, provider *metric.MeterProvider) func() { - return func() { - provider.Shutdown(ctx) - } -} - // Silences permission errors after initial error is emitted to prevent // chatty logs. type exporterLogSuppressor struct { @@ -280,3 +223,56 @@ func (e *exporterLogSuppressor) ForceFlush(ctx context.Context) error { func (e *exporterLogSuppressor) Shutdown(ctx context.Context) error { return e.exporter.Shutdown(ctx) } + +func latencyHistogramBoundaries() []float64 { + boundaries := []float64{} + boundary := 0.0 + increment := 0.002 + // 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range + for i := 0; i < 50; i++ { + boundaries = append(boundaries, boundary) + // increment by 2ms + boundary += increment + } + // For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes + for i := 0; i < 150 && boundary < 300; i++ { + boundaries = append(boundaries, boundary) + if i != 0 && i%10 == 0 { + increment *= 2 + } + boundary += increment + } + return boundaries +} + +func sizeHistogramBoundaries() []float64 { + kb := 1024.0 + mb := 1024.0 * kb + gb := 1024.0 * mb + boundaries := []float64{} + boundary := 0.0 + increment := 128 * kb + // 128 KiB increments up to 4MiB, then exponential growth + for len(boundaries) < 200 && boundary <= 16*gb { + boundaries = append(boundaries, boundary) + boundary += increment + if boundary >= 4*mb { + increment *= 2 + } + } + return boundaries +} + +func createHistogramView(name string, boundaries []float64) metric.View { + return metric.NewView(metric.Instrument{ + Name: name, + Kind: metric.InstrumentKindHistogram, + }, metric.Stream{ + Name: name, + Aggregation: metric.AggregationExplicitBucketHistogram{Boundaries: boundaries}, + }) +} + +func metricFormatter(m metricdata.Metrics) string { + return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/") +} diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index d66f7a1722ab..075f06d47bf4 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -38,11 +38,15 @@ func TestStorageMonitoredResource(t *testing.T) { ctx := context.Background() for _, test := range []struct { desc string + project string + api string detectedAttributes []attribute.KeyValue wantAttributes attribute.Set }{ { - desc: "default values set when GCP attributes are not detected", + desc: "default values set when GCP attributes are not detected", + project: "project-id", + api: "grpc", wantAttributes: attribute.NewSet(attribute.KeyValue{ Key: "location", Value: attribute.StringValue("global"), @@ -52,10 +56,18 @@ func TestStorageMonitoredResource(t *testing.T) { }, attribute.KeyValue{ Key: "host_id", Value: attribute.StringValue("unknown"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), }), }, { - desc: "use detected values when GCP attributes are detected", + desc: "use detected values when GCP attributes are detected", + project: "project-id", + api: "grpc", detectedAttributes: []attribute.KeyValue{ {Key: "location", Value: attribute.StringValue("us-central1")}, @@ -73,25 +85,29 @@ func TestStorageMonitoredResource(t *testing.T) { }, attribute.KeyValue{ Key: "host_id", Value: attribute.StringValue("gce-instance-id"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), }), }, } { t.Run(test.desc, func(t *testing.T) { - smr := &storageMonitoredResource{ - project: "project", - } - if err := smr.detectFromGCP(ctx, resource.WithAttributes(test.detectedAttributes...)); err != nil { - t.Errorf("detectFromGCP: %v", err) + smr, err := newStorageMonitoredResource(ctx, test.project, test.api, resource.WithAttributes(test.detectedAttributes...)) + if err != nil { + t.Errorf("newStorageMonitoredResource: %v", err) } - resultSet := smr.resource.Set() + resultSet := smr.toResource().Set() for _, want := range test.wantAttributes.ToSlice() { got, exists := resultSet.Value(want.Key) if !exists { - t.Errorf("detectFromGCP: %v not set", want.Key) + t.Errorf("resultSet[%v] not set", want.Key) continue } if got != want.Value { - t.Errorf("detectFromGCP: want[%v] = %v, got: %v", want.Key, want.Value, got) + t.Errorf("want[%v] = %v, got: %v", want.Key, want.Value.AsString(), got.AsString()) continue } } From be04e92e294310a976bd7a5499e68e4b94d94cdd Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Tue, 26 Nov 2024 15:03:21 -0800 Subject: [PATCH 04/15] remove exporter experimental option and add manual reader for tests --- storage/experimental/experimental.go | 8 -------- storage/grpc_client.go | 6 +++++- storage/grpc_metrics.go | 19 +++++++++++++------ storage/internal/experimental.go | 4 ---- storage/option.go | 20 ++------------------ storage/option_test.go | 24 +++++++----------------- 6 files changed, 27 insertions(+), 54 deletions(-) diff --git a/storage/experimental/experimental.go b/storage/experimental/experimental.go index b35de64d39d2..64066ecca33d 100644 --- a/storage/experimental/experimental.go +++ b/storage/experimental/experimental.go @@ -25,7 +25,6 @@ import ( "time" "cloud.google.com/go/storage/internal" - "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" ) @@ -37,13 +36,6 @@ func WithMetricInterval(metricInterval time.Duration) option.ClientOption { return internal.WithMetricInterval.(func(time.Duration) option.ClientOption)(metricInterval) } -// WithMetricExporter provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient]. -// Set an alternate client-side metric Exporter to emit metrics through. -// Must implement [metric.Exporter] -func WithMetricExporter(ex *metric.Exporter) option.ClientOption { - return internal.WithMetricExporter.(func(*metric.Exporter) option.ClientOption)(ex) -} - // WithReadStallTimeout provides a [option.ClientOption] that may be passed to [storage.NewClient]. // It enables the client to retry stalled requests when starting a download from // Cloud Storage. If the timeout elapses with no response from the server, the request diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 10e324591b33..a3fec2b25fb2 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -124,7 +124,11 @@ func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) if err == nil { project = c.ProjectID } - metricsContext, err := newGRPCMetricContext(ctx, project, config.metricInterval, config.testReader) + metricsContext, err := newGRPCMetricContext(ctx, metricsConfig{ + project: project, + interval: config.metricInterval, + manualReader: config.manualReader}, + ) if err != nil { return nil, fmt.Errorf("gRPC Metrics: %w", err) } diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 358559d9bbe9..10c60cd205d0 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -124,8 +124,14 @@ type metricsContext struct { close func() } -func newGRPCMetricContext(ctx context.Context, project string, interval time.Duration, testReader metric.Reader) (*metricsContext, error) { - smr, err := newStorageMonitoredResource(ctx, project, "grpc") +type metricsConfig struct { + project string + interval time.Duration + manualReader *metric.ManualReader +} + +func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsContext, error) { + smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc") if err != nil { return nil, err } @@ -133,8 +139,9 @@ func newGRPCMetricContext(ctx context.Context, project string, interval time.Dur if err != nil { return nil, err } - if interval == 0 { - interval = time.Minute + interval := time.Minute + if cfg.interval > 0 { + interval = cfg.interval } meterOpts := []metric.Option{ metric.WithResource(smr.resource), @@ -147,8 +154,8 @@ func newGRPCMetricContext(ctx context.Context, project string, interval time.Dur createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries())), } - if testReader != nil { - meterOpts = append(meterOpts, metric.WithReader(testReader)) + if cfg.manualReader != nil { + meterOpts = append(meterOpts, metric.WithReader(cfg.manualReader)) } provider := metric.NewMeterProvider(meterOpts...) mo := opentelemetry.MetricsOptions{ diff --git a/storage/internal/experimental.go b/storage/internal/experimental.go index 7db3ff52743c..70c51f34fcfd 100644 --- a/storage/internal/experimental.go +++ b/storage/internal/experimental.go @@ -22,10 +22,6 @@ var ( // greater than 1 minute. WithMetricInterval any // func (*time.Duration) option.ClientOption - // WithMetricExporter is a function which is implemented by storage package. - // Set an alternate client-side metric Exporter to emit metrics through. - WithMetricExporter any // func (*metric.Exporter) option.ClientOption - // WithReadStallTimeout is a function which is implemented by storage package. // It takes ReadStallTimeoutConfig as inputs and returns a option.ClientOption. WithReadStallTimeout any // func (*ReadStallTimeoutConfig) option.ClientOption diff --git a/storage/option.go b/storage/option.go index 7714371226b9..8103c9556cca 100644 --- a/storage/option.go +++ b/storage/option.go @@ -37,7 +37,6 @@ const ( func init() { // initialize experimental options - storageinternal.WithMetricExporter = withMetricExporter storageinternal.WithMetricInterval = withMetricInterval storageinternal.WithReadStallTimeout = withReadStallTimeout } @@ -77,9 +76,8 @@ type storageConfig struct { useJSONforReads bool readAPIWasSet bool disableClientMetrics bool - metricExporter *metric.Exporter metricInterval time.Duration - testReader *metric.ManualReader + manualReader *metric.ManualReader readStallTimeoutConfig *experimental.ReadStallTimeoutConfig } @@ -179,20 +177,6 @@ func (w *withMeterOptions) ApplyStorageOpt(c *storageConfig) { c.metricInterval = w.interval } -type withMetricExporterConfig struct { - internaloption.EmbeddableAdapter - // exporter override - metricExporter *metric.Exporter -} - -func withMetricExporter(ex *metric.Exporter) option.ClientOption { - return &withMetricExporterConfig{metricExporter: ex} -} - -func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) { - c.metricExporter = w.metricExporter -} - type withTestMetricReaderConfig struct { internaloption.EmbeddableAdapter // exporter override @@ -204,7 +188,7 @@ func withTestMetricReader(ex *metric.ManualReader) option.ClientOption { } func (w *withTestMetricReaderConfig) ApplyStorageOpt(c *storageConfig) { - c.testReader = w.metricReader + c.manualReader = w.metricReader } // WithReadStallTimeout is an option that may be passed to [NewClient]. diff --git a/storage/option_test.go b/storage/option_test.go index ea7c3915fab8..84ac0a1fea33 100644 --- a/storage/option_test.go +++ b/storage/option_test.go @@ -21,7 +21,7 @@ import ( "cloud.google.com/go/storage/experimental" "github.com/google/go-cmp/cmp" - "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" ) @@ -39,7 +39,6 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: true, disableClientMetrics: false, metricInterval: 0, - metricExporter: nil, }, }, { @@ -50,7 +49,6 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: true, disableClientMetrics: false, metricInterval: 0, - metricExporter: nil, }, }, { @@ -61,7 +59,6 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: true, disableClientMetrics: false, metricInterval: 0, - metricExporter: nil, }, }, { @@ -72,7 +69,6 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: false, disableClientMetrics: false, metricInterval: 0, - metricExporter: nil, }, }, { @@ -83,7 +79,6 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: false, disableClientMetrics: false, metricInterval: 0, - metricExporter: nil, }, }, { @@ -94,7 +89,6 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: false, disableClientMetrics: true, metricInterval: 0, - metricExporter: nil, }, }, { @@ -105,7 +99,6 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: false, disableClientMetrics: false, metricInterval: time.Minute * 5, - metricExporter: nil, }, }, { @@ -152,21 +145,18 @@ func TestApplyStorageOpt(t *testing.T) { } } -func TestSetCustomExporter(t *testing.T) { - exporter, err := stdoutmetric.New() - if err != nil { - t.Errorf("TestSetCustomExporter: %v", err) - } +func TestSetManualReader(t *testing.T) { + manualReader := metric.NewManualReader() want := storageConfig{ - metricExporter: &exporter, + manualReader: manualReader, } var got storageConfig - opt := experimental.WithMetricExporter(&exporter) + opt := withTestMetricReader(manualReader) if storageOpt, ok := opt.(storageClientOption); ok { storageOpt.ApplyStorageOpt(&got) } - if got.metricExporter != want.metricExporter { - t.Errorf("TestSetCustomExpoerter: metricExporter want=%v, got=%v", want.metricExporter, got.metricExporter) + if got.manualReader != want.manualReader { + t.Errorf("TestSetCustomExpoerter: manualReader want=%v, got=%v", want.manualReader, got.manualReader) } } From 256b835a1c90f435e331b4783d018951e5f33b84 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Tue, 26 Nov 2024 16:04:08 -0800 Subject: [PATCH 05/15] go mod tidy --- storage/go.mod | 1 - storage/go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/storage/go.mod b/storage/go.mod index 711999ed472b..a5535fecca00 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -15,7 +15,6 @@ require ( github.com/googleapis/gax-go/v2 v2.13.0 go.opentelemetry.io/contrib/detectors/gcp v1.29.0 go.opentelemetry.io/otel v1.29.0 - go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 go.opentelemetry.io/otel/sdk v1.29.0 go.opentelemetry.io/otel/sdk/metric v1.29.0 golang.org/x/oauth2 v0.23.0 diff --git a/storage/go.sum b/storage/go.sum index 482c6888d478..9581ded222ab 100644 --- a/storage/go.sum +++ b/storage/go.sum @@ -115,8 +115,6 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+n go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= -go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 h1:WDdP9acbMYjbKIyJUhTvtzj601sVJOqgWdUxSdR/Ysc= -go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0/go.mod h1:BLbf7zbNIONBLPwvFnwNHGj4zge8uTCM/UPIVW1Mq2I= go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= From 1e2cc6f4d11d4221e5782348ef8528258ecb650d Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Sun, 1 Dec 2024 15:37:05 -0800 Subject: [PATCH 06/15] dependency inject gcp detector for testing --- storage/grpc_metrics.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 10c60cd205d0..17b950361836 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -44,9 +44,9 @@ type storageMonitoredResource struct { project string api string location string - instanceID string + instance string cloudPlatform string - hostID string + host string resource *resource.Resource } @@ -59,10 +59,10 @@ func (smr *storageMonitoredResource) toResource() *resource.Resource { {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, {Key: "project_id", Value: attribute.StringValue(smr.project)}, {Key: "api", Value: attribute.StringValue(smr.api)}, - {Key: "instance_id", Value: attribute.StringValue(smr.instanceID)}, + {Key: "instance_id", Value: attribute.StringValue(smr.instance)}, {Key: "location", Value: attribute.StringValue(smr.location)}, {Key: "cloud_platform", Value: attribute.StringValue(smr.cloudPlatform)}, - {Key: "host_id", Value: attribute.StringValue(smr.hostID)}, + {Key: "host_id", Value: attribute.StringValue(smr.host)}, }...)) return smr.resource } @@ -81,15 +81,14 @@ func (smr *storageMonitoredResource) exporter() (metric.Exporter, error) { } func newStorageMonitoredResource(ctx context.Context, project, api string, opts ...resource.Option) (*storageMonitoredResource, error) { - aopts := append([]resource.Option{resource.WithDetectors(gcp.NewDetector())}, opts...) - detectedAttrs, err := resource.New(ctx, aopts...) + detectedAttrs, err := resource.New(ctx, opts...) if err != nil { return nil, err } smr := &storageMonitoredResource{ - instanceID: uuid.New().String(), - api: api, - project: project, + instance: uuid.New().String(), + api: api, + project: project, } s := detectedAttrs.Set() if p, present := s.Value("cloud.account.id"); present && smr.project == "" { @@ -108,9 +107,9 @@ func newStorageMonitoredResource(ctx context.Context, project, api string, opts smr.cloudPlatform = "unknown" } if v, ok := s.Value("host.id"); ok { - smr.hostID = v.AsString() + smr.host = v.AsString() } else { - smr.hostID = "unknown" + smr.host = "unknown" } return smr, nil } @@ -127,11 +126,11 @@ type metricsContext struct { type metricsConfig struct { project string interval time.Duration - manualReader *metric.ManualReader + manualReader *metric.ManualReader // used by tests } func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsContext, error) { - smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc") + smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc", resource.WithDetectors(gcp.NewDetector())) if err != nil { return nil, err } @@ -184,14 +183,13 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte option.WithGRPCDialOption( grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), } - context := &metricsContext{ + return &metricsContext{ clientOpts: opts, provider: provider, close: func() { provider.Shutdown(ctx) }, - } - return context, nil + }, nil } // Silences permission errors after initial error is emitted to prevent From bb67dd3d38220eeb1c3264d9d617b172eb808c7b Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Mon, 2 Dec 2024 11:47:49 -0800 Subject: [PATCH 07/15] revert removal of metricExporter option --- storage/experimental/experimental.go | 8 +++++++ storage/go.mod | 1 + storage/go.sum | 2 ++ storage/grpc_metrics.go | 33 +++++++++++++++++----------- storage/internal/experimental.go | 4 ++++ storage/option.go | 18 ++++++++++++++- storage/option_test.go | 19 ++++++++++++++++ 7 files changed, 71 insertions(+), 14 deletions(-) diff --git a/storage/experimental/experimental.go b/storage/experimental/experimental.go index 64066ecca33d..b35de64d39d2 100644 --- a/storage/experimental/experimental.go +++ b/storage/experimental/experimental.go @@ -25,6 +25,7 @@ import ( "time" "cloud.google.com/go/storage/internal" + "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" ) @@ -36,6 +37,13 @@ func WithMetricInterval(metricInterval time.Duration) option.ClientOption { return internal.WithMetricInterval.(func(time.Duration) option.ClientOption)(metricInterval) } +// WithMetricExporter provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient]. +// Set an alternate client-side metric Exporter to emit metrics through. +// Must implement [metric.Exporter] +func WithMetricExporter(ex *metric.Exporter) option.ClientOption { + return internal.WithMetricExporter.(func(*metric.Exporter) option.ClientOption)(ex) +} + // WithReadStallTimeout provides a [option.ClientOption] that may be passed to [storage.NewClient]. // It enables the client to retry stalled requests when starting a download from // Cloud Storage. If the timeout elapses with no response from the server, the request diff --git a/storage/go.mod b/storage/go.mod index a5535fecca00..711999ed472b 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -15,6 +15,7 @@ require ( github.com/googleapis/gax-go/v2 v2.13.0 go.opentelemetry.io/contrib/detectors/gcp v1.29.0 go.opentelemetry.io/otel v1.29.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 go.opentelemetry.io/otel/sdk v1.29.0 go.opentelemetry.io/otel/sdk/metric v1.29.0 golang.org/x/oauth2 v0.23.0 diff --git a/storage/go.sum b/storage/go.sum index 9581ded222ab..482c6888d478 100644 --- a/storage/go.sum +++ b/storage/go.sum @@ -115,6 +115,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+n go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 h1:WDdP9acbMYjbKIyJUhTvtzj601sVJOqgWdUxSdR/Ysc= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0/go.mod h1:BLbf7zbNIONBLPwvFnwNHGj4zge8uTCM/UPIVW1Mq2I= go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 17b950361836..42ab50a2b6d4 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -124,26 +124,33 @@ type metricsContext struct { } type metricsConfig struct { - project string - interval time.Duration - manualReader *metric.ManualReader // used by tests + project string + interval time.Duration + customExporter *metric.Exporter + manualReader *metric.ManualReader // used by tests } func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsContext, error) { - smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc", resource.WithDetectors(gcp.NewDetector())) - if err != nil { - return nil, err - } - exporter, err := smr.exporter() - if err != nil { - return nil, err + var exporter metric.Exporter + meterOpts := []metric.Option{} + if cfg.customExporter == nil { + smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc", resource.WithDetectors(gcp.NewDetector())) + if err != nil { + return nil, err + } + exporter, err = smr.exporter() + if err != nil { + return nil, err + } + meterOpts = append(meterOpts, metric.WithResource(smr.resource)) + } else { + exporter = *cfg.customExporter } interval := time.Minute if cfg.interval > 0 { interval = cfg.interval } - meterOpts := []metric.Option{ - metric.WithResource(smr.resource), + meterOpts = append(meterOpts, metric.WithReader( metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))), // Metric views update histogram boundaries to be relevant to GCS @@ -152,7 +159,7 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()), createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries())), - } + ) if cfg.manualReader != nil { meterOpts = append(meterOpts, metric.WithReader(cfg.manualReader)) } diff --git a/storage/internal/experimental.go b/storage/internal/experimental.go index 70c51f34fcfd..7db3ff52743c 100644 --- a/storage/internal/experimental.go +++ b/storage/internal/experimental.go @@ -22,6 +22,10 @@ var ( // greater than 1 minute. WithMetricInterval any // func (*time.Duration) option.ClientOption + // WithMetricExporter is a function which is implemented by storage package. + // Set an alternate client-side metric Exporter to emit metrics through. + WithMetricExporter any // func (*metric.Exporter) option.ClientOption + // WithReadStallTimeout is a function which is implemented by storage package. // It takes ReadStallTimeoutConfig as inputs and returns a option.ClientOption. WithReadStallTimeout any // func (*ReadStallTimeoutConfig) option.ClientOption diff --git a/storage/option.go b/storage/option.go index 8103c9556cca..a7474842b78a 100644 --- a/storage/option.go +++ b/storage/option.go @@ -37,6 +37,7 @@ const ( func init() { // initialize experimental options + storageinternal.WithMetricExporter = withMetricExporter storageinternal.WithMetricInterval = withMetricInterval storageinternal.WithReadStallTimeout = withReadStallTimeout } @@ -76,6 +77,7 @@ type storageConfig struct { useJSONforReads bool readAPIWasSet bool disableClientMetrics bool + metricExporter *metric.Exporter metricInterval time.Duration manualReader *metric.ManualReader readStallTimeoutConfig *experimental.ReadStallTimeoutConfig @@ -177,9 +179,23 @@ func (w *withMeterOptions) ApplyStorageOpt(c *storageConfig) { c.metricInterval = w.interval } -type withTestMetricReaderConfig struct { +type withMetricExporterConfig struct { internaloption.EmbeddableAdapter // exporter override + metricExporter *metric.Exporter +} + +func withMetricExporter(ex *metric.Exporter) option.ClientOption { + return &withMetricExporterConfig{metricExporter: ex} +} + +func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) { + c.metricExporter = w.metricExporter +} + +type withTestMetricReaderConfig struct { + internaloption.EmbeddableAdapter + // reader override metricReader *metric.ManualReader } diff --git a/storage/option_test.go b/storage/option_test.go index 84ac0a1fea33..be11682c4ae3 100644 --- a/storage/option_test.go +++ b/storage/option_test.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/storage/experimental" "github.com/google/go-cmp/cmp" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" ) @@ -145,6 +146,24 @@ func TestApplyStorageOpt(t *testing.T) { } } +func TestSetCustomExporter(t *testing.T) { + exporter, err := stdoutmetric.New() + if err != nil { + t.Errorf("TestSetCustomExporter: %v", err) + } + want := storageConfig{ + metricExporter: &exporter, + } + var got storageConfig + opt := experimental.WithMetricExporter(&exporter) + if storageOpt, ok := opt.(storageClientOption); ok { + storageOpt.ApplyStorageOpt(&got) + } + if got.metricExporter != want.metricExporter { + t.Errorf("TestSetCustomExpoerter: metricExporter want=%v, got=%v", want.metricExporter, got.metricExporter) + } +} + func TestSetManualReader(t *testing.T) { manualReader := metric.NewManualReader() want := storageConfig{ From 7e4095b803312978dbb4d534309c93a2853fd401 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Mon, 2 Dec 2024 11:56:35 -0800 Subject: [PATCH 08/15] revert changes to metricExporter option tests --- storage/option_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/storage/option_test.go b/storage/option_test.go index be11682c4ae3..1e21a875be76 100644 --- a/storage/option_test.go +++ b/storage/option_test.go @@ -40,6 +40,7 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: true, disableClientMetrics: false, metricInterval: 0, + metricExporter: nil, }, }, { @@ -50,6 +51,7 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: true, disableClientMetrics: false, metricInterval: 0, + metricExporter: nil, }, }, { @@ -60,6 +62,7 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: true, disableClientMetrics: false, metricInterval: 0, + metricExporter: nil, }, }, { @@ -70,6 +73,7 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: false, disableClientMetrics: false, metricInterval: 0, + metricExporter: nil, }, }, { @@ -80,6 +84,7 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: false, disableClientMetrics: false, metricInterval: 0, + metricExporter: nil, }, }, { @@ -90,6 +95,7 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: false, disableClientMetrics: true, metricInterval: 0, + metricExporter: nil, }, }, { @@ -100,6 +106,7 @@ func TestApplyStorageOpt(t *testing.T) { readAPIWasSet: false, disableClientMetrics: false, metricInterval: time.Minute * 5, + metricExporter: nil, }, }, { From 751b810addf56d177fc1a9c5ab2c8ce5ba803189 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Mon, 2 Dec 2024 14:51:38 -0800 Subject: [PATCH 09/15] use struct embedding --- storage/grpc_client.go | 1 + storage/grpc_metrics.go | 22 +++------------------- storage/grpc_metrics_test.go | 22 ++++------------------ 3 files changed, 8 insertions(+), 37 deletions(-) diff --git a/storage/grpc_client.go b/storage/grpc_client.go index a3fec2b25fb2..904ad48453a6 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -120,6 +120,7 @@ type grpcStorageClient struct { func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { var project string + // TODO: use new auth client c, err := transport.Creds(ctx, s.clientOption...) if err == nil { project = c.ProjectID diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 42ab50a2b6d4..3e91b08e77ed 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -152,7 +152,7 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte } meterOpts = append(meterOpts, metric.WithReader( - metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))), + metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval))), // Metric views update histogram boundaries to be relevant to GCS // otherwise default OTel histogram boundaries are used. metric.WithView( @@ -202,7 +202,7 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte // Silences permission errors after initial error is emitted to prevent // chatty logs. type exporterLogSuppressor struct { - exporter metric.Exporter + metric.Exporter emittedFailure bool } @@ -210,7 +210,7 @@ type exporterLogSuppressor struct { // lack of credentials after initial failure. // https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric@v1.28.0#Exporter func (e *exporterLogSuppressor) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { - if err := e.exporter.Export(ctx, rm); err != nil && !e.emittedFailure { + if err := e.Exporter.Export(ctx, rm); err != nil && !e.emittedFailure { if strings.Contains(err.Error(), "PermissionDenied") { e.emittedFailure = true return fmt.Errorf("gRPC metrics failed due permission issue: %w", err) @@ -220,22 +220,6 @@ func (e *exporterLogSuppressor) Export(ctx context.Context, rm *metricdata.Resou return nil } -func (e *exporterLogSuppressor) Temporality(k metric.InstrumentKind) metricdata.Temporality { - return e.exporter.Temporality(k) -} - -func (e *exporterLogSuppressor) Aggregation(k metric.InstrumentKind) metric.Aggregation { - return e.exporter.Aggregation(k) -} - -func (e *exporterLogSuppressor) ForceFlush(ctx context.Context) error { - return e.exporter.ForceFlush(ctx) -} - -func (e *exporterLogSuppressor) Shutdown(ctx context.Context) error { - return e.exporter.Shutdown(ctx) -} - func latencyHistogramBoundaries() []float64 { boundaries := []float64{} boundary := 0.0 diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index 075f06d47bf4..c897849a6ddd 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -117,7 +117,7 @@ func TestStorageMonitoredResource(t *testing.T) { func TestNewExporterLogSuppressor(t *testing.T) { ctx := context.Background() - s := &exporterLogSuppressor{exporter: &failingExporter{}} + s := &exporterLogSuppressor{Exporter: &failingExporter{}} if err := s.Export(ctx, nil); err == nil { t.Errorf("exporterLogSuppressor: did not emit an error when one was expected") } @@ -126,24 +126,10 @@ func TestNewExporterLogSuppressor(t *testing.T) { } } -type failingExporter struct{} +type failingExporter struct { + metric.Exporter +} func (f *failingExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { return fmt.Errorf("PermissionDenied") } - -func (f *failingExporter) Temporality(m metric.InstrumentKind) metricdata.Temporality { - return metricdata.CumulativeTemporality -} - -func (f *failingExporter) Aggregation(ik metric.InstrumentKind) metric.Aggregation { - return metric.AggregationDefault{} -} - -func (f *failingExporter) ForceFlush(ctx context.Context) error { - return nil -} - -func (f *failingExporter) Shutdown(ctx context.Context) error { - return nil -} From 3906ab7ac41ef57cfbe794d4d61a030c9ca9389a Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Tue, 3 Dec 2024 14:12:12 -0800 Subject: [PATCH 10/15] pass in resource correctly to exporter --- storage/grpc_metrics.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 3e91b08e77ed..bd63feea993a 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -50,23 +50,6 @@ type storageMonitoredResource struct { resource *resource.Resource } -func (smr *storageMonitoredResource) toResource() *resource.Resource { - if smr.resource != nil { - return smr.resource - } - // Ignore err since we aren't detecting attributes - smr.resource, _ = resource.New(context.TODO(), resource.WithAttributes([]attribute.KeyValue{ - {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, - {Key: "project_id", Value: attribute.StringValue(smr.project)}, - {Key: "api", Value: attribute.StringValue(smr.api)}, - {Key: "instance_id", Value: attribute.StringValue(smr.instance)}, - {Key: "location", Value: attribute.StringValue(smr.location)}, - {Key: "cloud_platform", Value: attribute.StringValue(smr.cloudPlatform)}, - {Key: "host_id", Value: attribute.StringValue(smr.host)}, - }...)) - return smr.resource -} - func (smr *storageMonitoredResource) exporter() (metric.Exporter, error) { exporter, err := mexporter.New( mexporter.WithProjectID(smr.project), @@ -111,6 +94,18 @@ func newStorageMonitoredResource(ctx context.Context, project, api string, opts } else { smr.host = "unknown" } + smr.resource, err = resource.New(ctx, resource.WithAttributes([]attribute.KeyValue{ + {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, + {Key: "project_id", Value: attribute.StringValue(smr.project)}, + {Key: "api", Value: attribute.StringValue(smr.api)}, + {Key: "instance_id", Value: attribute.StringValue(smr.instance)}, + {Key: "location", Value: attribute.StringValue(smr.location)}, + {Key: "cloud_platform", Value: attribute.StringValue(smr.cloudPlatform)}, + {Key: "host_id", Value: attribute.StringValue(smr.host)}, + }...)) + if err != nil { + return nil, err + } return smr, nil } From 34e04cbd001be7fbd3e497da6a5f44e30bb9a075 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Tue, 3 Dec 2024 14:16:32 -0800 Subject: [PATCH 11/15] update test using resource value --- storage/grpc_metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index c897849a6ddd..276e2aa5ce21 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -99,7 +99,7 @@ func TestStorageMonitoredResource(t *testing.T) { if err != nil { t.Errorf("newStorageMonitoredResource: %v", err) } - resultSet := smr.toResource().Set() + resultSet := smr.resource.Set() for _, want := range test.wantAttributes.ToSlice() { got, exists := resultSet.Value(want.Key) if !exists { From 6308f2f12bf7ded7dd595d03789bc27fd6933b31 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Tue, 3 Dec 2024 16:25:19 -0800 Subject: [PATCH 12/15] add tests checking resource values --- storage/grpc_metrics.go | 18 ++++++-- storage/grpc_metrics_test.go | 83 ++++++++++++++++++++++++++++++++++-- storage/integration_test.go | 43 +++++++++++++++---- 3 files changed, 128 insertions(+), 16 deletions(-) diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index bd63feea993a..99257d917c60 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -79,7 +79,7 @@ func newStorageMonitoredResource(ctx context.Context, project, api string, opts } else if !present && smr.project == "" { return nil, errors.New("google cloud project is required to start client-side metrics") } - if v, ok := s.Value("location"); ok { + if v, ok := s.Value("cloud.region"); ok { smr.location = v.AsString() } else { smr.location = "global" @@ -91,6 +91,8 @@ func newStorageMonitoredResource(ctx context.Context, project, api string, opts } if v, ok := s.Value("host.id"); ok { smr.host = v.AsString() + } else if v, ok := s.Value("faas.id"); ok { + smr.host = v.AsString() } else { smr.host = "unknown" } @@ -123,13 +125,20 @@ type metricsConfig struct { interval time.Duration customExporter *metric.Exporter manualReader *metric.ManualReader // used by tests + resourceOpts []resource.Option // used by tests } func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsContext, error) { var exporter metric.Exporter meterOpts := []metric.Option{} if cfg.customExporter == nil { - smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc", resource.WithDetectors(gcp.NewDetector())) + var ropts []resource.Option + if cfg.resourceOpts != nil { + ropts = cfg.resourceOpts + } else { + ropts = []resource.Option{resource.WithDetectors(gcp.NewDetector())} + } + smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc", ropts...) if err != nil { return nil, err } @@ -146,8 +155,6 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte interval = cfg.interval } meterOpts = append(meterOpts, - metric.WithReader( - metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval))), // Metric views update histogram boundaries to be relevant to GCS // otherwise default OTel histogram boundaries are used. metric.WithView( @@ -157,6 +164,9 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte ) if cfg.manualReader != nil { meterOpts = append(meterOpts, metric.WithReader(cfg.manualReader)) + } else { + meterOpts = append(meterOpts, metric.WithReader( + metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval)))) } provider := metric.NewMeterProvider(meterOpts...) mo := opentelemetry.MetricsOptions{ diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index 276e2aa5ce21..05caefba4f67 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -65,14 +65,14 @@ func TestStorageMonitoredResource(t *testing.T) { }), }, { - desc: "use detected values when GCP attributes are detected", + desc: "use detected values when GCE attributes are detected", project: "project-id", api: "grpc", detectedAttributes: []attribute.KeyValue{ - {Key: "location", + {Key: "cloud.region", Value: attribute.StringValue("us-central1")}, {Key: "cloud.platform", - Value: attribute.StringValue("gcp")}, + Value: attribute.StringValue("gce")}, {Key: "host.id", Value: attribute.StringValue("gce-instance-id")}, }, @@ -81,7 +81,7 @@ func TestStorageMonitoredResource(t *testing.T) { Value: attribute.StringValue("us-central1"), }, attribute.KeyValue{ Key: "cloud_platform", - Value: attribute.StringValue("gcp"), + Value: attribute.StringValue("gce"), }, attribute.KeyValue{ Key: "host_id", Value: attribute.StringValue("gce-instance-id"), @@ -93,6 +93,35 @@ func TestStorageMonitoredResource(t *testing.T) { Value: attribute.StringValue("grpc"), }), }, + { + desc: "use detected values when FAAS attributes are detected", + project: "project-id", + api: "grpc", + detectedAttributes: []attribute.KeyValue{ + {Key: "cloud.region", + Value: attribute.StringValue("us-central1")}, + {Key: "cloud.platform", + Value: attribute.StringValue("cloud-run")}, + {Key: "faas.id", + Value: attribute.StringValue("run-instance-id")}, + }, + wantAttributes: attribute.NewSet(attribute.KeyValue{ + Key: "location", + Value: attribute.StringValue("us-central1"), + }, attribute.KeyValue{ + Key: "cloud_platform", + Value: attribute.StringValue("cloud-run"), + }, attribute.KeyValue{ + Key: "host_id", + Value: attribute.StringValue("run-instance-id"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), + }), + }, } { t.Run(test.desc, func(t *testing.T) { smr, err := newStorageMonitoredResource(ctx, test.project, test.api, resource.WithAttributes(test.detectedAttributes...)) @@ -115,6 +144,52 @@ func TestStorageMonitoredResource(t *testing.T) { } } +func TestNewGRPCMetricContext(t *testing.T) { + ctx := context.Background() + mr := metric.NewManualReader() + attrs := []attribute.KeyValue{ + {Key: "cloud.region", + Value: attribute.StringValue("us-central1")}, + {Key: "cloud.platform", + Value: attribute.StringValue("gcp")}, + {Key: "host.id", + Value: attribute.StringValue("gce-instance-id")}, + } + cfg := metricsConfig{ + project: "project-id", + manualReader: mr, + resourceOpts: []resource.Option{resource.WithAttributes(attrs...)}, + } + mc, err := newGRPCMetricContext(ctx, cfg) + if err != nil { + t.Errorf("newGRPCMetricContext: %v", err) + } + defer mc.close() + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + monitoredResourceWant := map[string]string{ + "gcp.resource_type": "storage.googleapis.com/Client", + "api": "grpc", + "cloud_platform": "gcp", + "host_id": "gce-instance-id", + "location": "us-central1", + "project_id": "project-id", + "instance_id": "ignore", + } + for _, attr := range rm.Resource.Attributes() { + want := monitoredResourceWant[string(attr.Key)] + if want == "ignore" { + continue + } + got := attr.Value.AsString() + if want != got { + t.Errorf("got: %v want: %v", got, want) + } + } +} + func TestNewExporterLogSuppressor(t *testing.T) { ctx := context.Background() s := &exporterLogSuppressor{Exporter: &failingExporter{}} diff --git a/storage/integration_test.go b/storage/integration_test.go index ffaaae3b3818..8bb7614ba679 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -424,6 +424,10 @@ func TestIntegration_MetricsEnablement(t *testing.T) { if err != iterator.Done { t.Errorf("Objects.Next: expected iterator.Done got %v", err) } + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } metricCheck := map[string]bool{ "grpc.client.attempt.started": false, "grpc.client.attempt.duration": false, @@ -431,10 +435,6 @@ func TestIntegration_MetricsEnablement(t *testing.T) { "grpc.client.attempt.rcvd_total_compressed_message_size": false, "grpc.client.call.duration": false, } - rm := metricdata.ResourceMetrics{} - if err := mr.Collect(context.Background(), &rm); err != nil { - t.Errorf("ManualReader.Collect: %v", err) - } for _, sm := range rm.ScopeMetrics { for _, m := range sm.Metrics { metricCheck[m.Name] = true @@ -460,8 +460,39 @@ func TestIntegration_MetricsEnablementInGCE(t *testing.T) { if v, exists := attrs.Value("cloud.platform"); !exists || v.AsString() != "gcp_compute_engine" { t.Skip("only testable in a GCE instance") } + instance, exists := attrs.Value("host.id") + if !exists { + t.Skip("GCE instance id not detected") + } + if v, exists := attrs.Value("cloud.region"); !exists || !strings.Contains(strings.ToLower(v.AsString()), "us-west1") { + t.Skip("inside a GCE instance but region is not us-west1") + } it := client.Buckets(ctx, testutil.ProjID()) _, _ = it.Next() + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + + monitoredResourceWant := map[string]string{ + "gcp.resource_type": "storage.googleapis.com/Client", + "api": "grpc", + "cloud_platform": "gcp_compute_engine", + "host_id": instance.AsString(), + "location": "us-west1", + "project_id": testutil.ProjID(), + "instance_id": "ignore", // generated UUID + } + for _, attr := range rm.Resource.Attributes() { + want := monitoredResourceWant[string(attr.Key)] + if want == "ignore" { + continue + } + got := attr.Value.AsString() + if want != got { + t.Errorf("got: %v want: %v", got, want) + } + } metricCheck := map[string]bool{ "grpc.client.attempt.started": false, "grpc.client.attempt.duration": false, @@ -479,10 +510,6 @@ func TestIntegration_MetricsEnablementInGCE(t *testing.T) { // "grpc.lb.rls.target_picks": false, // "grpc.lb.rls.failed_picks": false, } - rm := metricdata.ResourceMetrics{} - if err := mr.Collect(context.Background(), &rm); err != nil { - t.Errorf("ManualReader.Collect: %v", err) - } for _, sm := range rm.ScopeMetrics { for _, m := range sm.Metrics { metricCheck[m.Name] = true From 585b72de63b936cc7d9c66b3b3665e47ac6dd05b Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Wed, 4 Dec 2024 09:37:03 -0800 Subject: [PATCH 13/15] add option to disable exporter in unit tests --- storage/grpc_metrics.go | 14 ++++++++------ storage/grpc_metrics_test.go | 7 ++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 99257d917c60..b8f7963a47d5 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -121,11 +121,12 @@ type metricsContext struct { } type metricsConfig struct { - project string - interval time.Duration - customExporter *metric.Exporter - manualReader *metric.ManualReader // used by tests - resourceOpts []resource.Option // used by tests + project string + interval time.Duration + customExporter *metric.Exporter + manualReader *metric.ManualReader // used by tests + disableExporter bool // used by tests disables exports + resourceOpts []resource.Option // used by tests } func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsContext, error) { @@ -164,7 +165,8 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte ) if cfg.manualReader != nil { meterOpts = append(meterOpts, metric.WithReader(cfg.manualReader)) - } else { + } + if !cfg.disableExporter { meterOpts = append(meterOpts, metric.WithReader( metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval)))) } diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index 05caefba4f67..d4bf38443f88 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -156,9 +156,10 @@ func TestNewGRPCMetricContext(t *testing.T) { Value: attribute.StringValue("gce-instance-id")}, } cfg := metricsConfig{ - project: "project-id", - manualReader: mr, - resourceOpts: []resource.Option{resource.WithAttributes(attrs...)}, + project: "project-id", + manualReader: mr, + disableExporter: true, // disable since this is a unit test + resourceOpts: []resource.Option{resource.WithAttributes(attrs...)}, } mc, err := newGRPCMetricContext(ctx, cfg) if err != nil { From 3de98fc7f7de0ce27065cf177c3aa2bf5d628320 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Fri, 20 Dec 2024 09:35:39 -0800 Subject: [PATCH 14/15] address feedback --- storage/grpc_metrics.go | 4 +++- storage/grpc_metrics_test.go | 16 ++++------------ storage/integration_test.go | 7 ------- 3 files changed, 7 insertions(+), 20 deletions(-) diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index b8f7963a47d5..f7bebd1defa7 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -58,7 +58,7 @@ func (smr *storageMonitoredResource) exporter() (metric.Exporter, error) { mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"}), ) if err != nil { - return nil, err + return nil, fmt.Errorf("storage: creating metrics exporter: %w", err) } return exporter, nil } @@ -74,6 +74,8 @@ func newStorageMonitoredResource(ctx context.Context, project, api string, opts project: project, } s := detectedAttrs.Set() + // Attempt to use resource detector project id if project id wasn't + // identified using ADC as a last resort. Otherwise metrics cannot be started. if p, present := s.Value("cloud.account.id"); present && smr.project == "" { smr.project = p.AsString() } else if !present && smr.project == "" { diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index d4bf38443f88..75297cb8695a 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -38,15 +38,11 @@ func TestStorageMonitoredResource(t *testing.T) { ctx := context.Background() for _, test := range []struct { desc string - project string - api string detectedAttributes []attribute.KeyValue wantAttributes attribute.Set }{ { - desc: "default values set when GCP attributes are not detected", - project: "project-id", - api: "grpc", + desc: "default values set when GCP attributes are not detected", wantAttributes: attribute.NewSet(attribute.KeyValue{ Key: "location", Value: attribute.StringValue("global"), @@ -65,9 +61,7 @@ func TestStorageMonitoredResource(t *testing.T) { }), }, { - desc: "use detected values when GCE attributes are detected", - project: "project-id", - api: "grpc", + desc: "use detected values when GCE attributes are detected", detectedAttributes: []attribute.KeyValue{ {Key: "cloud.region", Value: attribute.StringValue("us-central1")}, @@ -94,9 +88,7 @@ func TestStorageMonitoredResource(t *testing.T) { }), }, { - desc: "use detected values when FAAS attributes are detected", - project: "project-id", - api: "grpc", + desc: "use detected values when FAAS attributes are detected", detectedAttributes: []attribute.KeyValue{ {Key: "cloud.region", Value: attribute.StringValue("us-central1")}, @@ -124,7 +116,7 @@ func TestStorageMonitoredResource(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - smr, err := newStorageMonitoredResource(ctx, test.project, test.api, resource.WithAttributes(test.detectedAttributes...)) + smr, err := newStorageMonitoredResource(ctx, "project_id", "grpc", resource.WithAttributes(test.detectedAttributes...)) if err != nil { t.Errorf("newStorageMonitoredResource: %v", err) } diff --git a/storage/integration_test.go b/storage/integration_test.go index 791edf63256d..ef384296d0b5 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -505,13 +505,6 @@ func TestIntegration_MetricsEnablementInGCE(t *testing.T) { "grpc.lb.rls.cache_entries": false, "grpc.lb.rls.cache_size": false, "grpc.lb.rls.default_target_picks": false, - // TODO: determine a way to force these metrics to be collected - // "grpc.lb.wrr.rr_fallback": false, - // "grpc.lb.wrr.endpoint_weight_not_yet_usable": false, - // "grpc.lb.wrr.endpoint_weight_stale": false, - // "grpc.lb.wrr.endpoint_weights": false, - // "grpc.lb.rls.target_picks": false, - // "grpc.lb.rls.failed_picks": false, } for _, sm := range rm.ScopeMetrics { for _, m := range sm.Metrics { From 728feae9fd5d4e540b38001862ea04b2674570ca Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Fri, 20 Dec 2024 09:53:37 -0800 Subject: [PATCH 15/15] use project-id not project_id... --- storage/grpc_metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index 75297cb8695a..44d5ed89bd03 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -116,7 +116,7 @@ func TestStorageMonitoredResource(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - smr, err := newStorageMonitoredResource(ctx, "project_id", "grpc", resource.WithAttributes(test.detectedAttributes...)) + smr, err := newStorageMonitoredResource(ctx, "project-id", "grpc", resource.WithAttributes(test.detectedAttributes...)) if err != nil { t.Errorf("newStorageMonitoredResource: %v", err) }