Skip to content

Commit

Permalink
Improve HA behavior of database agents in leaf clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
r0mant committed Mar 1, 2022
1 parent 3beb298 commit 0493d28
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 24 deletions.
116 changes: 116 additions & 0 deletions integration/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/srv/db"
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/gravitational/teleport/lib/srv/db/mongodb"
"github.com/gravitational/teleport/lib/srv/db/mysql"
Expand Down Expand Up @@ -387,6 +388,121 @@ func TestDatabaseAccessPostgresSeparateListener(t *testing.T) {
require.NoError(t, err)
}

func init() {
// Override database agents shuffle behavior to ensure they're always
// tried in the same order during tests. Used for HA tests.
db.SetShuffleFunc(db.ShuffleSort)
}

// TestDatabaseAccessHARootCluster verifies that proxy falls back to a healthy
// database agent when multiple agents are serving the same database and one
// of them is down in a root cluster.
func TestDatabaseAccessHARootCluster(t *testing.T) {
pack := setupDatabaseTest(t)

// Insert a database server entry not backed by an actual running agent
// to simulate a scenario when an agent is down but the resource hasn't
// expired from the backend yet.
dbServer, err := types.NewDatabaseServerV3(types.Metadata{
Name: pack.root.postgresService.Name,
}, types.DatabaseServerSpecV3{
Protocol: defaults.ProtocolPostgres,
URI: pack.root.postgresAddr,
// To make sure unhealthy server is always picked in tests first, make
// sure its host ID always compares as "smaller" as the tests sort
// agents.
HostID: "0000",
Hostname: "test",
})
require.NoError(t, err)

_, err = pack.root.cluster.Process.GetAuthServer().UpsertDatabaseServer(
context.Background(), dbServer)
require.NoError(t, err)

// Connect to the database service in root cluster.
client, err := postgres.MakeTestClient(context.Background(), common.TestClientConfig{
AuthClient: pack.root.cluster.GetSiteAPI(pack.root.cluster.Secrets.SiteName),
AuthServer: pack.root.cluster.Process.GetAuthServer(),
Address: net.JoinHostPort(Loopback, pack.root.cluster.GetPortWeb()),
Cluster: pack.root.cluster.Secrets.SiteName,
Username: pack.root.user.GetName(),
RouteToDatabase: tlsca.RouteToDatabase{
ServiceName: pack.root.postgresService.Name,
Protocol: pack.root.postgresService.Protocol,
Username: "postgres",
Database: "test",
},
})
require.NoError(t, err)

// Execute a query.
result, err := client.Exec(context.Background(), "select 1").ReadAll()
require.NoError(t, err)
require.Equal(t, []*pgconn.Result{postgres.TestQueryResponse}, result)
require.Equal(t, uint32(1), pack.root.postgres.QueryCount())
require.Equal(t, uint32(0), pack.leaf.postgres.QueryCount())

// Disconnect.
err = client.Close(context.Background())
require.NoError(t, err)
}

// TestDatabaseAccessHALeafCluster verifies that proxy falls back to a healthy
// database agent when multiple agents are serving the same database and one
// of them is down in a leaf cluster.
func TestDatabaseAccessHALeafCluster(t *testing.T) {
pack := setupDatabaseTest(t)
pack.waitForLeaf(t)

// Insert a database server entry not backed by an actual running agent
// to simulate a scenario when an agent is down but the resource hasn't
// expired from the backend yet.
dbServer, err := types.NewDatabaseServerV3(types.Metadata{
Name: pack.leaf.postgresService.Name,
}, types.DatabaseServerSpecV3{
Protocol: defaults.ProtocolPostgres,
URI: pack.leaf.postgresAddr,
// To make sure unhealthy server is always picked in tests first, make
// sure its host ID always compares as "smaller" as the tests sort
// agents.
HostID: "0000",
Hostname: "test",
})
require.NoError(t, err)

_, err = pack.leaf.cluster.Process.GetAuthServer().UpsertDatabaseServer(
context.Background(), dbServer)
require.NoError(t, err)

// Connect to the database service in leaf cluster via root cluster.
client, err := postgres.MakeTestClient(context.Background(), common.TestClientConfig{
AuthClient: pack.root.cluster.GetSiteAPI(pack.root.cluster.Secrets.SiteName),
AuthServer: pack.root.cluster.Process.GetAuthServer(),
Address: net.JoinHostPort(Loopback, pack.root.cluster.GetPortWeb()), // Connecting via root cluster.
Cluster: pack.leaf.cluster.Secrets.SiteName,
Username: pack.root.user.GetName(),
RouteToDatabase: tlsca.RouteToDatabase{
ServiceName: pack.leaf.postgresService.Name,
Protocol: pack.leaf.postgresService.Protocol,
Username: "postgres",
Database: "test",
},
})
require.NoError(t, err)

// Execute a query.
result, err := client.Exec(context.Background(), "select 1").ReadAll()
require.NoError(t, err)
require.Equal(t, []*pgconn.Result{postgres.TestQueryResponse}, result)
require.Equal(t, uint32(1), pack.leaf.postgres.QueryCount())
require.Equal(t, uint32(0), pack.root.postgres.QueryCount())

// Disconnect.
err = client.Close(context.Background())
require.NoError(t, err)
}

