Skip to content
This repository has been archived by the owner on Jan 11, 2023. It is now read-only.

rollout: Publish rollout events to PubSub #109

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"cloud.google.com/go/compute/metadata"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
sdlog "github.com/TV4/logrus-stackdriver-formatter"
isatty "github.com/mattn/go-isatty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -61,6 +63,7 @@ var (
flHTTPAddr string
flProject string
flLabelSelector string
flPubSubTopic string

// Empty array means all regions.
flRegions []string
Expand Down Expand Up @@ -93,6 +96,7 @@ func init() {
flag.StringVar(&flHTTPAddr, "http-addr", defaultAddr, "address where to listen to http requests (e.g. :8080)")
flag.StringVar(&flProject, "project", "", "project in which the service is deployed")
flag.StringVar(&flLabelSelector, "label", "rollout-strategy=gradual", "filter services based on a label (e.g. team=backend)")
flag.StringVar(&flPubSubTopic, "notify-pubsub", "", "publish rollout events to the given Google Cloud Pub/Sub topic")
flag.StringVar(&flRegionsString, "regions", "", "the Cloud Run regions where the services should be looked at")
flag.Var(&flSteps, "step", "a percentage in traffic the candidate should go through")
flag.StringVar(&flStepsString, "steps", "5,20,50,80", "define steps in one flag separated by commas (e.g. 5,30,60)")
Expand Down Expand Up @@ -160,19 +164,29 @@ func main() {
}

ctx := context.Background()
var pubsub ps.Client
if flPubSubTopic != "" {
ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger))
pubsub, err = ps.New(ctx, flProject, flPubSubTopic)
if err != nil {
logger.Fatalf("failed to initialize Pub/Sub client: %v", err)
}

}

if flCLI {
runDaemon(ctx, logger, cfg)
runDaemon(ctx, logger, cfg, pubsub)
} else {
http.HandleFunc("/rollout", makeRolloutHandler(logger, cfg))
http.HandleFunc("/rollout", makeRolloutHandler(logger, cfg, pubsub))
logger.WithField("addr", flHTTPAddr).Infof("starting server")
logger.Fatal(http.ListenAndServe(flHTTPAddr, nil))
}
}

