From 0a39298388f03184e3ea2e7f0b839c730dea75b5 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Thu, 16 May 2024 12:31:14 +0200 Subject: [PATCH] Convert pgbk and pgevents to slog --- lib/backend/pgbk/atomicwrite.go | 7 ++-- lib/backend/pgbk/background.go | 38 ++++++++++----------- lib/backend/pgbk/common/azure.go | 2 +- lib/backend/pgbk/common/utils.go | 51 ++++++++++++++++------------ lib/backend/pgbk/pgbk.go | 15 ++++---- lib/events/pgevents/pgevents.go | 23 +++++-------- lib/events/pgevents/pgevents_test.go | 1 - 7 files changed, 69 insertions(+), 68 deletions(-) diff --git a/lib/backend/pgbk/atomicwrite.go b/lib/backend/pgbk/atomicwrite.go index 4e263c70ab07c..c2e2ce7e8b8fb 100644 --- a/lib/backend/pgbk/atomicwrite.go +++ b/lib/backend/pgbk/atomicwrite.go @@ -135,9 +135,10 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona } if tries > 2 { - // if we retried more than once, txn experienced non-trivial conflict and we should warn about it. Infrequent warnings of this kind - // are nothing to be concerned about, but high volumes may indicate that an automatic process is creating excessive conflicts. - b.log.Warnf("AtomicWrite retried %d times due to postgres transaction contention. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", tries) + b.log.WarnContext(ctx, + "AtomicWrite was retried several times due to transaction contention. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", + "tries", tries, + ) } if err != nil { diff --git a/lib/backend/pgbk/background.go b/lib/backend/pgbk/background.go index 13a354f3860a9..5a0daebe564d9 100644 --- a/lib/backend/pgbk/background.go +++ b/lib/backend/pgbk/background.go @@ -22,13 +22,13 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "time" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport/lib/backend" pgcommon "github.com/gravitational/teleport/lib/backend/pgbk/common" @@ -36,7 +36,7 @@ import ( ) func (b *Backend) backgroundExpiry(ctx context.Context) { - defer b.log.Info("Exited expiry loop.") + defer b.log.InfoContext(ctx, "Exited expiry loop.") for ctx.Err() == nil { // "DELETE FROM kv WHERE expires <= now()" but more complicated: logical @@ -71,15 +71,15 @@ func (b *Backend) backgroundExpiry(ctx context.Context) { return tag.RowsAffected(), nil }) if err != nil { - b.log.WithError(err).Error("Failed to delete expired items.") + b.log.ErrorContext(ctx, "Failed to delete expired items.", "error", err) break } if deleted > 0 { - b.log.WithFields(logrus.Fields{ - "deleted": deleted, - "elapsed": time.Since(t0).String(), - }).Debug("Deleted expired items.") + b.log.DebugContext(ctx, "Deleted expired items.", + "deleted", deleted, + "elapsed", time.Since(t0), + ) } if deleted < int64(b.cfg.ExpiryBatchSize) { @@ -96,16 +96,16 @@ func (b *Backend) backgroundExpiry(ctx context.Context) { } func (b *Backend) backgroundChangeFeed(ctx context.Context) { - defer b.log.Info("Exited change feed loop.") + defer b.log.InfoContext(ctx, "Exited change feed loop.") defer b.buf.Close() for ctx.Err() == nil { - b.log.Info("Starting change feed stream.") + b.log.InfoContext(ctx, "Starting change feed stream.") err := b.runChangeFeed(ctx) if ctx.Err() != nil { break } - b.log.WithError(err).Error("Change feed stream lost.") + b.log.ErrorContext(ctx, "Change feed stream lost.", "error", err) select { case <-ctx.Done(): @@ -135,7 +135,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { closeCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() if err := conn.Close(closeCtx); err != nil && closeCtx.Err() != nil { - b.log.WithError(err).Warn("Error closing change feed connection.") + b.log.WarnContext(ctx, "Error closing change feed connection.", "error", err) } }() if ac := b.feedConfig.AfterConnect; ac != nil { @@ -164,7 +164,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { // permission issues, which would delete the temporary slot (it's deleted on // any error), so we have to do it before that if _, err := conn.Exec(ctx, "SET log_min_messages TO fatal", pgx.QueryExecModeExec); err != nil { - b.log.WithError(err).Debug("Failed to silence log messages for change feed session.") + b.log.DebugContext(ctx, "Failed to silence log messages for change feed session.", "error", err) } // this can be useful on Azure if we have azure_pg_admin permissions but not @@ -179,7 +179,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{connConfig.User}.Sanitize()), pgx.QueryExecModeExec, ); err != nil { - b.log.WithError(err).Debug("Failed to enable replication for the current user.") + b.log.DebugContext(ctx, "Failed to enable replication for the current user.", "error", err) } } @@ -188,7 +188,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { // https://github.com/postgres/postgres/blob/b0ec61c9c27fb932ae6524f92a18e0d1fadbc144/src/backend/replication/slot.c#L193-L194 slotName := fmt.Sprintf("teleport_%x", [16]byte(uuid.New())) - b.log.WithField("slot_name", slotName).Info("Setting up change feed.") + b.log.InfoContext(ctx, "Setting up change feed.", "slot_name", slotName) // be noisy about pg_create_logical_replication_slot taking too long, since // hanging here leaves the backend non-functional @@ -202,7 +202,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { } cancel() - b.log.WithField("slot_name", slotName).Info("Change feed started.") + b.log.InfoContext(ctx, "Change feed started.", "slot_name", slotName) b.buf.SetInit() defer b.buf.Reset() @@ -260,10 +260,10 @@ func (b *Backend) pollChangeFeed(ctx context.Context, conn *pgx.Conn, addTables, messages := tag.RowsAffected() if messages > 0 { - b.log.WithFields(logrus.Fields{ - "messages": messages, - "elapsed": time.Since(t0).String(), - }).Debug("Fetched change feed events.") + b.log.LogAttrs(ctx, slog.LevelDebug, "Fetched change feed events.", + slog.Int64("messages", messages), + slog.Duration("elapsed", time.Since(t0)), + ) } return messages, nil diff --git a/lib/backend/pgbk/common/azure.go b/lib/backend/pgbk/common/azure.go index a1a4b1a4eeff5..6676820bae671 100644 --- a/lib/backend/pgbk/common/azure.go +++ b/lib/backend/pgbk/common/azure.go @@ -48,7 +48,7 @@ func AzureBeforeConnect(ctx context.Context, logger *slog.Logger) (func(ctx cont return trace.Wrap(err, "obtaining Azure authentication token") } - logger.With("ttl", time.Until(token.ExpiresOn).String()).DebugContext(ctx, "Acquired Azure access token.") + logger.DebugContext(ctx, "Acquired Azure access token.", "ttl", time.Until(token.ExpiresOn)) config.Password = token.Token return nil diff --git a/lib/backend/pgbk/common/utils.go b/lib/backend/pgbk/common/utils.go index 314b5a1b76a31..44d22fb1f02f5 100644 --- a/lib/backend/pgbk/common/utils.go +++ b/lib/backend/pgbk/common/utils.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "log/slog" "time" "github.com/gravitational/trace" @@ -29,7 +30,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/utils/retryutils" ) @@ -63,10 +63,10 @@ func ConnectPostgres(ctx context.Context, poolConfig *pgxpool.Config) (*pgx.Conn // TryEnsureDatabase will connect to the "postgres" database and attempt to // create the database named in the pool's configuration. -func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logrus.FieldLogger) { +func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log *slog.Logger) { pgConn, err := ConnectPostgres(ctx, poolConfig) if err != nil { - log.WithError(err).Warn("Failed to connect to the \"postgres\" database.") + log.WarnContext(ctx, "Failed to connect to the \"postgres\" database.", "error", err) return } @@ -81,13 +81,13 @@ func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logr // will fail immediately if we can't connect, anyway, so we can log // permission errors at debug level here. if IsCode(err, pgerrcode.InsufficientPrivilege) { - log.WithError(err).Debug("Error creating database due to insufficient privileges.") + log.DebugContext(ctx, "Error creating database due to insufficient privileges.", "error", err) } else { - log.WithError(err).Warn("Error creating database.") + log.WarnContext(ctx, "Error creating database.", "error", err) } } if err := pgConn.Close(ctx); err != nil { - log.WithError(err).Warn("Error closing connection to the \"postgres\" database.") + log.WarnContext(ctx, "Error closing connection to the \"postgres\" database.", "error", err) } } @@ -97,7 +97,7 @@ func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logr // any data has been sent. It will retry unique constraint violation and // exclusion constraint violations, so the closure should not rely on those for // normal behavior. -func Retry[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, error)) (T, error) { +func Retry[T any](ctx context.Context, log *slog.Logger, f func() (T, error)) (T, error) { const idempotent = false v, err := retry(ctx, log, idempotent, f) return v, trace.Wrap(err) @@ -108,13 +108,13 @@ func Retry[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, erro // assumes that f is idempotent, so it will retry even in ambiguous situations. // It will retry unique constraint violation and exclusion constraint // violations, so the closure should not rely on those for normal behavior. -func RetryIdempotent[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, error)) (T, error) { +func RetryIdempotent[T any](ctx context.Context, log *slog.Logger, f func() (T, error)) (T, error) { const idempotent = true v, err := retry(ctx, log, idempotent, f) return v, trace.Wrap(err) } -func retry[T any](ctx context.Context, log logrus.FieldLogger, isIdempotent bool, f func() (T, error)) (T, error) { +func retry[T any](ctx context.Context, log *slog.Logger, isIdempotent bool, f func() (T, error)) (T, error) { var v T var err error v, err = f() @@ -143,18 +143,22 @@ func retry[T any](ctx context.Context, log logrus.FieldLogger, isIdempotent bool _ = errors.As(err, &pgErr) if pgErr != nil && isSerializationErrorCode(pgErr.Code) { - log.WithError(err). - WithField("attempt", i). - Debug("Operation failed due to conflicts, retrying quickly.") + log.LogAttrs(ctx, slog.LevelDebug, + "Operation failed due to conflicts, retrying quickly.", + slog.Int("attempt", i), + slog.Any("error", err), + ) retry.Reset() // the very first attempt gets instant retry on serialization failure if i > 1 { retry.Inc() } } else if (isIdempotent && pgErr == nil) || pgconn.SafeToRetry(err) { - log.WithError(err). - WithField("attempt", i). - Debug("Operation failed, retrying.") + log.LogAttrs(ctx, slog.LevelDebug, + "Operation failed, retrying.", + slog.Int("attempt", i), + slog.Any("error", err), + ) retry.Inc() } else { // we either know we shouldn't retry (on a database error), or we @@ -207,7 +211,7 @@ func isSerializationErrorCode(code string) bool { // [pgx.BeginTxFunc]. func RetryTx( ctx context.Context, - log logrus.FieldLogger, + log *slog.Logger, db interface { BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error) }, @@ -233,7 +237,7 @@ func IsCode(err error, code string) bool { // the name of a table used to hold schema version numbers. func SetupAndMigrate( ctx context.Context, - log logrus.FieldLogger, + log *slog.Logger, db interface { BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error) Exec(context.Context, string, ...any) (pgconn.CommandTag, error) @@ -259,7 +263,10 @@ func SetupAndMigrate( }); err != nil { // the very first SELECT in the next transaction will fail, we don't // need anything higher than debug here - log.WithError(err).Debugf("Failed to confirm the existence of the %v table.", tableName) + log.DebugContext(ctx, "Failed to confirm the existence of the configured table.", + "table", tableName, + "error", err, + ) } const idempotent = true @@ -307,10 +314,10 @@ func SetupAndMigrate( } if int(version) != len(schemas) { - log.WithFields(logrus.Fields{ - "previous_version": version, - "current_version": len(schemas), - }).Info("Migrated database schema.") + log.InfoContext(ctx, "Migrated database schema.", + "previous_version", version, + "current_version", len(schemas), + ) } return nil diff --git a/lib/backend/pgbk/pgbk.go b/lib/backend/pgbk/pgbk.go index ace3c2023d4ac..e8cb20444e167 100644 --- a/lib/backend/pgbk/pgbk.go +++ b/lib/backend/pgbk/pgbk.go @@ -31,7 +31,6 @@ import ( "github.com/jackc/pgx/v5/pgtype/zeronull" "github.com/jackc/pgx/v5/pgxpool" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -150,22 +149,22 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { return nil, trace.Wrap(err) } - log := logrus.WithField(teleport.ComponentKey, componentName) - logger := slog.Default().With(teleport.ComponentKey, componentName) + log := slog.With(teleport.ComponentKey, componentName) - if err := cfg.AuthMode.ConfigurePoolConfigs(ctx, logger, poolConfig, feedConfig); err != nil { + if err := cfg.AuthMode.ConfigurePoolConfigs(ctx, log, poolConfig, feedConfig); err != nil { return nil, trace.Wrap(err) } const defaultTxIsoParamName = "default_transaction_isolation" if defaultTxIso := poolConfig.ConnConfig.RuntimeParams[defaultTxIsoParamName]; defaultTxIso != "" { - log.WithField(defaultTxIsoParamName, defaultTxIso). - Error("The " + defaultTxIsoParamName + " parameter was overridden in the connection string; proceeding with an unsupported configuration.") + const message = "The " + defaultTxIsoParamName + " parameter was overridden in the connection string; proceeding with an unsupported configuration." + log.ErrorContext(ctx, message, + defaultTxIsoParamName, defaultTxIso) } else { poolConfig.ConnConfig.RuntimeParams[defaultTxIsoParamName] = "serializable" } - log.Info("Setting up backend.") + log.InfoContext(ctx, "Setting up backend.") pgcommon.TryEnsureDatabase(ctx, poolConfig, log) @@ -212,7 +211,7 @@ type Backend struct { cfg Config feedConfig *pgxpool.Config - log logrus.FieldLogger + log *slog.Logger pool *pgxpool.Pool buf *backend.CircularBuffer diff --git a/lib/events/pgevents/pgevents.go b/lib/events/pgevents/pgevents.go index a97710cbce381..7adcce24e1e25 100644 --- a/lib/events/pgevents/pgevents.go +++ b/lib/events/pgevents/pgevents.go @@ -32,7 +32,6 @@ import ( "github.com/gravitational/trace" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -67,8 +66,7 @@ const ( // Config is the configuration struct to pass to New. type Config struct { - Log logrus.FieldLogger - Logger *slog.Logger + Log *slog.Logger PoolConfig *pgxpool.Config AuthMode pgcommon.AuthMode @@ -155,10 +153,7 @@ func (c *Config) CheckAndSetDefaults() error { } if c.Log == nil { - c.Log = logrus.WithField(teleport.ComponentKey, componentName) - } - if c.Logger == nil { - c.Logger = slog.Default().With(teleport.ComponentKey, componentName) + c.Log = slog.With(teleport.ComponentKey, componentName) } return nil @@ -171,11 +166,11 @@ func New(ctx context.Context, cfg Config) (*Log, error) { return nil, trace.Wrap(err) } - if err := cfg.AuthMode.ConfigurePoolConfigs(ctx, cfg.Logger, cfg.PoolConfig); err != nil { + if err := cfg.AuthMode.ConfigurePoolConfigs(ctx, cfg.Log, cfg.PoolConfig); err != nil { return nil, trace.Wrap(err) } - cfg.Log.Info("Setting up events backend.") + cfg.Log.InfoContext(ctx, "Setting up events backend.") pgcommon.TryEnsureDatabase(ctx, cfg.PoolConfig, cfg.Log) @@ -201,14 +196,14 @@ func New(ctx context.Context, cfg Config) (*Log, error) { go l.periodicCleanup(periodicCtx, cfg.CleanupInterval, cfg.RetentionPeriod) } - l.log.Info("Started events backend.") + l.log.InfoContext(ctx, "Started events backend.") return l, nil } // Log is an external [events.AuditLogger] backed by a PostgreSQL database. type Log struct { - log logrus.FieldLogger + log *slog.Logger pool *pgxpool.Pool cancel context.CancelFunc @@ -253,7 +248,7 @@ func (l *Log) periodicCleanup(ctx context.Context, cleanupInterval, retentionPer case <-tk.C: } - l.log.Debug("Executing periodic cleanup.") + l.log.DebugContext(ctx, "Executing periodic cleanup.") deleted, err := pgcommon.RetryIdempotent(ctx, l.log, func() (int64, error) { tag, err := l.pool.Exec(ctx, "DELETE FROM events WHERE creation_time < (now() - $1::interval)", @@ -266,9 +261,9 @@ func (l *Log) periodicCleanup(ctx context.Context, cleanupInterval, retentionPer return tag.RowsAffected(), nil }) if err != nil { - l.log.WithError(err).Error("Failed to execute periodic cleanup.") + l.log.ErrorContext(ctx, "Failed to execute periodic cleanup.", "error", err) } else { - l.log.WithField("deleted_rows", deleted).Debug("Executed periodic cleanup.") + l.log.DebugContext(ctx, "Executed periodic cleanup.", "deleted", deleted) } } } diff --git a/lib/events/pgevents/pgevents_test.go b/lib/events/pgevents/pgevents_test.go index bbedd6ca9258e..126b3f8b549c0 100644 --- a/lib/events/pgevents/pgevents_test.go +++ b/lib/events/pgevents/pgevents_test.go @@ -126,7 +126,6 @@ func TestConfig(t *testing.T) { require.NoError(t, actualConfig.CheckAndSetDefaults()) actualConfig.Log = nil - actualConfig.Logger = nil actualConfig.PoolConfig = nil require.Equal(t, expectedConfig, &actualConfig)