Skip to content
This repository has been archived by the owner on Jan 11, 2023. It is now read-only.

knative: Add knative package #107

Open
wants to merge 9 commits into
base: anthos-support
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions cmd/operator/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/sheets"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/stackdriver"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// runRollouts concurrently handles the rollout of the targeted services.
func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy) []error {
svcs, err := getTargetedServices(ctx, logger, strategy.Target)
ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger))
svcs, err := getTargetedServices(ctx, strategy.Target)
if err != nil {
return []error{errors.Wrap(err, "failed to get targeted services")}
}
Expand Down Expand Up @@ -50,21 +51,13 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str

// handleRollout manages the rollout process for a single service.
func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy) error {
lg := logger.WithFields(logrus.Fields{
"project": service.Project,
"service": service.Metadata.Name,
"region": service.Region,
})
lg := logger.WithFields(service.KProvider.LoggingFields()).WithField("service", service.Metadata.Name)

client, err := runapi.NewAPIClient(ctx, service.Region)
if err != nil {
return errors.Wrap(err, "failed to initialize Cloud Run API client")
}
metricsProvider, err := chooseMetricsProvider(ctx, lg, service.Project, service.Region, service.Metadata.Name)
metricsProvider, err := chooseMetricsProvider(ctx, lg, service.Namespace, service.Location, service.Metadata.Name)
if err != nil {
return errors.Wrap(err, "failed to initialize metrics provider")
}
roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger)
roll := rollout.New(ctx, metricsProvider, service, strategy).WithLogger(lg)

