Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v8] Fix RDS Redshift dynamic resources registration logic (#11868) #12452

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ const (
// that the resource originates from.
OriginLabel = "teleport.dev/origin"

// OriginConfigFile is an origin value indicating that the resource was
// OriginDefaults is an origin value indicating that the resource was
// constructed as a default value.
OriginDefaults = "defaults"

Expand Down
8 changes: 7 additions & 1 deletion lib/srv/db/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,14 @@ func (s *Server) matcher(resource types.ResourceWithLabels) bool {
if !ok {
return false
}
if database.IsRDS() || database.IsRedshift() {

// In the case of CloudOrigin CloudHosted resources the matchers should be skipped.
if cloudOrigin(resource) && database.IsCloudHosted() {
return true // Cloud fetchers return only matching databases.
}
return services.MatchResourceLabels(s.cfg.ResourceMatchers, database)
}

func cloudOrigin(r types.ResourceWithLabels) bool {
return r.Origin() == types.OriginCloud
}
183 changes: 110 additions & 73 deletions lib/srv/db/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,7 @@ func TestWatcher(t *testing.T) {
})

// Only db0 should be registered initially.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0})

// Create database with label group=a.
db1, err := makeDynamicDatabase("db1", map[string]string{"group": "a"})
Expand All @@ -77,15 +69,7 @@ func TestWatcher(t *testing.T) {
require.NoError(t, err)

// It should be registered.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1})

// Try to update db0 which is registered statically.
db0Updated, err := makeDynamicDatabase("db0", map[string]string{"group": "a", types.OriginLabel: types.OriginDynamic})
Expand All @@ -94,15 +78,7 @@ func TestWatcher(t *testing.T) {
require.NoError(t, err)

// It should not be registered, old db0 should remain.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1})

// Create database with label group=b.
db2, err := makeDynamicDatabase("db2", map[string]string{"group": "b"})
Expand All @@ -111,103 +87,164 @@ func TestWatcher(t *testing.T) {
require.NoError(t, err)

// It shouldn't be registered.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1})

// Update db2 labels so it matches.
db2.SetStaticLabels(map[string]string{"group": "a", types.OriginLabel: types.OriginDynamic})
err = testCtx.authServer.UpdateDatabase(ctx, db2)
require.NoError(t, err)

// Both should be registered now.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1, db2}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1, db2})

// Update db2 URI so it gets re-registered.
db2.SetURI("localhost:2345")
err = testCtx.authServer.UpdateDatabase(ctx, db2)
require.NoError(t, err)

// db2 should get updated.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db1, db2}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}
assertReconciledResource(t, reconcileCh, types.Databases{db0, db1, db2})

// Update db1 labels so it doesn't match.
db1.SetStaticLabels(map[string]string{"group": "c", types.OriginLabel: types.OriginDynamic})
err = testCtx.authServer.UpdateDatabase(ctx, db1)
require.NoError(t, err)

// Only db0 and db2 should remain registered.
select {
case d := <-reconcileCh:
sort.Sort(d)
require.Empty(t, cmp.Diff(types.Databases{db0, db2}, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}

assertReconciledResource(t, reconcileCh, types.Databases{db0, db2})

// Remove db2.
err = testCtx.authServer.DeleteDatabase(ctx, db2.GetName())
require.NoError(t, err)

// Only static database should remain.
assertReconciledResource(t, reconcileCh, types.Databases{db0})
}

// TestWatcherRDSDynamicResource RDS dynamic resource registration where the ResourceMatchers should be always
// evaluated for the dynamic registered resources.
func TestWatcherCloudDynamicResource(t *testing.T) {
var db1, db2, db3 *types.DatabaseV3
ctx := context.Background()
testCtx := setupTestContext(ctx, t)

db0, err := makeStaticDatabase("db0", nil)
require.NoError(t, err)

reconcileCh := make(chan types.Databases)
testCtx.setupDatabaseServer(ctx, t, agentParams{
Databases: []types.Database{db0},
ResourceMatchers: []services.ResourceMatcher{
{Labels: types.Labels{
"group": []string{"a"},
}},
},
OnReconcile: func(d types.Databases) {
reconcileCh <- d
},
})
assertReconciledResource(t, reconcileCh, types.Databases{db0})

withRDSURL := func(v3 *types.DatabaseSpecV3) {
v3.URI = "mypostgresql.c6c8mwvfdgv0.us-west-2.rds.amazonaws.com:5432"
}

t.Run("dynamic resource - no match", func(t *testing.T) {
// Created an RDS db dynamic resource that doesn't match any db service ResourceMatchers.
db1, err = makeDynamicDatabase("db1", map[string]string{"group": "z"}, withRDSURL)
require.NoError(t, err)
require.True(t, db1.IsRDS())
err = testCtx.authServer.CreateDatabase(ctx, db1)
require.NoError(t, err)
// The db1 should not be registered by the agent due to ResourceMatchers mismatch:
assertReconciledResource(t, reconcileCh, types.Databases{db0})
})

t.Run("dynamic resource - match", func(t *testing.T) {
// Create an RDS dynamic resource with labels that matches ResourceMatchers.
db2, err = makeDynamicDatabase("db2", map[string]string{"group": "a"}, withRDSURL)
require.NoError(t, err)
require.True(t, db2.IsRDS())

err = testCtx.authServer.CreateDatabase(ctx, db2)
require.NoError(t, err)
// The db2 service should be properly registered by the agent.
assertReconciledResource(t, reconcileCh, types.Databases{db0, db2})
})

t.Run("cloud resource - no match", func(t *testing.T) {
// Create an RDS Cloud resource with a label that doesn't match resource matcher.
db3, err = makeCloudDatabase("db3", map[string]string{"group": "z"})
require.NoError(t, err)
require.True(t, db3.IsRDS())

// The db3 DB RDS Cloud origin resource should properly register by the agent even if DB labels don't match
// any ResourceMatchers. The RDS Cloud origin databases relays could fetchers that return only matching databases.
err = testCtx.authServer.CreateDatabase(ctx, db3)
require.NoError(t, err)
assertReconciledResource(t, reconcileCh, types.Databases{db0, db2, db3})
})
}

func assertReconciledResource(t *testing.T, ch chan types.Databases, databases types.Databases) {
t.Helper()
select {
case d := <-reconcileCh:
require.Empty(t, cmp.Diff(types.Databases{db0}, d,
case d := <-ch:
sort.Sort(d)
require.Equal(t, len(d), len(databases))
require.Empty(t, cmp.Diff(databases, d,
cmpopts.IgnoreFields(types.Metadata{}, "ID"),
cmpopts.IgnoreFields(types.DatabaseStatusV3{}, "CACert"),
))
case <-time.After(time.Second):
t.Fatal("Didn't receive reconcile event after 1s.")
}

}

func makeStaticDatabase(name string, labels map[string]string) (*types.DatabaseV3, error) {
func makeStaticDatabase(name string, labels map[string]string, opts ...makeDatabaseOpt) (*types.DatabaseV3, error) {
return makeDatabase(name, labels, map[string]string{
types.OriginLabel: types.OriginConfigFile,
})
}, opts...)
}

func makeDynamicDatabase(name string, labels map[string]string) (*types.DatabaseV3, error) {
func makeDynamicDatabase(name string, labels map[string]string, opts ...makeDatabaseOpt) (*types.DatabaseV3, error) {
return makeDatabase(name, labels, map[string]string{
types.OriginLabel: types.OriginDynamic,
}, opts...)
}

func makeCloudDatabase(name string, labels map[string]string) (*types.DatabaseV3, error) {
return makeDatabase(name, labels, map[string]string{
types.OriginLabel: types.OriginCloud,
}, func(v3 *types.DatabaseSpecV3) {
v3.URI = "mypostgresql.c6c8mwvfdgv0.us-west-2.rds.amazonaws.com:5432"
})
}

func makeDatabase(name string, labels map[string]string, additionalLabels map[string]string) (*types.DatabaseV3, error) {
type makeDatabaseOpt func(*types.DatabaseSpecV3)

func makeDatabase(name string, labels map[string]string, additionalLabels map[string]string, opts ...makeDatabaseOpt) (*types.DatabaseV3, error) {
if labels == nil {
labels = make(map[string]string)
}

for k, v := range additionalLabels {
labels[k] = v
}

ds := types.DatabaseSpecV3{
Protocol: defaults.ProtocolPostgres,
URI: "localhost:5432",
}

for _, o := range opts {
o(&ds)
}

return types.NewDatabaseV3(types.Metadata{
Name: name,
Labels: labels,
}, types.DatabaseSpecV3{
Protocol: defaults.ProtocolPostgres,
URI: "localhost:5432",
})
}, ds)
}