Skip to content

Commit

Permalink
Fix RDS Redshift dynamic resources registration logic (#11868)
Browse files Browse the repository at this point in the history
  • Loading branch information
smallinsky committed May 6, 2022
1 parent 70d81a5 commit 3334e64
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 75 deletions.
2 changes: 1 addition & 1 deletion api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ const (
// that the resource originates from.
OriginLabel = TeleportNamespace + "/origin"

// OriginConfigFile is an origin value indicating that the resource was
// OriginDefaults is an origin value indicating that the resource was
// constructed as a default value.
OriginDefaults = "defaults"

Expand Down
8 changes: 7 additions & 1 deletion lib/srv/db/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,14 @@ func (s *Server) matcher(resource types.ResourceWithLabels) bool {
if !ok {
return false
}
if database.IsRDS() || database.IsRedshift() {

// In the case of CloudOrigin CloudHosted resources the matchers should be skipped.
if cloudOrigin(resource) && database.IsCloudHosted() {
return true // Cloud fetchers return only matching databases.
}
return services.MatchResourceLabels(s.cfg.ResourceMatchers, database)
}

func cloudOrigin(r types.ResourceWithLabels) bool {
return r.Origin() == types.OriginCloud
}
183 changes: 110 additions & 73 deletions lib/srv/db/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,7 @@ func TestWatcher(t *testing.T) {
})

// Only db0 should be registered initially.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0})

// Create database with label group=a.
db1, err := makeDynamicDatabase("db1", map[string]string{"group": "a"})
Expand All @@ -77,15 +69,7 @@ func TestWatcher(t *testing.T) {
require.NoError(t, err)

// It should be registered.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1})

// Try to update db0 which is registered statically.
db0Updated, err := makeDynamicDatabase("db0", map[string]string{"group": "a", types.OriginLabel: types.OriginDynamic})
Expand All @@ -94,15 +78,7 @@ func TestWatcher(t *testing.T) {
require.NoError(t, err)

// It should not be registered, old db0 should remain.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1})

// Create database with label group=b.
db2, err := makeDynamicDatabase("db2", map[string]string{"group": "b"})
Expand All @@ -111,103 +87,164 @@ func TestWatcher(t *testing.T) {
require.NoError(t, err)

// It shouldn't be registered.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1})

// Update db2 labels so it matches.
db2.SetStaticLabels(map[string]string{"group": "a", types.OriginLabel: types.OriginDynamic})
err = testCtx.authServer.UpdateDatabase(ctx, db2)
require.NoError(t, err)

// Both should be registered now.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1, db2}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1, db2})

// Update db2 URI so it gets re-registered.
db2.SetURI("localhost:2345")
err = testCtx.authServer.UpdateDatabase(ctx, db2)
require.NoError(t, err)

// db2 should get updated.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1, db2}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1, db2})

// Update db1 labels so it doesn't match.
db1.SetStaticLabels(map[string]string{"group": "c", types.OriginLabel: types.OriginDynamic})
err = testCtx.authServer.UpdateDatabase(ctx, db1)
require.NoError(t, err)

// Only db0 and db2 should remain registered.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db2}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}

assertReconciledResource(t, reconcileCh, types.Databases{db0, db2})

// Remove db2.
err = testCtx.authServer.DeleteDatabase(ctx, db2.GetName())
require.NoError(t, err)

// Only static database should remain.
assertReconciledResource(t, reconcileCh, types.Databases{db0})
}

// TestWatcherRDSDynamicResource RDS dynamic resource registration where the ResourceMatchers should be always
// evaluated for the dynamic registered resources.
func TestWatcherCloudDynamicResource(t *testing.T) {
var db1, db2, db3 *types.DatabaseV3
ctx := context.Background()
testCtx := setupTestContext(ctx, t)

db0, err := makeStaticDatabase("db0", nil)
require.NoError(t, err)

reconcileCh := make(chan types.Databases)
testCtx.setupDatabaseServer(ctx, t, agentParams{
Databases: []types.Database{db0},
ResourceMatchers: []services.ResourceMatcher{
{Labels: types.Labels{
"group": []string{"a"},
}},
},
OnReconcile: func(d types.Databases) {
reconcileCh <- d
},
})
assertReconciledResource(t, reconcileCh, types.Databases{db0})

withRDSURL := func(v3 *types.DatabaseSpecV3) {
v3.URI = "mypostgresql.c6c8mwvfdgv0.us-west-2.rds.amazonaws.com:5432"
}

t.Run("dynamic resource - no match", func(t *testing.T) {
// Created an RDS db dynamic resource that doesn't match any db service ResourceMatchers.
db1, err = makeDynamicDatabase("db1", map[string]string{"group": "z"}, withRDSURL)
require.NoError(t, err)
require.True(t, db1.IsRDS())
err = testCtx.authServer.CreateDatabase(ctx, db1)
require.NoError(t, err)
// The db1 should not be registered by the agent due to ResourceMatchers mismatch:
assertReconciledResource(t, reconcileCh, types.Databases{db0})
})

t.Run("dynamic resource - match", func(t *testing.T) {
// Create an RDS dynamic resource with labels that matches ResourceMatchers.
db2, err = makeDynamicDatabase("db2", map[string]string{"group": "a"}, withRDSURL)
require.NoError(t, err)
require.True(t, db2.IsRDS())

err = testCtx.authServer.CreateDatabase(ctx, db2)
require.NoError(t, err)
// The db2 service should be properly registered by the agent.
assertReconciledResource(t, reconcileCh, types.Databases{db0, db2})
})

t.Run("cloud resource - no match", func(t *testing.T) {
// Create an RDS Cloud resource with a label that doesn't match resource matcher.
db3, err = makeCloudDatabase("db3", map[string]string{"group": "z"})
require.NoError(t, err)
require.True(t, db3.IsRDS())

// The db3 DB RDS Cloud origin resource should properly register by the agent even if DB labels don't match
// any ResourceMatchers. The RDS Cloud origin databases relays could fetchers that return only matching databases.
err = testCtx.authServer.CreateDatabase(ctx, db3)
require.NoError(t, err)
assertReconciledResource(t, reconcileCh, types.Databases{db0, db2, db3})
})
}

func assertReconciledResource(t *testing.T, ch chan types.Databases, databases types.Databases) {
t.Helper()
select {
case d := <-reconcileCh:
require.Empty(t, cmp.Diff(types.Databases{db0}, d,
case d := <-ch:
sort.Sort(d)
require.Equal(t, len(d), len(databases))
require.Empty(t, cmp.Diff(databases, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
cmpopts.IgnoreFields(types.DatabaseStatusV3{}, "CACert"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}

}

func makeStaticDatabase(name string, labels map[string]string) (*types.DatabaseV3, error) {
func makeStaticDatabase(name string, labels map[string]string, opts ...makeDatabaseOpt) (*types.DatabaseV3, error) {
return makeDatabase(name, labels, map[string]string{
types.OriginLabel: types.OriginConfigFile,
})
}, opts...)
}

func makeDynamicDatabase(name string, labels map[string]string) (*types.DatabaseV3, error) {
func makeDynamicDatabase(name string, labels map[string]string, opts ...makeDatabaseOpt) (*types.DatabaseV3, error) {
return makeDatabase(name, labels, map[string]string{
types.OriginLabel: types.OriginDynamic,
}, opts...)
}

func makeCloudDatabase(name string, labels map[string]string) (*types.DatabaseV3, error) {
return makeDatabase(name, labels, map[string]string{
types.OriginLabel: types.OriginCloud,
}, func(v3 *types.DatabaseSpecV3) {
v3.URI = "mypostgresql.c6c8mwvfdgv0.us-west-2.rds.amazonaws.com:5432"
})
}

func makeDatabase(name string, labels map[string]string, additionalLabels map[string]string) (*types.DatabaseV3, error) {
type makeDatabaseOpt func(*types.DatabaseSpecV3)

func makeDatabase(name string, labels map[string]string, additionalLabels map[string]string, opts ...makeDatabaseOpt) (*types.DatabaseV3, error) {
if labels == nil {
labels = make(map[string]string)
}

for k, v := range additionalLabels {
labels[k] = v
}

ds := types.DatabaseSpecV3{
Protocol: defaults.ProtocolPostgres,
URI: "localhost:5432",
}

for _, o := range opts {
o(&ds)
}

return types.NewDatabaseV3(types.Metadata{
Name: name,
Labels: labels,
}, types.DatabaseSpecV3{
Protocol: defaults.ProtocolPostgres,
URI: "localhost:5432",
})
}, ds)
}

0 comments on commit 3334e64

Please sign in to comment.