Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event processing histogram metric #1134

Merged
merged 5 commits into from
Oct 12, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add batch ID and inject loggers
kate-osborn committed Oct 12, 2023
commit 6288b83d120e883d2d35af6c9a49387bac0dabd1
23 changes: 13 additions & 10 deletions internal/framework/events/eventsfakes/fake_event_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/framework/events/handler.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ package events

import (
"context"

"github.com/go-logr/logr"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . EventHandler
@@ -10,5 +12,5 @@ import (
type EventHandler interface {
// HandleEventBatch handles a batch of events.
// EventBatch can include duplicated events.
HandleEventBatch(ctx context.Context, batch EventBatch)
HandleEventBatch(ctx context.Context, logger logr.Logger, batch EventBatch)
}
12 changes: 9 additions & 3 deletions internal/framework/events/loop.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,9 @@ type EventLoop struct {
// The batches are swapped before starting the handler goroutine.
currentBatch EventBatch
nextBatch EventBatch

// the ID of the current batch
currentBatchID int
}

// NewEventLoop creates a new EventLoop.
@@ -63,11 +66,14 @@ func (el *EventLoop) Start(ctx context.Context) error {

handleBatch := func() {
go func(batch EventBatch) {
el.logger.Info("Handling events from the batch", "total", len(batch))
el.currentBatchID++
batchLogger := el.logger.WithName("eventHandler").WithValues("batchID", el.currentBatchID)

batchLogger.Info("Handling events from the batch", "total", len(batch))

el.handler.HandleEventBatch(ctx, batch)
el.handler.HandleEventBatch(ctx, batchLogger, batch)

el.logger.Info("Finished handling the batch")
batchLogger.Info("Finished handling the batch")
handlingDone <- struct{}{}
}(el.currentBatch)
}
11 changes: 6 additions & 5 deletions internal/framework/events/loop_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
@@ -47,7 +48,7 @@ var _ = Describe("EventLoop", func() {

// Ensure the first batch is handled
Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1))
_, batch = fakeHandler.HandleEventBatchArgsForCall(0)
_, _, batch = fakeHandler.HandleEventBatchArgsForCall(0)

var expectedBatch events.EventBatch = []interface{}{"event0"}
Expect(batch).Should(Equal(expectedBatch))
@@ -70,7 +71,7 @@ var _ = Describe("EventLoop", func() {
eventCh <- e

Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2))
_, batch := fakeHandler.HandleEventBatchArgsForCall(1)
_, _, batch := fakeHandler.HandleEventBatchArgsForCall(1)

var expectedBatch events.EventBatch = []interface{}{e}
Expect(batch).Should(Equal(expectedBatch))
@@ -82,7 +83,7 @@ var _ = Describe("EventLoop", func() {

// The func below will pause the handler goroutine while it is processing the batch with e1 until
// sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime.
fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) {
fakeHandler.HandleEventBatchCalls(func(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
close(firstHandleEventBatchCallInProgress)
<-sentSecondAndThirdEvents
})
@@ -106,14 +107,14 @@ var _ = Describe("EventLoop", func() {
close(sentSecondAndThirdEvents)

Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(3))
_, batch := fakeHandler.HandleEventBatchArgsForCall(1)
_, _, batch := fakeHandler.HandleEventBatchArgsForCall(1)

var expectedBatch events.EventBatch = []interface{}{e1}

// the first HandleEventBatch() call must have handled a batch with e1
Expect(batch).Should(Equal(expectedBatch))

_, batch = fakeHandler.HandleEventBatchArgsForCall(2)
_, _, batch = fakeHandler.HandleEventBatchArgsForCall(2)

expectedBatch = []interface{}{e2, e3}
// the second HandleEventBatch() call must have handled a batch with e2 and e3
13 changes: 5 additions & 8 deletions internal/mode/provisioner/handler.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@ type eventHandler struct {

statusUpdater status.Updater
k8sClient client.Client
logger logr.Logger

staticModeDeploymentYAML []byte

@@ -38,7 +37,6 @@ func newEventHandler(
gcName string,
statusUpdater status.Updater,
k8sClient client.Client,
logger logr.Logger,
staticModeDeploymentYAML []byte,
) *eventHandler {
return &eventHandler{
@@ -47,7 +45,6 @@ func newEventHandler(
statusUpdater: statusUpdater,
gcName: gcName,
k8sClient: k8sClient,
logger: logger,
staticModeDeploymentYAML: staticModeDeploymentYAML,
gatewayNextID: 1,
}
@@ -80,7 +77,7 @@ func (h *eventHandler) setGatewayClassStatuses(ctx context.Context) {
h.statusUpdater.Update(ctx, statuses)
}

func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {
func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context, logger logr.Logger) {
var gwsWithoutDeps, removedGwsWithDeps []types.NamespacedName

for nsname, gw := range h.store.gateways {
@@ -116,7 +113,7 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {

h.provisions[nsname] = deployment

h.logger.Info(
logger.Info(
"Created deployment",
"deployment", client.ObjectKeyFromObject(deployment),
"gateway", nsname,
@@ -134,18 +131,18 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {

delete(h.provisions, nsname)

h.logger.Info(
logger.Info(
"Deleted deployment",
"deployment", client.ObjectKeyFromObject(deployment),
"gateway", nsname,
)
}
}

func (h *eventHandler) HandleEventBatch(ctx context.Context, batch events.EventBatch) {
func (h *eventHandler) HandleEventBatch(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
h.store.update(batch)
h.setGatewayClassStatuses(ctx)
h.ensureDeploymentsMatchGateways(ctx)
h.ensureDeploymentsMatchGateways(ctx, logger)
}

func (h *eventHandler) generateDeploymentID() string {
23 changes: 10 additions & 13 deletions internal/mode/provisioner/handler_test.go
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ var _ = Describe("handler", func() {
Resource: gc,
},
}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

// Ensure GatewayClass is accepted

@@ -126,7 +126,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

depNsName := types.NamespacedName{
Namespace: "nginx-gateway",
@@ -156,7 +156,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
@@ -179,7 +179,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
embeddedfiles.StaticModeDeploymentYAML,
)
})
@@ -217,7 +216,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
deps := &v1.DeploymentList{}

err := k8sclient.List(context.Background(), deps)
@@ -237,7 +236,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

deps := &v1.DeploymentList{}

@@ -266,7 +265,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

deps := &v1.DeploymentList{}
err := k8sclient.List(context.Background(), deps)
@@ -295,7 +294,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

unknownGC := &v1beta1.GatewayClass{}
err = k8sclient.Get(context.Background(), client.ObjectKeyFromObject(gc), unknownGC)
@@ -330,7 +329,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
embeddedfiles.StaticModeDeploymentYAML,
)
})
@@ -340,7 +338,7 @@ var _ = Describe("handler", func() {
batch := []interface{}{e}

handle := func() {
handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
@@ -408,7 +406,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
@@ -429,7 +427,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
@@ -442,7 +440,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
[]byte("broken YAML"),
)

1 change: 0 additions & 1 deletion internal/mode/provisioner/manager.go
Original file line number Diff line number Diff line change
@@ -107,7 +107,6 @@ func StartManager(cfg Config) error {
cfg.GatewayClassName,
statusUpdater,
mgr.GetClient(),
cfg.Logger.WithName("eventHandler"),
embeddedfiles.StaticModeDeploymentYAML,
)

30 changes: 16 additions & 14 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
@@ -52,8 +52,6 @@ type eventHandlerConfig struct {
controlConfigNSName types.NamespacedName
// metricsCollector collects metrics for this controller.
metricsCollector handlerMetricsCollector
// logger is the logger to be used by the EventHandler.
logger logr.Logger
// version is the current version number of the nginx config.
version int
}
@@ -74,13 +72,13 @@ func newEventHandlerImpl(cfg eventHandlerConfig) *eventHandlerImpl {
}
}

func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.EventBatch) {
func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
start := time.Now()
h.cfg.logger.V(1).Info("Started processing event batch")
logger.V(1).Info("Started processing event batch")

defer func() {
duration := time.Since(start)
h.cfg.logger.V(1).Info(
logger.V(1).Info(
"Finished processing event batch",
"duration", duration.String(),
)
@@ -91,13 +89,13 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev
switch e := event.(type) {
case *events.UpsertEvent:
if cfg, ok := e.Resource.(*ngfAPI.NginxGateway); ok {
h.updateControlPlaneAndSetStatus(ctx, cfg)
h.updateControlPlaneAndSetStatus(ctx, logger, cfg)
} else {
h.cfg.processor.CaptureUpsertChange(e.Resource)
}
case *events.DeleteEvent:
if _, ok := e.Type.(*ngfAPI.NginxGateway); ok {
h.updateControlPlaneAndSetStatus(ctx, nil)
h.updateControlPlaneAndSetStatus(ctx, logger, nil)
} else {
h.cfg.processor.CaptureDeleteChange(e.Type, e.NamespacedName)
}
@@ -108,7 +106,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev

changed, graph := h.cfg.processor.Process()
if !changed {
h.cfg.logger.Info("Handling events didn't result into NGINX configuration changes")
logger.Info("Handling events didn't result into NGINX configuration changes")
if !h.cfg.healthChecker.ready && h.cfg.healthChecker.firstBatchError == nil {
h.cfg.healthChecker.setAsReady()
}
@@ -121,13 +119,13 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev
ctx,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
); err != nil {
h.cfg.logger.Error(err, "Failed to update NGINX configuration")
logger.Error(err, "Failed to update NGINX configuration")
nginxReloadRes.error = err
if !h.cfg.healthChecker.ready {
h.cfg.healthChecker.firstBatchError = err
}
} else {
h.cfg.logger.Info("NGINX configuration was successfully updated")
logger.Info("NGINX configuration was successfully updated")
if !h.cfg.healthChecker.ready {
h.cfg.healthChecker.setAsReady()
}
@@ -152,17 +150,21 @@ func (h *eventHandlerImpl) updateNginx(ctx context.Context, conf dataplane.Confi

// updateControlPlaneAndSetStatus updates the control plane configuration and then sets the status
// based on the outcome
func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(ctx context.Context, cfg *ngfAPI.NginxGateway) {
func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(
ctx context.Context,
logger logr.Logger,
cfg *ngfAPI.NginxGateway,
) {
var cond []conditions.Condition
if err := updateControlPlane(
cfg,
h.cfg.logger,
logger,
h.cfg.eventRecorder,
h.cfg.controlConfigNSName,
h.cfg.logLevelSetter,
); err != nil {
msg := "Failed to update control plane configuration"
h.cfg.logger.Error(err, msg)
logger.Error(err, msg)
h.cfg.eventRecorder.Eventf(
cfg,
apiv1.EventTypeWarning,
@@ -183,6 +185,6 @@ func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(ctx context.Context, c
}

h.cfg.statusUpdater.Update(ctx, nginxGatewayStatus)
h.cfg.logger.Info("Reconfigured control plane.")
logger.Info("Reconfigured control plane.")
}
}
27 changes: 13 additions & 14 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
@@ -69,7 +69,6 @@ var _ = Describe("eventHandler", func() {
handler = newEventHandlerImpl(eventHandlerConfig{
processor: fakeProcessor,
generator: fakeGenerator,
logger: ctlrZap.New(),
logLevelSetter: newZapLogLevelSetter(zap.NewAtomicLevel()),
nginxFileMgr: fakeNginxFileMgr,
nginxRuntimeMgr: fakeNginxRuntimeMgr,
@@ -117,7 +116,7 @@ var _ = Describe("eventHandler", func() {
e := &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}}
batch := []interface{}{e}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

checkUpsertEventExpectations(e)
expectReconfig(dataplane.Configuration{Version: 1}, fakeCfgFiles)
@@ -130,7 +129,7 @@ var _ = Describe("eventHandler", func() {
}
batch := []interface{}{e}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

checkDeleteEventExpectations(e)
expectReconfig(dataplane.Configuration{Version: 1}, fakeCfgFiles)
@@ -146,12 +145,12 @@ var _ = Describe("eventHandler", func() {
}
batch := []interface{}{upsertEvent, deleteEvent}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

checkUpsertEventExpectations(upsertEvent)
checkDeleteEventExpectations(deleteEvent)

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
})
})
})
@@ -184,7 +183,7 @@ var _ = Describe("eventHandler", func() {

It("handles a valid config", func() {
batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevelError)}}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1))
_, statuses := fakeStatusUpdater.UpdateArgsForCall(0)
@@ -195,7 +194,7 @@ var _ = Describe("eventHandler", func() {

It("handles an invalid config", func() {
batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevel("invalid"))}}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1))
_, statuses := fakeStatusUpdater.UpdateArgsForCall(0)
@@ -214,7 +213,7 @@ var _ = Describe("eventHandler", func() {

It("handles a deleted config", func() {
batch := []interface{}{&events.DeleteEvent{Type: &ngfAPI.NginxGateway{}}}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(len(fakeEventRecorder.Events)).To(Equal(1))
event := <-fakeEventRecorder.Events
Expect(event).To(Equal("Warning ResourceDeleted NginxGateway configuration was deleted; using defaults"))
@@ -229,7 +228,7 @@ var _ = Describe("eventHandler", func() {
fakeProcessor.ProcessReturns(true, &graph.Graph{})

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed())
})

@@ -238,7 +237,7 @@ var _ = Describe("eventHandler", func() {
batch := []interface{}{e}

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed())
})

@@ -249,22 +248,22 @@ var _ = Describe("eventHandler", func() {
fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(errors.New("reload error"))

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())

// now send an update with no changes; should still return an error
fakeProcessor.ProcessReturns(false, &graph.Graph{})

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())

// error goes away
fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(nil)

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed())
})
@@ -274,7 +273,7 @@ var _ = Describe("eventHandler", func() {

handle := func() {
batch := []interface{}{e}
handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
}

Expect(handle).Should(Panic())
1 change: 0 additions & 1 deletion internal/mode/static/manager.go
Original file line number Diff line number Diff line change
@@ -168,7 +168,6 @@ func StartManager(cfg config.Config) error {
processor: processor,
serviceResolver: resolver.NewServiceResolverImpl(mgr.GetClient()),
generator: ngxcfg.NewGeneratorImpl(),
logger: cfg.Logger.WithName("eventHandler"),
logLevelSetter: logLevelSetter,
nginxFileMgr: file.NewManagerImpl(
cfg.Logger.WithName("nginxFileManager"),