Skip to content

Commit

Permalink
run autoupdate_agent_rollout controller
Browse files Browse the repository at this point in the history
  • Loading branch information
hugoShaka committed Nov 15, 2024
1 parent 3c3b0b9 commit 1042ef7
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 22 deletions.
2 changes: 1 addition & 1 deletion lib/autoupdate/rolloutcontroller/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

// mockClient is a mock implementation if the Client interface for testing purposes.
// This is used to precisely check which calls are made by the Reconciler during tests.
// This is used to precisely check which calls are made by the reconciler during tests.
// Use newMockClient to create one from stubs. Once the test is over, you must call
// mockClient.checkIfEmpty to validate all expected calls were made.
type mockClient struct {
Expand Down
81 changes: 81 additions & 0 deletions lib/autoupdate/rolloutcontroller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Teleport
* Copyright (C) 2024 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package rolloutcontroller

import (
"context"
"log/slog"
"time"

"github.com/jonboulle/clockwork"

"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/utils/interval"
)

const (
reconcilerPeriod = time.Minute
)

// Controller wakes up every minute to reconcile the autoupdate_agent_rollout resource.
// See the reconciler godoc for more details about the reconciliation process.
// We currently wake up every minute, in the future we might decide to also watch for events
// (from autoupdate_config and autoupdate_version changefeed) to react faster.
type Controller struct {
reconciler reconciler
clock clockwork.Clock
log *slog.Logger
}

// New creates a new Controller for the autoupdate_agent_rollout kind.
func New(client Client, log *slog.Logger, clock clockwork.Clock) *Controller {
return &Controller{
clock: clock,
log: log,
reconciler: reconciler{
clt: client,
log: log,
},
}
}

// Run the autoupdate_agent_rollout controller. This function returns only when its context is cancelled.
func (c *Controller) Run(ctx context.Context) error {
config := interval.Config{
Duration: reconcilerPeriod,
FirstDuration: reconcilerPeriod,
Jitter: retryutils.SeventhJitter,
Clock: c.clock,
}
ticker := interval.New(config)

for {
select {
case <-ctx.Done():
c.log.InfoContext(ctx, "Stopping autoupdate_agent_rollout controller", "reason", ctx.Err())
return ctx.Err()
case <-ticker.Next():
c.log.DebugContext(ctx, "Reconciling autoupdate_agent_rollout")
err := c.reconciler.reconcile(ctx)
if err != nil {
c.log.ErrorContext(ctx, "Failed to reconcile autoudpate_agent_controller", "error", err)
}
}
}
}
12 changes: 6 additions & 6 deletions lib/autoupdate/rolloutcontroller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@ const (
maxConflictRetry = 3
)

// Reconciler reconciles the AutoUpdateAgentRollout singleton based on the content of the AutoUpdateVersion and
// reconciler reconciles the AutoUpdateAgentRollout singleton based on the content of the AutoUpdateVersion and
// AutoUpdateConfig singletons. This reconciler is not based on the services.GenericReconciler because:
// - we reconcile 2 resources with one
// - both input and output are singletons, we don't need the multi resource logic nor stream/paginated APIs
type Reconciler struct {
type reconciler struct {
clt Client
log *slog.Logger

// mutex ensures we only run one reconciliation at a time
mutex sync.Mutex
}

// Reconcile the AutoUpdateAgentRollout singleton. The reconciliation can fail because of a conflict (multiple auths
// reconcile the AutoUpdateAgentRollout singleton. The reconciliation can fail because of a conflict (multiple auths
// are racing), in this case we retry the reconciliation immediately.
func (r *Reconciler) Reconcile(ctx context.Context) error {
func (r *reconciler) reconcile(ctx context.Context) error {
r.mutex.Lock()
defer r.mutex.Unlock()

Expand Down Expand Up @@ -88,7 +88,7 @@ func (r *Reconciler) Reconcile(ctx context.Context) error {
// The creation/update/deletion can fail with a trace.CompareFailedError or trace.NotFoundError
// if the resource change while we were computing it.
// The caller must handle those error and retry the reconciliation.
func (r *Reconciler) tryReconcile(ctx context.Context) error {
func (r *reconciler) tryReconcile(ctx context.Context) error {
// get autoupdate_config
var config *autoupdate.AutoUpdateConfig
if c, err := r.clt.GetAutoUpdateConfig(ctx); err == nil {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (r *Reconciler) tryReconcile(ctx context.Context) error {
return trace.Wrap(err, "updating rollout")
}

func (r *Reconciler) buildRolloutSpec(config *autoupdate.AutoUpdateConfigSpecAgents, version *autoupdate.AutoUpdateVersionSpecAgents) (*autoupdate.AutoUpdateAgentRolloutSpec, error) {
func (r *reconciler) buildRolloutSpec(config *autoupdate.AutoUpdateConfigSpecAgents, version *autoupdate.AutoUpdateVersionSpecAgents) (*autoupdate.AutoUpdateAgentRolloutSpec, error) {
// reconcile mode
mode, err := getMode(config.GetMode(), version.GetMode())
if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions lib/autoupdate/rolloutcontroller/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestTryReconcile(t *testing.T) {

// Test execution: Running the reconciliation

reconciler := &Reconciler{
reconciler := &reconciler{
clt: client,
log: log,
}
Expand Down Expand Up @@ -375,13 +375,13 @@ func TestReconciler_Reconcile(t *testing.T) {
}

client := newMockClient(t, stubs)
reconciler := &Reconciler{
reconciler := &reconciler{
clt: client,
log: log,
}

// Test execution: run the reconciliation loop
require.NoError(t, reconciler.Reconcile(ctx))
require.NoError(t, reconciler.reconcile(ctx))

// Test validation: check that all the expected calls were received
client.checkIfEmpty(t)
Expand All @@ -397,13 +397,13 @@ func TestReconciler_Reconcile(t *testing.T) {
}

client := newMockClient(t, stubs)
reconciler := &Reconciler{
reconciler := &reconciler{
clt: client,
log: log,
}

// Test execution: run the reconciliation loop
require.NoError(t, reconciler.Reconcile(ctx))
require.NoError(t, reconciler.reconcile(ctx))

// Test validation: check that all the expected calls were received
client.checkIfEmpty(t)
Expand All @@ -421,13 +421,13 @@ func TestReconciler_Reconcile(t *testing.T) {
}

client := newMockClient(t, stubs)
reconciler := &Reconciler{
reconciler := &reconciler{
clt: client,
log: log,
}

// Test execution: run the reconciliation loop
require.NoError(t, reconciler.Reconcile(ctx))
require.NoError(t, reconciler.reconcile(ctx))

// Test validation: check that all the expected calls were received
client.checkIfEmpty(t)
Expand Down Expand Up @@ -461,13 +461,13 @@ func TestReconciler_Reconcile(t *testing.T) {
}

client := newMockClient(t, stubs)
reconciler := &Reconciler{
reconciler := &reconciler{
clt: client,
log: log,
}

// Test execution: run the reconciliation loop
require.NoError(t, reconciler.Reconcile(ctx))
require.NoError(t, reconciler.reconcile(ctx))

// Test validation: check that all the expected calls were received
client.checkIfEmpty(t)
Expand Down Expand Up @@ -499,13 +499,13 @@ func TestReconciler_Reconcile(t *testing.T) {
}

client := newMockClient(t, stubs)
reconciler := &Reconciler{
reconciler := &reconciler{
clt: client,
log: log,
}

// Test execution: run the reconciliation loop
require.NoError(t, reconciler.Reconcile(ctx))
require.NoError(t, reconciler.reconcile(ctx))

// Test validation: check that all the expected calls were received
client.checkIfEmpty(t)
Expand All @@ -523,13 +523,13 @@ func TestReconciler_Reconcile(t *testing.T) {
}

client := newMockClient(t, stubs)
reconciler := &Reconciler{
reconciler := &reconciler{
clt: client,
log: log,
}

// Test execution: run the reconciliation loop
require.ErrorContains(t, reconciler.Reconcile(ctx), "the DB fell on the floor")
require.ErrorContains(t, reconciler.reconcile(ctx), "the DB fell on the floor")

// Test validation: check that all the expected calls were received
client.checkIfEmpty(t)
Expand All @@ -553,13 +553,13 @@ func TestReconciler_Reconcile(t *testing.T) {
}

client := newMockClient(t, stubs)
reconciler := &Reconciler{
reconciler := &reconciler{
clt: client,
log: log,
}

// Test execution: run the reconciliation loop
require.ErrorContains(t, reconciler.Reconcile(cancelableCtx), "canceled")
require.ErrorContains(t, reconciler.reconcile(cancelableCtx), "canceled")

// Test validation: check that all the expected calls were received
client.checkIfEmpty(t)
Expand Down
6 changes: 6 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ import (
"github.com/gravitational/teleport/lib/auth/storage"
"github.com/gravitational/teleport/lib/authz"
"github.com/gravitational/teleport/lib/automaticupgrades"
"github.com/gravitational/teleport/lib/autoupdate/rolloutcontroller"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/dynamo"
_ "github.com/gravitational/teleport/lib/backend/etcdbk"
Expand Down Expand Up @@ -2430,6 +2431,11 @@ func (process *TeleportProcess) initAuthService() error {
return trace.Wrap(spiffeFedSyncer.Run(process.GracefulExitContext()), "running SPIFFEFederation Syncer")
})

agentRolloutController := rolloutcontroller.New(authServer, logger, process.Clock)
process.RegisterFunc("auth.autoupdate_agent_rollout_controller", func() error {
return trace.Wrap(agentRolloutController.Run(process.GracefulExitContext()), "running autoupdate_agent_rollout controller")
})

process.RegisterFunc("auth.server_info", func() error {
return trace.Wrap(auth.ReconcileServerInfos(process.GracefulExitContext(), authServer))
})
Expand Down
60 changes: 60 additions & 0 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ import (

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/breaker"
autoupdatepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1"
"github.com/gravitational/teleport/api/types"
autoupdate "github.com/gravitational/teleport/api/types/autoupdate"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/entitlements"
"github.com/gravitational/teleport/lib"
Expand Down Expand Up @@ -1845,3 +1847,61 @@ func TestInitDatabaseService(t *testing.T) {
})
}
}

// TestAgentRolloutController validates that the agent rollout controller is started
// when we run the Auth Service. It does so by creating a dummy autoupdate_version resource
// and checking that the corresponding autoupdate_agent_rollout resource is created by the auth.
// If you want to test the reconciliation logic, add tests to the rolloutcontroller package instead.
func TestAgentRolloutController(t *testing.T) {
t.Parallel()

// Test setup: create a Teleport Auth config
fakeClock := clockwork.NewFakeClock()
ctx := context.Background()

var err error
dataDir := t.TempDir()

cfg := servicecfg.MakeDefaultConfig()
cfg.Clock = fakeClock
cfg.DataDir = dataDir
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
cfg.Auth.Enabled = true
cfg.Auth.StorageConfig.Params["path"] = dataDir
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.SSH.Enabled = false
cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig()

process, err := NewTeleport(cfg)
require.NoError(t, err)

// Test setup: start the Teleport auth and wait for it to beocme ready
require.NoError(t, process.Start())
t.Cleanup(func() { require.NoError(t, process.Close()) })

// Test execution: create the autoupdate_version resource
authServer := process.GetAuthServer()
version, err := autoupdate.NewAutoUpdateVersion(&autoupdatepb.AutoUpdateVersionSpec{
Agents: &autoupdatepb.AutoUpdateVersionSpecAgents{
StartVersion: "1.2.3",
TargetVersion: "1.2.4",
Schedule: autoupdate.AgentsScheduleImmediate,
Mode: autoupdate.AgentsUpdateModeEnabled,
},
})
require.NoError(t, err)
version, err = authServer.CreateAutoUpdateVersion(ctx, version)
require.NoError(t, err)

// Test execution: advance clock to trigger a reonciliation
fakeClock.Advance(2 * time.Minute)

// Test validation: check that a new autoupdate_agent_rollout config was created
require.Eventually(t, func() bool {
rollout, err := authServer.GetAutoUpdateAgentRollout(ctx)
if err != nil {
return false
}
return rollout.Spec.GetTargetVersion() == version.Spec.GetAgents().GetTargetVersion()
}, time.Second, 10*time.Millisecond)
}

0 comments on commit 1042ef7

Please sign in to comment.