Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] mongodbatlas receiver ReportFatalError -> ReportStatus #30630

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 60 additions & 61 deletions receiver/mongodbatlasreceiver/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ type alertsReceiver struct {
tlsSettings *configtls.TLSServerSetting
consumer consumer.Logs
wg *sync.WaitGroup
logger *zap.Logger

// only relevant in `poll` mode
projects []*ProjectConfig
client alertsClient
privateKey string
publicKey string
backoffConfig configretry.BackOffConfig
pollInterval time.Duration
record *alertRecord
pageSize int64
maxPages int64
doneChan chan bool
storageClient storage.Client
projects []*ProjectConfig
client alertsClient
privateKey string
publicKey string
backoffConfig configretry.BackOffConfig
pollInterval time.Duration
record *alertRecord
pageSize int64
maxPages int64
doneChan chan bool
storageClient storage.Client
telemetrySettings component.TelemetrySettings
}

func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer consumer.Logs) (*alertsReceiver, error) {
Expand All @@ -100,25 +100,25 @@ func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer
}

recv := &alertsReceiver{
addr: cfg.Endpoint,
secret: string(cfg.Secret),
tlsSettings: cfg.TLS,
consumer: consumer,
mode: cfg.Mode,
projects: cfg.Projects,
backoffConfig: baseConfig.BackOffConfig,
publicKey: baseConfig.PublicKey,
privateKey: string(baseConfig.PrivateKey),
wg: &sync.WaitGroup{},
pollInterval: baseConfig.Alerts.PollInterval,
maxPages: baseConfig.Alerts.MaxPages,
pageSize: baseConfig.Alerts.PageSize,
doneChan: make(chan bool, 1),
logger: params.Logger,
addr: cfg.Endpoint,
secret: string(cfg.Secret),
tlsSettings: cfg.TLS,
consumer: consumer,
mode: cfg.Mode,
projects: cfg.Projects,
backoffConfig: baseConfig.BackOffConfig,
publicKey: baseConfig.PublicKey,
privateKey: string(baseConfig.PrivateKey),
wg: &sync.WaitGroup{},
pollInterval: baseConfig.Alerts.PollInterval,
maxPages: baseConfig.Alerts.MaxPages,
pageSize: baseConfig.Alerts.PageSize,
doneChan: make(chan bool, 1),
telemetrySettings: params.TelemetrySettings,
}

