Skip to content

Commit

Permalink
move vault token watcher inside runner
Browse files Browse the repository at this point in the history
The runner is the external API and adding the clients to the parameters
breaks that 1 API. This moves the code to create the clients and start
the vault token watcher inside the runner which preserves the API while
still keeping the desired encapsulation.
  • Loading branch information
eikenb committed Sep 29, 2022
1 parent 9f97fe1 commit 938aed7
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 143 deletions.
19 changes: 2 additions & 17 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/hashicorp/consul-template/service_os"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/consul-template/version"
"github.com/hashicorp/consul-template/watch"
)

// Exit codes are int values that represent an exit code for a particular error.
Expand Down Expand Up @@ -104,20 +103,8 @@ func (cli *CLI) Run(args []string) int {
return ExitCodeOK
}

// Create the clientset
clients, err := manager.NewClientSet(config)
if err != nil {
return logError(err, ExitCodeConfigError)
}

// vault token watcher
vtwatchErrCh := watch.VaultTokenWatcher(clients, config.Vault)
if err != nil {
return logError(err, ExitCodeRunnerError)
}

// Initial runner
runner, err := manager.NewRunner(clients, config, dry)
runner, err := manager.NewRunner(config, dry)
if err != nil {
return logError(err, ExitCodeRunnerError)
}
Expand All @@ -128,8 +115,6 @@ func (cli *CLI) Run(args []string) int {

for {
select {
case err := <-vtwatchErrCh:
return logError(err, ExitCodeRunnerError)
case err := <-runner.ErrCh:
// Check if the runner's error returned a specific exit status, and return
// that value. If no value was given, return a generic exit status.
Expand Down Expand Up @@ -165,7 +150,7 @@ func (cli *CLI) Run(args []string) int {
return logError(err, ExitCodeConfigError)
}

runner, err = manager.NewRunner(clients, config, dry)
runner, err = manager.NewRunner(config, dry)
if err != nil {
return logError(err, ExitCodeRunnerError)
}
Expand Down
35 changes: 29 additions & 6 deletions manager/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Runner struct {
// dependenciesLock is a lock around touching the dependencies map.
dependenciesLock sync.Mutex

// token watcher
vaultTokenWatcher *watch.Watcher
// watcher is the watcher this runner is using.
watcher *watch.Watcher

Expand Down Expand Up @@ -172,7 +174,7 @@ type RenderEvent struct {

// NewRunner accepts a slice of TemplateConfigs and returns a pointer to the new
// Runner and any error that occurred during creation.
func NewRunner(clients *dep.ClientSet, config *config.Config, dry bool) (*Runner, error) {
func NewRunner(config *config.Config, dry bool) (*Runner, error) {
log.Printf("[INFO] (runner) creating new runner (dry: %v, once: %v)",
dry, config.Once)

Expand All @@ -181,10 +183,21 @@ func NewRunner(clients *dep.ClientSet, config *config.Config, dry bool) (*Runner
dry: dry,
}

if err := runner.init(clients); err != nil {
// Create the clientset
clients, err := NewClientSet(config)
if err != nil {
return nil, fmt.Errorf("runner: %w", err)
}
// needs to be run early to do initial token handling
runner.vaultTokenWatcher, err = watch.VaultTokenWatcher(
clients, config.Vault, runner.DoneCh)
if err != nil {
return nil, err
}

if err := runner.init(clients); err != nil {
return nil, err
}
return runner, nil
}

Expand Down Expand Up @@ -226,7 +239,7 @@ func (r *Runner) Start() {

if r.child != nil {
r.stopDedup()
r.stopWatcher()
r.stopWatchers()

log.Printf("[INFO] (runner) waiting for child process to exit")
select {
Expand Down Expand Up @@ -330,7 +343,7 @@ func (r *Runner) Start() {

if r.child != nil {
r.stopDedup()
r.stopWatcher()
r.stopWatchers()

log.Printf("[INFO] (runner) waiting for child process to exit")
select {
Expand Down Expand Up @@ -384,6 +397,12 @@ func (r *Runner) Start() {
r.ErrCh <- err
return

case err := <-r.vaultTokenWatcher.ErrCh():
// Push the error back up the stack
log.Printf("[ERR] (runner): %s", err)
r.ErrCh <- err
return

case tmpl := <-r.quiescenceCh:
// Remove the quiescence for this template from the map. This will force
// the upcoming Run call to actually evaluate and render the template.
Expand Down Expand Up @@ -455,7 +474,7 @@ func (r *Runner) internalStop(immediately bool) {

log.Printf("[INFO] (runner) stopping")
r.stopDedup()
r.stopWatcher()
r.stopWatchers()
r.stopChild(immediately)

if err := r.deletePid(); err != nil {
Expand All @@ -475,11 +494,15 @@ func (r *Runner) stopDedup() {
}
}

func (r *Runner) stopWatcher() {
func (r *Runner) stopWatchers() {
if r.watcher != nil {
log.Printf("[DEBUG] (runner) stopping watcher")
r.watcher.Stop()
}
if r.vaultTokenWatcher != nil {
log.Printf("[DEBUG] (runner) stopping vault token watcher")
r.vaultTokenWatcher.Stop()
}
}

func (r *Runner) stopChild(immediately bool) {
Expand Down
26 changes: 13 additions & 13 deletions manager/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestRunner_initTemplates(t *testing.T) {
},
})

r, err := NewRunner(testClients, c, true)
r, err := NewRunner(c, true)
if err != nil {
t.Fatal(err)
}
Expand All @@ -51,7 +51,7 @@ func TestRunner_initTemplates(t *testing.T) {

func TestRunner_Receive(t *testing.T) {
c := config.TestConfig(&config.Config{Once: true})
r, err := NewRunner(testClients, c, true)
r, err := NewRunner(c, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestRunner_Run(t *testing.T) {
c.Once = true
c.Finalize()

r, err := NewRunner(testClients, c, true)
r, err := NewRunner(c, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -523,7 +523,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -611,7 +611,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -659,7 +659,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -705,7 +705,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -760,7 +760,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -826,7 +826,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -902,7 +902,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -952,7 +952,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, true)
r, err := NewRunner(c, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1010,7 +1010,7 @@ func TestRunner_Start(t *testing.T) {
})
c.Finalize()

r, err := NewRunner(testClients, c, false)
r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 938aed7

Please sign in to comment.