Skip to content
This repository has been archived by the owner on Jan 11, 2023. It is now read-only.

rollout: Publish rollout events to PubSub #109

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"cloud.google.com/go/compute/metadata"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
sdlog "github.com/TV4/logrus-stackdriver-formatter"
isatty "github.com/mattn/go-isatty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -61,6 +63,7 @@ var (
flHTTPAddr string
flProject string
flLabelSelector string
flPubSubTopic string

// Empty array means all regions.
flRegions []string
Expand Down Expand Up @@ -93,6 +96,7 @@ func init() {
flag.StringVar(&flHTTPAddr, "http-addr", defaultAddr, "address where to listen to http requests (e.g. :8080)")
flag.StringVar(&flProject, "project", "", "project in which the service is deployed")
flag.StringVar(&flLabelSelector, "label", "rollout-strategy=gradual", "filter services based on a label (e.g. team=backend)")
flag.StringVar(&flPubSubTopic, "notify-pubsub", "", "publish rollout events to the given Google Cloud Pub/Sub topic")
flag.StringVar(&flRegionsString, "regions", "", "the Cloud Run regions where the services should be looked at")
flag.Var(&flSteps, "step", "a percentage in traffic the candidate should go through")
flag.StringVar(&flStepsString, "steps", "5,20,50,80", "define steps in one flag separated by commas (e.g. 5,30,60)")
Expand Down Expand Up @@ -160,19 +164,29 @@ func main() {
}

ctx := context.Background()
var pubsub ps.Client
if flPubSubTopic != "" {
logger.WithField("topic", flPubSubTopic).Debug("will publish to Pub/Sub")
ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger))
pubsub, err = ps.New(ctx, flProject, flPubSubTopic)
if err != nil {
logger.Fatalf("failed to initialize Pub/Sub client: %v", err)
}
}

if flCLI {
runDaemon(ctx, logger, cfg)
runDaemon(ctx, logger, cfg, pubsub)
} else {
http.HandleFunc("/rollout", makeRolloutHandler(logger, cfg))
http.HandleFunc("/rollout", makeRolloutHandler(logger, cfg, pubsub))
logger.WithField("addr", flHTTPAddr).Infof("starting server")
logger.Fatal(http.ListenAndServe(flHTTPAddr, nil))
}
}

