From 47f1cee1bfd460a83adc9e7ab5d8dbe31861aa10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Getulio=20Valentin=20S=C3=A1nchez?= Date: Mon, 24 Aug 2020 14:47:49 -0400 Subject: [PATCH] stackdriver: Add stackdriver support for CRfA MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds a method to change the stackdriver query's fields to the ones needed by Cloud Run for Anthos. Signed-off-by: Getulio Valentin Sánchez --- cmd/operator/main.go | 88 ++++++-- cmd/operator/rollout.go | 34 +-- cmd/operator/service.go | 96 ++++++--- go.mod | 1 + internal/config/config.go | 48 ++++- internal/config/config_test.go | 58 +++++- internal/knative/cloudrun/gke.go | 81 ++++++++ internal/knative/cloudrun/provider.go | 142 +++++++++++++ internal/knative/mock/provider.go | 36 ++++ internal/knative/provider.go | 15 ++ internal/metrics/stackdriver/metrics.go | 194 +++++++++++++++++ internal/metrics/stackdriver/platform.go | 25 +++ internal/metrics/stackdriver/stackdriver.go | 218 ++++---------------- internal/rollout/rollout.go | 33 ++- internal/rollout/rollout_test.go | 11 +- internal/rollout/traffic_test.go | 6 +- internal/run/mock/runclient.go | 24 --- internal/run/wrapper.go | 100 --------- 18 files changed, 798 insertions(+), 412 deletions(-) create mode 100644 internal/knative/cloudrun/gke.go create mode 100644 internal/knative/cloudrun/provider.go create mode 100644 internal/knative/mock/provider.go create mode 100644 internal/knative/provider.go create mode 100644 internal/metrics/stackdriver/metrics.go create mode 100644 internal/metrics/stackdriver/platform.go delete mode 100644 internal/run/mock/runclient.go delete mode 100644 internal/run/wrapper.go diff --git a/cmd/operator/main.go b/cmd/operator/main.go index f99b4bf..4348442 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -59,10 +59,20 @@ var ( flCLI bool flCLILoopInterval time.Duration flHTTPAddr string - flProject string - flLabelSelector string - // Empty array means all regions. + // Generic target configuration + flPlatform string + flProject string + flLabelSelector string + + // Anthos target configuration + flGKEClusterLocation string + flGKECluster string + // TODO(gvso): Support multiple namespaces or all namespaces if none is + // specified. + flGKENamespace string + + // Fully-managed target config. Empty array means all regions. flRegions []string flRegionsString string @@ -91,8 +101,12 @@ func init() { flag.BoolVar(&flCLI, "cli", false, "run as CLI application to manage rollout in intervals") flag.DurationVar(&flCLILoopInterval, "cli-run-interval", 60*time.Second, "the time between each rollout process (in seconds)") flag.StringVar(&flHTTPAddr, "http-addr", defaultAddr, "address where to listen to http requests (e.g. :8080)") + flag.StringVar(&flPlatform, "platform", config.PlatformManaged, "target platform to query for Knative services (managed or gke)") flag.StringVar(&flProject, "project", "", "project in which the service is deployed") flag.StringVar(&flLabelSelector, "label", "rollout-strategy=gradual", "filter services based on a label (e.g. team=backend)") + flag.StringVar(&flGKEClusterLocation, "cluster-location", "", "zone in which the cluster is located") + flag.StringVar(&flGKECluster, "cluster", "", "ID of the cluster") + flag.StringVar(&flGKENamespace, "namespace", "default", "Kubernetes namespace where to look for Knative services") flag.StringVar(&flRegionsString, "regions", "", "the Cloud Run regions where the services should be looked at") flag.Var(&flSteps, "step", "a percentage in traffic the candidate should go through") flag.StringVar(&flStepsString, "steps", "5,20,50,80", "define steps in one flag separated by commas (e.g. 5,30,60)") @@ -150,7 +164,12 @@ func main() { logger.Debug(flagsToString()) // Configuration. - target := config.NewTarget(flProject, flRegions, flLabelSelector) + var target config.Target + if flPlatform == config.PlatformManaged { + target = config.NewManagedTarget(flProject, flRegions, flLabelSelector) + } else { + target = config.NewGKETarget(flProject, flGKEClusterLocation, flGKECluster, flGKENamespace, flLabelSelector) + } healthCriteria := healthCriteriaFromFlags(flMinRequestCount, flErrorRate, flLatencyP99, flLatencyP95, flLatencyP50) printHealthCriteria(logger, healthCriteria) strategy := config.NewStrategy(target, flSteps, flHealthOffset, flTimeBeweenRollouts, healthCriteria) @@ -197,14 +216,39 @@ func validateFlags() error { } } - for _, region := range flRegions { - if region == "" { - return errors.New("regions cannot be empty") + if flCLILoopInterval < 0 { + return errors.Errorf("cli run interval cannot be negative, got %s", flCLILoopInterval) + } + + if flPlatform != config.PlatformManaged && flPlatform != config.PlatformGKE { + return errors.Errorf("invalid platform %s", flPlatform) + } + + // GKE platform. + if flPlatform == "gke" { + if flGKECluster == "" { + return errors.Errorf("gke: cluster must be specified") + } + if flGKEClusterLocation == "" { + return errors.Errorf("gke: cluster location must be specified") } + if flGKENamespace == "" { + return errors.Errorf("gke: namespace cannot be empty") + } + return nil } - if flCLILoopInterval < 0 { - return errors.Errorf("cli run interval cannot be negative, got %s", flCLILoopInterval) + // Fully-managed platform. + for i, region := range flRegions { + if region == "" { + return errors.Errorf("region value at index %d cannot be empty", i) + } + } + if flGKECluster != "" { + return errors.New("cannot specify cluster for managed platform") + } + if flGKEClusterLocation != "" { + return errors.New("cannot specify cluster location for managed platform") } return nil } @@ -216,15 +260,25 @@ func flagsToString() string { } else { str += fmt.Sprintf("-http-addr=%s\n", flHTTPAddr) } - - regionsStr := "all" - if len(flRegions) != 0 { - regionsStr = fmt.Sprintf("%v", flRegions) + str += fmt.Sprintf("-project=%s\n-platform=%s\n", flProject, flPlatform) + + if flPlatform == config.PlatformGKE { + str += fmt.Sprintf("-cluster-location=%s\n"+ + "-cluster=%s\n"+ + "-namespace=%s\n", + flGKEClusterLocation, + flGKECluster, + flGKENamespace, + ) + } else { + regionsStr := "all" + if len(flRegions) != 0 { + regionsStr = fmt.Sprintf("%v", flRegions) + } + str += fmt.Sprintf("-regions=%s\n", regionsStr) } - str += fmt.Sprintf("-project=%s\n"+ - "-label=%s\n"+ - "-regions=%s\n"+ + str += fmt.Sprintf("-label=%s\n"+ "-steps=%s\n"+ "-healthcheck-offset=%s\n"+ "-min-wait=%s\n"+ @@ -233,9 +287,7 @@ func flagsToString() string { "-latency-p99=%.2f\n"+ "-latency-p95=%.2f\n"+ "-latency-p50=%.2f\n", - flProject, flLabelSelector, - regionsStr, flSteps, flHealthOffset, flTimeBeweenRollouts, diff --git a/cmd/operator/rollout.go b/cmd/operator/rollout.go index 9dfed50..7109f08 100644 --- a/cmd/operator/rollout.go +++ b/cmd/operator/rollout.go @@ -10,14 +10,15 @@ import ( "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/sheets" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/stackdriver" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout" - runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) // runRollouts concurrently handles the rollout of the targeted services. func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy) []error { - svcs, err := getTargetedServices(ctx, logger, strategy.Target) + ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger)) + svcs, err := getTargetedServices(ctx, strategy.Target) if err != nil { return []error{errors.Wrap(err, "failed to get targeted services")} } @@ -50,21 +51,13 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str // handleRollout manages the rollout process for a single service. func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy) error { - lg := logger.WithFields(logrus.Fields{ - "project": service.Project, - "service": service.Metadata.Name, - "region": service.Region, - }) + lg := logger.WithFields(service.KProvider.LoggingFields()) - client, err := runapi.NewAPIClient(ctx, service.Region) - if err != nil { - return errors.Wrap(err, "failed to initialize Cloud Run API client") - } - metricsProvider, err := chooseMetricsProvider(ctx, lg, service.Project, service.Region, service.Metadata.Name) + metricsProvider, err := chooseMetricsProvider(ctx, lg, service.Location, service.Metadata.Name) if err != nil { return errors.Wrap(err, "failed to initialize metrics provider") } - roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger) + roll := rollout.New(ctx, metricsProvider, service, strategy).WithLogger(lg) changed, err := roll.Rollout() if err != nil { @@ -94,11 +87,20 @@ func rolloutErrsToString(errs []error) (errsStr string) { // chooseMetricsProvider checks the CLI flags and determine which metrics // provider should be used for the rollout. -func chooseMetricsProvider(ctx context.Context, logger *logrus.Entry, project, region, svcName string) (metrics.Provider, error) { +func chooseMetricsProvider(ctx context.Context, logger *logrus.Entry, location, svcName string) (metrics.Provider, error) { if flGoogleSheetsID != "" { logger.Debug("using Google Sheets as metrics provider") - return sheets.NewProvider(ctx, flGoogleSheetsID, "", region, svcName) + return sheets.NewProvider(ctx, flGoogleSheetsID, "", location, svcName) } + logger.Debug("using Cloud Monitoring (Stackdriver) as metrics provider") - return stackdriver.NewProvider(ctx, project, region, svcName) + provider, err := stackdriver.NewProvider(ctx, flProject, location, svcName) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize stackdriver provider") + } + if flPlatform == config.PlatformGKE { + logger.Debug("pointing stackdriver to Cloud Run for Anthos") + provider = provider.WithGKEPlatform(flGKENamespace, flGKECluster) + } + return provider, nil } diff --git a/cmd/operator/service.go b/cmd/operator/service.go index db9fbf1..ed4eaa3 100644 --- a/cmd/operator/service.go +++ b/cmd/operator/service.go @@ -5,8 +5,9 @@ import ( "sync" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative/cloudrun" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout" - runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -15,7 +16,24 @@ import ( // getTargetedServices returned a list of service records that match the target // configuration. -func getTargetedServices(ctx context.Context, logger *logrus.Logger, target config.Target) ([]*rollout.ServiceRecord, error) { +func getTargetedServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) { + logger := util.LoggerFrom(ctx) + if target.Platform == config.PlatformManaged { + logger.Debug("getting services on Cloud Run fully managed") + return getManagedServices(ctx, target) + } + logger.Debug("getting services on Cloud Run for Anthos") + return getGKEServices(ctx, target) +} + +// getManagedServices returned a list of services from Cloud Run fully managed +// that match the target. +// +// It fetches the matches from each of the specified regions in the target +// configuration. If no regions are explicitly specified, it gets the list of +// Cloud Run regions and queries all of them. +func getManagedServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) { + logger := util.LoggerFrom(ctx) logger.Debug("querying Cloud Run API to get all targeted services") ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -27,7 +45,7 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf wg sync.WaitGroup ) - regions, err := determineRegions(ctx, logger, target) + regions, err := determineRegions(ctx, target) if err != nil { return nil, errors.Wrap(err, "cannot determine regions") } @@ -35,9 +53,16 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf for _, region := range regions { wg.Add(1) - go func(ctx context.Context, logger *logrus.Logger, region, labelSelector string) { + go func(ctx context.Context, logger *logrus.Entry, project, region, labelSelector string) { defer wg.Done() - svcs, err := getServicesByRegionAndLabel(ctx, logger, target.Project, region, target.LabelSelector) + + provider, err := cloudrun.NewFullyManagedProvider(ctx, project, region) + if err != nil { + retError = errors.Wrap(err, "failed to initialize Cloud Run fully managed client") + cancel() + return + } + svcs, err := getServicesByLabel(ctx, provider, project, labelSelector) if err != nil { retError = err cancel() @@ -46,34 +71,49 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf for _, svc := range svcs { mu.Lock() - retServices = append(retServices, newServiceRecord(svc, target.Project, region)) + retServices = append(retServices, newServiceRecord(svc, provider, project, region)) mu.Unlock() } - }(ctx, logger, region, target.LabelSelector) + }(ctx, logger, target.Project, region, target.LabelSelector) } wg.Wait() return retServices, retError } -// getServicesByRegionAndLabel returns all the service records that match the -// labelSelector in a specific region. -func getServicesByRegionAndLabel(ctx context.Context, logger *logrus.Logger, project, region, labelSelector string) ([]*run.Service, error) { +// getGKEServices get the services running on Cloud Run for Anthos. +func getGKEServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) { + logger := util.LoggerFrom(ctx) + provider, err := cloudrun.NewGKEProvider(ctx, target.Project, target.GKEClusterLocation, target.GKEClusterName) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize Cloud Run for Anthos client") + } + + logger.Debugf("querying for services on GKE cluster, namespace %s", target.GKENamespace) + svcs, err := getServicesByLabel(ctx, provider, target.GKENamespace, target.LabelSelector) + + var services []*rollout.ServiceRecord + for _, svc := range svcs { + services = append(services, newServiceRecord(svc, provider, target.GKENamespace, target.GKEClusterLocation)) + } + return services, nil +} + +// getServicesByLabel returns all the service records that match the label +// selector. +// +// For Cloud Run fully managed, the namespace is the project ID. +func getServicesByLabel(ctx context.Context, provider knative.Provider, namespace, labelSelector string) ([]*run.Service, error) { + logger := util.LoggerFrom(ctx) lg := logger.WithFields(logrus.Fields{ - "region": region, "labelSelector": labelSelector, }) - lg.Debug("querying Cloud Run services") - runclient, err := runapi.NewAPIClient(ctx, region) + lg.Debug("querying for services in provider") + svcs, err := provider.ServicesWithLabelSelector(namespace, labelSelector) if err != nil { - return nil, errors.Wrap(err, "failed to initialize Cloud Run client") - } - - svcs, err := runclient.ServicesWithLabelSelector(project, labelSelector) - if err != nil { - return nil, errors.Wrapf(err, "failed to get services with label %q in region %q", labelSelector, region) + return nil, errors.Wrapf(err, "failed to get services with label %q", labelSelector) } lg.WithField("n", len(svcs)).Debug("finished retrieving services from the API") @@ -81,10 +121,12 @@ func getServicesByRegionAndLabel(ctx context.Context, logger *logrus.Logger, pro } // determineRegions gets the regions the label selector should be searched at. +// Used for Cloud Run fuly managed. // // If the target configuration does not specify any regions, the entire list of // regions is retrieved from API. -func determineRegions(ctx context.Context, logger *logrus.Logger, target config.Target) ([]string, error) { +func determineRegions(ctx context.Context, target config.Target) ([]string, error) { + logger := util.LoggerFrom(ctx) regions := target.Regions if len(regions) != 0 { logger.Debug("using predefined list of regions, skip querying from API") @@ -92,10 +134,7 @@ func determineRegions(ctx context.Context, logger *logrus.Logger, target config. } logger.Debug("retrieving all regions from the API") - - lg := logrus.NewEntry(logger) - ctx = util.ContextWithLogger(ctx, lg) - regions, err := runapi.Regions(ctx, target.Project) + regions, err := cloudrun.Regions(ctx, target.Project) if err != nil { return nil, errors.Wrap(err, "cannot get list of regions from Cloud Run API") } @@ -105,10 +144,11 @@ func determineRegions(ctx context.Context, logger *logrus.Logger, target config. } // newServiceRecord creates a new service record. -func newServiceRecord(svc *run.Service, project, region string) *rollout.ServiceRecord { +func newServiceRecord(svc *run.Service, provider knative.Provider, namespace, location string) *rollout.ServiceRecord { return &rollout.ServiceRecord{ - Service: svc, - Project: project, - Region: region, + Service: svc, + KProvider: provider, + Namespace: namespace, + Location: location, } } diff --git a/go.mod b/go.mod index 2c4bbdf..0f1a3b9 100644 --- a/go.mod +++ b/go.mod @@ -11,5 +11,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.6.0 github.com/stretchr/testify v1.6.1 + golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d google.golang.org/api v0.28.0 ) diff --git a/internal/config/config.go b/internal/config/config.go index a83f84f..890ac29 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,12 @@ const ( ErrorRateMetricsCheck MetricsCheck = "error-rate-percent" ) +// Supported Knative platforms. +const ( + PlatformManaged string = "managed" + PlatformGKE string = "gke" +) + // Target is the configuration to filter services. // // A target might have the following form @@ -27,8 +33,18 @@ const ( // } type Target struct { Project string - Regions []string LabelSelector string + + // Either "managed" or "gke". + Platform string + + // Regions are used when targeting Cloud Run fully-managed services. + Regions []string + + // Anthos target configuration. + GKEClusterLocation string + GKEClusterName string + GKENamespace string } // HealthCriterion is a metrics threshold that should be met to consider a @@ -53,15 +69,28 @@ type Config struct { Strategies []Strategy } -// NewTarget initializes a target to filter services by label. -func NewTarget(project string, regions []string, labelSelector string) Target { +// NewManagedTarget initializes a target for Cloud Run fully-managed. +func NewManagedTarget(project string, regions []string, labelSelector string) Target { return Target{ + Platform: PlatformManaged, Project: project, Regions: regions, LabelSelector: labelSelector, } } +// NewGKETarget initializes a target for Cloud Run on Anthos. +func NewGKETarget(project, clusterLocation, clusterName, namespace, label string) Target { + return Target{ + Platform: PlatformGKE, + Project: project, + GKEClusterLocation: clusterLocation, + GKEClusterName: clusterName, + GKENamespace: namespace, + LabelSelector: label, + } +} + // NewStrategy initializes a strategy. func NewStrategy(target Target, steps []int64, healthOffset, timeBetweenRollouts time.Duration, healthCriteria []HealthCriterion) Strategy { return Strategy{ @@ -143,5 +172,18 @@ func validateTarget(target Target) error { if target.LabelSelector == "" { return errors.Errorf("label must be specified") } + + if target.Platform == PlatformGKE { + if target.GKEClusterLocation == "" { + return errors.New("cluster location required for GKE platform") + } + if target.GKEClusterName == "" { + return errors.New("cluster name required for GKE platform") + } + if target.GKENamespace == "" { + return errors.New("namespace required for GKE platform") + } + } + return nil } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 2d43ca7..d04fd45 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -20,7 +20,7 @@ func TestStrategy_Validate(t *testing.T) { }{ { name: "correct config with label selector", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{5, 30, 60}, healthOffset: 10 * time.Minute, timeBetweenRollouts: 10 * time.Minute, @@ -32,7 +32,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "missing project", - target: config.NewTarget("", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{5, 30, 60}, healthOffset: 10 * time.Minute, timeBetweenRollouts: 10 * time.Minute, @@ -41,7 +41,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "missing steps", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{}, healthOffset: 10 * time.Minute, timeBetweenRollouts: 10 * time.Minute, @@ -49,7 +49,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "steps not in order", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{30, 30, 60}, healthOffset: 10 * time.Minute, timeBetweenRollouts: 10 * time.Minute, @@ -57,7 +57,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "step greater than 100", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{5, 30, 101}, healthOffset: 20, timeBetweenRollouts: 10 * time.Minute, @@ -65,7 +65,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "non-positive health offset", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{5, 30, 60}, healthOffset: 0, timeBetweenRollouts: 10 * time.Minute, @@ -73,7 +73,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "empty label selector", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, ""), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, ""), steps: []int64{5, 30, 60}, healthOffset: 20, timeBetweenRollouts: 10 * time.Minute, @@ -81,7 +81,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "invalid request count value", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{5, 30, 60}, healthOffset: 20, timeBetweenRollouts: 10 * time.Minute, @@ -92,7 +92,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "invalid error rate in criteria", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{5, 30, 60}, healthOffset: 20, timeBetweenRollouts: 10 * time.Minute, @@ -103,7 +103,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "invalid latency percentile", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{5, 30, 60}, healthOffset: 20, timeBetweenRollouts: 10 * time.Minute, @@ -114,7 +114,7 @@ func TestStrategy_Validate(t *testing.T) { }, { name: "invalid latency value", - target: config.NewTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), + target: config.NewManagedTarget("myproject", []string{"us-east1", "us-west1"}, "team=backend"), steps: []int64{5, 30, 60}, healthOffset: 20, timeBetweenRollouts: 10 * time.Minute, @@ -123,6 +123,42 @@ func TestStrategy_Validate(t *testing.T) { }, shouldErr: true, }, + { + name: "GKE platform requires cluster location", + target: config.NewGKETarget("myproject", "", "testcluster", "default", "team=backend"), + steps: []int64{5, 30, 60}, + healthOffset: 10 * time.Minute, + timeBetweenRollouts: 10 * time.Minute, + healthCriteria: []config.HealthCriterion{ + {Metric: config.LatencyMetricsCheck, Percentile: 99, Threshold: 750}, + {Metric: config.RequestCountMetricsCheck, Threshold: 1000}, + }, + shouldErr: true, + }, + { + name: "GKE platform requires cluster name", + target: config.NewGKETarget("myproject", "us-central1-a", "", "default", "team=backend"), + steps: []int64{5, 30, 60}, + healthOffset: 10 * time.Minute, + timeBetweenRollouts: 10 * time.Minute, + healthCriteria: []config.HealthCriterion{ + {Metric: config.LatencyMetricsCheck, Percentile: 99, Threshold: 750}, + {Metric: config.RequestCountMetricsCheck, Threshold: 1000}, + }, + shouldErr: true, + }, + { + name: "GKE platform requires namespace", + target: config.NewGKETarget("myproject", "us-central1-a", "testcluster", "", "team=backend"), + steps: []int64{5, 30, 60}, + healthOffset: 10 * time.Minute, + timeBetweenRollouts: 10 * time.Minute, + healthCriteria: []config.HealthCriterion{ + {Metric: config.LatencyMetricsCheck, Percentile: 99, Threshold: 750}, + {Metric: config.RequestCountMetricsCheck, Threshold: 1000}, + }, + shouldErr: true, + }, } for _, test := range tests { diff --git a/internal/knative/cloudrun/gke.go b/internal/knative/cloudrun/gke.go new file mode 100644 index 0000000..51a9de7 --- /dev/null +++ b/internal/knative/cloudrun/gke.go @@ -0,0 +1,81 @@ +package cloudrun + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "fmt" + "net/http" + + "github.com/pkg/errors" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + "google.golang.org/api/compute/v1" + "google.golang.org/api/container/v1" +) + +// ClusterClient is a GKE client. +type ClusterClient struct { + HTTPClient *http.Client + Endpoint string +} + +// newGKEClient initializes a client for GKE cluster. +func newGKEClient(ctx context.Context, project, zone, clusterName string) (*http.Client, string, error) { + tokenSource, err := google.DefaultTokenSource(ctx, compute.CloudPlatformScope) + if err != nil { + return nil, "", errors.Wrap(err, "failed to get token source") + } + + httpClient := oauth2.NewClient(ctx, tokenSource) + containerService, err := container.New(httpClient) + if err != nil { + return nil, "", fmt.Errorf("could not create client for Google Kubernetes Engine: %v", err) + } + + cluster, err := containerService.Projects.Zones.Clusters.Get(project, zone, clusterName).Context(ctx).Do() + if err != nil { + return nil, "", errors.Wrapf(err, "failed to get cluster %q in project %q, zone %q", clusterName, project, zone) + } + + hClient, err := newGKEHTTPClient(cluster, tokenSource) + if err != nil { + return nil, "", errors.Wrap(err, "failed to initialize HTTP client") + } + + return hClient, cluster.Endpoint, nil +} + +func newGKEHTTPClient(cluster *container.Cluster, tokenSource oauth2.TokenSource) (*http.Client, error) { + tlsCfg, err := gkeTLSConfig(cluster) + if err != nil { + return nil, errors.Wrap(err, "failed to get TLS configuration") + } + + hTransport := http.DefaultTransport.(*http.Transport).Clone() + hTransport.TLSClientConfig = tlsCfg + oauth2Transport := &oauth2.Transport{ + Base: hTransport, + Source: tokenSource, + } + return &http.Client{Transport: oauth2Transport}, nil +} + +func gkeTLSConfig(cluster *container.Cluster) (*tls.Config, error) { + caCert, err := base64.StdEncoding.DecodeString(cluster.MasterAuth.ClusterCaCertificate) + if err != nil { + return nil, errors.Wrap(err, "failed to decode cluster certificate") + } + + // CA Cert from kube master + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM([]byte(caCert)) + + // Setup TLS config + tlsConfig := &tls.Config{ + RootCAs: caCertPool, + } + + return tlsConfig, nil +} diff --git a/internal/knative/cloudrun/provider.go b/internal/knative/cloudrun/provider.go new file mode 100644 index 0000000..592e034 --- /dev/null +++ b/internal/knative/cloudrun/provider.go @@ -0,0 +1,142 @@ +package cloudrun + +import ( + "context" + "fmt" + + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "google.golang.org/api/option" + "google.golang.org/api/run/v1" +) + +// Provider is a KServiceProvider for Cloud Run (gke or fully managed). +type Provider struct { + apiService *run.APIService + project string + platform string + + // Managed + region string + + // GKE + clusterLocation string + clusterName string +} + +// regions are the available regions. +// +// TODO: caching regions might be unnecessary if we are querying them once during +// the lifespan of the process. +var regions = []string{} + +// NewFullyManagedProvider returns a provider for Cloud Run fully managed. +func NewFullyManagedProvider(ctx context.Context, project, region string) (*Provider, error) { + regionalEndpoint := fmt.Sprintf("https://%s-run.googleapis.com/", region) + apiService, err := run.NewService(ctx, option.WithEndpoint(regionalEndpoint)) + if err != nil { + return nil, errors.Wrap(err, "could not initialize client for the Cloud Run API") + } + + return &Provider{ + apiService: apiService, + project: project, + platform: "managed", + region: region, + }, nil +} + +// NewGKEProvider returns a provider for Cloud Run for Anthos. +func NewGKEProvider(ctx context.Context, project, zone, clusterName string) (*Provider, error) { + hClient, endpoint, err := newGKEClient(ctx, project, zone, clusterName) + if err != nil { + return nil, errors.Wrapf(err, "failed to initialize client to GKE cluster %s, zone %s", clusterName, zone) + } + + apiService, err := run.NewService(ctx, + option.WithHTTPClient(hClient), + option.WithEndpoint(endpoint), + ) + if err != nil { + return nil, errors.Wrap(err, "could not initialize client for the Cloud Run API") + } + + return &Provider{ + apiService: apiService, + project: project, + platform: "gke", + clusterLocation: zone, + clusterName: clusterName, + }, nil +} + +// ReplaceService replaces an existing service. +func (p *Provider) ReplaceService(namespace, serviceID string, svc *run.Service) (*run.Service, error) { + serviceName := serviceName(namespace, serviceID) + return p.apiService.Namespaces.Services.ReplaceService(serviceName, svc).Do() +} + +// ServicesWithLabelSelector gets services filtered by a label selector. +func (p *Provider) ServicesWithLabelSelector(namespace string, labelSelector string) ([]*run.Service, error) { + parent := fmt.Sprintf("namespaces/%s", namespace) + servicesList, err := p.apiService.Namespaces.Services.List(parent).LabelSelector(labelSelector).Do() + if err != nil { + return nil, errors.Wrap(err, "failed to filter services by label selector") + } + + return servicesList.Items, nil +} + +// LoggingFields returns the logging fields related to this provider. +func (p *Provider) LoggingFields() logrus.Fields { + switch p.platform { + case "gke": + return logrus.Fields{ + "project": p.project, + "clusterLocation": p.clusterLocation, + "clusterName": p.clusterName, + } + case "managed": + return logrus.Fields{ + "project": p.project, + "region": p.region, + } + default: + return logrus.Fields{} + } +} + +// Regions gets the supported regions for the project (for Cloud Run fully +// managed). +func Regions(ctx context.Context, project string) ([]string, error) { + logger := util.LoggerFrom(ctx) + if len(regions) != 0 { + logger.Debug("using cached regions, skip querying from API") + return regions, nil + } + + client, err := run.NewService(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not initialize client for the Cloud Run API") + } + + name := fmt.Sprintf("projects/%s", project) + resp, err := client.Projects.Locations.List(name).Do() + if err != nil { + return nil, errors.Wrap(err, "failed to get locations") + } + + for _, location := range resp.Locations { + regions = append(regions, location.LocationId) + } + return regions, nil +} + +// serviceName returns the name of the specified service. It returns the +// form namespaces/{namespace_id}/services/{service_id}. +// +// For Cloud Run (fully managed), the namespace is the project ID or number. +func serviceName(namespace, serviceID string) string { + return fmt.Sprintf("namespaces/%s/services/%s", namespace, serviceID) +} diff --git a/internal/knative/mock/provider.go b/internal/knative/mock/provider.go new file mode 100644 index 0000000..79316dc --- /dev/null +++ b/internal/knative/mock/provider.go @@ -0,0 +1,36 @@ +package mock + +import ( + "github.com/sirupsen/logrus" + "google.golang.org/api/run/v1" +) + +// Provider represents a mock implementation of knative.Provider. +type Provider struct { + ServicesWithLabelSelectorFn func(namespace, labelSelector string) ([]*run.Service, error) + ServicesWithLabelSelectorInvoked bool + + ReplaceServiceFn func(namespace, serviceID string, svc *run.Service) (*run.Service, error) + ReplaceServiceInvoked bool + + LoggingFieldsFn func() logrus.Fields + LoggingFieldsInvoked bool +} + +// ServicesWithLabelSelector invokes the mock implementation and marks the function as invoked. +func (p *Provider) ServicesWithLabelSelector(namespace, service string) ([]*run.Service, error) { + p.ServicesWithLabelSelectorInvoked = true + return p.ServicesWithLabelSelectorFn(namespace, service) +} + +// ReplaceService invokes the mock implementation and marks the function as invoked. +func (p *Provider) ReplaceService(namespace, serviceID string, svc *run.Service) (*run.Service, error) { + p.ReplaceServiceInvoked = true + return p.ReplaceServiceFn(namespace, serviceID, svc) +} + +// LoggingFields invokes the mock implementation and marks the function as invoked. +func (p *Provider) LoggingFields() logrus.Fields { + p.LoggingFieldsInvoked = true + return p.LoggingFieldsFn() +} diff --git a/internal/knative/provider.go b/internal/knative/provider.go new file mode 100644 index 0000000..8fc0c01 --- /dev/null +++ b/internal/knative/provider.go @@ -0,0 +1,15 @@ +package knative + +import ( + "github.com/sirupsen/logrus" + "google.golang.org/api/run/v1" +) + +// Provider represents a Knative client. +type Provider interface { + ServicesWithLabelSelector(namespace string, labelSelector string) ([]*run.Service, error) + ReplaceService(namespace, serviceID string, svc *run.Service) (*run.Service, error) + + // Returns the logging fields related to this provider. + LoggingFields() logrus.Fields +} diff --git a/internal/metrics/stackdriver/metrics.go b/internal/metrics/stackdriver/metrics.go new file mode 100644 index 0000000..7bdf975 --- /dev/null +++ b/internal/metrics/stackdriver/metrics.go @@ -0,0 +1,194 @@ +package stackdriver + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + monitoring "google.golang.org/api/monitoring/v3" +) + +// requestCount count returns the number of requests for the given offset. +func requestCount(ctx context.Context, p Provider, q query, offset time.Duration) (int64, error) { + endTime := time.Now() + endTimeString := endTime.Format(time.RFC3339Nano) + startTime := endTime.Add(-1 * offset) + startTimeString := startTime.Format(time.RFC3339Nano) + offsetString := fmt.Sprintf("%fs", offset.Seconds()) + log.Println(q) + req := p.metricsClient.Projects.TimeSeries.List("projects/" + p.project). + Filter(string(q)). + IntervalStartTime(startTimeString). + IntervalEndTime(endTimeString). + AggregationAlignmentPeriod(offsetString). + AggregationPerSeriesAligner("ALIGN_DELTA"). + AggregationGroupByFields("resource.labels.service_name"). + AggregationCrossSeriesReducer("REDUCE_SUM") + + logger := util.LoggerFrom(ctx).WithFields(logrus.Fields{ + "intervalStartTime": startTimeString, + "intervalEndTime": endTimeString, + "metrics": "request-count", + }) + logger.Debug("querying Cloud Monitoring API") + timeSeries, err := makeRequestForTimeSeries(logger, req) + if err != nil { + return 0, errors.Wrap(err, "error when querying for time series") + } + + // This happens when no request was made during the given offset. + if len(timeSeries) == 0 { + return 0, nil + } + // The request count is aggregated for the entire service, so only one time + // series and a point is returned. There's no need for a loop. + series := timeSeries[0] + if len(series.Points) == 0 { + return 0, errors.New("no data point was retrieved") + } + return *(series.Points[0].Value.Int64Value), nil +} + +// latency returns the latency for the resource for the given offset. +// It returns 0 if no request was made during the interval. +func latency(ctx context.Context, p Provider, q query, offset time.Duration, alignReduceType metrics.AlignReduce) (float64, error) { + endTime := time.Now() + endTimeString := endTime.Format(time.RFC3339Nano) + startTime := endTime.Add(-1 * offset) + startTimeString := startTime.Format(time.RFC3339Nano) + aligner, reducer := alignerAndReducer(alignReduceType) + offsetString := fmt.Sprintf("%fs", offset.Seconds()) + log.Println(q) + req := p.metricsClient.Projects.TimeSeries.List("projects/" + p.project). + Filter(string(q)). + IntervalStartTime(startTimeString). + IntervalEndTime(endTimeString). + AggregationAlignmentPeriod(offsetString). + AggregationPerSeriesAligner(aligner). + AggregationGroupByFields("resource.labels.service_name"). + AggregationCrossSeriesReducer(reducer) + + logger := util.LoggerFrom(ctx).WithFields(logrus.Fields{ + "intervalStartTime": startTimeString, + "intervalEndTime": endTimeString, + "metrics": "latency", + "aligner": aligner, + "reducer": reducer, + }) + logger.Debug("querying Cloud Monitoring API") + timeSeries, err := makeRequestForTimeSeries(logger, req) + if err != nil { + return 0, errors.Wrap(err, "error when querying for time series") + } + + // This happens when no request was made during the given offset. + if len(timeSeries) == 0 { + return 0, nil + } + // The request count is aggregated for the entire service, so only one time + // series and a point is returned. There's no need for a loop. + series := timeSeries[0] + if len(series.Points) == 0 { + return 0, errors.New("no data point was retrieved") + } + return *(series.Points[0].Value.DoubleValue), nil +} + +// errorRate returns the rate of 5xx errors for the resource in the given +// offset. It returns 0 if no request was made during the interval. +func errorRate(ctx context.Context, p Provider, q query, offset time.Duration) (float64, error) { + endTime := time.Now() + endTimeString := endTime.Format(time.RFC3339Nano) + startTime := endTime.Add(-1 * offset) + startTimeString := startTime.Format(time.RFC3339Nano) + offsetString := fmt.Sprintf("%fs", offset.Seconds()) + log.Println(q) + req := p.metricsClient.Projects.TimeSeries.List("projects/" + p.project). + Filter(string(q)). + IntervalStartTime(startTimeString). + IntervalEndTime(endTimeString). + AggregationAlignmentPeriod(offsetString). + AggregationPerSeriesAligner("ALIGN_DELTA"). + AggregationGroupByFields("metric.labels.response_code_class"). + AggregationCrossSeriesReducer("REDUCE_SUM") + + logger := util.LoggerFrom(ctx).WithFields(logrus.Fields{ + "intervalStartTime": startTimeString, + "intervalEndTime": endTimeString, + "metrics": "error-rate", + }) + logger.Debug("querying Cloud Monitoring API") + timeSeries, err := makeRequestForTimeSeries(logger, req) + if err != nil { + return 0, errors.Wrap(err, "error when querying for time series") + } + + // This happens when no request was made during the given offset. + if len(timeSeries) == 0 { + return 0, nil + } + return calculateErrorResponseRate(timeSeries) +} + +func makeRequestForTimeSeries(logger *logrus.Entry, req *monitoring.ProjectsTimeSeriesListCall) ([]*monitoring.TimeSeries, error) { + resp, err := req.Do() + if err != nil { + return nil, errors.Wrap(err, "error when retrieving time series") + } + if len(resp.ExecutionErrors) != 0 { + for _, execError := range resp.ExecutionErrors { + logger.WithField("message", execError.Message).Warn("execution error occurred") + } + return nil, errors.Errorf("execution errors occurred") + } + + return resp.TimeSeries, nil +} + +// calculateErrorResponseRate calculates the percentage of 5xx error response. +// +// It gets all the server responses and calculates the error rate by performing +// the operation (5xx responses / all responses). Then, it divides the number of +// error responses by the total. +func calculateErrorResponseRate(timeSeries []*monitoring.TimeSeries) (float64, error) { + var errorResponseCount, totalResponses int64 + for _, series := range timeSeries { + // Because the interval and the series aligner are the same, only one + // point is returned per time series. + switch series.Metric.Labels["response_code_class"] { + case "5xx": + errorResponseCount += *(series.Points[0].Value.Int64Value) + default: + totalResponses += *(series.Points[0].Value.Int64Value) + } + } + + totalResponses += errorResponseCount + if totalResponses == 0 { + return 0, nil + } + + rate := float64(errorResponseCount) / float64(totalResponses) + return rate, nil +} + +func alignerAndReducer(alignReduceType metrics.AlignReduce) (aligner string, reducer string) { + switch alignReduceType { + case metrics.Align99Reduce99: + aligner = "ALIGN_PERCENTILE_99" + reducer = "REDUCE_PERCENTILE_99" + case metrics.Align95Reduce95: + aligner = "ALIGN_PERCENTILE_95" + reducer = "REDUCE_PERCENTILE_50" + case metrics.Align50Reduce50: + aligner = "ALIGN_PERCENTILE_50" + reducer = "REDUCE_PERCENTILE_50" + } + + return +} diff --git a/internal/metrics/stackdriver/platform.go b/internal/metrics/stackdriver/platform.go new file mode 100644 index 0000000..8f18e87 --- /dev/null +++ b/internal/metrics/stackdriver/platform.go @@ -0,0 +1,25 @@ +package stackdriver + +type platform interface { + RequestLatenciesMetricType() string + RequestCountMetricType() string +} + +type managed struct{} +type gke struct{} + +func (managed) RequestLatenciesMetricType() string { + return "run.googleapis.com/request_latencies" +} + +func (managed) RequestCountMetricType() string { + return "run.googleapis.com/request_count" +} + +func (gke) RequestLatenciesMetricType() string { + return "knative.dev/serving/revision/request_latencies" +} + +func (gke) RequestCountMetricType() string { + return "knative.dev/serving/revision/request_count" +} diff --git a/internal/metrics/stackdriver/stackdriver.go b/internal/metrics/stackdriver/stackdriver.go index b318006..7ceb89a 100644 --- a/internal/metrics/stackdriver/stackdriver.go +++ b/internal/metrics/stackdriver/stackdriver.go @@ -6,9 +6,7 @@ import ( "time" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics" - "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" "github.com/pkg/errors" - "github.com/sirupsen/logrus" // TODO: Migrate to cloud.google.com/go/monitoring/apiv3/v2 once RPC for MQL // query is added (https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/query). @@ -23,224 +21,78 @@ type Provider struct { metricsClient *monitoring.Service project string + // The platform the Cloud Run service is running on. + platform + // query is used to filter the metrics for the wanted resource. + // + // TODO: Use a different data structure to keep track of the filters and + // build the query just before calling the API. query } // Metric types. const ( - requestLatencies = "run.googleapis.com/request_latencies" - requestCount = "run.googleapis.com/request_count" + cloudRunManagedRequestLatencies = "run.googleapis.com/request_latencies" + cloudRunManagedRequestCount = "run.googleapis.com/request_count" + gkeRequestLatencies = "knative.dev/serving/revision/request_latencies" + gkeRequestCount = "knative.dev/serving/revision/request_count" ) // NewProvider initializes the provider for Cloud Monitoring. -func NewProvider(ctx context.Context, project string, region string, serviceName string) (*Provider, error) { +func NewProvider(ctx context.Context, project string, location string, serviceName string) (Provider, error) { client, err := monitoring.NewService(ctx) if err != nil { - return nil, errors.Wrap(err, "could not initialize Cloud Metics client") + return Provider{}, errors.Wrap(err, "could not initialize Cloud Metics client") } - return &Provider{ + return Provider{ metricsClient: client, project: project, - query: newQuery(project, region, serviceName), + platform: managed{}, + query: newQuery(project, location, serviceName), }, nil } +// WithGKEPlatform updates the Cloud Run platform to GKE. +func (p Provider) WithGKEPlatform(namespace, clusterName string) Provider { + p.query = p.addFilter("resource.label.namespace_name", namespace). + addFilter("resource.label.cluster_name", clusterName) + p.platform = gke{} + return p +} + // SetCandidateRevision sets the candidate revision name for which the provider // should get metrics. -func (p *Provider) SetCandidateRevision(revisionName string) { +func (p Provider) SetCandidateRevision(revisionName string) { p.query = p.query.addFilter("resource.labels.revision_name", revisionName) } // RequestCount count returns the number of requests for the given offset. -func (p *Provider) RequestCount(ctx context.Context, offset time.Duration) (int64, error) { - query := p.addFilter("metric.type", requestCount) - endTime := time.Now() - endTimeString := endTime.Format(time.RFC3339Nano) - startTime := endTime.Add(-1 * offset) - startTimeString := startTime.Format(time.RFC3339Nano) - offsetString := fmt.Sprintf("%fs", offset.Seconds()) - - req := p.metricsClient.Projects.TimeSeries.List("projects/" + p.project). - Filter(string(query)). - IntervalStartTime(startTimeString). - IntervalEndTime(endTimeString). - AggregationAlignmentPeriod(offsetString). - AggregationPerSeriesAligner("ALIGN_DELTA"). - AggregationGroupByFields("resource.labels.service_name"). - AggregationCrossSeriesReducer("REDUCE_SUM") - - logger := util.LoggerFrom(ctx).WithFields(logrus.Fields{ - "intervalStartTime": startTimeString, - "intervalEndTime": endTimeString, - "metrics": "request-count", - }) - logger.Debug("querying Cloud Monitoring API") - timeSeries, err := makeRequestForTimeSeries(logger, req) - if err != nil { - return 0, errors.Wrap(err, "error when querying for time series") - } - - // This happens when no request was made during the given offset. - if len(timeSeries) == 0 { - return 0, nil - } - // The request count is aggregated for the entire service, so only one time - // series and a point is returned. There's no need for a loop. - series := timeSeries[0] - if len(series.Points) == 0 { - return 0, errors.New("no data point was retrieved") - } - return *(series.Points[0].Value.Int64Value), nil +func (p Provider) RequestCount(ctx context.Context, offset time.Duration) (int64, error) { + query := p.addFilter("metric.type", p.platform.RequestCountMetricType()) + return requestCount(ctx, p, query, offset) } // Latency returns the latency for the resource for the given offset. // It returns 0 if no request was made during the interval. -func (p *Provider) Latency(ctx context.Context, offset time.Duration, alignReduceType metrics.AlignReduce) (float64, error) { - query := p.query.addFilter("metric.type", requestLatencies) - endTime := time.Now() - endTimeString := endTime.Format(time.RFC3339Nano) - startTime := endTime.Add(-1 * offset) - startTimeString := startTime.Format(time.RFC3339Nano) - aligner, reducer := alignerAndReducer(alignReduceType) - offsetString := fmt.Sprintf("%fs", offset.Seconds()) - - req := p.metricsClient.Projects.TimeSeries.List("projects/" + p.project). - Filter(string(query)). - IntervalStartTime(startTimeString). - IntervalEndTime(endTimeString). - AggregationAlignmentPeriod(offsetString). - AggregationPerSeriesAligner(aligner). - AggregationGroupByFields("resource.labels.service_name"). - AggregationCrossSeriesReducer(reducer) - - logger := util.LoggerFrom(ctx).WithFields(logrus.Fields{ - "intervalStartTime": startTimeString, - "intervalEndTime": endTimeString, - "metrics": "latency", - "aligner": aligner, - "reducer": reducer, - }) - logger.Debug("querying Cloud Monitoring API") - timeSeries, err := makeRequestForTimeSeries(logger, req) - if err != nil { - return 0, errors.Wrap(err, "error when querying for time series") - } - - // This happens when no request was made during the given offset. - if len(timeSeries) == 0 { - return 0, nil - } - // The request count is aggregated for the entire service, so only one time - // series and a point is returned. There's no need for a loop. - series := timeSeries[0] - if len(series.Points) == 0 { - return 0, errors.New("no data point was retrieved") - } - return *(series.Points[0].Value.DoubleValue), nil +func (p Provider) Latency(ctx context.Context, offset time.Duration, alignReduceType metrics.AlignReduce) (float64, error) { + query := p.query.addFilter("metric.type", p.platform.RequestLatenciesMetricType()) + return latency(ctx, p, query, offset, alignReduceType) } // ErrorRate returns the rate of 5xx errors for the resource in the given offset. // It returns 0 if no request was made during the interval. -func (p *Provider) ErrorRate(ctx context.Context, offset time.Duration) (float64, error) { - query := p.query.addFilter("metric.type", requestCount) - endTime := time.Now() - endTimeString := endTime.Format(time.RFC3339Nano) - startTime := endTime.Add(-1 * offset) - startTimeString := startTime.Format(time.RFC3339Nano) - offsetString := fmt.Sprintf("%fs", offset.Seconds()) - - req := p.metricsClient.Projects.TimeSeries.List("projects/" + p.project). - Filter(string(query)). - IntervalStartTime(startTimeString). - IntervalEndTime(endTimeString). - AggregationAlignmentPeriod(offsetString). - AggregationPerSeriesAligner("ALIGN_DELTA"). - AggregationGroupByFields("metric.labels.response_code_class"). - AggregationCrossSeriesReducer("REDUCE_SUM") - - logger := util.LoggerFrom(ctx).WithFields(logrus.Fields{ - "intervalStartTime": startTimeString, - "intervalEndTime": endTimeString, - "metrics": "error-rate", - }) - logger.Debug("querying Cloud Monitoring API") - timeSeries, err := makeRequestForTimeSeries(logger, req) - if err != nil { - return 0, errors.Wrap(err, "error when querying for time series") - } - - // This happens when no request was made during the given offset. - if len(timeSeries) == 0 { - return 0, nil - } - return calculateErrorResponseRate(timeSeries) -} - -func makeRequestForTimeSeries(logger *logrus.Entry, req *monitoring.ProjectsTimeSeriesListCall) ([]*monitoring.TimeSeries, error) { - resp, err := req.Do() - if err != nil { - return nil, errors.Wrap(err, "error when retrieving time series") - } - if len(resp.ExecutionErrors) != 0 { - for _, execError := range resp.ExecutionErrors { - logger.WithField("message", execError.Message).Warn("execution error occurred") - } - return nil, errors.Errorf("execution errors occurred") - } - - return resp.TimeSeries, nil -} - -// calculateErrorResponseRate calculates the percentage of 5xx error response. -// -// It gets all the server responses and calculates the error rate by performing -// the operation (5xx responses / all responses). Then, it divides the number of -// error responses by the total. -func calculateErrorResponseRate(timeSeries []*monitoring.TimeSeries) (float64, error) { - var errorResponseCount, totalResponses int64 - for _, series := range timeSeries { - // Because the interval and the series aligner are the same, only one - // point is returned per time series. - switch series.Metric.Labels["response_code_class"] { - case "5xx": - errorResponseCount += *(series.Points[0].Value.Int64Value) - default: - totalResponses += *(series.Points[0].Value.Int64Value) - } - } - - totalResponses += errorResponseCount - if totalResponses == 0 { - return 0, nil - } - - rate := float64(errorResponseCount) / float64(totalResponses) - return rate, nil -} - -func alignerAndReducer(alignReduceType metrics.AlignReduce) (aligner string, reducer string) { - switch alignReduceType { - case metrics.Align99Reduce99: - aligner = "ALIGN_PERCENTILE_99" - reducer = "REDUCE_PERCENTILE_99" - case metrics.Align95Reduce95: - aligner = "ALIGN_PERCENTILE_95" - reducer = "REDUCE_PERCENTILE_50" - case metrics.Align50Reduce50: - aligner = "ALIGN_PERCENTILE_50" - reducer = "REDUCE_PERCENTILE_50" - } - - return +func (p Provider) ErrorRate(ctx context.Context, offset time.Duration) (float64, error) { + query := p.query.addFilter("metric.type", p.platform.RequestCountMetricType()) + return errorRate(ctx, p, query, offset) } // newQuery initializes a query. -func newQuery(project, region, serviceName string) query { +func newQuery(project, location, serviceName string) query { var q query return q.addFilter("resource.labels.project_id", project). - addFilter("resource.labels.location", region). + addFilter("resource.labels.location", location). addFilter("resource.labels.service_name", serviceName) } diff --git a/internal/rollout/rollout.go b/internal/rollout/rollout.go index fc80e29..4b8ffc9 100644 --- a/internal/rollout/rollout.go +++ b/internal/rollout/rollout.go @@ -8,8 +8,8 @@ import ( "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/health" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics" - runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" "github.com/jonboulle/clockwork" "github.com/pkg/errors" @@ -29,8 +29,9 @@ const ( // ServiceRecord holds a service object and information about it. type ServiceRecord struct { *run.Service - Project string - Region string + KProvider knative.Provider + Namespace string + Location string } // Rollout is the rollout manager. @@ -38,11 +39,11 @@ type Rollout struct { ctx context.Context metricsProvider metrics.Provider service *run.Service + namespace string serviceName string - project string region string strategy config.Strategy - runClient runapi.Client + kProvider knative.Provider log *logrus.Entry time clockwork.Clock @@ -73,23 +74,23 @@ func New(ctx context.Context, metricsProvider metrics.Provider, svcRecord *Servi metricsProvider: metricsProvider, service: svcRecord.Service, serviceName: svcRecord.Metadata.Name, - project: svcRecord.Project, - region: svcRecord.Region, + namespace: svcRecord.Namespace, + kProvider: svcRecord.KProvider, strategy: strategy, log: logrus.NewEntry(logrus.New()), time: clockwork.NewRealClock(), } } -// WithClient updates the client in the rollout instance. -func (r *Rollout) WithClient(client runapi.Client) *Rollout { - r.runClient = client +// WithKnativeProvider updates the Knative provider in the rollout instance. +func (r *Rollout) WithKnativeProvider(provider knative.Provider) *Rollout { + r.kProvider = provider return r } // WithLogger updates the logger in the rollout instance. -func (r *Rollout) WithLogger(logger *logrus.Logger) *Rollout { - r.log = logger.WithField("project", r.project) +func (r *Rollout) WithLogger(logger *logrus.Entry) *Rollout { + r.log = logger return r } @@ -101,12 +102,6 @@ func (r *Rollout) WithClock(clock clockwork.Clock) *Rollout { // Rollout handles the gradual rollout. func (r *Rollout) Rollout() (bool, error) { - r.log = r.log.WithFields(logrus.Fields{ - "project": r.project, - "service": r.serviceName, - "region": r.region, - }) - _, trafficChanged, err := r.UpdateService(r.service) if err != nil { return false, errors.Wrapf(err, "failed to perform rollout") @@ -175,7 +170,7 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) { // replaceService updates the service object in Cloud Run. func (r *Rollout) replaceService(svc *run.Service) error { - _, err := r.runClient.ReplaceService(r.project, r.serviceName, svc) + _, err := r.kProvider.ReplaceService(r.namespace, r.serviceName, svc) return errors.Wrapf(err, "could not update service %q", r.serviceName) } diff --git a/internal/rollout/rollout_test.go b/internal/rollout/rollout_test.go index 586f1e5..83e8bfd 100644 --- a/internal/rollout/rollout_test.go +++ b/internal/rollout/rollout_test.go @@ -7,12 +7,11 @@ import ( "time" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" + kmock "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative/mock" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics" metricsmock "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/mock" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout" - runmock "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run/mock" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "google.golang.org/api/run/v1" ) @@ -46,7 +45,7 @@ func makeLastRolloutAnnotation(clock clockwork.Clock, offsetFromNowMinute int) s } func TestUpdateService(t *testing.T) { - runclient := &runmock.RunAPI{} + kProvider := &kmock.Provider{} clockMock := clockwork.NewFakeClock() metricsMock := &metricsmock.Metrics{} metricsMock.RequestCountFn = func(ctx context.Context, offset time.Duration) (int64, error) { @@ -333,7 +332,7 @@ func TestUpdateService(t *testing.T) { } for _, test := range tests { - runclient.ReplaceServiceFn = func(namespace, serviceID string, svc *run.Service) (*run.Service, error) { + kProvider.ReplaceServiceFn = func(namespace, serviceID string, svc *run.Service) (*run.Service, error) { return svc, nil } @@ -347,9 +346,7 @@ func TestUpdateService(t *testing.T) { svcRecord := &rollout.ServiceRecord{Service: svc} strategy.HealthCriteria = test.healthCriteria - lg := logrus.New() - lg.SetLevel(logrus.DebugLevel) - r := rollout.New(context.TODO(), metricsMock, svcRecord, strategy).WithClient(runclient).WithLogger(lg).WithClock(clockMock) + r := rollout.New(context.TODO(), metricsMock, svcRecord, strategy).WithKnativeProvider(kProvider).WithClock(clockMock) t.Run(test.name, func(tt *testing.T) { retSvc, changedTraffic, err := r.UpdateService(svc) diff --git a/internal/rollout/traffic_test.go b/internal/rollout/traffic_test.go index cdf9678..d70f51f 100644 --- a/internal/rollout/traffic_test.go +++ b/internal/rollout/traffic_test.go @@ -5,14 +5,14 @@ import ( "testing" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" + kmock "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative/mock" metricsmock "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/mock" - runmock "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run/mock" "github.com/stretchr/testify/assert" "google.golang.org/api/run/v1" ) func TestRollForwardTraffic(t *testing.T) { - runclient := &runmock.RunAPI{} + kProvider := &kmock.Provider{} metricsMock := &metricsmock.Metrics{} strategy := config.Strategy{ Steps: []int64{5, 30, 60}, @@ -129,7 +129,7 @@ func TestRollForwardTraffic(t *testing.T) { } svcRecord := &ServiceRecord{Service: svc} - r := New(context.TODO(), metricsMock, svcRecord, strategy).WithClient(runclient) + r := New(context.TODO(), metricsMock, svcRecord, strategy).WithKnativeProvider(kProvider) t.Run(test.name, func(tt *testing.T) { traffic := r.rollForwardTraffic(svc.Spec.Traffic, test.stable, test.candidate) diff --git a/internal/run/mock/runclient.go b/internal/run/mock/runclient.go deleted file mode 100644 index 57d409e..0000000 --- a/internal/run/mock/runclient.go +++ /dev/null @@ -1,24 +0,0 @@ -package mock - -import "google.golang.org/api/run/v1" - -// RunAPI represents a mock implementation of run.API. -type RunAPI struct { - ServiceFn func(namespace, serviceID string) (*run.Service, error) - ServiceInvoked bool - - ReplaceServiceFn func(namespace, serviceID string, svc *run.Service) (*run.Service, error) - ReplaceServiceInvoked bool -} - -// Service invokes the mock implementation and marks the function as invoked. -func (a *RunAPI) Service(namespace, service string) (*run.Service, error) { - a.ServiceInvoked = true - return a.ServiceFn(namespace, service) -} - -// ReplaceService invokes the mock implementation and marks the function as invoked. -func (a *RunAPI) ReplaceService(namespace, serviceID string, svc *run.Service) (*run.Service, error) { - a.ReplaceServiceInvoked = true - return a.ReplaceServiceFn(namespace, serviceID, svc) -} diff --git a/internal/run/wrapper.go b/internal/run/wrapper.go deleted file mode 100644 index 02f8368..0000000 --- a/internal/run/wrapper.go +++ /dev/null @@ -1,100 +0,0 @@ -package run - -import ( - "context" - "fmt" - - "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" - "github.com/pkg/errors" - "google.golang.org/api/option" - "google.golang.org/api/run/v1" -) - -// Client represents a wrapper around the Cloud Run package. -type Client interface { - Service(namespace, serviceID string) (*run.Service, error) - ReplaceService(namespace, serviceID string, svc *run.Service) (*run.Service, error) -} - -// API is a wrapper for the Cloud Run package. -type API struct { - Client *run.APIService - Region string -} - -// regions are the available regions. -// -// TODO: caching regions might be unnecessary if we are querying them once during -// the lifespan of the process. -var regions = []string{} - -// NewAPIClient initializes an instance of APIService. -func NewAPIClient(ctx context.Context, region string) (*API, error) { - regionalEndpoint := fmt.Sprintf("https://%s-run.googleapis.com/", region) - client, err := run.NewService(ctx, option.WithEndpoint(regionalEndpoint)) - if err != nil { - return nil, errors.Wrap(err, "could not initialize client for the Cloud Run API") - } - - return &API{ - Client: client, - Region: region, - }, nil -} - -// Service retrieves information about a service. -func (a *API) Service(namespace, serviceID string) (*run.Service, error) { - serviceName := serviceName(namespace, serviceID) - return a.Client.Namespaces.Services.Get(serviceName).Do() -} - -// ReplaceService replaces an existing service. -func (a *API) ReplaceService(namespace, serviceID string, svc *run.Service) (*run.Service, error) { - serviceName := serviceName(namespace, serviceID) - return a.Client.Namespaces.Services.ReplaceService(serviceName, svc).Do() -} - -// ServicesWithLabelSelector gets services filtered by a label selector. -func (a *API) ServicesWithLabelSelector(namespace string, labelSelector string) ([]*run.Service, error) { - parent := fmt.Sprintf("namespaces/%s", namespace) - - servicesList, err := a.Client.Namespaces.Services.List(parent).LabelSelector(labelSelector).Do() - if err != nil { - return nil, errors.Wrap(err, "failed to filter services by label selector") - } - - return servicesList.Items, nil -} - -// Regions gets the supported regions for the project. -func Regions(ctx context.Context, project string) ([]string, error) { - logger := util.LoggerFrom(ctx) - if len(regions) != 0 { - logger.Debug("using cached regions, skip querying from API") - return regions, nil - } - - client, err := run.NewService(ctx) - if err != nil { - return nil, errors.Wrap(err, "could not initialize client for the Cloud Run API") - } - - name := fmt.Sprintf("projects/%s", project) - resp, err := client.Projects.Locations.List(name).Do() - if err != nil { - return nil, errors.Wrap(err, "failed to get locations") - } - - for _, location := range resp.Locations { - regions = append(regions, location.LocationId) - } - return regions, nil -} - -// generateServiceName returns the name of the specified service. It returns the -// form namespaces/{namespace_id}/services/{service_id}. -// -// For Cloud Run (fully managed), the namespace is the project ID or number. -func serviceName(namespace, serviceID string) string { - return fmt.Sprintf("namespaces/%s/services/%s", namespace, serviceID) -}