diff --git a/cmd/operator/main.go b/cmd/operator/main.go index f99b4bf..b69ebe0 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -28,6 +28,8 @@ import ( "cloud.google.com/go/compute/metadata" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" + ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" sdlog "github.com/TV4/logrus-stackdriver-formatter" isatty "github.com/mattn/go-isatty" "github.com/pkg/errors" @@ -61,6 +63,7 @@ var ( flHTTPAddr string flProject string flLabelSelector string + flPubSubTopic string // Empty array means all regions. flRegions []string @@ -93,6 +96,7 @@ func init() { flag.StringVar(&flHTTPAddr, "http-addr", defaultAddr, "address where to listen to http requests (e.g. :8080)") flag.StringVar(&flProject, "project", "", "project in which the service is deployed") flag.StringVar(&flLabelSelector, "label", "rollout-strategy=gradual", "filter services based on a label (e.g. team=backend)") + flag.StringVar(&flPubSubTopic, "notify-pubsub", "", "publish rollout events to the given Google Cloud Pub/Sub topic") flag.StringVar(&flRegionsString, "regions", "", "the Cloud Run regions where the services should be looked at") flag.Var(&flSteps, "step", "a percentage in traffic the candidate should go through") flag.StringVar(&flStepsString, "steps", "5,20,50,80", "define steps in one flag separated by commas (e.g. 5,30,60)") @@ -160,19 +164,29 @@ func main() { } ctx := context.Background() + var pubsub ps.Client + if flPubSubTopic != "" { + logger.WithField("topic", flPubSubTopic).Debug("will publish to Pub/Sub") + ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger)) + pubsub, err = ps.New(ctx, flProject, flPubSubTopic) + if err != nil { + logger.Fatalf("failed to initialize Pub/Sub client: %v", err) + } + } + if flCLI { - runDaemon(ctx, logger, cfg) + runDaemon(ctx, logger, cfg, pubsub) } else { - http.HandleFunc("/rollout", makeRolloutHandler(logger, cfg)) + http.HandleFunc("/rollout", makeRolloutHandler(logger, cfg, pubsub)) logger.WithField("addr", flHTTPAddr).Infof("starting server") logger.Fatal(http.ListenAndServe(flHTTPAddr, nil)) } } -func runDaemon(ctx context.Context, logger *logrus.Logger, cfg *config.Config) { +func runDaemon(ctx context.Context, logger *logrus.Logger, cfg *config.Config, pubsub ps.Client) { for { // TODO(gvso): Handle all the strategies. - errs := runRollouts(ctx, logger, cfg.Strategies[0]) + errs := runRollouts(ctx, logger, cfg.Strategies[0], pubsub) errsStr := rolloutErrsToString(errs) if len(errs) != 0 { logger.Warnf("there were %d errors: \n%s", len(errs), errsStr) diff --git a/cmd/operator/rollout.go b/cmd/operator/rollout.go index 9dfed50..0a48afa 100644 --- a/cmd/operator/rollout.go +++ b/cmd/operator/rollout.go @@ -9,6 +9,7 @@ import ( "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/sheets" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/stackdriver" + ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout" runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run" "github.com/pkg/errors" @@ -16,7 +17,7 @@ import ( ) // runRollouts concurrently handles the rollout of the targeted services. -func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy) []error { +func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy, pubsub ps.Client) []error { svcs, err := getTargetedServices(ctx, logger, strategy.Target) if err != nil { return []error{errors.Wrap(err, "failed to get targeted services")} @@ -32,16 +33,16 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str ) for _, svc := range svcs { wg.Add(1) - go func(ctx context.Context, lg *logrus.Logger, svc *rollout.ServiceRecord, strategy config.Strategy) { + go func(ctx context.Context, lg *logrus.Logger, svc *rollout.ServiceRecord, strategy config.Strategy, pubsub ps.Client) { defer wg.Done() - err := handleRollout(ctx, lg, svc, strategy) + err := handleRollout(ctx, lg, svc, strategy, pubsub) if err != nil { lg.Debugf("rollout error for service %q: %+v", svc.Service.Metadata.Name, err) mu.Lock() errs = append(errs, err) mu.Unlock() } - }(ctx, logger, svc, strategy) + }(ctx, logger, svc, strategy, pubsub) } wg.Wait() @@ -49,7 +50,7 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str } // handleRollout manages the rollout process for a single service. -func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy) error { +func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy, pubsub ps.Client) error { lg := logger.WithFields(logrus.Fields{ "project": service.Project, "service": service.Metadata.Name, @@ -64,7 +65,7 @@ func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout. if err != nil { return errors.Wrap(err, "failed to initialize metrics provider") } - roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger) + roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger).WithPubSub(pubsub) changed, err := roll.Rollout() if err != nil { diff --git a/cmd/operator/server.go b/cmd/operator/server.go index 5c1c136..0b36d28 100644 --- a/cmd/operator/server.go +++ b/cmd/operator/server.go @@ -5,15 +5,16 @@ import ( "net/http" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" + ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub" "github.com/sirupsen/logrus" ) // makeRolloutHandler creates a request handler to perform a rollout process. -func makeRolloutHandler(logger *logrus.Logger, cfg *config.Config) http.HandlerFunc { +func makeRolloutHandler(logger *logrus.Logger, cfg *config.Config, pubsub ps.Client) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() // TODO(gvso): Handle all the strategies. - errs := runRollouts(ctx, logger, cfg.Strategies[0]) + errs := runRollouts(ctx, logger, cfg.Strategies[0], pubsub) errsStr := rolloutErrsToString(errs) if len(errs) != 0 { msg := fmt.Sprintf("there were %d errors: \n%s", len(errs), errsStr) diff --git a/internal/notification/pubsub/pubsub.go b/internal/notification/pubsub/pubsub.go index 820bdf2..3ab6ee8 100644 --- a/internal/notification/pubsub/pubsub.go +++ b/internal/notification/pubsub/pubsub.go @@ -100,20 +100,17 @@ func (ps PubSub) Publish(ctx context.Context, event RolloutEvent) error { } logger := util.LoggerFrom(ctx) - ps.topic.Publish(ctx, &cloudpubsub.Message{ - Data: data, - }) - logger.WithField("size", len(data)).Debug("event published to Pub/Sub") - return nil -} + result := ps.topic.Publish(ctx, &cloudpubsub.Message{Data: data}) + serverID, err := result.Get(ctx) + if err != nil { + return errors.Wrap(err, "failed to publish event to Pub/Sub") + } -// Stop is a wrapper around Cloud Run Pub/Sub package's Stop method on Topic. -// -// It sends all remaining published messages and stop goroutines created for -// handling publishing. Returns once all outstanding messages have been sent or -// have failed to be sent. -func (ps PubSub) Stop() { - ps.topic.Stop() + logger.WithFields(logrus.Fields{ + "size": len(data), + "serverID": serverID, + }).Debug("event published to Pub/Sub") + return nil } // findRevisionWithTag scans the service's traffic configuration and returns the diff --git a/internal/rollout/rollout.go b/internal/rollout/rollout.go index fc80e29..f4ebe83 100644 --- a/internal/rollout/rollout.go +++ b/internal/rollout/rollout.go @@ -9,6 +9,7 @@ import ( "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/health" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics" + "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub" runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run" "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util" "github.com/jonboulle/clockwork" @@ -45,6 +46,7 @@ type Rollout struct { runClient runapi.Client log *logrus.Entry time clockwork.Clock + pubsubClient pubsub.Client // Used to determine if candidate should become stable during update. promoteToStable bool @@ -99,6 +101,12 @@ func (r *Rollout) WithClock(clock clockwork.Clock) *Rollout { return r } +// WithPubSub updates the Pub/Sub client in the rollout instance. +func (r *Rollout) WithPubSub(psClient pubsub.Client) *Rollout { + r.pubsubClient = psClient + return r +} + // Rollout handles the gradual rollout. func (r *Rollout) Rollout() (bool, error) { r.log = r.log.WithFields(logrus.Fields{ @@ -145,7 +153,7 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) { svc = r.updateAnnotations(svc, stable, candidate) r.setHealthReportAnnotation(svc, "new candidate, no health report available yet") - err := r.replaceService(svc) + err := r.replaceServiceAndPublish(svc, true, health.Healthy) return svc, true, errors.Wrap(err, "failed to replace service") } @@ -169,13 +177,21 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) { report := health.StringReport(r.strategy.HealthCriteria, diagnosis, trafficChanged) r.setHealthReportAnnotation(svc, report) - err = r.replaceService(svc) + err = r.replaceServiceAndPublish(svc, trafficChanged, diagnosis.OverallResult) return svc, trafficChanged, errors.Wrap(err, "failed to replace service") } -// replaceService updates the service object in Cloud Run. -func (r *Rollout) replaceService(svc *run.Service) error { - _, err := r.runClient.ReplaceService(r.project, r.serviceName, svc) +// replaceServiceAndPublish updates the service object in Cloud Run. Then, it +// publishes to Google Cloud Pub/Sub. +func (r *Rollout) replaceServiceAndPublish(svc *run.Service, trafficChanged bool, diagnosis health.DiagnosisResult) error { + svc, err := r.runClient.ReplaceService(r.project, r.serviceName, svc) + + if trafficChanged { + pubErr := r.publishEvent(svc, diagnosis) + if pubErr != nil { + r.log.Warnf("failed to publish rollout/rollback message: %v", pubErr) + } + } return errors.Wrapf(err, "could not update service %q", r.serviceName) } @@ -232,3 +248,20 @@ func (r *Rollout) diagnoseCandidate(candidate string, healthCriteria []config.He d, err = health.Diagnose(ctx, healthCriteria, metricsValues) return d, errors.Wrap(err, "failed to diagnose candidate's health") } + +func (r *Rollout) publishEvent(svc *run.Service, diagnosis health.DiagnosisResult) error { + if r.pubsubClient == nil { + return nil + } + + event, err := pubsub.NewRolloutEvent(svc, diagnosis, r.promoteToStable) + if err != nil { + return errors.Wrap(err, "failed to create rollout event message") + } + ctx := util.ContextWithLogger(r.ctx, r.log) + err = r.pubsubClient.Publish(ctx, event) + if err != nil { + return errors.Wrap(err, "error when publishing rollout event") + } + return nil +}