diff --git a/cmd/sonobuoy/app/master.go b/cmd/sonobuoy/app/master.go index dd6c6c346..72207f34f 100644 --- a/cmd/sonobuoy/app/master.go +++ b/cmd/sonobuoy/app/master.go @@ -17,6 +17,7 @@ limitations under the License. package app import ( + "fmt" "os" "github.com/heptio/sonobuoy/pkg/config" @@ -27,47 +28,61 @@ import ( "github.com/spf13/cobra" ) -var noExit bool -var kubecfg Kubeconfig +type aggregatorInput struct { + noExit bool + kubecfg Kubeconfig +} -func NewCmdMaster() *cobra.Command { +// NewCmdAggregator returns the command that runs Sonobuoy as an aggregator. It will +// load the config, launch plugins, gather results, and query the cluster for data. +func NewCmdAggregator() *cobra.Command { + input := aggregatorInput{} cmd := &cobra.Command{ - Use: "master", - Short: "Runs the master/aggregator component (for internal use)", - Long: "Sonobuoy is an introspective kubernetes component that generates reports on cluster conformance, configuration, and more", - Run: runMaster, + Use: "aggregator", + Short: "Runs the aggregator component (for internal use)", + Long: "Sonobuoy is an introspective kubernetes component that generates reports on cluster conformance, configuration, and more", + Run: func(cmd *cobra.Command, args []string) { + if err := runAggregator(input); err != nil { + errlog.LogError(err) + os.Exit(1) + } + }, Hidden: true, Args: cobra.ExactArgs(0), + + // Original command but no longer used. Kept for backward compatibility. + Aliases: []string{"master"}, } cmd.PersistentFlags().BoolVar( - &noExit, "no-exit", false, + &input.noExit, "no-exit", false, "Use this if you want sonobuoy to block and not exit. Useful when you want to explicitly grab results.tar.gz", ) - AddKubeconfigFlag(&kubecfg, cmd.Flags()) + AddKubeconfigFlag(&input.kubecfg, cmd.Flags()) return cmd } -func runMaster(cmd *cobra.Command, args []string) { - +func runAggregator(input aggregatorInput) error { cfg, err := config.LoadConfig() if err != nil { - errlog.LogError(errors.Wrap(err, "error loading sonobuoy configuration")) - os.Exit(1) + return errors.Wrap(err, "error loading sonobuoy configuration") } - kcfg, err := kubecfg.Get() + kcfg, err := input.kubecfg.Get() if err != nil { - errlog.LogError(err) - os.Exit(1) + return errors.Wrap(err, "getting kubeconfig") } // Run Discovery (gather API data, run plugins) errcount := discovery.Run(kcfg, cfg) - if noExit { + if input.noExit { logrus.Info("no-exit was specified, sonobuoy is now blocking") select {} } - os.Exit(errcount) + if errcount > 0 { + return fmt.Errorf("%v errors encountered during execution", errcount) + } + + return nil } diff --git a/cmd/sonobuoy/app/root.go b/cmd/sonobuoy/app/root.go index 9d78ca22b..f441d76fb 100644 --- a/cmd/sonobuoy/app/root.go +++ b/cmd/sonobuoy/app/root.go @@ -35,7 +35,7 @@ func NewSonobuoyCommand() *cobra.Command { cmds.ResetFlags() - cmds.AddCommand(NewCmdMaster()) + cmds.AddCommand(NewCmdAggregator()) cmds.AddCommand(NewCmdDelete()) cmds.AddCommand(NewCmdE2E()) diff --git a/cmd/sonobuoy/app/worker.go b/cmd/sonobuoy/app/worker.go index 454bb3457..01aa77188 100644 --- a/cmd/sonobuoy/app/worker.go +++ b/cmd/sonobuoy/app/worker.go @@ -35,12 +35,13 @@ import ( "github.com/spf13/cobra" ) +// NewCmdWorker is the cobra command that acts as the entrypoint for Sonobuoy when running +// as a sidecar with a plugin. It will wait for a 'done' file then transmit the results to the +// aggregator pod. func NewCmdWorker() *cobra.Command { - var workerCmd = &cobra.Command{ Use: "worker", - Short: "Gather and send data to the sonobuoy master instance (for internal use)", - Run: runGatherHelp, + Short: "Gather and send data to the sonobuoy aggregator instance (for internal use)", Hidden: true, Args: cobra.ExactArgs(0), } @@ -65,10 +66,6 @@ var singleNodeCmd = &cobra.Command{ Args: cobra.ExactArgs(0), } -func runGatherHelp(cmd *cobra.Command, args []string) { - cmd.Help() -} - // sigHandler returns a channel that will receive a message after the timeout // elapses after a SIGTERM is received. func sigHandler(timeout time.Duration) <-chan struct{} { @@ -138,12 +135,15 @@ func runGather(global bool) error { return errors.Wrap(err, "getting HTTP client") } - // A single-node results URL looks like: - // http://sonobuoy-master:8080/api/v1/results/by-node/node1/systemd_logs - url := cfg.MasterURL + "/" + cfg.NodeName + "/" + cfg.ResultType + url := "" if global { - // http://sonobuoy-master:8080/api/v1/results/global/systemd_logs + // A global results URL looks like: + // http://sonobuoy-aggregator:8080/api/v1/results/global/systemd_logs url = cfg.MasterURL + "/" + cfg.ResultType + } else { + // A single-node results URL looks like: + // http://sonobuoy-aggregator:8080/api/v1/results/by-node/node1/systemd_logs + url = cfg.MasterURL + "/" + cfg.NodeName + "/" + cfg.ResultType } err = worker.GatherResults(cfg.ResultsDir+"/done", url, client, sigHandler(plugin.GracefulShutdownPeriod*time.Second)) diff --git a/main.go b/main.go index 2fb23467b..52e89ede7 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,6 @@ import ( "github.com/heptio/sonobuoy/cmd/sonobuoy/app" ) -// main entry point of the program func main() { err := app.NewSonobuoyCommand().Execute() if err != nil { diff --git a/pkg/client/retrieve.go b/pkg/client/retrieve.go index 837058138..30ae12ef3 100644 --- a/pkg/client/retrieve.go +++ b/pkg/client/retrieve.go @@ -38,7 +38,7 @@ var tarCommand = []string{ "/usr/bin/env", "bash", "-c", - fmt.Sprintf("tar cf - %s/*.tar.gz", config.MasterResultsPath), + fmt.Sprintf("tar cf - %s/*.tar.gz", config.AggregatorResultsPath), } // RetrieveResults copies results from a sonobuoy run into a Reader in tar format. @@ -71,9 +71,9 @@ func (c *SonobuoyClient) RetrieveResults(cfg *RetrieveConfig) (io.Reader, <-chan Name(podName). Namespace(cfg.Namespace). SubResource("exec"). - Param("container", config.MasterContainerName) + Param("container", config.AggregatorContainerName) req.VersionedParams(&corev1.PodExecOptions{ - Container: config.MasterContainerName, + Container: config.AggregatorContainerName, Command: tarCommand, Stdin: false, Stdout: true, diff --git a/pkg/config/config.go b/pkg/config/config.go index 637ca027e..59a7f5d03 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -28,7 +28,7 @@ import ( ) const ( - // DefaultNamespace is the namespace where the master and plugin workers will run (but not necessarily the pods created by the plugin workers). + // DefaultNamespace is the namespace where the aggregator and plugin workers will run (but not necessarily the pods created by the plugin workers). DefaultNamespace = "heptio-sonobuoy" // DefaultKubeConformanceImageURL is the URL of the docker image to run for the kube conformance tests. @@ -44,12 +44,12 @@ const ( DefaultAggregationServerBindAddress = "0.0.0.0" // DefaultAggregationServerTimeoutSeconds is the default amount of time the aggregation server will wait for all plugins to complete. DefaultAggregationServerTimeoutSeconds = 10800 // 180 min - // MasterPodName is the name of the main pod that runs plugins and collects results. - MasterPodName = "sonobuoy" - // MasterContainerName is the name of the main container in the master pod. - MasterContainerName = "kube-sonobuoy" - // MasterResultsPath is the location in the main container of the master pod where results will be archived. - MasterResultsPath = "/tmp/sonobuoy" + // AggregatorPodName is the name of the main pod that runs plugins and collects results. + AggregatorPodName = "sonobuoy" + // AggregatorContainerName is the name of the main container in the aggregator pod. + AggregatorContainerName = "kube-sonobuoy" + // AggregatorResultsPath is the location in the main container of the aggregator pod where results will be archived. + AggregatorResultsPath = "/tmp/sonobuoy" // DefaultSonobuoyPullPolicy is the default pull policy used in the Sonobuoy config. DefaultSonobuoyPullPolicy = "IfNotPresent" // DefaultQueryQPS is the number of queries per second Sonobuoy will make when gathering data. @@ -332,7 +332,7 @@ func New() *Config { cfgUuid, _ := uuid.NewV4() cfg.UUID = cfgUuid.String() cfg.Description = "DEFAULT" - cfg.ResultsDir = MasterResultsPath + cfg.ResultsDir = AggregatorResultsPath cfg.Version = buildinfo.Version cfg.Filters.Namespaces = ".*" diff --git a/pkg/plugin/aggregation/aggregator.go b/pkg/plugin/aggregation/aggregator.go index b82078fc4..6a635f062 100644 --- a/pkg/plugin/aggregation/aggregator.go +++ b/pkg/plugin/aggregation/aggregator.go @@ -248,7 +248,7 @@ func (a *Aggregator) HandleHTTPResult(result *plugin.Result, w http.ResponseWrit // Since most plugins submit over HTTP, this method is currently only used to // consume an error stream from each plugin's Monitor() function. // -// If we support plugins that are just simple commands that the sonobuoy master +// If we support plugins that are just simple commands that the Sonobuoy aggregator // runs, those plugins can submit results through the same channel. func (a *Aggregator) IngestResults(ctx context.Context, resultsCh <-chan *plugin.Result) { for { diff --git a/pkg/plugin/driver/daemonset/daemonset.go b/pkg/plugin/driver/daemonset/daemonset.go index d6d2d2659..adffb7dc8 100644 --- a/pkg/plugin/driver/daemonset/daemonset.go +++ b/pkg/plugin/driver/daemonset/daemonset.go @@ -42,7 +42,7 @@ const ( ) // Plugin is a plugin driver that dispatches containers to each node, -// expecting each pod to report to the master. +// expecting each pod to report to the aggregator. type Plugin struct { driver.Base } @@ -51,7 +51,7 @@ type Plugin struct { var _ plugin.Interface = &Plugin{} // NewPlugin creates a new DaemonSet plugin from the given Plugin Definition -// and sonobuoy master address. +// and sonobuoy aggregator address. func NewPlugin(dfn manifest.Manifest, namespace, sonobuoyImage, imagePullPolicy, imagePullSecrets string, customAnnotations map[string]string) *Plugin { return &Plugin{ driver.Base{ @@ -81,7 +81,7 @@ func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult { return ret } -func getMasterAddress(hostname string) string { +func getAggregatorAddress(hostname string) string { return fmt.Sprintf("https://%s/api/v1/results/by-node", hostname) } @@ -107,7 +107,7 @@ func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificat Labels: labels, Annotations: annotations, OwnerReferences: []metav1.OwnerReference{ - metav1.OwnerReference{ + { APIVersion: "v1", Kind: "Pod", Name: ownerPod.GetName(), @@ -160,7 +160,7 @@ func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificat // Run dispatches worker pods according to the DaemonSet's configuration. func (p *Plugin) Run(kubeclient kubernetes.Interface, hostname string, cert *tls.Certificate, ownerPod *v1.Pod) error { - daemonSet := p.createDaemonSetDefinition(getMasterAddress(hostname), cert, ownerPod) + daemonSet := p.createDaemonSetDefinition(getAggregatorAddress(hostname), cert, ownerPod) secret, err := p.MakeTLSSecret(cert) if err != nil { diff --git a/pkg/plugin/driver/job/job.go b/pkg/plugin/driver/job/job.go index 810b9c978..9fc280be9 100644 --- a/pkg/plugin/driver/job/job.go +++ b/pkg/plugin/driver/job/job.go @@ -50,7 +50,7 @@ type Plugin struct { var _ plugin.Interface = &Plugin{} // NewPlugin creates a new DaemonSet plugin from the given Plugin Definition -// and sonobuoy master address. +// and sonobuoy aggregator address. func NewPlugin(dfn manifest.Manifest, namespace, sonobuoyImage, imagePullPolicy, imagePullSecrets string, customAnnotations map[string]string) *Plugin { return &Plugin{ driver.Base{ @@ -74,7 +74,7 @@ func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult { } } -func getMasterAddress(hostname string) string { +func getAggregatorAddress(hostname string) string { return fmt.Sprintf("https://%s/api/v1/results/%v", hostname, plugin.GlobalResult) } @@ -100,7 +100,7 @@ func (p *Plugin) createPodDefinition(hostname string, cert *tls.Certificate, own Labels: labels, Annotations: annotations, OwnerReferences: []metav1.OwnerReference{ - metav1.OwnerReference{ + { APIVersion: "v1", Kind: "Pod", Name: ownerPod.GetName(), @@ -144,7 +144,7 @@ func (p *Plugin) createPodDefinition(hostname string, cert *tls.Certificate, own // Run dispatches worker pods according to the Job's configuration. func (p *Plugin) Run(kubeclient kubernetes.Interface, hostname string, cert *tls.Certificate, ownerPod *v1.Pod) error { - job := p.createPodDefinition(getMasterAddress(hostname), cert, ownerPod) + job := p.createPodDefinition(getAggregatorAddress(hostname), cert, ownerPod) secret, err := p.MakeTLSSecret(cert) if err != nil { diff --git a/pkg/plugin/interface.go b/pkg/plugin/interface.go index 57b3e1150..1378d562e 100644 --- a/pkg/plugin/interface.go +++ b/pkg/plugin/interface.go @@ -116,7 +116,7 @@ type AggregationConfig struct { // WorkerConfig is the file given to the sonobuoy worker to configure it to phone home. type WorkerConfig struct { - // MasterURL is the URL we talk to for submitting results + // MasterURL is the URL we talk to the aggregator pod on for submitting results MasterURL string `json:"masterurl,omitempty" mapstructure:"masterurl"` // NodeName is the node name we should call ourselves when sending results NodeName string `json:"nodename,omitempty" mapstructure:"nodename"` diff --git a/pkg/worker/doc.go b/pkg/worker/doc.go index 8704449d0..6a0468378 100644 --- a/pkg/worker/doc.go +++ b/pkg/worker/doc.go @@ -15,6 +15,6 @@ limitations under the License. */ // Package worker is responsible for the logic behind submitting results data -// back to a sonobuoy master. This is intended for plugins to leverage, to +// back to a Sonobuoy aggregator. This is intended for plugins to leverage, to // avoid uploading data manually. package worker diff --git a/pkg/worker/request.go b/pkg/worker/request.go index c7306e5d5..4f56cf60f 100644 --- a/pkg/worker/request.go +++ b/pkg/worker/request.go @@ -79,7 +79,7 @@ func DoRequest(url string, client *http.Client, callback func() (io.Reader, stri err = fmt.Errorf("unexpected status code %d", resp.StatusCode) } if err != nil { - errlog.LogError(errors.Wrapf(err, "could not send error message to master URL (%s)", url)) + errlog.LogError(errors.Wrapf(err, "could not send error message to aggregator URL (%s)", url)) } return errors.WithStack(err) @@ -87,7 +87,7 @@ func DoRequest(url string, client *http.Client, callback func() (io.Reader, stri req, err := http.NewRequest(http.MethodPut, url, input) if err != nil { - return errors.Wrapf(err, "error constructing master request to %v", url) + return errors.Wrapf(err, "error constructing aggregator request to %v", url) } req.Header.Add("content-type", mimeType) if len(filename) > 0 { @@ -98,11 +98,11 @@ func DoRequest(url string, client *http.Client, callback func() (io.Reader, stri resp, err := pesterClient.Do(req) if err != nil { - return errors.Wrapf(err, "error encountered dialing master at %v", url) + return errors.Wrapf(err, "error encountered dialing aggregator at %v", url) } if resp.StatusCode != http.StatusOK { // TODO: retry logic for something like a 429 or otherwise - return errors.Errorf("got a %v response when dialing master to %v", resp.StatusCode, url) + return errors.Errorf("got a %v response when dialing aggregator to %v", resp.StatusCode, url) } return nil } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index e939d8f05..e0b68a70c 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -38,7 +38,7 @@ func init() { // // 1. Output data will be placed into an agreed upon results directory. // 2. The Job will wait for a done file -// 3. The done file contains a single string of the results to be sent to the master +// 3. The done file contains a single string of the results to be sent to the aggregator func GatherResults(waitfile string, url string, client *http.Client, stopc <-chan struct{}) error { logrus.WithField("waitfile", waitfile).Info("Waiting for waitfile") ticker := time.Tick(1 * time.Second) diff --git a/site/docs/v0.13.0/enhancements/v2-worker-master.md b/site/docs/v0.13.0/enhancements/v2-worker-master.md index 5faeab66a..9c2bd02b6 100755 --- a/site/docs/v0.13.0/enhancements/v2-worker-master.md +++ b/site/docs/v0.13.0/enhancements/v2-worker-master.md @@ -17,7 +17,7 @@ ## How it works now -Sonobuoy uses a worker-master model, where a master delegates tasks to worker +Sonobuoy uses a worker-master model, where an aggregator delegates tasks to worker pods. When those pods have finished, they need to report the results of their work back to the master. Presently this is done over an ill-defined, ad hoc REST-ish client/server model embedded in the server.