func runDaemon(ctx context.Context, logger *logrus.Logger, cfg *config.Config) {
func runDaemon(ctx context.Context, logger *logrus.Logger, cfg *config.Config, pubsub ps.Client) {
for {
// TODO(gvso): Handle all the strategies.
errs := runRollouts(ctx, logger, cfg.Strategies[0])
errs := runRollouts(ctx, logger, cfg.Strategies[0], pubsub)
errsStr := rolloutErrsToString(errs)
if len(errs) != 0 {
logger.Warnf("there were %d errors: \n%s", len(errs), errsStr)
Expand Down
13 changes: 7 additions & 6 deletions cmd/operator/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/sheets"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/stackdriver"
ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// runRollouts concurrently handles the rollout of the targeted services.
func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy) []error {
func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy, pubsub ps.Client) []error {
svcs, err := getTargetedServices(ctx, logger, strategy.Target)
if err != nil {
return []error{errors.Wrap(err, "failed to get targeted services")}
Expand All @@ -32,24 +33,24 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str
)
for _, svc := range svcs {
wg.Add(1)
go func(ctx context.Context, lg *logrus.Logger, svc *rollout.ServiceRecord, strategy config.Strategy) {
go func(ctx context.Context, lg *logrus.Logger, svc *rollout.ServiceRecord, strategy config.Strategy, pubsub ps.Client) {
defer wg.Done()
err := handleRollout(ctx, lg, svc, strategy)
err := handleRollout(ctx, lg, svc, strategy, pubsub)
if err != nil {
lg.Debugf("rollout error for service %q: %+v", svc.Service.Metadata.Name, err)
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(ctx, logger, svc, strategy)
}(ctx, logger, svc, strategy, pubsub)
}
wg.Wait()

return errs
}

// handleRollout manages the rollout process for a single service.
func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy) error {
func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy, pubsub ps.Client) error {
lg := logger.WithFields(logrus.Fields{
"project": service.Project,
"service": service.Metadata.Name,
Expand All @@ -64,7 +65,7 @@ func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.
if err != nil {
return errors.Wrap(err, "failed to initialize metrics provider")
}
roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger)
roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger).WithPubSub(pubsub)

changed, err := roll.Rollout()
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/operator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"net/http"

"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub"
"github.com/sirupsen/logrus"
)

// makeRolloutHandler creates a request handler to perform a rollout process.
func makeRolloutHandler(logger *logrus.Logger, cfg *config.Config) http.HandlerFunc {
func makeRolloutHandler(logger *logrus.Logger, cfg *config.Config, pubsub ps.Client) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// TODO(gvso): Handle all the strategies.
errs := runRollouts(ctx, logger, cfg.Strategies[0])
errs := runRollouts(ctx, logger, cfg.Strategies[0], pubsub)
errsStr := rolloutErrsToString(errs)
if len(errs) != 0 {
msg := fmt.Sprintf("there were %d errors: \n%s", len(errs), errsStr)
Expand Down
23 changes: 10 additions & 13 deletions internal/notification/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,17 @@ func (ps PubSub) Publish(ctx context.Context, event RolloutEvent) error {
}

logger := util.LoggerFrom(ctx)
ps.topic.Publish(ctx, &cloudpubsub.Message{
Data: data,
})
logger.WithField("size", len(data)).Debug("event published to Pub/Sub")
return nil
}
result := ps.topic.Publish(ctx, &cloudpubsub.Message{Data: data})
serverID, err := result.Get(ctx)
if err != nil {
return errors.Wrap(err, "failed to publish event to Pub/Sub")
}

// Stop is a wrapper around Cloud Run Pub/Sub package's Stop method on Topic.
//
// It sends all remaining published messages and stop goroutines created for
// handling publishing. Returns once all outstanding messages have been sent or
// have failed to be sent.
func (ps PubSub) Stop() {
ps.topic.Stop()
logger.WithFields(logrus.Fields{
"size": len(data),
"serverID": serverID,
}).Debug("event published to Pub/Sub")
return nil
}

// findRevisionWithTag scans the service's traffic configuration and returns the
Expand Down
43 changes: 38 additions & 5 deletions internal/rollout/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/health"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -45,6 +46,7 @@ type Rollout struct {
runClient runapi.Client
log *logrus.Entry
time clockwork.Clock
pubsubClient pubsub.Client

// Used to determine if candidate should become stable during update.
promoteToStable bool
Expand Down Expand Up @@ -99,6 +101,12 @@ func (r *Rollout) WithClock(clock clockwork.Clock) *Rollout {
return r
}

// WithPubSub updates the Pub/Sub client in the rollout instance.
func (r *Rollout) WithPubSub(psClient pubsub.Client) *Rollout {
r.pubsubClient = psClient
return r
}

// Rollout handles the gradual rollout.
func (r *Rollout) Rollout() (bool, error) {
r.log = r.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -145,7 +153,7 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) {
svc = r.updateAnnotations(svc, stable, candidate)
r.setHealthReportAnnotation(svc, "new candidate, no health report available yet")

err := r.replaceService(svc)
err := r.replaceServiceAndPublish(svc, true, health.Healthy)
return svc, true, errors.Wrap(err, "failed to replace service")
}

Expand All @@ -169,13 +177,21 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) {
report := health.StringReport(r.strategy.HealthCriteria, diagnosis, trafficChanged)
r.setHealthReportAnnotation(svc, report)

err = r.replaceService(svc)
err = r.replaceServiceAndPublish(svc, trafficChanged, diagnosis.OverallResult)
return svc, trafficChanged, errors.Wrap(err, "failed to replace service")
}

// replaceService updates the service object in Cloud Run.
func (r *Rollout) replaceService(svc *run.Service) error {
_, err := r.runClient.ReplaceService(r.project, r.serviceName, svc)
// replaceServiceAndPublish updates the service object in Cloud Run. Then, it
// publishes to Google Cloud Pub/Sub.
func (r *Rollout) replaceServiceAndPublish(svc *run.Service, trafficChanged bool, diagnosis health.DiagnosisResult) error {
svc, err := r.runClient.ReplaceService(r.project, r.serviceName, svc)

if trafficChanged {
pubErr := r.publishEvent(svc, diagnosis)
if pubErr != nil {
r.log.Warnf("failed to publish rollout/rollback message: %v", pubErr)
}
}
return errors.Wrapf(err, "could not update service %q", r.serviceName)
}

Expand Down Expand Up @@ -232,3 +248,20 @@ func (r *Rollout) diagnoseCandidate(candidate string, healthCriteria []config.He
d, err = health.Diagnose(ctx, healthCriteria, metricsValues)
return d, errors.Wrap(err, "failed to diagnose candidate's health")
}

func (r *Rollout) publishEvent(svc *run.Service, diagnosis health.DiagnosisResult) error {
if r.pubsubClient == nil {
return nil
}

event, err := pubsub.NewRolloutEvent(svc, diagnosis, r.promoteToStable)
if err != nil {
return errors.Wrap(err, "failed to create rollout event message")
}
ctx := util.ContextWithLogger(r.ctx, r.log)
err = r.pubsubClient.Publish(ctx, event)
if err != nil {
return errors.Wrap(err, "error when publishing rollout event")
}
return nil
}