// TestDatabaseAccessMongoSeparateListener tests mongo proxy listener running on separate port.
func TestDatabaseAccessMongoSeparateListener(t *testing.T) {
pack := setupDatabaseTest(t,
Expand Down
15 changes: 15 additions & 0 deletions lib/reversetunnel/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,18 @@ type Server interface {
// Wait waits for server to close all outstanding operations
Wait()
}

const (
// NoApplicationTunnel is the error message returned when application
// reverse tunnel cannot be found.
//
// It usually happens when an app agent has shut down (or crashed) but
// hasn't expired from the backend yet.
NoApplicationTunnel = "could not find reverse tunnel, check that Application Service agent proxying this application is up and running"
// NoDatabaseTunnel is the error message returned when database reverse
// tunnel cannot be found.
//
// It usually happens when a database agent has shut down (or crashed) but
// hasn't expired from the backend yet.
NoDatabaseTunnel = "could not find reverse tunnel, check that Database Service agent proxying this database is up and running"
)
10 changes: 7 additions & 3 deletions lib/reversetunnel/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,13 @@ func (p *transport) getConn(servers []string, r *sshutils.DialReq) (net.Conn, bo
return nil, false, trace.Wrap(err)
}

// Connections to applications should never occur over a direct dial, return right away.
if r.ConnType == types.AppTunnel {
return nil, false, trace.ConnectionProblem(err, "failed to connect to application")
// Connections to applications and databases should never occur over
// a direct dial, return right away.
switch r.ConnType {
case types.AppTunnel:
return nil, false, trace.ConnectionProblem(err, NoApplicationTunnel)
case types.DatabaseTunnel:
return nil, false, trace.ConnectionProblem(err, NoDatabaseTunnel)
}

errTun := err
Expand Down
12 changes: 6 additions & 6 deletions lib/srv/db/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"net"
"os"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1518,6 +1517,12 @@ func (c *testContext) Close() error {
return trace.NewAggregate(errors...)
}

func init() {
// Override database agents shuffle behavior to ensure they're always
// tried in the same order during tests. Used for HA tests.
SetShuffleFunc(ShuffleSort)
}

func setupTestContext(ctx context.Context, t *testing.T, withDatabases ...withDatabaseOption) *testContext {
testCtx := &testContext{
clusterName: "root.example.com",
Expand Down Expand Up @@ -1630,11 +1635,6 @@ func setupTestContext(ctx context.Context, t *testing.T, withDatabases ...withDa
Emitter: testCtx.emitter,
Clock: testCtx.clock,
ServerID: "proxy-server",
Shuffle: func(servers []types.DatabaseServer) []types.DatabaseServer {
// To ensure predictability in tests, sort servers instead of shuffling.
sort.Sort(types.DatabaseServers(servers))
return servers
},
LockWatcher: proxyLockWatcher,
})
require.NoError(t, err)
Expand Down
70 changes: 55 additions & 15 deletions lib/srv/db/proxyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"io"
"math/rand"
"net"
"sort"
"strings"
"sync"
"time"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -85,12 +88,51 @@ type ProxyServerConfig struct {
Clock clockwork.Clock
// ServerID is the ID of the audit log server.
ServerID string
// Shuffle allows to override shuffle logic in tests.
Shuffle func([]types.DatabaseServer) []types.DatabaseServer
// LockWatcher is a lock watcher.
LockWatcher *services.LockWatcher
}

// ShuffleFunc defines a function that shuffles a list of database servers.
type ShuffleFunc func([]types.DatabaseServer) []types.DatabaseServer

// ShuffleRandom is a ShuffleFunc that randomizes the order of database servers.
// Used to provide load balancing behavior when proxying to multiple agents.
func ShuffleRandom(servers []types.DatabaseServer) []types.DatabaseServer {
rand.New(rand.NewSource(time.Now().UnixNano())).Shuffle(
len(servers), func(i, j int) {
servers[i], servers[j] = servers[j], servers[i]
})
return servers
}

// ShuffleSort is a ShuffleFunc that sorts database servers by name and host ID.
// Used to provide predictable behavior in tests.
func ShuffleSort(servers []types.DatabaseServer) []types.DatabaseServer {
sort.Sort(types.DatabaseServers(servers))
return servers
}

var (
// mu protects the shuffleFunc global access.
mu sync.RWMutex
// shuffleFunc provides shuffle behavior for multiple database agents.
shuffleFunc ShuffleFunc = ShuffleRandom
)

// SetShuffleFunc sets the shuffle behavior when proxying to multiple agents.
func SetShuffleFunc(fn ShuffleFunc) {
mu.Lock()
defer mu.Unlock()
shuffleFunc = fn
}

// getShuffleFunc returns the configured function used to shuffle agents.
func getShuffleFunc() ShuffleFunc {
mu.RLock()
defer mu.RUnlock()
return shuffleFunc
}

// CheckAndSetDefaults validates the config and sets default values.
func (c *ProxyServerConfig) CheckAndSetDefaults() error {
if c.AccessPoint == nil {
Expand All @@ -114,15 +156,6 @@ func (c *ProxyServerConfig) CheckAndSetDefaults() error {
if c.ServerID == "" {
return trace.BadParameter("missing ServerID")
}
if c.Shuffle == nil {
c.Shuffle = func(servers []types.DatabaseServer) []types.DatabaseServer {
rand.New(rand.NewSource(c.Clock.Now().UnixNano())).Shuffle(
len(servers), func(i, j int) {
servers[i], servers[j] = servers[j], servers[i]
})
return servers
}
}
if c.LockWatcher == nil {
return trace.BadParameter("missing LockWatcher")
}
Expand Down Expand Up @@ -351,7 +384,7 @@ func (s *ProxyServer) Connect(ctx context.Context, proxyCtx *common.ProxyContext
// There may be multiple database servers proxying the same database. If
// we get a connection problem error trying to dial one of them, likely
// the database server is down so try the next one.
for _, server := range s.cfg.Shuffle(proxyCtx.Servers) {
for _, server := range getShuffleFunc()(proxyCtx.Servers) {
s.log.Debugf("Dialing to %v.", server)
tlsConfig, err := s.getConfigForServer(ctx, proxyCtx.Identity, server)
if err != nil {
Expand All @@ -364,9 +397,9 @@ func (s *ProxyServer) Connect(ctx context.Context, proxyCtx *common.ProxyContext
ConnType: types.DatabaseTunnel,
})
if err != nil {
// Connection problem indicates reverse tunnel to this server is down.
if trace.IsConnectionProblem(err) {
s.log.WithError(err).Warnf("Failed to dial %v.", server)
// If an agent is down, we'll retry on the next one (if available).
if isReverseTunnelDownError(err) {
s.log.WithError(err).Warnf("Failed to dial database %v.", server)
continue
}
return nil, trace.Wrap(err)
Expand All @@ -380,6 +413,13 @@ func (s *ProxyServer) Connect(ctx context.Context, proxyCtx *common.ProxyContext
return nil, trace.BadParameter("failed to connect to any of the database servers")
}

// isReverseTunnelDownError returns true if the provided error indicates that
// the reverse tunnel connection is down e.g. because the agent is down.
func isReverseTunnelDownError(err error) bool {
return trace.IsConnectionProblem(err) ||
strings.Contains(err.Error(), reversetunnel.NoDatabaseTunnel)
}

// Proxy starts proxying all traffic received from database client between
// this proxy and Teleport database service over reverse tunnel.
//
Expand Down

0 comments on commit 0493d28

Please sign in to comment.