diff --git a/controllers/eventbus/installer/installer.go b/controllers/eventbus/installer/installer.go index 6ed8a444d9..0f932c71eb 100644 --- a/controllers/eventbus/installer/installer.go +++ b/controllers/eventbus/installer/installer.go @@ -25,12 +25,12 @@ type Installer interface { func Install(ctx context.Context, eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error { installer, err := getInstaller(eventBus, client, natsStreamingImage, natsMetricsImage, logger) if err != nil { - logger.Desugar().Error("failed to an installer", zap.Error(err)) + logger.Errorw("failed to an installer", zap.Error(err)) return err } busConfig, err := installer.Install(ctx) if err != nil { - logger.Desugar().Error("installation error", zap.Error(err)) + logger.Errorw("installation error", zap.Error(err)) return err } eventBus.Status.Config = *busConfig @@ -86,7 +86,7 @@ func Uninstall(ctx context.Context, eventBus *v1alpha1.EventBus, client client.C installer, err := getInstaller(eventBus, client, natsStreamingImage, natsMetricsImage, logger) if err != nil { - logger.Desugar().Error("failed to get an installer", zap.Error(err)) + logger.Errorw("failed to get an installer", zap.Error(err)) return err } return installer.Uninstall(ctx) diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index 74d8ccdd99..1f76787cdf 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -74,14 +74,14 @@ func NewNATSStreaming(url, clusterID, subject, clientID string, auth *Auth, logg } func (n *natsStreaming) Connect() (Connection, error) { - log := n.logger.With("clientID", n.clientID).Desugar() + log := n.logger.With("clientID", n.clientID) conn := &natsStreamingConnection{} opts := []nats.Option{ // Do not reconnect here but handle reconnction outside nats.NoReconnect(), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { conn.natsConnected = false - log.Error("NATS connection lost", zap.Error(err)) + log.Errorw("NATS connection lost", zap.Error(err)) }), nats.ReconnectHandler(func(nnc *nats.Conn) { conn.natsConnected = true @@ -99,7 +99,7 @@ func (n *natsStreaming) Connect() (Connection, error) { } nc, err := nats.Connect(n.url, opts...) if err != nil { - log.Error("Failed to connect to NATS server", zap.Error(err)) + log.Errorw("Failed to connect to NATS server", zap.Error(err)) return nil, err } log.Info("Connected to NATS server.") @@ -109,10 +109,10 @@ func (n *natsStreaming) Connect() (Connection, error) { sc, err := stan.Connect(n.clusterID, n.clientID, stan.NatsConn(nc), stan.Pings(5, 60), stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) { conn.stanConnected = false - log.Error("NATS streaming connection lost", zap.Error(err)) + log.Errorw("NATS streaming connection lost", zap.Error(err)) })) if err != nil { - log.Error("Failed to connect to NATS streaming server", zap.Error(err)) + log.Errorw("Failed to connect to NATS streaming server", zap.Error(err)) return nil, err } log.Info("Connected to NATS streaming server.") diff --git a/eventsources/cmd/main.go b/eventsources/cmd/main.go index 50232338a0..722a3a8ce2 100644 --- a/eventsources/cmd/main.go +++ b/eventsources/cmd/main.go @@ -26,11 +26,11 @@ func main() { } eventSourceSpec, err := base64.StdEncoding.DecodeString(encodedEventSourceSpec) if err != nil { - logger.Desugar().Fatal("failed to decode eventsource string", zap.Error(err)) + logger.Fatalw("failed to decode eventsource string", zap.Error(err)) } eventSource := &v1alpha1.EventSource{} if err = json.Unmarshal(eventSourceSpec, eventSource); err != nil { - logger.Desugar().Fatal("failed to unmarshal eventsource object", zap.Error(err)) + logger.Fatalw("failed to unmarshal eventsource object", zap.Error(err)) } busConfig := &eventbusv1alpha1.BusConfig{} @@ -38,10 +38,10 @@ func main() { if len(encodedBusConfigSpec) > 0 { busConfigSpec, err := base64.StdEncoding.DecodeString(encodedBusConfigSpec) if err != nil { - logger.Desugar().Fatal("failed to decode bus config string", zap.Error(err)) + logger.Fatalw("failed to decode bus config string", zap.Error(err)) } if err = json.Unmarshal(busConfigSpec, busConfig); err != nil { - logger.Desugar().Fatal("failed to unmarshal bus config object", zap.Error(err)) + logger.Fatalw("failed to unmarshal bus config object", zap.Error(err)) } } diff --git a/eventsources/common/webhook/webhook.go b/eventsources/common/webhook/webhook.go index 95444e9b24..065de1af86 100644 --- a/eventsources/common/webhook/webhook.go +++ b/eventsources/common/webhook/webhook.go @@ -102,19 +102,19 @@ func startServer(router Router, controller *Controller) { } err = server.ListenAndServeTLS(certPath, keyPath) if err != nil { - route.Logger.With("port", route.Context.Port).Desugar().Error("failed to listen and serve with TLS configured", zap.Error(err)) + route.Logger.With("port", route.Context.Port).Errorw("failed to listen and serve with TLS configured", zap.Error(err)) } case route.Context.DeprecatedServerCertPath != "" && route.Context.DeprecatedServerKeyPath != "": // DEPRECATED. route.Logger.Warn("ServerCertPath and ServerKeyPath are deprecated, please use ServerCertSecret and ServerKeySecret") err := server.ListenAndServeTLS(route.Context.DeprecatedServerCertPath, route.Context.DeprecatedServerKeyPath) if err != nil { - route.Logger.With("port", route.Context.Port).Desugar().Error("failed to listen and serve with TLS configured", zap.Error(err)) + route.Logger.With("port", route.Context.Port).Errorw("failed to listen and serve with TLS configured", zap.Error(err)) } default: err := server.ListenAndServe() if err != nil { - route.Logger.With("port", route.Context.Port).Desugar().Error("failed to listen and serve", zap.Error(err)) + route.Logger.With("port", route.Context.Port).Errorw("failed to listen and serve", zap.Error(err)) } } }() @@ -182,13 +182,13 @@ func activateRoute(router Router, controller *Controller) { // manageRouteChannels consumes data from route's data channel and stops the processing when the event source is stopped/removed func manageRouteChannels(router Router, dispatch func([]byte) error) { route := router.GetRoute() - logger := route.Logger.Desugar() + logger := route.Logger for { select { case data := <-route.DataCh: logger.Info("new event received, dispatching it...") if err := dispatch(data); err != nil { - logger.Error("failed to send event", zap.Error(err)) + logger.Errorw("failed to send event", zap.Error(err)) route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName) continue } @@ -204,7 +204,7 @@ func manageRouteChannels(router Router, dispatch func([]byte) error) { func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte) error) error { route := router.GetRoute() - logger := route.Logger.Desugar() + logger := route.Logger // in order to process a route, it needs to go through // 1. validation - basic configuration checks @@ -231,7 +231,7 @@ func ManageRoute(ctx context.Context, router Router, controller *Controller, dis logger.Info("running operations post route activation...") if err := router.PostActivate(); err != nil { - logger.Error("error occurred while performing post route activation operations", zap.Error(err)) + logger.Errorw("error occurred while performing post route activation operations", zap.Error(err)) return err } @@ -243,7 +243,7 @@ func ManageRoute(ctx context.Context, router Router, controller *Controller, dis logger.Info("running operations post route inactivation...") if err := router.PostInactivate(); err != nil { - logger.Error("error occurred while running operations post route inactivation", zap.Error(err)) + logger.Errorw("error occurred while running operations post route inactivation", zap.Error(err)) } return nil diff --git a/eventsources/sources/gcppubsub/start.go b/eventsources/sources/gcppubsub/start.go index 016fad281c..84b0f2f0a0 100644 --- a/eventsources/sources/gcppubsub/start.go +++ b/eventsources/sources/gcppubsub/start.go @@ -89,7 +89,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt "topicProjectID", pubsubEventSource.TopicProjectID, "projectID", pubsubEventSource.ProjectID, "subscriptionID", pubsubEventSource.SubscriptionID, - ).Desugar() + ) if pubsubEventSource.JSONBody { log.Info("assuming all events have a json body...") @@ -120,7 +120,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } eventBytes, err := json.Marshal(eventData) if err != nil { - log.Error("failed to marshal the event data", zap.Error(err)) + log.Errorw("failed to marshal the event data", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) m.Nack() return @@ -128,7 +128,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt log.Info("dispatching event...") if err = dispatch(eventBytes); err != nil { - log.Error("failed to dispatch GCP PubSub event", zap.Error(err)) + log.Errorw("failed to dispatch GCP PubSub event", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) m.Nack() return @@ -146,13 +146,13 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt if pubsubEventSource.DeleteSubscriptionOnFinish { log.Info("deleting PubSub subscription...") if err = subscription.Delete(context.Background()); err != nil { - log.Error("failed to delete the PubSub subscription", zap.Error(err)) + log.Errorw("failed to delete the PubSub subscription", zap.Error(err)) } } log.Info("closing PubSub client...") if err = client.Close(); err != nil { - log.Error("failed to close the PubSub client", zap.Error(err)) + log.Errorw("failed to close the PubSub client", zap.Error(err)) } return nil @@ -198,7 +198,7 @@ func (el *EventListener) hash() (string, error) { return common.Hasher(el.GetEventName() + string(body)), nil } -func (el *EventListener) prepareSubscription(ctx context.Context, logger *zap.Logger) (*pubsub.Client, *pubsub.Subscription, error) { +func (el *EventListener) prepareSubscription(ctx context.Context, logger *zap.SugaredLogger) (*pubsub.Client, *pubsub.Subscription, error) { pubsubEventSource := &el.PubSubEventSource opts := make([]option.ClientOption, 0, 1) diff --git a/sensors/cmd/main.go b/sensors/cmd/main.go index 077b7a45d9..3484162ce2 100644 --- a/sensors/cmd/main.go +++ b/sensors/cmd/main.go @@ -41,7 +41,7 @@ func main() { kubeConfig, _ := os.LookupEnv(common.EnvVarKubeConfig) restConfig, err := common.GetClientConfig(kubeConfig) if err != nil { - logger.Desugar().Fatal("failed to get kubeconfig", zap.Error(err)) + logger.Fatalw("failed to get kubeconfig", zap.Error(err)) } kubeClient := kubernetes.NewForConfigOrDie(restConfig) encodedSensorSpec, defined := os.LookupEnv(common.EnvVarSensorObject) @@ -50,11 +50,11 @@ func main() { } sensorSpec, err := base64.StdEncoding.DecodeString(encodedSensorSpec) if err != nil { - logger.Desugar().Fatal("failed to decode sensor string", zap.Error(err)) + logger.Fatalw("failed to decode sensor string", zap.Error(err)) } sensor := &v1alpha1.Sensor{} if err = json.Unmarshal(sensorSpec, sensor); err != nil { - logger.Desugar().Fatal("failed to unmarshal sensor object", zap.Error(err)) + logger.Fatalw("failed to unmarshal sensor object", zap.Error(err)) } busConfig := &eventbusv1alpha1.BusConfig{} @@ -62,10 +62,10 @@ func main() { if len(encodedBusConfigSpec) > 0 { busConfigSpec, err := base64.StdEncoding.DecodeString(encodedBusConfigSpec) if err != nil { - logger.Desugar().Fatal("failed to decode bus config string", zap.Error(err)) + logger.Fatalw("failed to decode bus config string", zap.Error(err)) } if err = json.Unmarshal(busConfigSpec, busConfig); err != nil { - logger.Desugar().Fatal("failed to unmarshal bus config object", zap.Error(err)) + logger.Fatalw("failed to unmarshal bus config object", zap.Error(err)) } } diff --git a/sensors/listener.go b/sensors/listener.go index 1ac1e4a6b2..d1ccdb4a99 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -368,7 +368,7 @@ func (sensorCtx *SensorContext) triggerOne(ctx context.Context, sensor *v1alpha1 } func (sensorCtx *SensorContext) getDependencyExpression(ctx context.Context, trigger v1alpha1.Trigger) (string, error) { - logger := logging.FromContext(ctx).Desugar() + logger := logging.FromContext(ctx) // Translate original expression which might contain group names // to an expression only contains dependency names @@ -381,12 +381,12 @@ func (sensorCtx *SensorContext) getDependencyExpression(ctx context.Context, tri program, err := expr.Compile(originalExpr, expr.Env(parameters)) if err != nil { - logger.Error("Failed to compile original dependency expression", zap.Error(err)) + logger.Errorw("Failed to compile original dependency expression", zap.Error(err)) return "", err } result, err := expr.Run(program, parameters) if err != nil { - logger.Error("Failed to parse original dependency expression", zap.Error(err)) + logger.Errorw("Failed to parse original dependency expression", zap.Error(err)) return "", err } newExpr := fmt.Sprintf("%v", result) @@ -445,14 +445,14 @@ func (sensorCtx *SensorContext) getDependencyExpression(ctx context.Context, tri } depExpression = strings.Join(deps, "&&") } - logger.Sugar().Infof("Dependency expression for trigger %s before simplification: %s", trigger.Template.Name, depExpression) + logger.Infof("Dependency expression for trigger %s before simplification: %s", trigger.Template.Name, depExpression) boolSimplifier, err := common.NewBoolExpression(depExpression) if err != nil { - logger.Error("Invalid dependency expression", zap.Error(err)) + logger.Errorw("Invalid dependency expression", zap.Error(err)) return "", err } result := boolSimplifier.GetExpression() - logger.Sugar().Infof("Dependency expression for trigger %s after simplification: %s", trigger.Template.Name, result) + logger.Infof("Dependency expression for trigger %s after simplification: %s", trigger.Template.Name, result) return result, nil }