if recv.mode == alertModePoll {
recv.client = internal.NewMongoDBAtlasClient(recv.publicKey, recv.privateKey, recv.backoffConfig, recv.logger)
recv.client = internal.NewMongoDBAtlasClient(recv.publicKey, recv.privateKey, recv.backoffConfig, recv.telemetrySettings.Logger)
return recv, nil
}
s := &http.Server{
Expand All @@ -130,19 +130,19 @@ func newAlertsReceiver(params rcvr.CreateSettings, baseConfig *Config, consumer
return recv, nil
}

func (a *alertsReceiver) Start(ctx context.Context, host component.Host, storageClient storage.Client) error {
func (a *alertsReceiver) Start(ctx context.Context, _ component.Host, storageClient storage.Client) error {
if a.mode == alertModePoll {
return a.startPolling(ctx, storageClient)
}
return a.startListening(ctx, host)
return a.startListening(ctx)
}

func (a *alertsReceiver) startPolling(ctx context.Context, storageClient storage.Client) error {
a.logger.Debug("starting alerts receiver in retrieval mode")
a.telemetrySettings.Logger.Debug("starting alerts receiver in retrieval mode")
a.storageClient = storageClient
err := a.syncPersistence(ctx)
if err != nil {
a.logger.Error("there was an error syncing the receiver with checkpoint", zap.Error(err))
a.telemetrySettings.Logger.Error("there was an error syncing the receiver with checkpoint", zap.Error(err))
}

t := time.NewTicker(a.pollInterval)
Expand All @@ -153,7 +153,7 @@ func (a *alertsReceiver) startPolling(ctx context.Context, storageClient storage
select {
case <-t.C:
if err := a.retrieveAndProcessAlerts(ctx); err != nil {
a.logger.Error("unable to retrieve alerts", zap.Error(err))
a.telemetrySettings.Logger.Error("unable to retrieve alerts", zap.Error(err))
}
case <-a.doneChan:
return
Expand All @@ -170,7 +170,7 @@ func (a *alertsReceiver) retrieveAndProcessAlerts(ctx context.Context) error {
for _, p := range a.projects {
project, err := a.client.GetProject(ctx, p.Name)
if err != nil {
a.logger.Error("error retrieving project "+p.Name+":", zap.Error(err))
a.telemetrySettings.Logger.Error("error retrieving project "+p.Name+":", zap.Error(err))
continue
}
a.pollAndProcess(ctx, p, project)
Expand All @@ -185,21 +185,21 @@ func (a *alertsReceiver) pollAndProcess(ctx context.Context, pc *ProjectConfig,
PageSize: int(a.pageSize),
})
if err != nil {
a.logger.Error("unable to get alerts for project", zap.Error(err))
a.telemetrySettings.Logger.Error("unable to get alerts for project", zap.Error(err))
break
}

filteredAlerts := a.applyFilters(pc, projectAlerts)
now := pcommon.NewTimestampFromTime(time.Now())
logs, err := a.convertAlerts(now, filteredAlerts, project)
if err != nil {
a.logger.Error("error processing alerts", zap.Error(err))
a.telemetrySettings.Logger.Error("error processing alerts", zap.Error(err))
break
}

if logs.LogRecordCount() > 0 {
if err = a.consumer.ConsumeLogs(ctx, logs); err != nil {
a.logger.Error("error consuming alerts", zap.Error(err))
a.telemetrySettings.Logger.Error("error consuming alerts", zap.Error(err))
break
}
}
Expand All @@ -209,8 +209,8 @@ func (a *alertsReceiver) pollAndProcess(ctx context.Context, pc *ProjectConfig,
}
}

func (a *alertsReceiver) startListening(ctx context.Context, host component.Host) error {
a.logger.Debug("starting alerts receiver in listening mode")
func (a *alertsReceiver) startListening(ctx context.Context) error {
a.telemetrySettings.Logger.Debug("starting alerts receiver in listening mode")
// We use a.server.Serve* over a.server.ListenAndServe*
// So that we can catch and return errors relating to binding to network interface on start.
var lc net.ListenConfig
Expand All @@ -225,33 +225,32 @@ func (a *alertsReceiver) startListening(ctx context.Context, host component.Host
go func() {
defer a.wg.Done()

a.logger.Debug("Starting ServeTLS",
a.telemetrySettings.Logger.Debug("Starting ServeTLS",
zap.String("address", a.addr),
zap.String("certfile", a.tlsSettings.CertFile),
zap.String("keyfile", a.tlsSettings.KeyFile))

err := a.server.ServeTLS(l, a.tlsSettings.CertFile, a.tlsSettings.KeyFile)

a.logger.Debug("Serve TLS done")
a.telemetrySettings.Logger.Debug("Serve TLS done")

if err != http.ErrServerClosed {
a.logger.Error("ServeTLS failed", zap.Error(err))
host.ReportFatalError(err)
a.telemetrySettings.Logger.Error("ServeTLS failed", zap.Error(err))
a.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
}
}()
} else {
go func() {
defer a.wg.Done()

a.logger.Debug("Starting Serve", zap.String("address", a.addr))
a.telemetrySettings.Logger.Debug("Starting Serve", zap.String("address", a.addr))

err := a.server.Serve(l)

a.logger.Debug("Serve done")
a.telemetrySettings.Logger.Debug("Serve done")

if err != http.ErrServerClosed {
a.logger.Error("Serve failed", zap.Error(err))
host.ReportFatalError(err)
a.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
}
}()
}
Expand All @@ -261,13 +260,13 @@ func (a *alertsReceiver) startListening(ctx context.Context, host component.Host
func (a *alertsReceiver) handleRequest(rw http.ResponseWriter, req *http.Request) {
if req.ContentLength < 0 {
rw.WriteHeader(http.StatusLengthRequired)
a.logger.Debug("Got request with no Content-Length specified", zap.String("remote", req.RemoteAddr))
a.telemetrySettings.Logger.Debug("Got request with no Content-Length specified", zap.String("remote", req.RemoteAddr))
return
}

if req.ContentLength > maxContentLength {
rw.WriteHeader(http.StatusRequestEntityTooLarge)
a.logger.Debug("Got request with large Content-Length specified",
a.telemetrySettings.Logger.Debug("Got request with large Content-Length specified",
zap.String("remote", req.RemoteAddr),
zap.Int64("content-length", req.ContentLength),
zap.Int64("max-content-length", maxContentLength))
Expand All @@ -277,34 +276,34 @@ func (a *alertsReceiver) handleRequest(rw http.ResponseWriter, req *http.Request
payloadSigHeader := req.Header.Get(signatureHeaderName)
if payloadSigHeader == "" {
rw.WriteHeader(http.StatusBadRequest)
a.logger.Debug("Got payload with no HMAC signature, dropping...")
a.telemetrySettings.Logger.Debug("Got payload with no HMAC signature, dropping...")
return
}

payload := make([]byte, req.ContentLength)
_, err := io.ReadFull(req.Body, payload)
if err != nil {
rw.WriteHeader(http.StatusBadRequest)
a.logger.Debug("Failed to read alerts payload", zap.Error(err), zap.String("remote", req.RemoteAddr))
a.telemetrySettings.Logger.Debug("Failed to read alerts payload", zap.Error(err), zap.String("remote", req.RemoteAddr))
return
}

if err = verifyHMACSignature(a.secret, payload, payloadSigHeader); err != nil {
rw.WriteHeader(http.StatusBadRequest)
a.logger.Debug("Got payload with invalid HMAC signature, dropping...", zap.Error(err), zap.String("remote", req.RemoteAddr))
a.telemetrySettings.Logger.Debug("Got payload with invalid HMAC signature, dropping...", zap.Error(err), zap.String("remote", req.RemoteAddr))
return
}

logs, err := payloadToLogs(time.Now(), payload)
if err != nil {
rw.WriteHeader(http.StatusBadRequest)
a.logger.Error("Failed to convert log payload to log record", zap.Error(err))
a.telemetrySettings.Logger.Error("Failed to convert log payload to log record", zap.Error(err))
return
}

if err := a.consumer.ConsumeLogs(req.Context(), logs); err != nil {
rw.WriteHeader(http.StatusInternalServerError)
a.logger.Error("Failed to consumer alert as log", zap.Error(err))
a.telemetrySettings.Logger.Error("Failed to consumer alert as log", zap.Error(err))
return
}

Expand All @@ -319,19 +318,19 @@ func (a *alertsReceiver) Shutdown(ctx context.Context) error {
}

func (a *alertsReceiver) shutdownListener(ctx context.Context) error {
a.logger.Debug("Shutting down server")
a.telemetrySettings.Logger.Debug("Shutting down server")
err := a.server.Shutdown(ctx)
if err != nil {
return err
}

a.logger.Debug("Waiting for shutdown to complete.")
a.telemetrySettings.Logger.Debug("Waiting for shutdown to complete.")
a.wg.Wait()
return nil
}

func (a *alertsReceiver) shutdownPoller(ctx context.Context) error {
a.logger.Debug("Shutting down client")
a.telemetrySettings.Logger.Debug("Shutting down client")
close(a.doneChan)
a.wg.Wait()
return a.writeCheckpoint(ctx)
Expand All @@ -356,7 +355,7 @@ func (a *alertsReceiver) convertAlerts(now pcommon.Timestamp, alerts []mongodbat

ts, err := time.Parse(time.RFC3339, alert.Updated)
if err != nil {
a.logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated))
a.telemetrySettings.Logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated))
continue
}

Expand All @@ -367,7 +366,7 @@ func (a *alertsReceiver) convertAlerts(now pcommon.Timestamp, alerts []mongodbat
// of unrecognized alerts to process.
bodyBytes, err := json.Marshal(alert)
if err != nil {
a.logger.Warn("unable to marshal alert into a body string")
a.telemetrySettings.Logger.Warn("unable to marshal alert into a body string")
continue
}

Expand Down Expand Up @@ -537,7 +536,7 @@ func (a *alertsReceiver) syncPersistence(ctx context.Context) error {

func (a *alertsReceiver) writeCheckpoint(ctx context.Context) error {
if a.storageClient == nil {
a.logger.Error("unable to write checkpoint since no storage client was found")
a.telemetrySettings.Logger.Error("unable to write checkpoint since no storage client was found")
return errors.New("missing non-nil storage client")
}
marshalBytes, err := json.Marshal(&a.record)
Expand All @@ -560,7 +559,7 @@ func (a *alertsReceiver) applyFilters(pConf *ProjectConfig, alerts []mongodbatla
for _, alert := range alerts {
updatedTime, err := time.Parse(time.RFC3339, alert.Updated)
if err != nil {
a.logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated))
a.telemetrySettings.Logger.Warn("unable to interpret updated time for alert, expecting a RFC3339 timestamp", zap.String("timestamp", alert.Updated))
continue
}

Expand Down