From 73682f57fa5b4eff41cd6839149abb148aaf8d73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Getulio=20Valentin=20S=C3=A1nchez?= Date: Wed, 26 Aug 2020 09:51:29 -0400 Subject: [PATCH 1/4] rollout: Publish rollout events to PubSub MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This integrates the rollout package with the pubsub package to publish events after rollouts/rollbacks occur. It also adds a new flag `--notify-pubsub` to specify the PubSub topic. Signed-off-by: Getulio Valentin Sánchez --- cmd/operator/main.go | 22 +++++++++--- cmd/operator/rollout.go | 13 +++---- cmd/operator/server.go | 5 +-- internal/notification/pubsub/pubsub.go | 1 + internal/rollout/rollout.go | 48 ++++++++++++++++++++++---- 5 files changed, 71 insertions(+), 18 deletions(-) diff --git a/cmd/operator/main.go b/cmd/operator/main.go index f99b4bf..87f87de 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -28,6 +28,8 @@ import ( "cloud.google.com/go/compute/metadata" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" + ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" sdlog "github.com/TV4/logrus-stackdriver-formatter" isatty "github.com/mattn/go-isatty" "github.com/pkg/errors" @@ -61,6 +63,7 @@ var ( flHTTPAddr string flProject string flLabelSelector string + flPubSubTopic string // Empty array means all regions. flRegions []string @@ -93,6 +96,7 @@ func init() { flag.StringVar(&flHTTPAddr, "http-addr", defaultAddr, "address where to listen to http requests (e.g. :8080)") flag.StringVar(&flProject, "project", "", "project in which the service is deployed") flag.StringVar(&flLabelSelector, "label", "rollout-strategy=gradual", "filter services based on a label (e.g. team=backend)") + flag.StringVar(&flPubSubTopic, "notify-pubsub", "", "publish rollout events to the given Google Cloud Pub/Sub topic") flag.StringVar(&flRegionsString, "regions", "", "the Cloud Run regions where the services should be looked at") flag.Var(&flSteps, "step", "a percentage in traffic the candidate should go through") flag.StringVar(&flStepsString, "steps", "5,20,50,80", "define steps in one flag separated by commas (e.g. 5,30,60)") @@ -160,19 +164,29 @@ func main() { } ctx := context.Background() + var pubsub ps.Client + if flPubSubTopic != "" { + ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger)) + pubsub, err = ps.New(ctx, flProject, flPubSubTopic) + if err != nil { + logger.Fatalf("failed to initialize Pub/Sub client: %v", err) + } + + } + if flCLI { - runDaemon(ctx, logger, cfg) + runDaemon(ctx, logger, cfg, pubsub) } else { - http.HandleFunc("/rollout", makeRolloutHandler(logger, cfg)) + http.HandleFunc("/rollout", makeRolloutHandler(logger, cfg, pubsub)) logger.WithField("addr", flHTTPAddr).Infof("starting server") logger.Fatal(http.ListenAndServe(flHTTPAddr, nil)) } } -func runDaemon(ctx context.Context, logger *logrus.Logger, cfg *config.Config) { +func runDaemon(ctx context.Context, logger *logrus.Logger, cfg *config.Config, pubsub ps.Client) { for { // TODO(gvso): Handle all the strategies. - errs := runRollouts(ctx, logger, cfg.Strategies[0]) + errs := runRollouts(ctx, logger, cfg.Strategies[0], pubsub) errsStr := rolloutErrsToString(errs) if len(errs) != 0 { logger.Warnf("there were %d errors: \n%s", len(errs), errsStr) diff --git a/cmd/operator/rollout.go b/cmd/operator/rollout.go index 9dfed50..0a48afa 100644 --- a/cmd/operator/rollout.go +++ b/cmd/operator/rollout.go @@ -9,6 +9,7 @@ import ( "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/sheets" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/stackdriver" + ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout" runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run" "github.com/pkg/errors" @@ -16,7 +17,7 @@ import ( ) // runRollouts concurrently handles the rollout of the targeted services. -func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy) []error { +func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy, pubsub ps.Client) []error { svcs, err := getTargetedServices(ctx, logger, strategy.Target) if err != nil { return []error{errors.Wrap(err, "failed to get targeted services")} @@ -32,16 +33,16 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str ) for _, svc := range svcs { wg.Add(1) - go func(ctx context.Context, lg *logrus.Logger, svc *rollout.ServiceRecord, strategy config.Strategy) { + go func(ctx context.Context, lg *logrus.Logger, svc *rollout.ServiceRecord, strategy config.Strategy, pubsub ps.Client) { defer wg.Done() - err := handleRollout(ctx, lg, svc, strategy) + err := handleRollout(ctx, lg, svc, strategy, pubsub) if err != nil { lg.Debugf("rollout error for service %q: %+v", svc.Service.Metadata.Name, err) mu.Lock() errs = append(errs, err) mu.Unlock() } - }(ctx, logger, svc, strategy) + }(ctx, logger, svc, strategy, pubsub) } wg.Wait() @@ -49,7 +50,7 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str } // handleRollout manages the rollout process for a single service. -func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy) error { +func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy, pubsub ps.Client) error { lg := logger.WithFields(logrus.Fields{ "project": service.Project, "service": service.Metadata.Name, @@ -64,7 +65,7 @@ func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout. if err != nil { return errors.Wrap(err, "failed to initialize metrics provider") } - roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger) + roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger).WithPubSub(pubsub) changed, err := roll.Rollout() if err != nil { diff --git a/cmd/operator/server.go b/cmd/operator/server.go index 5c1c136..0b36d28 100644 --- a/cmd/operator/server.go +++ b/cmd/operator/server.go @@ -5,15 +5,16 @@ import ( "net/http" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" + ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub" "github.com/sirupsen/logrus" ) // makeRolloutHandler creates a request handler to perform a rollout process. -func makeRolloutHandler(logger *logrus.Logger, cfg *config.Config) http.HandlerFunc { +func makeRolloutHandler(logger *logrus.Logger, cfg *config.Config, pubsub ps.Client) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() // TODO(gvso): Handle all the strategies. - errs := runRollouts(ctx, logger, cfg.Strategies[0]) + errs := runRollouts(ctx, logger, cfg.Strategies[0], pubsub) errsStr := rolloutErrsToString(errs) if len(errs) != 0 { msg := fmt.Sprintf("there were %d errors: \n%s", len(errs), errsStr) diff --git a/internal/notification/pubsub/pubsub.go b/internal/notification/pubsub/pubsub.go index 820bdf2..b1961c2 100644 --- a/internal/notification/pubsub/pubsub.go +++ b/internal/notification/pubsub/pubsub.go @@ -23,6 +23,7 @@ const ( // Client represents a client to Google Cloud Pub/Sub. type Client interface { Publish(ctx context.Context, event RolloutEvent) error + Stop() } // PubSub is a Google Cloud Pub/Sub client to publish messages. diff --git a/internal/rollout/rollout.go b/internal/rollout/rollout.go index fc80e29..f2cb51b 100644 --- a/internal/rollout/rollout.go +++ b/internal/rollout/rollout.go @@ -9,6 +9,7 @@ import ( "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/health" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub" runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" "github.com/jonboulle/clockwork" @@ -45,6 +46,7 @@ type Rollout struct { runClient runapi.Client log *logrus.Entry time clockwork.Clock + pubsubClient pubsub.Client // Used to determine if candidate should become stable during update. promoteToStable bool @@ -99,6 +101,12 @@ func (r *Rollout) WithClock(clock clockwork.Clock) *Rollout { return r } +// WithPubSub updates the Pub/Sub client in the rollout instance. +func (r *Rollout) WithPubSub(psClient pubsub.Client) *Rollout { + r.pubsubClient = psClient + return r +} + // Rollout handles the gradual rollout. func (r *Rollout) Rollout() (bool, error) { r.log = r.log.WithFields(logrus.Fields{ @@ -145,7 +153,7 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) { svc = r.updateAnnotations(svc, stable, candidate) r.setHealthReportAnnotation(svc, "new candidate, no health report available yet") - err := r.replaceService(svc) + err := r.replaceServiceAndPublish(svc, true, health.Healthy) return svc, true, errors.Wrap(err, "failed to replace service") } @@ -169,13 +177,21 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) { report := health.StringReport(r.strategy.HealthCriteria, diagnosis, trafficChanged) r.setHealthReportAnnotation(svc, report) - err = r.replaceService(svc) - return svc, trafficChanged, errors.Wrap(err, "failed to replace service") + err = r.replaceServiceAndPublish(svc, trafficChanged, diagnosis.OverallResult) + return svc, true, errors.Wrap(err, "failed to replace service") } -// replaceService updates the service object in Cloud Run. -func (r *Rollout) replaceService(svc *run.Service) error { - _, err := r.runClient.ReplaceService(r.project, r.serviceName, svc) +// replaceServiceAndPublish updates the service object in Cloud Run. Then, it +// publishes to Google Cloud Pub/Sub. +func (r *Rollout) replaceServiceAndPublish(svc *run.Service, trafficChanged bool, diagnosis health.DiagnosisResult) error { + svc, err := r.runClient.ReplaceService(r.project, r.serviceName, svc) + + if trafficChanged { + pubErr := r.publish(svc, diagnosis) + if pubErr != nil { + r.log.Warnf("failed to publish rollout/rollback message: %v", pubErr) + } + } return errors.Wrapf(err, "could not update service %q", r.serviceName) } @@ -232,3 +248,23 @@ func (r *Rollout) diagnoseCandidate(candidate string, healthCriteria []config.He d, err = health.Diagnose(ctx, healthCriteria, metricsValues) return d, errors.Wrap(err, "failed to diagnose candidate's health") } + +func (r *Rollout) publish(svc *run.Service, diagnosis health.DiagnosisResult) error { + if r.pubsubClient == nil { + return nil + } + + event, err := pubsub.NewRolloutEvent(svc, diagnosis, r.promoteToStable) + if err != nil { + return errors.Wrap(err, "failed to create message") + } + ctx := util.ContextWithLogger(r.ctx, r.log) + err = r.pubsubClient.Publish(ctx, event) + if err != nil { + return errors.Wrap(err, "error when publishing message") + } + + // Wait for all messages to be sent (or to fail). + r.pubsubClient.Stop() + return nil +} From 24c6f478d1de26dc5b6950b752039cb1f3406250 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Getulio=20Valentin=20S=C3=A1nchez?= Date: Wed, 26 Aug 2020 10:05:59 -0400 Subject: [PATCH 2/4] Return trafficChanged value in UpdateService MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Getulio Valentin Sánchez --- cmd/operator/main.go | 1 - internal/rollout/rollout.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 87f87de..97804e5 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -171,7 +171,6 @@ func main() { if err != nil { logger.Fatalf("failed to initialize Pub/Sub client: %v", err) } - } if flCLI { diff --git a/internal/rollout/rollout.go b/internal/rollout/rollout.go index f2cb51b..a1d3c51 100644 --- a/internal/rollout/rollout.go +++ b/internal/rollout/rollout.go @@ -178,7 +178,7 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) { r.setHealthReportAnnotation(svc, report) err = r.replaceServiceAndPublish(svc, trafficChanged, diagnosis.OverallResult) - return svc, true, errors.Wrap(err, "failed to replace service") + return svc, trafficChanged, errors.Wrap(err, "failed to replace service") } // replaceServiceAndPublish updates the service object in Cloud Run. Then, it From 68a50302195033f346732d6d04b485e25ec13da7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Getulio=20Valentin=20S=C3=A1nchez?= Date: Wed, 26 Aug 2020 15:13:15 -0400 Subject: [PATCH 3/4] Wait for messages to be published MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Getulio Valentin Sánchez --- cmd/operator/main.go | 1 + internal/notification/pubsub/pubsub.go | 24 ++++++++++-------------- internal/rollout/rollout.go | 7 ++----- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 97804e5..b69ebe0 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -166,6 +166,7 @@ func main() { ctx := context.Background() var pubsub ps.Client if flPubSubTopic != "" { + logger.WithField("topic", flPubSubTopic).Debug("will publish to Pub/Sub") ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger)) pubsub, err = ps.New(ctx, flProject, flPubSubTopic) if err != nil { diff --git a/internal/notification/pubsub/pubsub.go b/internal/notification/pubsub/pubsub.go index b1961c2..3ab6ee8 100644 --- a/internal/notification/pubsub/pubsub.go +++ b/internal/notification/pubsub/pubsub.go @@ -23,7 +23,6 @@ const ( // Client represents a client to Google Cloud Pub/Sub. type Client interface { Publish(ctx context.Context, event RolloutEvent) error - Stop() } // PubSub is a Google Cloud Pub/Sub client to publish messages. @@ -101,20 +100,17 @@ func (ps PubSub) Publish(ctx context.Context, event RolloutEvent) error { } logger := util.LoggerFrom(ctx) - ps.topic.Publish(ctx, &cloudpubsub.Message{ - Data: data, - }) - logger.WithField("size", len(data)).Debug("event published to Pub/Sub") - return nil -} + result := ps.topic.Publish(ctx, &cloudpubsub.Message{Data: data}) + serverID, err := result.Get(ctx) + if err != nil { + return errors.Wrap(err, "failed to publish event to Pub/Sub") + } -// Stop is a wrapper around Cloud Run Pub/Sub package's Stop method on Topic. -// -// It sends all remaining published messages and stop goroutines created for -// handling publishing. Returns once all outstanding messages have been sent or -// have failed to be sent. -func (ps PubSub) Stop() { - ps.topic.Stop() + logger.WithFields(logrus.Fields{ + "size": len(data), + "serverID": serverID, + }).Debug("event published to Pub/Sub") + return nil } // findRevisionWithTag scans the service's traffic configuration and returns the diff --git a/internal/rollout/rollout.go b/internal/rollout/rollout.go index a1d3c51..37e98a1 100644 --- a/internal/rollout/rollout.go +++ b/internal/rollout/rollout.go @@ -187,7 +187,7 @@ func (r *Rollout) replaceServiceAndPublish(svc *run.Service, trafficChanged bool svc, err := r.runClient.ReplaceService(r.project, r.serviceName, svc) if trafficChanged { - pubErr := r.publish(svc, diagnosis) + pubErr := r.publishEvent(svc, diagnosis) if pubErr != nil { r.log.Warnf("failed to publish rollout/rollback message: %v", pubErr) } @@ -249,7 +249,7 @@ func (r *Rollout) diagnoseCandidate(candidate string, healthCriteria []config.He return d, errors.Wrap(err, "failed to diagnose candidate's health") } -func (r *Rollout) publish(svc *run.Service, diagnosis health.DiagnosisResult) error { +func (r *Rollout) publishEvent(svc *run.Service, diagnosis health.DiagnosisResult) error { if r.pubsubClient == nil { return nil } @@ -263,8 +263,5 @@ func (r *Rollout) publish(svc *run.Service, diagnosis health.DiagnosisResult) er if err != nil { return errors.Wrap(err, "error when publishing message") } - - // Wait for all messages to be sent (or to fail). - r.pubsubClient.Stop() return nil } From 623c2175eb55e6fda62f007295c6d85c70d64d14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Getulio=20Valentin=20S=C3=A1nchez?= Date: Wed, 26 Aug 2020 15:15:47 -0400 Subject: [PATCH 4/4] Update errors when publishing fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Getulio Valentin Sánchez --- internal/rollout/rollout.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/rollout/rollout.go b/internal/rollout/rollout.go index 37e98a1..f4ebe83 100644 --- a/internal/rollout/rollout.go +++ b/internal/rollout/rollout.go @@ -256,12 +256,12 @@ func (r *Rollout) publishEvent(svc *run.Service, diagnosis health.DiagnosisResul event, err := pubsub.NewRolloutEvent(svc, diagnosis, r.promoteToStable) if err != nil { - return errors.Wrap(err, "failed to create message") + return errors.Wrap(err, "failed to create rollout event message") } ctx := util.ContextWithLogger(r.ctx, r.log) err = r.pubsubClient.Publish(ctx, event) if err != nil { - return errors.Wrap(err, "error when publishing message") + return errors.Wrap(err, "error when publishing rollout event") } return nil }