Skip to content

Commit

Permalink
Convert pgbk and pgevents to slog
Browse files Browse the repository at this point in the history
  • Loading branch information
espadolini committed May 16, 2024
1 parent 2840caa commit 0a39298
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 68 deletions.
7 changes: 4 additions & 3 deletions lib/backend/pgbk/atomicwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona
}

if tries > 2 {
// if we retried more than once, txn experienced non-trivial conflict and we should warn about it. Infrequent warnings of this kind
// are nothing to be concerned about, but high volumes may indicate that an automatic process is creating excessive conflicts.
b.log.Warnf("AtomicWrite retried %d times due to postgres transaction contention. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", tries)
b.log.WarnContext(ctx,
"AtomicWrite was retried several times due to transaction contention. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.",
"tries", tries,
)
}

if err != nil {
Expand Down
38 changes: 19 additions & 19 deletions lib/backend/pgbk/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/sirupsen/logrus"

"github.com/gravitational/teleport/lib/backend"
pgcommon "github.com/gravitational/teleport/lib/backend/pgbk/common"
"github.com/gravitational/teleport/lib/defaults"
)

func (b *Backend) backgroundExpiry(ctx context.Context) {
defer b.log.Info("Exited expiry loop.")
defer b.log.InfoContext(ctx, "Exited expiry loop.")

for ctx.Err() == nil {
// "DELETE FROM kv WHERE expires <= now()" but more complicated: logical
Expand Down Expand Up @@ -71,15 +71,15 @@ func (b *Backend) backgroundExpiry(ctx context.Context) {
return tag.RowsAffected(), nil
})
if err != nil {
b.log.WithError(err).Error("Failed to delete expired items.")
b.log.ErrorContext(ctx, "Failed to delete expired items.", "error", err)
break
}

if deleted > 0 {
b.log.WithFields(logrus.Fields{
"deleted": deleted,
"elapsed": time.Since(t0).String(),
}).Debug("Deleted expired items.")
b.log.DebugContext(ctx, "Deleted expired items.",
"deleted", deleted,
"elapsed", time.Since(t0),
)
}

if deleted < int64(b.cfg.ExpiryBatchSize) {
Expand All @@ -96,16 +96,16 @@ func (b *Backend) backgroundExpiry(ctx context.Context) {
}

func (b *Backend) backgroundChangeFeed(ctx context.Context) {
defer b.log.Info("Exited change feed loop.")
defer b.log.InfoContext(ctx, "Exited change feed loop.")
defer b.buf.Close()

for ctx.Err() == nil {
b.log.Info("Starting change feed stream.")
b.log.InfoContext(ctx, "Starting change feed stream.")
err := b.runChangeFeed(ctx)
if ctx.Err() != nil {
break
}
b.log.WithError(err).Error("Change feed stream lost.")
b.log.ErrorContext(ctx, "Change feed stream lost.", "error", err)

select {
case <-ctx.Done():
Expand Down Expand Up @@ -135,7 +135,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
closeCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
if err := conn.Close(closeCtx); err != nil && closeCtx.Err() != nil {
b.log.WithError(err).Warn("Error closing change feed connection.")
b.log.WarnContext(ctx, "Error closing change feed connection.", "error", err)
}
}()
if ac := b.feedConfig.AfterConnect; ac != nil {
Expand Down Expand Up @@ -164,7 +164,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
// permission issues, which would delete the temporary slot (it's deleted on
// any error), so we have to do it before that
if _, err := conn.Exec(ctx, "SET log_min_messages TO fatal", pgx.QueryExecModeExec); err != nil {
b.log.WithError(err).Debug("Failed to silence log messages for change feed session.")
b.log.DebugContext(ctx, "Failed to silence log messages for change feed session.", "error", err)
}

// this can be useful on Azure if we have azure_pg_admin permissions but not
Expand All @@ -179,7 +179,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{connConfig.User}.Sanitize()),
pgx.QueryExecModeExec,
); err != nil {
b.log.WithError(err).Debug("Failed to enable replication for the current user.")
b.log.DebugContext(ctx, "Failed to enable replication for the current user.", "error", err)
}
}

Expand All @@ -188,7 +188,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
// https://github.com/postgres/postgres/blob/b0ec61c9c27fb932ae6524f92a18e0d1fadbc144/src/backend/replication/slot.c#L193-L194
slotName := fmt.Sprintf("teleport_%x", [16]byte(uuid.New()))

b.log.WithField("slot_name", slotName).Info("Setting up change feed.")
b.log.InfoContext(ctx, "Setting up change feed.", "slot_name", slotName)

// be noisy about pg_create_logical_replication_slot taking too long, since
// hanging here leaves the backend non-functional
Expand All @@ -202,7 +202,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
}
cancel()

b.log.WithField("slot_name", slotName).Info("Change feed started.")
b.log.InfoContext(ctx, "Change feed started.", "slot_name", slotName)
b.buf.SetInit()
defer b.buf.Reset()

Expand Down Expand Up @@ -260,10 +260,10 @@ func (b *Backend) pollChangeFeed(ctx context.Context, conn *pgx.Conn, addTables,

messages := tag.RowsAffected()
if messages > 0 {
b.log.WithFields(logrus.Fields{
"messages": messages,
"elapsed": time.Since(t0).String(),
}).Debug("Fetched change feed events.")
b.log.LogAttrs(ctx, slog.LevelDebug, "Fetched change feed events.",
slog.Int64("messages", messages),
slog.Duration("elapsed", time.Since(t0)),
)
}

return messages, nil
Expand Down
2 changes: 1 addition & 1 deletion lib/backend/pgbk/common/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func AzureBeforeConnect(ctx context.Context, logger *slog.Logger) (func(ctx cont
return trace.Wrap(err, "obtaining Azure authentication token")
}

logger.With("ttl", time.Until(token.ExpiresOn).String()).DebugContext(ctx, "Acquired Azure access token.")
logger.DebugContext(ctx, "Acquired Azure access token.", "ttl", time.Until(token.ExpiresOn))
config.Password = token.Token

return nil
Expand Down
51 changes: 29 additions & 22 deletions lib/backend/pgbk/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"time"

"github.com/gravitational/trace"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/sirupsen/logrus"

"github.com/gravitational/teleport/api/utils/retryutils"
)
Expand Down Expand Up @@ -63,10 +63,10 @@ func ConnectPostgres(ctx context.Context, poolConfig *pgxpool.Config) (*pgx.Conn

// TryEnsureDatabase will connect to the "postgres" database and attempt to
// create the database named in the pool's configuration.
func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logrus.FieldLogger) {
func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log *slog.Logger) {
pgConn, err := ConnectPostgres(ctx, poolConfig)
if err != nil {
log.WithError(err).Warn("Failed to connect to the \"postgres\" database.")
log.WarnContext(ctx, "Failed to connect to the \"postgres\" database.", "error", err)
return
}

Expand All @@ -81,13 +81,13 @@ func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logr
// will fail immediately if we can't connect, anyway, so we can log
// permission errors at debug level here.
if IsCode(err, pgerrcode.InsufficientPrivilege) {
log.WithError(err).Debug("Error creating database due to insufficient privileges.")
log.DebugContext(ctx, "Error creating database due to insufficient privileges.", "error", err)
} else {
log.WithError(err).Warn("Error creating database.")
log.WarnContext(ctx, "Error creating database.", "error", err)
}
}
if err := pgConn.Close(ctx); err != nil {
log.WithError(err).Warn("Error closing connection to the \"postgres\" database.")
log.WarnContext(ctx, "Error closing connection to the \"postgres\" database.", "error", err)
}
}

Expand All @@ -97,7 +97,7 @@ func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logr
// any data has been sent. It will retry unique constraint violation and
// exclusion constraint violations, so the closure should not rely on those for
// normal behavior.
func Retry[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, error)) (T, error) {
func Retry[T any](ctx context.Context, log *slog.Logger, f func() (T, error)) (T, error) {
const idempotent = false
v, err := retry(ctx, log, idempotent, f)
return v, trace.Wrap(err)
Expand All @@ -108,13 +108,13 @@ func Retry[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, erro
// assumes that f is idempotent, so it will retry even in ambiguous situations.
// It will retry unique constraint violation and exclusion constraint
// violations, so the closure should not rely on those for normal behavior.
func RetryIdempotent[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, error)) (T, error) {
func RetryIdempotent[T any](ctx context.Context, log *slog.Logger, f func() (T, error)) (T, error) {
const idempotent = true
v, err := retry(ctx, log, idempotent, f)
return v, trace.Wrap(err)
}

func retry[T any](ctx context.Context, log logrus.FieldLogger, isIdempotent bool, f func() (T, error)) (T, error) {
func retry[T any](ctx context.Context, log *slog.Logger, isIdempotent bool, f func() (T, error)) (T, error) {
var v T
var err error
v, err = f()
Expand Down Expand Up @@ -143,18 +143,22 @@ func retry[T any](ctx context.Context, log logrus.FieldLogger, isIdempotent bool
_ = errors.As(err, &pgErr)

if pgErr != nil && isSerializationErrorCode(pgErr.Code) {
log.WithError(err).
WithField("attempt", i).
Debug("Operation failed due to conflicts, retrying quickly.")
log.LogAttrs(ctx, slog.LevelDebug,
"Operation failed due to conflicts, retrying quickly.",
slog.Int("attempt", i),
slog.Any("error", err),
)
retry.Reset()
// the very first attempt gets instant retry on serialization failure
if i > 1 {
retry.Inc()
}
} else if (isIdempotent && pgErr == nil) || pgconn.SafeToRetry(err) {
log.WithError(err).
WithField("attempt", i).
Debug("Operation failed, retrying.")
log.LogAttrs(ctx, slog.LevelDebug,
"Operation failed, retrying.",
slog.Int("attempt", i),
slog.Any("error", err),
)
retry.Inc()
} else {
// we either know we shouldn't retry (on a database error), or we
Expand Down Expand Up @@ -207,7 +211,7 @@ func isSerializationErrorCode(code string) bool {
// [pgx.BeginTxFunc].
func RetryTx(
ctx context.Context,
log logrus.FieldLogger,
log *slog.Logger,
db interface {
BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error)
},
Expand All @@ -233,7 +237,7 @@ func IsCode(err error, code string) bool {
// the name of a table used to hold schema version numbers.
func SetupAndMigrate(
ctx context.Context,
log logrus.FieldLogger,
log *slog.Logger,
db interface {
BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error)
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
Expand All @@ -259,7 +263,10 @@ func SetupAndMigrate(
}); err != nil {
// the very first SELECT in the next transaction will fail, we don't
// need anything higher than debug here
log.WithError(err).Debugf("Failed to confirm the existence of the %v table.", tableName)
log.DebugContext(ctx, "Failed to confirm the existence of the configured table.",
"table", tableName,
"error", err,
)
}

const idempotent = true
Expand Down Expand Up @@ -307,10 +314,10 @@ func SetupAndMigrate(
}

if int(version) != len(schemas) {
log.WithFields(logrus.Fields{
"previous_version": version,
"current_version": len(schemas),
}).Info("Migrated database schema.")
log.InfoContext(ctx, "Migrated database schema.",
"previous_version", version,
"current_version", len(schemas),
)
}

return nil
Expand Down
15 changes: 7 additions & 8 deletions lib/backend/pgbk/pgbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/jackc/pgx/v5/pgtype/zeronull"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -150,22 +149,22 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
return nil, trace.Wrap(err)
}

log := logrus.WithField(teleport.ComponentKey, componentName)
logger := slog.Default().With(teleport.ComponentKey, componentName)
log := slog.With(teleport.ComponentKey, componentName)

if err := cfg.AuthMode.ConfigurePoolConfigs(ctx, logger, poolConfig, feedConfig); err != nil {
if err := cfg.AuthMode.ConfigurePoolConfigs(ctx, log, poolConfig, feedConfig); err != nil {
return nil, trace.Wrap(err)
}

const defaultTxIsoParamName = "default_transaction_isolation"
if defaultTxIso := poolConfig.ConnConfig.RuntimeParams[defaultTxIsoParamName]; defaultTxIso != "" {
log.WithField(defaultTxIsoParamName, defaultTxIso).
Error("The " + defaultTxIsoParamName + " parameter was overridden in the connection string; proceeding with an unsupported configuration.")
const message = "The " + defaultTxIsoParamName + " parameter was overridden in the connection string; proceeding with an unsupported configuration."
log.ErrorContext(ctx, message,
defaultTxIsoParamName, defaultTxIso)
} else {
poolConfig.ConnConfig.RuntimeParams[defaultTxIsoParamName] = "serializable"
}

log.Info("Setting up backend.")
log.InfoContext(ctx, "Setting up backend.")

pgcommon.TryEnsureDatabase(ctx, poolConfig, log)

Expand Down Expand Up @@ -212,7 +211,7 @@ type Backend struct {
cfg Config
feedConfig *pgxpool.Config

log logrus.FieldLogger
log *slog.Logger
pool *pgxpool.Pool
buf *backend.CircularBuffer

Expand Down
Loading

0 comments on commit 0a39298

Please sign in to comment.