Skip to content

Commit

Permalink
[chore] mongodbatlas receiver ReportFatalError -> ReportStatus (open-…
Browse files Browse the repository at this point in the history
…telemetry#30630)

Remove use of deprecated host.ReportFatalError

Linked to
open-telemetry#30501

Fixes open-telemetry#30595

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
Alex Boten authored and mfyuce committed Jan 18, 2024
1 parent 7f7fecf commit 0a432d9
Showing 1 changed file with 60 additions and 61 deletions.
121 changes: 60 additions & 61 deletions receiver/mongodbatlasreceiver/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ type alertsReceiver struct {
tlsSettings *configtls.TLSServerSetting
consumer consumer.Logs
wg *sync.WaitGroup
logger *zap.Logger

// only relevant in `poll` mode
projects []*ProjectConfig
client alertsClient
privateKey string
publicKey string
backoffConfig configretry.BackOffConfig
pollInterval time.Duration
record *alertRecord
pageSize int64
maxPages int64
doneChan chan bool
storageClient storage.Client
projects []*ProjectConfig
client alertsClient
privateKey string
publicKey string
backoffConfig configretry.BackOffConfig
pollInterval time.Duration
record *alertRecord
pageSize int64
maxPages int64
doneChan chan bool
storageClient storage.Client
telemetrySettings component.TelemetrySettings
}

func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer consumer.Logs) (*alertsReceiver, error) {
Expand All @@ -100,25 +100,25 @@ func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer
}

recv := &alertsReceiver{
addr: cfg.Endpoint,
secret: string(cfg.Secret),
tlsSettings: cfg.TLS,
consumer: consumer,
mode: cfg.Mode,
projects: cfg.Projects,
backoffConfig: baseConfig.BackOffConfig,
publicKey: baseConfig.PublicKey,
privateKey: string(baseConfig.PrivateKey),
wg: &sync.WaitGroup{},
pollInterval: baseConfig.Alerts.PollInterval,
maxPages: baseConfig.Alerts.MaxPages,
pageSize: baseConfig.Alerts.PageSize,
doneChan: make(chan bool, 1),
logger: params.Logger,
addr: cfg.Endpoint,
secret: string(cfg.Secret),
tlsSettings: cfg.TLS,
consumer: consumer,
mode: cfg.Mode,
projects: cfg.Projects,
backoffConfig: baseConfig.BackOffConfig,
publicKey: baseConfig.PublicKey,
privateKey: string(baseConfig.PrivateKey),
wg: &sync.WaitGroup{},
pollInterval: baseConfig.Alerts.PollInterval,
maxPages: baseConfig.Alerts.MaxPages,
pageSize: baseConfig.Alerts.PageSize,
doneChan: make(chan bool, 1),
telemetrySettings: params.TelemetrySettings,
}

if recv.mode == alertModePoll {
recv.client = internal.NewMongoDBAtlasClient(recv.publicKey, recv.privateKey, recv.backoffConfig, recv.logger)
recv.client = internal.NewMongoDBAtlasClient(recv.publicKey, recv.privateKey, recv.backoffConfig, recv.telemetrySettings.Logger)
return recv, nil
}
s := &http.Server{
Expand All @@ -130,19 +130,19 @@ func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer
return recv, nil
}

