Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert pgbk and pgevents to log/slog #41636

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions lib/backend/pgbk/atomicwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona
return trace.Wrap(row.Scan(&success))
}

var tries int
var attempts int
err = pgcommon.RetryTx(ctx, b.log, b.pool, pgx.TxOptions{}, false, func(tx pgx.Tx) error {
tries++
attempts++

var condBatch, actBatch pgx.Batch
for _, bi := range condBatchItems {
Expand All @@ -130,14 +130,15 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona
return nil
})

if tries > 1 {
backend.AtomicWriteContention.WithLabelValues(b.GetName()).Add(float64(tries - 1))
if attempts > 1 {
backend.AtomicWriteContention.WithLabelValues(b.GetName()).Add(float64(attempts - 1))
}

if tries > 2 {
// if we retried more than once, txn experienced non-trivial conflict and we should warn about it. Infrequent warnings of this kind
// are nothing to be concerned about, but high volumes may indicate that an automatic process is creating excessive conflicts.
b.log.Warnf("AtomicWrite retried %d times due to postgres transaction contention. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", tries)
if attempts > 2 {
b.log.WarnContext(ctx,
"AtomicWrite was retried several times due to transaction contention. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.",
"attempts", attempts,
)
}

if err != nil {
Expand Down
40 changes: 20 additions & 20 deletions lib/backend/pgbk/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/sirupsen/logrus"

"github.com/gravitational/teleport/lib/backend"
pgcommon "github.com/gravitational/teleport/lib/backend/pgbk/common"
"github.com/gravitational/teleport/lib/defaults"
)

func (b *Backend) backgroundExpiry(ctx context.Context) {
defer b.log.Info("Exited expiry loop.")
defer b.log.InfoContext(ctx, "Exited expiry loop.")

for ctx.Err() == nil {
// "DELETE FROM kv WHERE expires <= now()" but more complicated: logical
Expand Down Expand Up @@ -71,15 +71,15 @@ func (b *Backend) backgroundExpiry(ctx context.Context) {
return tag.RowsAffected(), nil
})
if err != nil {
b.log.WithError(err).Error("Failed to delete expired items.")
b.log.ErrorContext(ctx, "Failed to delete expired items.", "error", err)
break
}

if deleted > 0 {
b.log.WithFields(logrus.Fields{
"deleted": deleted,
"elapsed": time.Since(t0).String(),
}).Debug("Deleted expired items.")
b.log.DebugContext(ctx, "Deleted expired items.",
"deleted", deleted,
"elapsed", time.Since(t0),
)
}

if deleted < int64(b.cfg.ExpiryBatchSize) {
Expand All @@ -96,16 +96,16 @@ func (b *Backend) backgroundExpiry(ctx context.Context) {
}

func (b *Backend) backgroundChangeFeed(ctx context.Context) {
defer b.log.Info("Exited change feed loop.")
defer b.log.InfoContext(ctx, "Exited change feed loop.")
defer b.buf.Close()

for ctx.Err() == nil {
b.log.Info("Starting change feed stream.")
b.log.InfoContext(ctx, "Starting change feed stream.")
err := b.runChangeFeed(ctx)
if ctx.Err() != nil {
break
}
b.log.WithError(err).Error("Change feed stream lost.")
b.log.ErrorContext(ctx, "Change feed stream lost.", "error", err)

select {
case <-ctx.Done():
Expand Down Expand Up @@ -135,7 +135,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
closeCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
if err := conn.Close(closeCtx); err != nil && closeCtx.Err() != nil {
b.log.WithError(err).Warn("Error closing change feed connection.")
b.log.WarnContext(ctx, "Error closing change feed connection.", "error", err)
}
}()
if ac := b.feedConfig.AfterConnect; ac != nil {
Expand Down Expand Up @@ -164,7 +164,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
// permission issues, which would delete the temporary slot (it's deleted on
// any error), so we have to do it before that
if _, err := conn.Exec(ctx, "SET log_min_messages TO fatal", pgx.QueryExecModeExec); err != nil {
b.log.WithError(err).Debug("Failed to silence log messages for change feed session.")
b.log.DebugContext(ctx, "Failed to silence log messages for change feed session.", "error", err)
}

// this can be useful on Azure if we have azure_pg_admin permissions but not
Expand All @@ -174,12 +174,12 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
//
// HACK(espadolini): ALTER ROLE CURRENT_USER crashes Postgres on Azure, so
// we have to use an explicit username
if b.cfg.AuthMode == AzureADAuth && connConfig.User != "" {
if b.cfg.AuthMode == pgcommon.AzureADAuth && connConfig.User != "" {
if _, err := conn.Exec(ctx,
fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{connConfig.User}.Sanitize()),
pgx.QueryExecModeExec,
); err != nil {
b.log.WithError(err).Debug("Failed to enable replication for the current user.")
b.log.DebugContext(ctx, "Failed to enable replication for the current user.", "error", err)
}
}

Expand All @@ -188,7 +188,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
// https://github.com/postgres/postgres/blob/b0ec61c9c27fb932ae6524f92a18e0d1fadbc144/src/backend/replication/slot.c#L193-L194
slotName := fmt.Sprintf("teleport_%x", [16]byte(uuid.New()))

b.log.WithField("slot_name", slotName).Info("Setting up change feed.")
b.log.InfoContext(ctx, "Setting up change feed.", "slot_name", slotName)

// be noisy about pg_create_logical_replication_slot taking too long, since
// hanging here leaves the backend non-functional
Expand All @@ -202,7 +202,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
}
cancel()

b.log.WithField("slot_name", slotName).Info("Change feed started.")
b.log.InfoContext(ctx, "Change feed started.", "slot_name", slotName)
b.buf.SetInit()
defer b.buf.Reset()

Expand Down Expand Up @@ -260,10 +260,10 @@ func (b *Backend) pollChangeFeed(ctx context.Context, conn *pgx.Conn, addTables,

messages := tag.RowsAffected()
if messages > 0 {
b.log.WithFields(logrus.Fields{
"messages": messages,
"elapsed": time.Since(t0).String(),
}).Debug("Fetched change feed events.")
b.log.LogAttrs(ctx, slog.LevelDebug, "Fetched change feed events.",
slog.Int64("messages", messages),
slog.Duration("elapsed", time.Since(t0)),
)
}

return messages, nil
Expand Down
98 changes: 98 additions & 0 deletions lib/backend/pgbk/common/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Teleport
* Copyright (C) 2024 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package pgcommon

import (
"context"
"fmt"
"log/slog"
"slices"
"strings"

"github.com/gravitational/trace"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

// AuthMode determines if we should use some environment-specific authentication
// mechanism or credentials.
type AuthMode string

const (
// StaticAuth uses the static credentials as defined in the connection
// string.
StaticAuth AuthMode = ""
// AzureADAuth gets a connection token from Azure and uses it as the
// password when connecting.
AzureADAuth AuthMode = "azure"
// GCPSQLIAMAuth fetches an access token and uses it as password when
// connecting to GCP SQL PostgreSQL.
GCPSQLIAMAuth AuthMode = "gcp-sql"
// GCPAlloyDBIAMAuth fetches an access token and uses it as password when
// connecting to GCP AlloyDB (PostgreSQL-compatible).
GCPAlloyDBIAMAuth AuthMode = "gcp-alloydb"
)

var supportedAuthModes = []AuthMode{
StaticAuth,
AzureADAuth,
GCPSQLIAMAuth,
GCPAlloyDBIAMAuth,
}

// Check returns an error if the AuthMode is invalid.
func (a AuthMode) Check() error {
if slices.Contains(supportedAuthModes, a) {
return nil
}

quotedModes := make([]string, 0, len(supportedAuthModes))
for _, mode := range supportedAuthModes {
quotedModes = append(quotedModes, fmt.Sprintf("%q", mode))
}

return trace.BadParameter("invalid authentication mode %q, should be one of %s", a, strings.Join(quotedModes, ", "))
}

// ConfigurePoolConfigs configures pgxpool.Config based on the authMode.
func (a AuthMode) ConfigurePoolConfigs(ctx context.Context, logger *slog.Logger, configs ...*pgxpool.Config) error {
if bc, err := a.getBeforeConnect(ctx, logger); err != nil {
return trace.Wrap(err)
} else if bc != nil {
for _, config := range configs {
config.BeforeConnect = bc
}
}
return nil
}

func (a AuthMode) getBeforeConnect(ctx context.Context, logger *slog.Logger) (func(context.Context, *pgx.ConnConfig) error, error) {
switch a {
case AzureADAuth:
bc, err := AzureBeforeConnect(ctx, logger)
return bc, trace.Wrap(err)
case GCPSQLIAMAuth:
bc, err := GCPSQLBeforeConnect(ctx, logger)
return bc, trace.Wrap(err)
case GCPAlloyDBIAMAuth:
bc, err := GCPAlloyDBBeforeConnect(ctx, logger)
return bc, trace.Wrap(err)
}
return nil, nil
}
116 changes: 116 additions & 0 deletions lib/backend/pgbk/common/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Teleport
* Copyright (C) 2024 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package pgcommon

import (
"context"
"log/slog"
"os"
"testing"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/lib/utils"
)

func TestMain(m *testing.M) {
utils.InitLoggerForTests()
os.Exit(m.Run())
}

func TestAuthMode(t *testing.T) {
mustSetGoogleApplicationCredentialsEnv(t)
mustSetAzureEnvironmentCredential(t)

verifyBeforeConnectIsSet := func(t *testing.T, config *pgxpool.Config) {
t.Helper()
require.NotNil(t, config.BeforeConnect)
}
verifyNothingIsSet := func(t *testing.T, config *pgxpool.Config) {
t.Helper()
require.NotNil(t, config)
require.Equal(t, pgxpool.Config{}, *config)
}

tests := []struct {
authMode AuthMode
requireCheckError require.ErrorAssertionFunc
verifyPoolConfigAfterConfigure func(*testing.T, *pgxpool.Config)
}{
{
authMode: AuthMode("unknown-mode"),
requireCheckError: require.Error,
},
{
authMode: StaticAuth,
requireCheckError: require.NoError,
verifyPoolConfigAfterConfigure: verifyNothingIsSet,
},
{
authMode: AzureADAuth,
requireCheckError: require.NoError,
verifyPoolConfigAfterConfigure: verifyBeforeConnectIsSet,
},
{
authMode: GCPSQLIAMAuth,
requireCheckError: require.NoError,
verifyPoolConfigAfterConfigure: verifyBeforeConnectIsSet,
},
{
authMode: GCPAlloyDBIAMAuth,
requireCheckError: require.NoError,
verifyPoolConfigAfterConfigure: verifyBeforeConnectIsSet,
},
}

ctx := context.Background()
logger := slog.Default()
for _, tc := range tests {
t.Run(string(tc.authMode), func(t *testing.T) {
err := tc.authMode.Check()
if err != nil {
// Just checking out how the error message looks like.
t.Log(err)
}
tc.requireCheckError(t, err)

if tc.verifyPoolConfigAfterConfigure != nil {
configs := []*pgxpool.Config{
&pgxpool.Config{},
&pgxpool.Config{},
}

err := tc.authMode.ConfigurePoolConfigs(ctx, logger, configs...)
require.NoError(t, err)

for _, config := range configs {
tc.verifyPoolConfigAfterConfigure(t, config)
}
}
})
}
}

func mustSetAzureEnvironmentCredential(t *testing.T) {
t.Helper()
t.Setenv("AZURE_TENANT_ID", "teleport-test-tenant-id")
t.Setenv("AZURE_CLIENT_ID", "teleport-test-client-id")
t.Setenv("AZURE_CLIENT_SECRET", "teleport-test-client-secret")
}
Loading
Loading