Skip to content

Commit

Permalink
Rename master to aggregator (#847)
Browse files Browse the repository at this point in the history
Slightly more intuitive and useful name. Kept the old name
as an alias which we can decide to remove later if we want.

Did not update a few of the config values or script names
that are named with 'master' in order to avoid potentially
breaking changes.

Fixes #238

Signed-off-by: John Schnake <[email protected]>
  • Loading branch information
johnSchnake authored Aug 23, 2019
1 parent 58b331c commit 973e782
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 60 deletions.
51 changes: 33 additions & 18 deletions cmd/sonobuoy/app/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package app

import (
"fmt"
"os"

"github.com/heptio/sonobuoy/pkg/config"
Expand All @@ -27,47 +28,61 @@ import (
"github.com/spf13/cobra"
)

var noExit bool
var kubecfg Kubeconfig
type aggregatorInput struct {
noExit bool
kubecfg Kubeconfig
}

func NewCmdMaster() *cobra.Command {
// NewCmdAggregator returns the command that runs Sonobuoy as an aggregator. It will
// load the config, launch plugins, gather results, and query the cluster for data.
func NewCmdAggregator() *cobra.Command {
input := aggregatorInput{}
cmd := &cobra.Command{
Use: "master",
Short: "Runs the master/aggregator component (for internal use)",
Long: "Sonobuoy is an introspective kubernetes component that generates reports on cluster conformance, configuration, and more",
Run: runMaster,
Use: "aggregator",
Short: "Runs the aggregator component (for internal use)",
Long: "Sonobuoy is an introspective kubernetes component that generates reports on cluster conformance, configuration, and more",
Run: func(cmd *cobra.Command, args []string) {
if err := runAggregator(input); err != nil {
errlog.LogError(err)
os.Exit(1)
}
},
Hidden: true,
Args: cobra.ExactArgs(0),

// Original command but no longer used. Kept for backward compatibility.
Aliases: []string{"master"},
}
cmd.PersistentFlags().BoolVar(
&noExit, "no-exit", false,
&input.noExit, "no-exit", false,
"Use this if you want sonobuoy to block and not exit. Useful when you want to explicitly grab results.tar.gz",
)
AddKubeconfigFlag(&kubecfg, cmd.Flags())
AddKubeconfigFlag(&input.kubecfg, cmd.Flags())
return cmd
}

func runMaster(cmd *cobra.Command, args []string) {

func runAggregator(input aggregatorInput) error {
cfg, err := config.LoadConfig()
if err != nil {
errlog.LogError(errors.Wrap(err, "error loading sonobuoy configuration"))
os.Exit(1)
return errors.Wrap(err, "error loading sonobuoy configuration")
}

kcfg, err := kubecfg.Get()
kcfg, err := input.kubecfg.Get()
if err != nil {
errlog.LogError(err)
os.Exit(1)
return errors.Wrap(err, "getting kubeconfig")
}

// Run Discovery (gather API data, run plugins)
errcount := discovery.Run(kcfg, cfg)

if noExit {
if input.noExit {
logrus.Info("no-exit was specified, sonobuoy is now blocking")
select {}
}

os.Exit(errcount)
if errcount > 0 {
return fmt.Errorf("%v errors encountered during execution", errcount)
}

return nil
}
2 changes: 1 addition & 1 deletion cmd/sonobuoy/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewSonobuoyCommand() *cobra.Command {

cmds.ResetFlags()

cmds.AddCommand(NewCmdMaster())
cmds.AddCommand(NewCmdAggregator())
cmds.AddCommand(NewCmdDelete())
cmds.AddCommand(NewCmdE2E())

Expand Down
22 changes: 11 additions & 11 deletions cmd/sonobuoy/app/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import (
"github.com/spf13/cobra"
)

// NewCmdWorker is the cobra command that acts as the entrypoint for Sonobuoy when running
// as a sidecar with a plugin. It will wait for a 'done' file then transmit the results to the
// aggregator pod.
func NewCmdWorker() *cobra.Command {

var workerCmd = &cobra.Command{
Use: "worker",
Short: "Gather and send data to the sonobuoy master instance (for internal use)",
Run: runGatherHelp,
Short: "Gather and send data to the sonobuoy aggregator instance (for internal use)",
Hidden: true,
Args: cobra.ExactArgs(0),
}
Expand All @@ -65,10 +66,6 @@ var singleNodeCmd = &cobra.Command{
Args: cobra.ExactArgs(0),
}

func runGatherHelp(cmd *cobra.Command, args []string) {
cmd.Help()
}

// sigHandler returns a channel that will receive a message after the timeout
// elapses after a SIGTERM is received.
func sigHandler(timeout time.Duration) <-chan struct{} {
Expand Down Expand Up @@ -138,12 +135,15 @@ func runGather(global bool) error {
return errors.Wrap(err, "getting HTTP client")
}

// A single-node results URL looks like:
// http://sonobuoy-master:8080/api/v1/results/by-node/node1/systemd_logs
url := cfg.MasterURL + "/" + cfg.NodeName + "/" + cfg.ResultType
url := ""
if global {
// http://sonobuoy-master:8080/api/v1/results/global/systemd_logs
// A global results URL looks like:
// http://sonobuoy-aggregator:8080/api/v1/results/global/systemd_logs
url = cfg.MasterURL + "/" + cfg.ResultType
} else {
// A single-node results URL looks like:
// http://sonobuoy-aggregator:8080/api/v1/results/by-node/node1/systemd_logs
url = cfg.MasterURL + "/" + cfg.NodeName + "/" + cfg.ResultType
}

err = worker.GatherResults(cfg.ResultsDir+"/done", url, client, sigHandler(plugin.GracefulShutdownPeriod*time.Second))
Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/heptio/sonobuoy/cmd/sonobuoy/app"
)

// main entry point of the program
func main() {
err := app.NewSonobuoyCommand().Execute()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var tarCommand = []string{
"/usr/bin/env",
"bash",
"-c",
fmt.Sprintf("tar cf - %s/*.tar.gz", config.MasterResultsPath),
fmt.Sprintf("tar cf - %s/*.tar.gz", config.AggregatorResultsPath),
}

// RetrieveResults copies results from a sonobuoy run into a Reader in tar format.
Expand Down Expand Up @@ -71,9 +71,9 @@ func (c *SonobuoyClient) RetrieveResults(cfg *RetrieveConfig) (io.Reader, <-chan
Name(podName).
Namespace(cfg.Namespace).
SubResource("exec").
Param("container", config.MasterContainerName)
Param("container", config.AggregatorContainerName)
req.VersionedParams(&corev1.PodExecOptions{
Container: config.MasterContainerName,
Container: config.AggregatorContainerName,
Command: tarCommand,
Stdin: false,
Stdout: true,
Expand Down
16 changes: 8 additions & 8 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
// DefaultNamespace is the namespace where the master and plugin workers will run (but not necessarily the pods created by the plugin workers).
// DefaultNamespace is the namespace where the aggregator and plugin workers will run (but not necessarily the pods created by the plugin workers).
DefaultNamespace = "heptio-sonobuoy"

// DefaultKubeConformanceImageURL is the URL of the docker image to run for the kube conformance tests.
Expand All @@ -44,12 +44,12 @@ const (
DefaultAggregationServerBindAddress = "0.0.0.0"
// DefaultAggregationServerTimeoutSeconds is the default amount of time the aggregation server will wait for all plugins to complete.
DefaultAggregationServerTimeoutSeconds = 10800 // 180 min
// MasterPodName is the name of the main pod that runs plugins and collects results.
MasterPodName = "sonobuoy"
// MasterContainerName is the name of the main container in the master pod.
MasterContainerName = "kube-sonobuoy"
// MasterResultsPath is the location in the main container of the master pod where results will be archived.
MasterResultsPath = "/tmp/sonobuoy"
// AggregatorPodName is the name of the main pod that runs plugins and collects results.
AggregatorPodName = "sonobuoy"
// AggregatorContainerName is the name of the main container in the aggregator pod.
AggregatorContainerName = "kube-sonobuoy"
// AggregatorResultsPath is the location in the main container of the aggregator pod where results will be archived.
AggregatorResultsPath = "/tmp/sonobuoy"
// DefaultSonobuoyPullPolicy is the default pull policy used in the Sonobuoy config.
DefaultSonobuoyPullPolicy = "IfNotPresent"
// DefaultQueryQPS is the number of queries per second Sonobuoy will make when gathering data.
Expand Down Expand Up @@ -332,7 +332,7 @@ func New() *Config {
cfgUuid, _ := uuid.NewV4()
cfg.UUID = cfgUuid.String()
cfg.Description = "DEFAULT"
cfg.ResultsDir = MasterResultsPath
cfg.ResultsDir = AggregatorResultsPath
cfg.Version = buildinfo.Version

cfg.Filters.Namespaces = ".*"
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/aggregation/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (a *Aggregator) HandleHTTPResult(result *plugin.Result, w http.ResponseWrit
// Since most plugins submit over HTTP, this method is currently only used to
// consume an error stream from each plugin's Monitor() function.
//
// If we support plugins that are just simple commands that the sonobuoy master
// If we support plugins that are just simple commands that the Sonobuoy aggregator
// runs, those plugins can submit results through the same channel.
func (a *Aggregator) IngestResults(ctx context.Context, resultsCh <-chan *plugin.Result) {
for {
Expand Down
10 changes: 5 additions & 5 deletions pkg/plugin/driver/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

// Plugin is a plugin driver that dispatches containers to each node,
// expecting each pod to report to the master.
// expecting each pod to report to the aggregator.
type Plugin struct {
driver.Base
}
Expand All @@ -51,7 +51,7 @@ type Plugin struct {
var _ plugin.Interface = &Plugin{}

// NewPlugin creates a new DaemonSet plugin from the given Plugin Definition
// and sonobuoy master address.
// and sonobuoy aggregator address.
func NewPlugin(dfn manifest.Manifest, namespace, sonobuoyImage, imagePullPolicy, imagePullSecrets string, customAnnotations map[string]string) *Plugin {
return &Plugin{
driver.Base{
Expand Down Expand Up @@ -81,7 +81,7 @@ func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult {
return ret
}

func getMasterAddress(hostname string) string {
func getAggregatorAddress(hostname string) string {
return fmt.Sprintf("https://%s/api/v1/results/by-node", hostname)
}

Expand All @@ -107,7 +107,7 @@ func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificat
Labels: labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: ownerPod.GetName(),
Expand Down Expand Up @@ -160,7 +160,7 @@ func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificat

// Run dispatches worker pods according to the DaemonSet's configuration.
func (p *Plugin) Run(kubeclient kubernetes.Interface, hostname string, cert *tls.Certificate, ownerPod *v1.Pod) error {
daemonSet := p.createDaemonSetDefinition(getMasterAddress(hostname), cert, ownerPod)
daemonSet := p.createDaemonSetDefinition(getAggregatorAddress(hostname), cert, ownerPod)

secret, err := p.MakeTLSSecret(cert)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/plugin/driver/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Plugin struct {
var _ plugin.Interface = &Plugin{}

// NewPlugin creates a new DaemonSet plugin from the given Plugin Definition
// and sonobuoy master address.
// and sonobuoy aggregator address.
func NewPlugin(dfn manifest.Manifest, namespace, sonobuoyImage, imagePullPolicy, imagePullSecrets string, customAnnotations map[string]string) *Plugin {
return &Plugin{
driver.Base{
Expand All @@ -74,7 +74,7 @@ func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult {
}
}

func getMasterAddress(hostname string) string {
func getAggregatorAddress(hostname string) string {
return fmt.Sprintf("https://%s/api/v1/results/%v", hostname, plugin.GlobalResult)
}

Expand All @@ -100,7 +100,7 @@ func (p *Plugin) createPodDefinition(hostname string, cert *tls.Certificate, own
Labels: labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: ownerPod.GetName(),
Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *Plugin) createPodDefinition(hostname string, cert *tls.Certificate, own

// Run dispatches worker pods according to the Job's configuration.
func (p *Plugin) Run(kubeclient kubernetes.Interface, hostname string, cert *tls.Certificate, ownerPod *v1.Pod) error {
job := p.createPodDefinition(getMasterAddress(hostname), cert, ownerPod)
job := p.createPodDefinition(getAggregatorAddress(hostname), cert, ownerPod)

secret, err := p.MakeTLSSecret(cert)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type AggregationConfig struct {

// WorkerConfig is the file given to the sonobuoy worker to configure it to phone home.
type WorkerConfig struct {
// MasterURL is the URL we talk to for submitting results
// MasterURL is the URL we talk to the aggregator pod on for submitting results
MasterURL string `json:"masterurl,omitempty" mapstructure:"masterurl"`
// NodeName is the node name we should call ourselves when sending results
NodeName string `json:"nodename,omitempty" mapstructure:"nodename"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/worker/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ limitations under the License.
*/

// Package worker is responsible for the logic behind submitting results data
// back to a sonobuoy master. This is intended for plugins to leverage, to
// back to a Sonobuoy aggregator. This is intended for plugins to leverage, to
// avoid uploading data manually.
package worker
8 changes: 4 additions & 4 deletions pkg/worker/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func DoRequest(url string, client *http.Client, callback func() (io.Reader, stri
err = fmt.Errorf("unexpected status code %d", resp.StatusCode)
}
if err != nil {
errlog.LogError(errors.Wrapf(err, "could not send error message to master URL (%s)", url))
errlog.LogError(errors.Wrapf(err, "could not send error message to aggregator URL (%s)", url))
}

return errors.WithStack(err)
}

req, err := http.NewRequest(http.MethodPut, url, input)
if err != nil {
return errors.Wrapf(err, "error constructing master request to %v", url)
return errors.Wrapf(err, "error constructing aggregator request to %v", url)
}
req.Header.Add("content-type", mimeType)
if len(filename) > 0 {
Expand All @@ -98,11 +98,11 @@ func DoRequest(url string, client *http.Client, callback func() (io.Reader, stri

resp, err := pesterClient.Do(req)
if err != nil {
return errors.Wrapf(err, "error encountered dialing master at %v", url)
return errors.Wrapf(err, "error encountered dialing aggregator at %v", url)
}
if resp.StatusCode != http.StatusOK {
// TODO: retry logic for something like a 429 or otherwise
return errors.Errorf("got a %v response when dialing master to %v", resp.StatusCode, url)
return errors.Errorf("got a %v response when dialing aggregator to %v", resp.StatusCode, url)
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func init() {
//
// 1. Output data will be placed into an agreed upon results directory.
// 2. The Job will wait for a done file
// 3. The done file contains a single string of the results to be sent to the master
// 3. The done file contains a single string of the results to be sent to the aggregator
func GatherResults(waitfile string, url string, client *http.Client, stopc <-chan struct{}) error {
logrus.WithField("waitfile", waitfile).Info("Waiting for waitfile")
ticker := time.Tick(1 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion site/docs/v0.13.0/enhancements/v2-worker-master.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

## How it works now

Sonobuoy uses a worker-master model, where a master delegates tasks to worker
Sonobuoy uses a worker-master model, where an aggregator delegates tasks to worker
pods. When those pods have finished, they need to report the results of their
work back to the master. Presently this is done over an ill-defined, ad hoc
REST-ish client/server model embedded in the server.
Expand Down

0 comments on commit 973e782

Please sign in to comment.