Skip to content
This repository has been archived by the owner on Jan 11, 2023. It is now read-only.

stackdriver: Add stackdriver support for CRfA #108

Open
wants to merge 1 commit into
base: anthos-support
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 70 additions & 18 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,20 @@ var (
flCLI bool
flCLILoopInterval time.Duration
flHTTPAddr string
flProject string
flLabelSelector string

// Empty array means all regions.
// Generic target configuration
flPlatform string
flProject string
flLabelSelector string

// Anthos target configuration
flGKEClusterLocation string
flGKECluster string
// TODO(gvso): Support multiple namespaces or all namespaces if none is
// specified.
flGKENamespace string

// Fully-managed target config. Empty array means all regions.
flRegions []string
flRegionsString string

Expand Down Expand Up @@ -91,8 +101,12 @@ func init() {
flag.BoolVar(&flCLI, "cli", false, "run as CLI application to manage rollout in intervals")
flag.DurationVar(&flCLILoopInterval, "cli-run-interval", 60*time.Second, "the time between each rollout process (in seconds)")
flag.StringVar(&flHTTPAddr, "http-addr", defaultAddr, "address where to listen to http requests (e.g. :8080)")
flag.StringVar(&flPlatform, "platform", config.PlatformManaged, "target platform to query for Knative services (managed or gke)")
flag.StringVar(&flProject, "project", "", "project in which the service is deployed")
flag.StringVar(&flLabelSelector, "label", "rollout-strategy=gradual", "filter services based on a label (e.g. team=backend)")
flag.StringVar(&flGKEClusterLocation, "cluster-location", "", "zone in which the cluster is located")
flag.StringVar(&flGKECluster, "cluster", "", "ID of the cluster")
flag.StringVar(&flGKENamespace, "namespace", "default", "Kubernetes namespace where to look for Knative services")
flag.StringVar(&flRegionsString, "regions", "", "the Cloud Run regions where the services should be looked at")
flag.Var(&flSteps, "step", "a percentage in traffic the candidate should go through")
flag.StringVar(&flStepsString, "steps", "5,20,50,80", "define steps in one flag separated by commas (e.g. 5,30,60)")
Expand Down Expand Up @@ -150,7 +164,12 @@ func main() {
logger.Debug(flagsToString())

// Configuration.
target := config.NewTarget(flProject, flRegions, flLabelSelector)
var target config.Target
if flPlatform == config.PlatformManaged {
target = config.NewManagedTarget(flProject, flRegions, flLabelSelector)
} else {
target = config.NewGKETarget(flProject, flGKEClusterLocation, flGKECluster, flGKENamespace, flLabelSelector)
}
healthCriteria := healthCriteriaFromFlags(flMinRequestCount, flErrorRate, flLatencyP99, flLatencyP95, flLatencyP50)
printHealthCriteria(logger, healthCriteria)
strategy := config.NewStrategy(target, flSteps, flHealthOffset, flTimeBeweenRollouts, healthCriteria)
Expand Down Expand Up @@ -197,14 +216,39 @@ func validateFlags() error {
}
}

for _, region := range flRegions {
if region == "" {
return errors.New("regions cannot be empty")
if flCLILoopInterval < 0 {
return errors.Errorf("cli run interval cannot be negative, got %s", flCLILoopInterval)
}

if flPlatform != config.PlatformManaged && flPlatform != config.PlatformGKE {
return errors.Errorf("invalid platform %s", flPlatform)
}

// GKE platform.
if flPlatform == "gke" {
if flGKECluster == "" {
return errors.Errorf("gke: cluster must be specified")
}
if flGKEClusterLocation == "" {
return errors.Errorf("gke: cluster location must be specified")
}
if flGKENamespace == "" {
return errors.Errorf("gke: namespace cannot be empty")
}
return nil
}

if flCLILoopInterval < 0 {
return errors.Errorf("cli run interval cannot be negative, got %s", flCLILoopInterval)
// Fully-managed platform.
for i, region := range flRegions {
if region == "" {
return errors.Errorf("region value at index %d cannot be empty", i)
}
}
if flGKECluster != "" {
return errors.New("cannot specify cluster for managed platform")
}
if flGKEClusterLocation != "" {
return errors.New("cannot specify cluster location for managed platform")
}
return nil
}
Expand All @@ -216,15 +260,25 @@ func flagsToString() string {
} else {
str += fmt.Sprintf("-http-addr=%s\n", flHTTPAddr)
}

regionsStr := "all"
if len(flRegions) != 0 {
regionsStr = fmt.Sprintf("%v", flRegions)
str += fmt.Sprintf("-project=%s\n-platform=%s\n", flProject, flPlatform)

if flPlatform == config.PlatformGKE {
str += fmt.Sprintf("-cluster-location=%s\n"+
"-cluster=%s\n"+
"-namespace=%s\n",
flGKEClusterLocation,
flGKECluster,
flGKENamespace,
)
} else {
regionsStr := "all"
if len(flRegions) != 0 {
regionsStr = fmt.Sprintf("%v", flRegions)
}
str += fmt.Sprintf("-regions=%s\n", regionsStr)
}

str += fmt.Sprintf("-project=%s\n"+
"-label=%s\n"+
"-regions=%s\n"+
str += fmt.Sprintf("-label=%s\n"+
"-steps=%s\n"+
"-healthcheck-offset=%s\n"+
"-min-wait=%s\n"+
Expand All @@ -233,9 +287,7 @@ func flagsToString() string {
"-latency-p99=%.2f\n"+
"-latency-p95=%.2f\n"+
"-latency-p50=%.2f\n",
flProject,
flLabelSelector,
regionsStr,
flSteps,
flHealthOffset,
flTimeBeweenRollouts,
Expand Down
34 changes: 18 additions & 16 deletions cmd/operator/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/sheets"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/stackdriver"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// runRollouts concurrently handles the rollout of the targeted services.
func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy) []error {
svcs, err := getTargetedServices(ctx, logger, strategy.Target)
ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger))
svcs, err := getTargetedServices(ctx, strategy.Target)
if err != nil {
return []error{errors.Wrap(err, "failed to get targeted services")}
}
Expand Down Expand Up @@ -50,21 +51,13 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str

// handleRollout manages the rollout process for a single service.
func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy) error {
lg := logger.WithFields(logrus.Fields{
"project": service.Project,
"service": service.Metadata.Name,
"region": service.Region,
})
lg := logger.WithFields(service.KProvider.LoggingFields())

client, err := runapi.NewAPIClient(ctx, service.Region)
if err != nil {
return errors.Wrap(err, "failed to initialize Cloud Run API client")
}
metricsProvider, err := chooseMetricsProvider(ctx, lg, service.Project, service.Region, service.Metadata.Name)
metricsProvider, err := chooseMetricsProvider(ctx, lg, service.Location, service.Metadata.Name)
if err != nil {
return errors.Wrap(err, "failed to initialize metrics provider")
}
roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger)
roll := rollout.New(ctx, metricsProvider, service, strategy).WithLogger(lg)

changed, err := roll.Rollout()
if err != nil {
Expand Down Expand Up @@ -94,11 +87,20 @@ func rolloutErrsToString(errs []error) (errsStr string) {

// chooseMetricsProvider checks the CLI flags and determine which metrics
// provider should be used for the rollout.
func chooseMetricsProvider(ctx context.Context, logger *logrus.Entry, project, region, svcName string) (metrics.Provider, error) {
func chooseMetricsProvider(ctx context.Context, logger *logrus.Entry, location, svcName string) (metrics.Provider, error) {
if flGoogleSheetsID != "" {
logger.Debug("using Google Sheets as metrics provider")
return sheets.NewProvider(ctx, flGoogleSheetsID, "", region, svcName)
return sheets.NewProvider(ctx, flGoogleSheetsID, "", location, svcName)
}

logger.Debug("using Cloud Monitoring (Stackdriver) as metrics provider")
return stackdriver.NewProvider(ctx, project, region, svcName)
provider, err := stackdriver.NewProvider(ctx, flProject, location, svcName)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize stackdriver provider")
}
if flPlatform == config.PlatformGKE {
logger.Debug("pointing stackdriver to Cloud Run for Anthos")
provider = provider.WithGKEPlatform(flGKENamespace, flGKECluster)
}
return provider, nil
}
96 changes: 68 additions & 28 deletions cmd/operator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"sync"

"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative/cloudrun"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -15,7 +16,24 @@ import (

// getTargetedServices returned a list of service records that match the target
// configuration.
func getTargetedServices(ctx context.Context, logger *logrus.Logger, target config.Target) ([]*rollout.ServiceRecord, error) {
func getTargetedServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) {
logger := util.LoggerFrom(ctx)
if target.Platform == config.PlatformManaged {
logger.Debug("getting services on Cloud Run fully managed")
return getManagedServices(ctx, target)
}
logger.Debug("getting services on Cloud Run for Anthos")
return getGKEServices(ctx, target)
}

// getManagedServices returned a list of services from Cloud Run fully managed
// that match the target.
//
// It fetches the matches from each of the specified regions in the target
// configuration. If no regions are explicitly specified, it gets the list of
// Cloud Run regions and queries all of them.
func getManagedServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) {
logger := util.LoggerFrom(ctx)
logger.Debug("querying Cloud Run API to get all targeted services")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -27,17 +45,24 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf
wg sync.WaitGroup
)

regions, err := determineRegions(ctx, logger, target)
regions, err := determineRegions(ctx, target)
if err != nil {
return nil, errors.Wrap(err, "cannot determine regions")
}

for _, region := range regions {
wg.Add(1)

go func(ctx context.Context, logger *logrus.Logger, region, labelSelector string) {
go func(ctx context.Context, logger *logrus.Entry, project, region, labelSelector string) {
defer wg.Done()
svcs, err := getServicesByRegionAndLabel(ctx, logger, target.Project, region, target.LabelSelector)

provider, err := cloudrun.NewFullyManagedProvider(ctx, project, region)
if err != nil {
retError = errors.Wrap(err, "failed to initialize Cloud Run fully managed client")
cancel()
return
}
svcs, err := getServicesByLabel(ctx, provider, project, labelSelector)
if err != nil {
retError = err
cancel()
Expand All @@ -46,56 +71,70 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf

for _, svc := range svcs {
mu.Lock()
retServices = append(retServices, newServiceRecord(svc, target.Project, region))
retServices = append(retServices, newServiceRecord(svc, provider, project, region))
mu.Unlock()
}

}(ctx, logger, region, target.LabelSelector)
}(ctx, logger, target.Project, region, target.LabelSelector)
}

wg.Wait()
return retServices, retError
}

// getServicesByRegionAndLabel returns all the service records that match the
// labelSelector in a specific region.
func getServicesByRegionAndLabel(ctx context.Context, logger *logrus.Logger, project, region, labelSelector string) ([]*run.Service, error) {
// getGKEServices get the services running on Cloud Run for Anthos.
func getGKEServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) {
logger := util.LoggerFrom(ctx)
provider, err := cloudrun.NewGKEProvider(ctx, target.Project, target.GKEClusterLocation, target.GKEClusterName)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize Cloud Run for Anthos client")
}

logger.Debugf("querying for services on GKE cluster, namespace %s", target.GKENamespace)
svcs, err := getServicesByLabel(ctx, provider, target.GKENamespace, target.LabelSelector)

var services []*rollout.ServiceRecord
for _, svc := range svcs {
services = append(services, newServiceRecord(svc, provider, target.GKENamespace, target.GKEClusterLocation))
}
return services, nil
}

// getServicesByLabel returns all the service records that match the label
// selector.
//
// For Cloud Run fully managed, the namespace is the project ID.
func getServicesByLabel(ctx context.Context, provider knative.Provider, namespace, labelSelector string) ([]*run.Service, error) {
logger := util.LoggerFrom(ctx)
lg := logger.WithFields(logrus.Fields{
"region": region,
"labelSelector": labelSelector,
})

lg.Debug("querying Cloud Run services")
runclient, err := runapi.NewAPIClient(ctx, region)
lg.Debug("querying for services in provider")
svcs, err := provider.ServicesWithLabelSelector(namespace, labelSelector)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize Cloud Run client")
}

svcs, err := runclient.ServicesWithLabelSelector(project, labelSelector)
if err != nil {
return nil, errors.Wrapf(err, "failed to get services with label %q in region %q", labelSelector, region)
return nil, errors.Wrapf(err, "failed to get services with label %q", labelSelector)
}

lg.WithField("n", len(svcs)).Debug("finished retrieving services from the API")
return svcs, nil
}

// determineRegions gets the regions the label selector should be searched at.
// Used for Cloud Run fuly managed.
//
// If the target configuration does not specify any regions, the entire list of
// regions is retrieved from API.
func determineRegions(ctx context.Context, logger *logrus.Logger, target config.Target) ([]string, error) {
func determineRegions(ctx context.Context, target config.Target) ([]string, error) {
logger := util.LoggerFrom(ctx)
regions := target.Regions
if len(regions) != 0 {
logger.Debug("using predefined list of regions, skip querying from API")
return regions, nil
}

logger.Debug("retrieving all regions from the API")

lg := logrus.NewEntry(logger)
ctx = util.ContextWithLogger(ctx, lg)
regions, err := runapi.Regions(ctx, target.Project)
regions, err := cloudrun.Regions(ctx, target.Project)
if err != nil {
return nil, errors.Wrap(err, "cannot get list of regions from Cloud Run API")
}
Expand All @@ -105,10 +144,11 @@ func determineRegions(ctx context.Context, logger *logrus.Logger, target config.
}

// newServiceRecord creates a new service record.
func newServiceRecord(svc *run.Service, project, region string) *rollout.ServiceRecord {
func newServiceRecord(svc *run.Service, provider knative.Provider, namespace, location string) *rollout.ServiceRecord {
return &rollout.ServiceRecord{
Service: svc,
Project: project,
Region: region,
Service: svc,
KProvider: provider,
Namespace: namespace,
Location: location,
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.6.1
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
google.golang.org/api v0.28.0
)
Loading