func runDaemon(ctx context.Context, logger *logrus.Logger, cfg *config.Config) {
func runDaemon(ctx context.Context, logger *logrus.Logger, cfg *config.Config, pubsub ps.Client) {
for {
// TODO(gvso): Handle all the strategies.
errs := runRollouts(ctx, logger, cfg.Strategies[0])
errs := runRollouts(ctx, logger, cfg.Strategies[0], pubsub)
errsStr := rolloutErrsToString(errs)
if len(errs) != 0 {
logger.Warnf("there were %d errors: \n%s", len(errs), errsStr)
Expand Down
13 changes: 7 additions & 6 deletions cmd/operator/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/sheets"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/stackdriver"
ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// runRollouts concurrently handles the rollout of the targeted services.
func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy) []error {
func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy, pubsub ps.Client) []error {
svcs, err := getTargetedServices(ctx, logger, strategy.Target)
if err != nil {
return []error{errors.Wrap(err, "failed to get targeted services")}
Expand All @@ -32,24 +33,24 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str
)
for _, svc := range svcs {
wg.Add(1)
go func(ctx context.Context, lg *logrus.Logger, svc *rollout.ServiceRecord, strategy config.Strategy) {
go func(ctx context.Context, lg *logrus.Logger, svc *rollout.ServiceRecord, strategy config.Strategy, pubsub ps.Client) {
defer wg.Done()
err := handleRollout(ctx, lg, svc, strategy)
err := handleRollout(ctx, lg, svc, strategy, pubsub)
if err != nil {
lg.Debugf("rollout error for service %q: %+v", svc.Service.Metadata.Name, err)
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(ctx, logger, svc, strategy)
}(ctx, logger, svc, strategy, pubsub)
}
wg.Wait()

return errs
}

// handleRollout manages the rollout process for a single service.
func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy) error {
func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy, pubsub ps.Client) error {
lg := logger.WithFields(logrus.Fields{
"project": service.Project,
"service": service.Metadata.Name,
Expand All @@ -64,7 +65,7 @@ func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.
if err != nil {
return errors.Wrap(err, "failed to initialize metrics provider")
}
roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger)
roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger).WithPubSub(pubsub)

changed, err := roll.Rollout()
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/operator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"net/http"

"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
ps "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub"
"github.com/sirupsen/logrus"
)

// makeRolloutHandler creates a request handler to perform a rollout process.
func makeRolloutHandler(logger *logrus.Logger, cfg *config.Config) http.HandlerFunc {
func makeRolloutHandler(logger *logrus.Logger, cfg *config.Config, pubsub ps.Client) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
// TODO(gvso): Handle all the strategies.
errs := runRollouts(ctx, logger, cfg.Strategies[0])
errs := runRollouts(ctx, logger, cfg.Strategies[0], pubsub)
errsStr := rolloutErrsToString(errs)
if len(errs) != 0 {
msg := fmt.Sprintf("there were %d errors: \n%s", len(errs), errsStr)
Expand Down
1 change: 1 addition & 0 deletions internal/notification/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
// Client represents a client to Google Cloud Pub/Sub.
type Client interface {
Publish(ctx context.Context, event RolloutEvent) error
Stop()
}

// PubSub is a Google Cloud Pub/Sub client to publish messages.
Expand Down
48 changes: 42 additions & 6 deletions internal/rollout/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/health"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/notification/pubsub"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -45,6 +46,7 @@ type Rollout struct {
runClient runapi.Client
log *logrus.Entry
time clockwork.Clock
pubsubClient pubsub.Client

// Used to determine if candidate should become stable during update.
promoteToStable bool
Expand Down Expand Up @@ -99,6 +101,12 @@ func (r *Rollout) WithClock(clock clockwork.Clock) *Rollout {
return r
}

// WithPubSub updates the Pub/Sub client in the rollout instance.
func (r *Rollout) WithPubSub(psClient pubsub.Client) *Rollout {
r.pubsubClient = psClient
return r
}

// Rollout handles the gradual rollout.
func (r *Rollout) Rollout() (bool, error) {
r.log = r.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -145,7 +153,7 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) {
svc = r.updateAnnotations(svc, stable, candidate)
r.setHealthReportAnnotation(svc, "new candidate, no health report available yet")

err := r.replaceService(svc)
err := r.replaceServiceAndPublish(svc, true, health.Healthy)
return svc, true, errors.Wrap(err, "failed to replace service")
}

Expand All @@ -169,13 +177,21 @@ func (r *Rollout) UpdateService(svc *run.Service) (*run.Service, bool, error) {
report := health.StringReport(r.strategy.HealthCriteria, diagnosis, trafficChanged)
r.setHealthReportAnnotation(svc, report)

err = r.replaceService(svc)
return svc, trafficChanged, errors.Wrap(err, "failed to replace service")
err = r.replaceServiceAndPublish(svc, trafficChanged, diagnosis.OverallResult)
return svc, true, errors.Wrap(err, "failed to replace service")
}

// replaceService updates the service object in Cloud Run.
func (r *Rollout) replaceService(svc *run.Service) error {
_, err := r.runClient.ReplaceService(r.project, r.serviceName, svc)
// replaceServiceAndPublish updates the service object in Cloud Run. Then, it
// publishes to Google Cloud Pub/Sub.
func (r *Rollout) replaceServiceAndPublish(svc *run.Service, trafficChanged bool, diagnosis health.DiagnosisResult) error {
svc, err := r.runClient.ReplaceService(r.project, r.serviceName, svc)

if trafficChanged {
pubErr := r.publish(svc, diagnosis)
if pubErr != nil {
r.log.Warnf("failed to publish rollout/rollback message: %v", pubErr)
}
}
return errors.Wrapf(err, "could not update service %q", r.serviceName)
}

Expand Down Expand Up @@ -232,3 +248,23 @@ func (r *Rollout) diagnoseCandidate(candidate string, healthCriteria []config.He
d, err = health.Diagnose(ctx, healthCriteria, metricsValues)
return d, errors.Wrap(err, "failed to diagnose candidate's health")
}

func (r *Rollout) publish(svc *run.Service, diagnosis health.DiagnosisResult) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better name like publishEvent

if r.pubsubClient == nil {
return nil
}

event, err := pubsub.NewRolloutEvent(svc, diagnosis, r.promoteToStable)
if err != nil {
return errors.Wrap(err, "failed to create message")
}
ctx := util.ContextWithLogger(r.ctx, r.log)
err = r.pubsubClient.Publish(ctx, event)
if err != nil {
return errors.Wrap(err, "error when publishing message")
}

// Wait for all messages to be sent (or to fail).
r.pubsubClient.Stop()
Copy link
Contributor Author

@gvso gvso Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're publishing one event per go routine (in each rollout process), we might just wait for the event to be published using PublishResult.Get. In this way, we can abstract the synchronous publishing from the user of the pubsub package

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's a decent idea. I think that could be done with

waitFn, err := PublishMessage()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets move Get into Publish

return nil
}