changed, err := roll.Rollout()
if err != nil {
Expand Down
61 changes: 32 additions & 29 deletions cmd/operator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"sync"

"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative/cloudrun"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -15,7 +16,8 @@ import (

// getTargetedServices returned a list of service records that match the target
// configuration.
func getTargetedServices(ctx context.Context, logger *logrus.Logger, target config.Target) ([]*rollout.ServiceRecord, error) {
func getTargetedServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) {
logger := util.LoggerFrom(ctx)
logger.Debug("querying Cloud Run API to get all targeted services")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -27,17 +29,25 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf
wg sync.WaitGroup
)

regions, err := determineRegions(ctx, logger, target)
regions, err := determineRegions(ctx, target)
if err != nil {
return nil, errors.Wrap(err, "cannot determine regions")
}

for _, region := range regions {
wg.Add(1)

go func(ctx context.Context, logger *logrus.Logger, region, labelSelector string) {
go func(ctx context.Context, logger *logrus.Entry, project, region, labelSelector string) {
defer wg.Done()
svcs, err := getServicesByRegionAndLabel(ctx, logger, target.Project, region, target.LabelSelector)

provider, err := cloudrun.NewFullyManagedProvider(ctx, project, region)
if err != nil {
retError = errors.Wrap(err, "failed to initialize Cloud Run fully managed client")
cancel()
return
}

svcs, err := getServicesByLabel(ctx, provider, project, labelSelector)
if err != nil {
retError = err
cancel()
Expand All @@ -46,34 +56,28 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf

for _, svc := range svcs {
mu.Lock()
retServices = append(retServices, newServiceRecord(svc, target.Project, region))
retServices = append(retServices, newServiceRecord(svc, provider, project, region))
mu.Unlock()
}

}(ctx, logger, region, target.LabelSelector)
}(ctx, logger, target.Project, region, target.LabelSelector)
}

wg.Wait()
return retServices, retError
}

// getServicesByRegionAndLabel returns all the service records that match the
// labelSelector in a specific region.
func getServicesByRegionAndLabel(ctx context.Context, logger *logrus.Logger, project, region, labelSelector string) ([]*run.Service, error) {
// getServicesByLabel returns all the service records that match the label
// selector.
func getServicesByLabel(ctx context.Context, provider knative.Provider, namespace, labelSelector string) ([]*run.Service, error) {
logger := util.LoggerFrom(ctx)
lg := logger.WithFields(logrus.Fields{
"region": region,
"labelSelector": labelSelector,
})

lg.Debug("querying Cloud Run services")
runclient, err := runapi.NewAPIClient(ctx, region)
lg.Debug("querying for services in provider")
svcs, err := provider.ListServices(namespace, labelSelector)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize Cloud Run client")
}

svcs, err := runclient.ServicesWithLabelSelector(project, labelSelector)
if err != nil {
return nil, errors.Wrapf(err, "failed to get services with label %q in region %q", labelSelector, region)
return nil, errors.Wrapf(err, "failed to get services with label %q", labelSelector)
}

lg.WithField("n", len(svcs)).Debug("finished retrieving services from the API")
Expand All @@ -84,18 +88,16 @@ func getServicesByRegionAndLabel(ctx context.Context, logger *logrus.Logger, pro
//
// If the target configuration does not specify any regions, the entire list of
// regions is retrieved from API.
func determineRegions(ctx context.Context, logger *logrus.Logger, target config.Target) ([]string, error) {
func determineRegions(ctx context.Context, target config.Target) ([]string, error) {
logger := util.LoggerFrom(ctx)
regions := target.Regions
if len(regions) != 0 {
logger.Debug("using predefined list of regions, skip querying from API")
return regions, nil
}

logger.Debug("retrieving all regions from the API")

lg := logrus.NewEntry(logger)
ctx = util.ContextWithLogger(ctx, lg)
regions, err := runapi.Regions(ctx, target.Project)
regions, err := cloudrun.FullyManagedRegions(ctx, target.Project)
if err != nil {
return nil, errors.Wrap(err, "cannot get list of regions from Cloud Run API")
}
Expand All @@ -105,10 +107,11 @@ func determineRegions(ctx context.Context, logger *logrus.Logger, target config.
}

// newServiceRecord creates a new service record.
func newServiceRecord(svc *run.Service, project, region string) *rollout.ServiceRecord {
func newServiceRecord(svc *run.Service, provider knative.Provider, namespace, location string) *rollout.ServiceRecord {
return &rollout.ServiceRecord{
Service: svc,
Project: project,
Region: region,
Service: svc,
KProvider: provider,
Namespace: namespace,
Location: location,
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.6.1
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
google.golang.org/api v0.28.0
)
75 changes: 75 additions & 0 deletions internal/knative/cloudrun/gke.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cloudrun

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"net/http"

"github.com/pkg/errors"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/api/container/v1"
)

// newGKEClient initializes a client for GKE cluster.
func newGKEClient(ctx context.Context, project, zone, clusterName string) (*http.Client, string, error) {
tokenSource, err := google.DefaultTokenSource(ctx, compute.CloudPlatformScope)
if err != nil {
return nil, "", errors.Wrap(err, "failed to get token source")
}

httpClient := oauth2.NewClient(ctx, tokenSource)
containerService, err := container.New(httpClient)
if err != nil {
return nil, "", fmt.Errorf("could not create client for Google Kubernetes Engine: %v", err)
}

// TODO: handle regional clusters
cluster, err := containerService.Projects.Zones.Clusters.Get(project, zone, clusterName).Context(ctx).Do()
ahmetb marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, "", errors.Wrapf(err, "failed to get cluster %q in project %q, zone %q", clusterName, project, zone)
}

hClient, err := newGKEHTTPClient(cluster, tokenSource)
if err != nil {
return nil, "", errors.Wrap(err, "failed to initialize HTTP client")
}
return hClient, cluster.Endpoint, nil
}

func newGKEHTTPClient(cluster *container.Cluster, tokenSource oauth2.TokenSource) (*http.Client, error) {
tlsCfg, err := gkeTLSConfig(cluster)
if err != nil {
return nil, errors.Wrap(err, "failed to get TLS configuration")
}

hTransport := http.DefaultTransport.(*http.Transport).Clone()
hTransport.TLSClientConfig = tlsCfg
oauth2Transport := &oauth2.Transport{
Base: hTransport,
Source: tokenSource,
}
return &http.Client{Transport: oauth2Transport}, nil
}

func gkeTLSConfig(cluster *container.Cluster) (*tls.Config, error) {
caCert, err := base64.StdEncoding.DecodeString(cluster.MasterAuth.ClusterCaCertificate)
if err != nil {
return nil, errors.Wrap(err, "failed to decode cluster certificate")
}

// CA Cert from kube master
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caCert))

// Setup TLS config
tlsConfig := &tls.Config{
RootCAs: caCertPool,
}

return tlsConfig, nil
}
Loading