diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index 48a32c617a0..6e6cef37b8a 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -7,6 +7,7 @@ package kubernetes import ( "context" + "errors" "time" "github.com/go-logr/logr" @@ -56,14 +57,25 @@ func (m MutatorFunc) Mutate(old client.Object) client.Object { type UpdateHandler struct { log logr.Logger client client.Client + sendUpdates chan struct{} updateChannel chan Update + writer *UpdateWriter } func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { + sendUpdates := make(chan struct{}) + updateChannel := make(chan Update, 100) return &UpdateHandler{ log: log, client: client, - updateChannel: make(chan Update, 100), + sendUpdates: sendUpdates, + updateChannel: updateChannel, + writer: &UpdateWriter{ + log: log, + enabled: sendUpdates, + updateChannel: updateChannel, + eventsBeforeEnabled: make(chan Update, 1000), + }, } } @@ -127,6 +139,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error { u.log.Info("started status update handler") defer u.log.Info("stopped status update handler") + // Enable Updaters to start sending updates to this handler. + close(u.sendUpdates) + u.writer.handleEventsReceivedBeforeEnabled() + for { select { case <-ctx.Done(): @@ -142,9 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Writer retrieves the interface that should be used to write to the UpdateHandler. func (u *UpdateHandler) Writer() Updater { - return &UpdateWriter{ - updateChannel: u.updateChannel, - } + return u.writer } // Updater describes an interface to send status updates somewhere. @@ -154,13 +168,40 @@ type Updater interface { // UpdateWriter takes status updates and sends these to the UpdateHandler via a channel. type UpdateWriter struct { + log logr.Logger + enabled <-chan struct{} updateChannel chan<- Update + // a temporary buffer to store events received before the Updater is enabled. + // These events will be sent to the update channel once the Updater is enabled. + eventsBeforeEnabled chan Update } // Send sends the given Update off to the update channel for writing by the UpdateHandler. func (u *UpdateWriter) Send(update Update) { // Non-blocking receive to see if we should pass along update. - u.updateChannel <- update + select { + case <-u.enabled: + u.updateChannel <- update + default: + if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) { + u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName) + u.eventsBeforeEnabled <- update + } else { + // If the buffer is full, drop the event to avoid blocking the sender. + u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName) + } + } +} + +// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel. +func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() { + go func() { + for e := range u.eventsBeforeEnabled { + u.log.Info("sending stored status update", "event", e.NamespacedName) + u.updateChannel <- e + } + close(u.eventsBeforeEnabled) + }() } // isStatusEqual checks if two objects have equivalent status.