diff --git a/receiver/mongodbatlasreceiver/alerts.go b/receiver/mongodbatlasreceiver/alerts.go index b014a4e2a384..a1fd7b697885 100644 --- a/receiver/mongodbatlasreceiver/alerts.go +++ b/receiver/mongodbatlasreceiver/alerts.go @@ -66,20 +66,20 @@ type alertsReceiver struct { tlsSettings *configtls.TLSServerSetting consumer consumer.Logs wg *sync.WaitGroup - logger *zap.Logger // only relevant in `poll` mode - projects []*ProjectConfig - client alertsClient - privateKey string - publicKey string - backoffConfig configretry.BackOffConfig - pollInterval time.Duration - record *alertRecord - pageSize int64 - maxPages int64 - doneChan chan bool - storageClient storage.Client + projects []*ProjectConfig + client alertsClient + privateKey string + publicKey string + backoffConfig configretry.BackOffConfig + pollInterval time.Duration + record *alertRecord + pageSize int64 + maxPages int64 + doneChan chan bool + storageClient storage.Client + telemetrySettings component.TelemetrySettings } func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer consumer.Logs) (*alertsReceiver, error) { @@ -100,25 +100,25 @@ func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer } recv := &alertsReceiver{ - addr: cfg.Endpoint, - secret: string(cfg.Secret), - tlsSettings: cfg.TLS, - consumer: consumer, - mode: cfg.Mode, - projects: cfg.Projects, - backoffConfig: baseConfig.BackOffConfig, - publicKey: baseConfig.PublicKey, - privateKey: string(baseConfig.PrivateKey), - wg: &sync.WaitGroup{}, - pollInterval: baseConfig.Alerts.PollInterval, - maxPages: baseConfig.Alerts.MaxPages, - pageSize: baseConfig.Alerts.PageSize, - doneChan: make(chan bool, 1), - logger: params.Logger, + addr: cfg.Endpoint, + secret: string(cfg.Secret), + tlsSettings: cfg.TLS, + consumer: consumer, + mode: cfg.Mode, + projects: cfg.Projects, + backoffConfig: baseConfig.BackOffConfig, + publicKey: baseConfig.PublicKey, + privateKey: string(baseConfig.PrivateKey), + wg: &sync.WaitGroup{}, + pollInterval: baseConfig.Alerts.PollInterval, + maxPages: baseConfig.Alerts.MaxPages, + pageSize: baseConfig.Alerts.PageSize, + doneChan: make(chan bool, 1), + telemetrySettings: params.TelemetrySettings, } if recv.mode == alertModePoll { - recv.client = internal.NewMongoDBAtlasClient(recv.publicKey, recv.privateKey, recv.backoffConfig, recv.logger) + recv.client = internal.NewMongoDBAtlasClient(recv.publicKey, recv.privateKey, recv.backoffConfig, recv.telemetrySettings.Logger) return recv, nil } s := &http.Server{ @@ -130,19 +130,19 @@ func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer return recv, nil } -func (a *alertsReceiver) Start(ctx context.Context, host component.Host, storageClient storage.Client) error { +func (a *alertsReceiver) Start(ctx context.Context, _ component.Host, storageClient storage.Client) error { if a.mode == alertModePoll { return a.startPolling(ctx, storageClient) } - return a.startListening(ctx, host) + return a.startListening(ctx) } func (a *alertsReceiver) startPolling(ctx context.Context, storageClient storage.Client) error { - a.logger.Debug("starting alerts receiver in retrieval mode") + a.telemetrySettings.Logger.Debug("starting alerts receiver in retrieval mode") a.storageClient = storageClient err := a.syncPersistence(ctx) if err != nil { - a.logger.Error("there was an error syncing the receiver with checkpoint", zap.Error(err)) + a.telemetrySettings.Logger.Error("there was an error syncing the receiver with checkpoint", zap.Error(err)) } t := time.NewTicker(a.pollInterval) @@ -153,7 +153,7 @@ func (a *alertsReceiver) startPolling(ctx context.Context, storageClient storage select { case <-t.C: if err := a.retrieveAndProcessAlerts(ctx); err != nil { - a.logger.Error("unable to retrieve alerts", zap.Error(err)) + a.telemetrySettings.Logger.Error("unable to retrieve alerts", zap.Error(err)) } case <-a.doneChan: return @@ -170,7 +170,7 @@ func (a *alertsReceiver) retrieveAndProcessAlerts(ctx context.Context) error { for _, p := range a.projects { project, err := a.client.GetProject(ctx, p.Name) if err != nil { - a.logger.Error("error retrieving project "+p.Name+":", zap.Error(err)) + a.telemetrySettings.Logger.Error("error retrieving project "+p.Name+":", zap.Error(err)) continue } a.pollAndProcess(ctx, p, project) @@ -185,7 +185,7 @@ func (a *alertsReceiver) pollAndProcess(ctx context.Context, pc *ProjectConfig, PageSize: int(a.pageSize), }) if err != nil { - a.logger.Error("unable to get alerts for project", zap.Error(err)) + a.telemetrySettings.Logger.Error("unable to get alerts for project", zap.Error(err)) break } @@ -193,13 +193,13 @@ func (a *alertsReceiver) pollAndProcess(ctx context.Context, pc *ProjectConfig, now := pcommon.NewTimestampFromTime(time.Now()) logs, err := a.convertAlerts(now, filteredAlerts, project) if err != nil { - a.logger.Error("error processing alerts", zap.Error(err)) + a.telemetrySettings.Logger.Error("error processing alerts", zap.Error(err)) break } if logs.LogRecordCount() > 0 { if err = a.consumer.ConsumeLogs(ctx, logs); err != nil { - a.logger.Error("error consuming alerts", zap.Error(err)) + a.telemetrySettings.Logger.Error("error consuming alerts", zap.Error(err)) break } } @@ -209,8 +209,8 @@ func (a *alertsReceiver) pollAndProcess(ctx context.Context, pc *ProjectConfig, } } -func (a *alertsReceiver) startListening(ctx context.Context, host component.Host) error { - a.logger.Debug("starting alerts receiver in listening mode") +func (a *alertsReceiver) startListening(ctx context.Context) error { + a.telemetrySettings.Logger.Debug("starting alerts receiver in listening mode") // We use a.server.Serve* over a.server.ListenAndServe* // So that we can catch and return errors relating to binding to network interface on start. var lc net.ListenConfig @@ -225,33 +225,32 @@ func (a *alertsReceiver) startListening(ctx context.Context, host component.Host go func() { defer a.wg.Done() - a.logger.Debug("Starting ServeTLS", + a.telemetrySettings.Logger.Debug("Starting ServeTLS", zap.String("address", a.addr), zap.String("certfile", a.tlsSettings.CertFile), zap.String("keyfile", a.tlsSettings.KeyFile)) err := a.server.ServeTLS(l, a.tlsSettings.CertFile, a.tlsSettings.KeyFile) - a.logger.Debug("Serve TLS done") + a.telemetrySettings.Logger.Debug("Serve TLS done") if err != http.ErrServerClosed { - a.logger.Error("ServeTLS failed", zap.Error(err)) - host.ReportFatalError(err) + a.telemetrySettings.Logger.Error("ServeTLS failed", zap.Error(err)) + a.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) } }() } else { go func() { defer a.wg.Done() - a.logger.Debug("Starting Serve", zap.String("address", a.addr)) + a.telemetrySettings.Logger.Debug("Starting Serve", zap.String("address", a.addr)) err := a.server.Serve(l) - a.logger.Debug("Serve done") + a.telemetrySettings.Logger.Debug("Serve done") if err != http.ErrServerClosed { - a.logger.Error("Serve failed", zap.Error(err)) - host.ReportFatalError(err) + a.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) } }() } @@ -261,13 +260,13 @@ func (a *alertsReceiver) startListening(ctx context.Context, host component.Host func (a *alertsReceiver) handleRequest(rw http.ResponseWriter, req *http.Request) { if req.ContentLength < 0 { rw.WriteHeader(http.StatusLengthRequired) - a.logger.Debug("Got request with no Content-Length specified", zap.String("remote", req.RemoteAddr)) + a.telemetrySettings.Logger.Debug("Got request with no Content-Length specified", zap.String("remote", req.RemoteAddr)) return } if req.ContentLength > maxContentLength { rw.WriteHeader(http.StatusRequestEntityTooLarge) - a.logger.Debug("Got request with large Content-Length specified", + a.telemetrySettings.Logger.Debug("Got request with large Content-Length specified", zap.String("remote", req.RemoteAddr), zap.Int64("content-length", req.ContentLength), zap.Int64("max-content-length", maxContentLength)) @@ -277,7 +276,7 @@ func (a *alertsReceiver) handleRequest(rw http.ResponseWriter, req *http.Request payloadSigHeader := req.Header.Get(signatureHeaderName) if payloadSigHeader == "" { rw.WriteHeader(http.StatusBadRequest) - a.logger.Debug("Got payload with no HMAC signature, dropping...") + a.telemetrySettings.Logger.Debug("Got payload with no HMAC signature, dropping...") return } @@ -285,26 +284,26 @@ func (a *alertsReceiver) handleRequest(rw http.ResponseWriter, req *http.Request _, err := io.ReadFull(req.Body, payload) if err != nil { rw.WriteHeader(http.StatusBadRequest) - a.logger.Debug("Failed to read alerts payload", zap.Error(err), zap.String("remote", req.RemoteAddr)) + a.telemetrySettings.Logger.Debug("Failed to read alerts payload", zap.Error(err), zap.String("remote", req.RemoteAddr)) return } if err = verifyHMACSignature(a.secret, payload, payloadSigHeader); err != nil { rw.WriteHeader(http.StatusBadRequest) - a.logger.Debug("Got payload with invalid HMAC signature, dropping...", zap.Error(err), zap.String("remote", req.RemoteAddr)) + a.telemetrySettings.Logger.Debug("Got payload with invalid HMAC signature, dropping...", zap.Error(err), zap.String("remote", req.RemoteAddr)) return } logs, err := payloadToLogs(time.Now(), payload) if err != nil { rw.WriteHeader(http.StatusBadRequest) - a.logger.Error("Failed to convert log payload to log record", zap.Error(err)) + a.telemetrySettings.Logger.Error("Failed to convert log payload to log record", zap.Error(err)) return } if err := a.consumer.ConsumeLogs(req.Context(), logs); err != nil { rw.WriteHeader(http.StatusInternalServerError) - a.logger.Error("Failed to consumer alert as log", zap.Error(err)) + a.telemetrySettings.Logger.Error("Failed to consumer alert as log", zap.Error(err)) return } @@ -319,19 +318,19 @@ func (a *alertsReceiver) Shutdown(ctx context.Context) error { } func (a *alertsReceiver) shutdownListener(ctx context.Context) error { - a.logger.Debug("Shutting down server") + a.telemetrySettings.Logger.Debug("Shutting down server") err := a.server.Shutdown(ctx) if err != nil { return err } - a.logger.Debug("Waiting for shutdown to complete.") + a.telemetrySettings.Logger.Debug("Waiting for shutdown to complete.") a.wg.Wait() return nil } func (a *alertsReceiver) shutdownPoller(ctx context.Context) error { - a.logger.Debug("Shutting down client") + a.telemetrySettings.Logger.Debug("Shutting down client") close(a.doneChan) a.wg.Wait() return a.writeCheckpoint(ctx) @@ -356,7 +355,7 @@ func (a *alertsReceiver) convertAlerts(now pcommon.Timestamp, alerts []mongodbat ts, err := time.Parse(time.RFC3339, alert.Updated) if err != nil { - a.logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated)) + a.telemetrySettings.Logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated)) continue } @@ -367,7 +366,7 @@ func (a *alertsReceiver) convertAlerts(now pcommon.Timestamp, alerts []mongodbat // of unrecognized alerts to process. bodyBytes, err := json.Marshal(alert) if err != nil { - a.logger.Warn("unable to marshal alert into a body string") + a.telemetrySettings.Logger.Warn("unable to marshal alert into a body string") continue } @@ -537,7 +536,7 @@ func (a *alertsReceiver) syncPersistence(ctx context.Context) error { func (a *alertsReceiver) writeCheckpoint(ctx context.Context) error { if a.storageClient == nil { - a.logger.Error("unable to write checkpoint since no storage client was found") + a.telemetrySettings.Logger.Error("unable to write checkpoint since no storage client was found") return errors.New("missing non-nil storage client") } marshalBytes, err := json.Marshal(&a.record) @@ -560,7 +559,7 @@ func (a *alertsReceiver) applyFilters(pConf *ProjectConfig, alerts []mongodbatla for _, alert := range alerts { updatedTime, err := time.Parse(time.RFC3339, alert.Updated) if err != nil { - a.logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated)) + a.telemetrySettings.Logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated)) continue }