func (a *alertsReceiver) Start(ctx context.Context, host component.Host, storageClient storage.Client) error {
func (a *alertsReceiver) Start(ctx context.Context, _ component.Host, storageClient storage.Client) error {
if a.mode == alertModePoll {
return a.startPolling(ctx, storageClient)
}
return a.startListening(ctx, host)
return a.startListening(ctx)
}

func (a *alertsReceiver) startPolling(ctx context.Context, storageClient storage.Client) error {
a.logger.Debug("starting alerts receiver in retrieval mode")
a.telemetrySettings.Logger.Debug("starting alerts receiver in retrieval mode")
a.storageClient = storageClient
err := a.syncPersistence(ctx)
if err != nil {
a.logger.Error("there was an error syncing the receiver with checkpoint", zap.Error(err))
a.telemetrySettings.Logger.Error("there was an error syncing the receiver with checkpoint", zap.Error(err))
}

t := time.NewTicker(a.pollInterval)
Expand All @@ -153,7 +153,7 @@ func (a *alertsReceiver) startPolling(ctx context.Context, storageClient storage
select {
case <-t.C:
if err := a.retrieveAndProcessAlerts(ctx); err != nil {
a.logger.Error("unable to retrieve alerts", zap.Error(err))
a.telemetrySettings.Logger.Error("unable to retrieve alerts", zap.Error(err))
}
case <-a.doneChan:
return
Expand All @@ -170,7 +170,7 @@ func (a *alertsReceiver) retrieveAndProcessAlerts(ctx context.Context) error {
for _, p := range a.projects {
project, err := a.client.GetProject(ctx, p.Name)
if err != nil {
a.logger.Error("error retrieving project "+p.Name+":", zap.Error(err))
a.telemetrySettings.Logger.Error("error retrieving project "+p.Name+":", zap.Error(err))
continue
}
a.pollAndProcess(ctx, p, project)
Expand All @@ -185,21 +185,21 @@ func (a *alertsReceiver) pollAndProcess(ctx context.Context, pc *ProjectConfig,
PageSize: int(a.pageSize),
})
if err != nil {
a.logger.Error("unable to get alerts for project", zap.Error(err))
a.telemetrySettings.Logger.Error("unable to get alerts for project", zap.Error(err))
break
}

filteredAlerts := a.applyFilters(pc, projectAlerts)
now := pcommon.NewTimestampFromTime(time.Now())
logs, err := a.convertAlerts(now, filteredAlerts, project)
if err != nil {
a.logger.Error("error processing alerts", zap.Error(err))
a.telemetrySettings.Logger.Error("error processing alerts", zap.Error(err))
break
}

if logs.LogRecordCount() > 0 {
if err = a.consumer.ConsumeLogs(ctx, logs); err != nil {
a.logger.Error("error consuming alerts", zap.Error(err))
a.telemetrySettings.Logger.Error("error consuming alerts", zap.Error(err))
break
}
}
Expand All @@ -209,8 +209,8 @@ func (a *alertsReceiver) pollAndProcess(ctx context.Context, pc *ProjectConfig,
}
}

func (a *alertsReceiver) startListening(ctx context.Context, host component.Host) error {
a.logger.Debug("starting alerts receiver in listening mode")
func (a *alertsReceiver) startListening(ctx context.Context) error {
a.telemetrySettings.Logger.Debug("starting alerts receiver in listening mode")
// We use a.server.Serve* over a.server.ListenAndServe*
// So that we can catch and return errors relating to binding to network interface on start.
var lc net.ListenConfig
Expand All @@ -225,33 +225,32 @@ func (a *alertsReceiver) startListening(ctx context.Context, host component.Host
go func() {
defer a.wg.Done()

a.logger.Debug("Starting ServeTLS",
a.telemetrySettings.Logger.Debug("Starting ServeTLS",
zap.String("address", a.addr),
zap.String("certfile", a.tlsSettings.CertFile),
zap.String("keyfile", a.tlsSettings.KeyFile))

err := a.server.ServeTLS(l, a.tlsSettings.CertFile, a.tlsSettings.KeyFile)

a.logger.Debug("Serve TLS done")
a.telemetrySettings.Logger.Debug("Serve TLS done")

if err != http.ErrServerClosed {
a.logger.Error("ServeTLS failed", zap.Error(err))
host.ReportFatalError(err)
a.telemetrySettings.Logger.Error("ServeTLS failed", zap.Error(err))
a.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
}
}()
} else {
go func() {
defer a.wg.Done()

a.logger.Debug("Starting Serve", zap.String("address", a.addr))
a.telemetrySettings.Logger.Debug("Starting Serve", zap.String("address", a.addr))

err := a.server.Serve(l)

a.logger.Debug("Serve done")
a.telemetrySettings.Logger.Debug("Serve done")

if err != http.ErrServerClosed {
a.logger.Error("Serve failed", zap.Error(err))
host.ReportFatalError(err)
a.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
}
}()
}
Expand All @@ -261,13 +260,13 @@ func (a *alertsReceiver) startListening(ctx context.Context, host component.Host
func (a *alertsReceiver) handleRequest(rw http.ResponseWriter, req *http.Request) {
if req.ContentLength < 0 {
rw.WriteHeader(http.StatusLengthRequired)
a.logger.Debug("Got request with no Content-Length specified", zap.String("remote", req.RemoteAddr))
a.telemetrySettings.Logger.Debug("Got request with no Content-Length specified", zap.String("remote", req.RemoteAddr))
return
}

if req.ContentLength > maxContentLength {
rw.WriteHeader(http.StatusRequestEntityTooLarge)
a.logger.Debug("Got request with large Content-Length specified",
a.telemetrySettings.Logger.Debug("Got request with large Content-Length specified",
zap.String("remote", req.RemoteAddr),
zap.Int64("content-length", req.ContentLength),
zap.Int64("max-content-length", maxContentLength))
Expand All @@ -277,34 +276,34 @@ func (a *alertsReceiver) handleRequest(rw http.ResponseWriter, req *http.Request
payloadSigHeader := req.Header.Get(signatureHeaderName)
if payloadSigHeader == "" {
rw.WriteHeader(http.StatusBadRequest)
a.logger.Debug("Got payload with no HMAC signature, dropping...")
a.telemetrySettings.Logger.Debug("Got payload with no HMAC signature, dropping...")
return
}

payload := make([]byte, req.ContentLength)
_, err := io.ReadFull(req.Body, payload)
if err != nil {
rw.WriteHeader(http.StatusBadRequest)
a.logger.Debug("Failed to read alerts payload", zap.Error(err), zap.String("remote", req.RemoteAddr))
a.telemetrySettings.Logger.Debug("Failed to read alerts payload", zap.Error(err), zap.String("remote", req.RemoteAddr))
return
}

if err = verifyHMACSignature(a.secret, payload, payloadSigHeader); err != nil {
rw.WriteHeader(http.StatusBadRequest)
a.logger.Debug("Got payload with invalid HMAC signature, dropping...", zap.Error(err), zap.String("remote", req.RemoteAddr))
a.telemetrySettings.Logger.Debug("Got payload with invalid HMAC signature, dropping...", zap.Error(err), zap.String("remote", req.RemoteAddr))
return
}

logs, err := payloadToLogs(time.Now(), payload)
if err != nil {
rw.WriteHeader(http.StatusBadRequest)
a.logger.Error("Failed to convert log payload to log record", zap.Error(err))
a.telemetrySettings.Logger.Error("Failed to convert log payload to log record", zap.Error(err))
return
}

if err := a.consumer.ConsumeLogs(req.Context(), logs); err != nil {
rw.WriteHeader(http.StatusInternalServerError)
a.logger.Error("Failed to consumer alert as log", zap.Error(err))
a.telemetrySettings.Logger.Error("Failed to consumer alert as log", zap.Error(err))
return
}

Expand All @@ -319,19 +318,19 @@ func (a *alertsReceiver) Shutdown(ctx context.Context) error {
}

func (a *alertsReceiver) shutdownListener(ctx context.Context) error {
a.logger.Debug("Shutting down server")
a.telemetrySettings.Logger.Debug("Shutting down server")
err := a.server.Shutdown(ctx)
if err != nil {
return err
}

a.logger.Debug("Waiting for shutdown to complete.")
a.telemetrySettings.Logger.Debug("Waiting for shutdown to complete.")
a.wg.Wait()
return nil
}

func (a *alertsReceiver) shutdownPoller(ctx context.Context) error {
a.logger.Debug("Shutting down client")
a.telemetrySettings.Logger.Debug("Shutting down client")
close(a.doneChan)
a.wg.Wait()
return a.writeCheckpoint(ctx)
Expand All @@ -356,7 +355,7 @@ func (a *alertsReceiver) convertAlerts(now pcommon.Timestamp, alerts []mongodbat

ts, err := time.Parse(time.RFC3339, alert.Updated)
if err != nil {
a.logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated))
a.telemetrySettings.Logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated))
continue
}

Expand All @@ -367,7 +366,7 @@ func (a *alertsReceiver) convertAlerts(now pcommon.Timestamp, alerts []mongodbat
// of unrecognized alerts to process.
bodyBytes, err := json.Marshal(alert)
if err != nil {
a.logger.Warn("unable to marshal alert into a body string")
a.telemetrySettings.Logger.Warn("unable to marshal alert into a body string")
continue
}

Expand Down Expand Up @@ -537,7 +536,7 @@ func (a *alertsReceiver) syncPersistence(ctx context.Context) error {

func (a *alertsReceiver) writeCheckpoint(ctx context.Context) error {
if a.storageClient == nil {
a.logger.Error("unable to write checkpoint since no storage client was found")
a.telemetrySettings.Logger.Error("unable to write checkpoint since no storage client was found")
return errors.New("missing non-nil storage client")
}
marshalBytes, err := json.Marshal(&a.record)
Expand All @@ -560,7 +559,7 @@ func (a *alertsReceiver) applyFilters(pConf *ProjectConfig, alerts []mongodbatla
for _, alert := range alerts {
updatedTime, err := time.Parse(time.RFC3339, alert.Updated)
if err != nil {
a.logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated))
a.telemetrySettings.Logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated))
continue
}

Expand Down

0 comments on commit 0a432d9

Please sign in to comment.