diff --git a/app/common/telemetry.go b/app/common/telemetry.go index 98831706e..0879fb4ef 100644 --- a/app/common/telemetry.go +++ b/app/common/telemetry.go @@ -30,6 +30,13 @@ import ( "github.com/openmeterio/openmeter/pkg/gosundheit" ) +// Set the default logger to JSON for messages emitted before the "real" logger is initialized. +// +// We use JSON as a best-effort to make the logs machine-readable. +func init() { + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil))) +} + const ( DefaultShutdownTimeout = 5 * time.Second ) diff --git a/app/common/wire.go b/app/common/wire.go index a7bba8967..9c4e79f1e 100644 --- a/app/common/wire.go +++ b/app/common/wire.go @@ -83,6 +83,8 @@ var KafkaTopic = wire.NewSet( var Telemetry = wire.NewSet( NewTelemetryResource, + NewLogger, + NewMeterProvider, wire.Bind(new(metric.MeterProvider), new(*sdkmetric.MeterProvider)), NewMeter, @@ -96,11 +98,6 @@ var Telemetry = wire.NewSet( NewTelemetryServer, ) -var Logger = wire.NewSet( - NewTelemetryResource, - NewLogger, -) - var OpenMeter = wire.NewSet( NewMeterRepository, wire.Bind(new(meter.Repository), new(*meter.InMemoryRepository)), diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index a182468a6..6ac4dc6c9 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "os" "github.com/spf13/pflag" @@ -56,17 +57,20 @@ func main() { os.Exit(0) } - logger := initializeLogger(conf) - - app, cleanup, err := initializeApplication(ctx, conf, logger) + app, cleanup, err := initializeApplication(ctx, conf) if err != nil { - logger.Error("failed to initialize application", "error", err) + slog.Error("failed to initialize application", "error", err) + + cleanup() + os.Exit(1) } defer cleanup() app.SetGlobals() + logger := app.Logger + // Validate service prerequisites if !conf.Events.Enabled { diff --git a/cmd/balance-worker/wire.go b/cmd/balance-worker/wire.go index 8363705a7..a85364ecb 100644 --- a/cmd/balance-worker/wire.go +++ b/cmd/balance-worker/wire.go @@ -17,9 +17,11 @@ type Application struct { common.GlobalInitializer common.Migrator common.Runner + + Logger *slog.Logger } -func initializeApplication(ctx context.Context, conf config.Configuration, logger *slog.Logger) (Application, func(), error) { +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { wire.Build( metadata, common.Config, @@ -39,13 +41,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge return Application{}, nil, nil } -// TODO: is this necessary? Do we need a logger first? -func initializeLogger(conf config.Configuration) *slog.Logger { - wire.Build(metadata, common.Config, common.Logger) - - return new(slog.Logger) -} - func metadata(conf config.Configuration) common.Metadata { return common.Metadata{ ServiceName: "openmeter", diff --git a/cmd/balance-worker/wire_gen.go b/cmd/balance-worker/wire_gen.go index 8d81787d8..c572f76e3 100644 --- a/cmd/balance-worker/wire_gen.go +++ b/cmd/balance-worker/wire_gen.go @@ -18,11 +18,13 @@ import ( // Injectors from wire.go: -func initializeApplication(ctx context.Context, conf config.Configuration, logger *slog.Logger) (Application, func(), error) { +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { telemetryConfig := conf.Telemetry - metricsTelemetryConfig := telemetryConfig.Metrics + logTelemetryConfig := telemetryConfig.Log commonMetadata := metadata(conf) resource := common.NewTelemetryResource(commonMetadata) + logger := common.NewLogger(logTelemetryConfig, resource) + metricsTelemetryConfig := telemetryConfig.Metrics meterProvider, cleanup, err := common.NewMeterProvider(ctx, metricsTelemetryConfig, resource, logger) if err != nil { return Application{}, nil, err @@ -61,7 +63,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge ingestConfiguration := conf.Ingest kafkaIngestConfiguration := ingestConfiguration.Kafka kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration - logTelemetryConfig := telemetryConfig.Log brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) subscriber, err := common.BalanceWorkerSubscriber(balanceWorkerConfiguration, brokerOptions) if err != nil { @@ -174,6 +175,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge GlobalInitializer: globalInitializer, Migrator: migrator, Runner: runner, + Logger: logger, } return application, func() { cleanup6() @@ -185,22 +187,14 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge }, nil } -// TODO: is this necessary? Do we need a logger first? -func initializeLogger(conf config.Configuration) *slog.Logger { - telemetryConfig := conf.Telemetry - logTelemetryConfig := telemetryConfig.Log - commonMetadata := metadata(conf) - resource := common.NewTelemetryResource(commonMetadata) - logger := common.NewLogger(logTelemetryConfig, resource) - return logger -} - // wire.go: type Application struct { common.GlobalInitializer common.Migrator common.Runner + + Logger *slog.Logger } func metadata(conf config.Configuration) common.Metadata { diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index 8f46a2df7..5c0c49f85 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -67,17 +67,20 @@ func main() { os.Exit(0) } - logger := initializeLogger(conf) - - app, cleanup, err := initializeApplication(ctx, conf, logger) + app, cleanup, err := initializeApplication(ctx, conf) if err != nil { - logger.Error("failed to initialize application", "error", err) + slog.Error("failed to initialize application", "error", err) + + cleanup() + os.Exit(1) } defer cleanup() app.SetGlobals() + logger := app.Logger + // Validate service prerequisites if !conf.Events.Enabled { diff --git a/cmd/notification-service/wire.go b/cmd/notification-service/wire.go index 3c9749318..7f9e7bb20 100644 --- a/cmd/notification-service/wire.go +++ b/cmd/notification-service/wire.go @@ -34,10 +34,11 @@ type Application struct { MessagePublisher message.Publisher EventPublisher eventbus.Publisher - Meter metric.Meter + Logger *slog.Logger + Meter metric.Meter } -func initializeApplication(ctx context.Context, conf config.Configuration, logger *slog.Logger) (Application, func(), error) { +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { wire.Build( metadata, common.Config, @@ -55,13 +56,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge return Application{}, nil, nil } -// TODO: is this necessary? Do we need a logger first? -func initializeLogger(conf config.Configuration) *slog.Logger { - wire.Build(metadata, common.Config, common.Logger) - - return new(slog.Logger) -} - func metadata(conf config.Configuration) common.Metadata { return common.Metadata{ ServiceName: "openmeter", diff --git a/cmd/notification-service/wire_gen.go b/cmd/notification-service/wire_gen.go index 6aebba18e..d96175fee 100644 --- a/cmd/notification-service/wire_gen.go +++ b/cmd/notification-service/wire_gen.go @@ -22,11 +22,13 @@ import ( // Injectors from wire.go: -func initializeApplication(ctx context.Context, conf config.Configuration, logger *slog.Logger) (Application, func(), error) { +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { telemetryConfig := conf.Telemetry - metricsTelemetryConfig := telemetryConfig.Metrics + logTelemetryConfig := telemetryConfig.Log commonMetadata := metadata(conf) resource := common.NewTelemetryResource(commonMetadata) + logger := common.NewLogger(logTelemetryConfig, resource) + metricsTelemetryConfig := telemetryConfig.Metrics meterProvider, cleanup, err := common.NewMeterProvider(ctx, metricsTelemetryConfig, resource, logger) if err != nil { return Application{}, nil, err @@ -86,7 +88,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge ingestConfiguration := conf.Ingest kafkaIngestConfiguration := ingestConfiguration.Kafka kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration - logTelemetryConfig := telemetryConfig.Log brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) notificationConfiguration := conf.Notification v4 := common.NotificationServiceProvisionTopics(notificationConfiguration) @@ -146,6 +147,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge BrokerOptions: brokerOptions, MessagePublisher: publisher, EventPublisher: eventbusPublisher, + Logger: logger, Meter: meter, } return application, func() { @@ -158,16 +160,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge }, nil } -// TODO: is this necessary? Do we need a logger first? -func initializeLogger(conf config.Configuration) *slog.Logger { - telemetryConfig := conf.Telemetry - logTelemetryConfig := telemetryConfig.Log - commonMetadata := metadata(conf) - resource := common.NewTelemetryResource(commonMetadata) - logger := common.NewLogger(logTelemetryConfig, resource) - return logger -} - // wire.go: type Application struct { @@ -184,7 +176,8 @@ type Application struct { MessagePublisher message.Publisher EventPublisher eventbus.Publisher - Meter metric.Meter + Logger *slog.Logger + Meter metric.Meter } func metadata(conf config.Configuration) common.Metadata { diff --git a/cmd/server/main.go b/cmd/server/main.go index 3906903ef..26479b4ea 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -89,17 +89,20 @@ func main() { os.Exit(0) } - logger := initializeLogger(conf) - - app, cleanup, err := initializeApplication(ctx, conf, logger) + app, cleanup, err := initializeApplication(ctx, conf) if err != nil { - logger.Error("failed to initialize application", "error", err) + slog.Error("failed to initialize application", "error", err) + + cleanup() + os.Exit(1) } defer cleanup() app.SetGlobals() + logger := app.Logger + logger.Info("starting OpenMeter server", "config", map[string]string{ "address": conf.Address, "telemetry.address": conf.Telemetry.Address, diff --git a/cmd/server/wire.go b/cmd/server/wire.go index 73847d47c..10ae7ac32 100644 --- a/cmd/server/wire.go +++ b/cmd/server/wire.go @@ -40,12 +40,13 @@ type Application struct { NamespaceHandlers []namespace.Handler NamespaceManager *namespace.Manager - Meter metric.Meter + Logger *slog.Logger + Meter metric.Meter RouterHook func(chi.Router) } -func initializeApplication(ctx context.Context, conf config.Configuration, logger *slog.Logger) (Application, func(), error) { +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { wire.Build( metadata, common.Config, @@ -66,13 +67,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge return Application{}, nil, nil } -// TODO: is this necessary? Do we need a logger first? -func initializeLogger(conf config.Configuration) *slog.Logger { - wire.Build(metadata, common.Config, common.Logger) - - return new(slog.Logger) -} - func metadata(conf config.Configuration) common.Metadata { return common.Metadata{ ServiceName: "openmeter", diff --git a/cmd/server/wire_gen.go b/cmd/server/wire_gen.go index fd5c61684..558f5e4b3 100644 --- a/cmd/server/wire_gen.go +++ b/cmd/server/wire_gen.go @@ -26,11 +26,13 @@ import ( // Injectors from wire.go: -func initializeApplication(ctx context.Context, conf config.Configuration, logger *slog.Logger) (Application, func(), error) { +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { telemetryConfig := conf.Telemetry - metricsTelemetryConfig := telemetryConfig.Metrics + logTelemetryConfig := telemetryConfig.Log commonMetadata := metadata(conf) resource := common.NewTelemetryResource(commonMetadata) + logger := common.NewLogger(logTelemetryConfig, resource) + metricsTelemetryConfig := telemetryConfig.Metrics meterProvider, cleanup, err := common.NewMeterProvider(ctx, metricsTelemetryConfig, resource, logger) if err != nil { return Application{}, nil, err @@ -109,7 +111,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge ingestConfiguration := conf.Ingest kafkaIngestConfiguration := ingestConfiguration.Kafka kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration - logTelemetryConfig := telemetryConfig.Log brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) v4 := common.ServerProvisionTopics(eventsConfiguration) adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) @@ -224,6 +225,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge IngestCollector: ingestCollector, NamespaceHandlers: v5, NamespaceManager: manager, + Logger: logger, Meter: meter, RouterHook: v6, } @@ -238,16 +240,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge }, nil } -// TODO: is this necessary? Do we need a logger first? -func initializeLogger(conf config.Configuration) *slog.Logger { - telemetryConfig := conf.Telemetry - logTelemetryConfig := telemetryConfig.Log - commonMetadata := metadata(conf) - resource := common.NewTelemetryResource(commonMetadata) - logger := common.NewLogger(logTelemetryConfig, resource) - return logger -} - // wire.go: type Application struct { @@ -267,7 +259,8 @@ type Application struct { NamespaceHandlers []namespace.Handler NamespaceManager *namespace.Manager - Meter metric.Meter + Logger *slog.Logger + Meter metric.Meter RouterHook func(chi.Router) } diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 15a717b36..792f1d27a 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -74,17 +74,20 @@ func main() { os.Exit(0) } - logger := initializeLogger(conf) - - app, cleanup, err := initializeApplication(ctx, conf, logger) + app, cleanup, err := initializeApplication(ctx, conf) if err != nil { - logger.Error("failed to initialize application", "error", err) + slog.Error("failed to initialize application", "error", err) + + cleanup() + os.Exit(1) } defer cleanup() app.SetGlobals() + logger := app.Logger + logger.Info("starting OpenMeter sink worker", "config", map[string]string{ "telemetry.address": conf.Telemetry.Address, "ingest.kafka.broker": conf.Ingest.Kafka.Broker, diff --git a/cmd/sink-worker/wire.go b/cmd/sink-worker/wire.go index a7a512d52..928b4511c 100644 --- a/cmd/sink-worker/wire.go +++ b/cmd/sink-worker/wire.go @@ -32,11 +32,12 @@ type Application struct { FlushHandler flushhandler.FlushEventHandler TopicProvisioner pkgkafka.TopicProvisioner + Logger *slog.Logger Meter metric.Meter Tracer trace.Tracer } -func initializeApplication(ctx context.Context, conf config.Configuration, logger *slog.Logger) (Application, func(), error) { +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { wire.Build( metadata, common.Config, @@ -54,13 +55,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge return Application{}, nil, nil } -// TODO: is this necessary? Do we need a logger first? -func initializeLogger(conf config.Configuration) *slog.Logger { - wire.Build(metadata, common.Config, common.Logger) - - return new(slog.Logger) -} - func metadata(conf config.Configuration) common.Metadata { return common.Metadata{ ServiceName: "openmeter", diff --git a/cmd/sink-worker/wire_gen.go b/cmd/sink-worker/wire_gen.go index 4a09790cc..fdfd0a845 100644 --- a/cmd/sink-worker/wire_gen.go +++ b/cmd/sink-worker/wire_gen.go @@ -25,11 +25,13 @@ import ( // Injectors from wire.go: -func initializeApplication(ctx context.Context, conf config.Configuration, logger *slog.Logger) (Application, func(), error) { +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { telemetryConfig := conf.Telemetry - metricsTelemetryConfig := telemetryConfig.Metrics + logTelemetryConfig := telemetryConfig.Log commonMetadata := metadata(conf) resource := common.NewTelemetryResource(commonMetadata) + logger := common.NewLogger(logTelemetryConfig, resource) + metricsTelemetryConfig := telemetryConfig.Metrics meterProvider, cleanup, err := common.NewMeterProvider(ctx, metricsTelemetryConfig, resource, logger) if err != nil { return Application{}, nil, err @@ -57,7 +59,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge ingestConfiguration := conf.Ingest kafkaIngestConfiguration := ingestConfiguration.Kafka kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration - logTelemetryConfig := telemetryConfig.Log meter := common.NewMeter(meterProvider, commonMetadata) brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) v3 := common.SinkWorkerProvisionTopics(eventsConfiguration) @@ -113,6 +114,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge TelemetryServer: v2, FlushHandler: flushEventHandler, TopicProvisioner: topicProvisioner, + Logger: logger, Meter: meter, Tracer: tracer, } @@ -124,16 +126,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge }, nil } -// TODO: is this necessary? Do we need a logger first? -func initializeLogger(conf config.Configuration) *slog.Logger { - telemetryConfig := conf.Telemetry - logTelemetryConfig := telemetryConfig.Log - commonMetadata := metadata(conf) - resource := common.NewTelemetryResource(commonMetadata) - logger := common.NewLogger(logTelemetryConfig, resource) - return logger -} - // wire.go: type Application struct { @@ -146,6 +138,7 @@ type Application struct { FlushHandler flushhandler.FlushEventHandler TopicProvisioner kafka2.TopicProvisioner + Logger *slog.Logger Meter metric.Meter Tracer trace.Tracer }