Skip to content

Commit

Permalink
Provide contexts to kibana client methods (#1713)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano authored Mar 26, 2024
1 parent caf7758 commit 62fe1c9
Show file tree
Hide file tree
Showing 28 changed files with 217 additions and 183 deletions.
4 changes: 0 additions & 4 deletions cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,6 @@ func pipelineCommandAction(cmd *cobra.Command, args []string) error {
results = append(results, r)
}

if err != nil {
return fmt.Errorf("error running package pipeline benchmarks: %w", err)
}

for _, report := range results {
if err := reporters.WriteReportable(reporters.Output(reportOutput), report); err != nil {
return fmt.Errorf("error writing benchmark report: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func dumpInstalledObjectsCmdAction(cmd *cobra.Command, args []string) error {
if err != nil {
return fmt.Errorf("failed to initialize Kibana client: %w", err)
}
installedPackage, err := kibanaClient.GetPackage(packageName)
installedPackage, err := kibanaClient.GetPackage(cmd.Context(), packageName)
if err != nil {
return fmt.Errorf("failed to get package status: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func editDashboardsCmd(cmd *cobra.Command, args []string) error {
}

if len(dashboardIDs) == 0 {
dashboardIDs, err = promptDashboardIDs(kibanaClient)
dashboardIDs, err = promptDashboardIDs(cmd.Context(), kibanaClient)
if err != nil {
return fmt.Errorf("prompt for dashboard selection failed: %w", err)
}
Expand All @@ -107,7 +107,7 @@ func editDashboardsCmd(cmd *cobra.Command, args []string) error {
updatedDashboardIDs := make([]string, 0, len(dashboardIDs))
failedDashboardUpdates := make(map[string]error, len(dashboardIDs))
for _, dashboardID := range dashboardIDs {
err = kibanaClient.SetManagedSavedObject("dashboard", dashboardID, false)
err = kibanaClient.SetManagedSavedObject(cmd.Context(), "dashboard", dashboardID, false)
if err != nil {
failedDashboardUpdates[dashboardID] = err
} else {
Expand Down
9 changes: 5 additions & 4 deletions cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package cmd

import (
"context"
"fmt"

"github.com/AlecAivazis/survey/v2"
Expand Down Expand Up @@ -93,7 +94,7 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error {
}

if len(dashboardIDs) == 0 {
dashboardIDs, err = promptDashboardIDs(kibanaClient)
dashboardIDs, err = promptDashboardIDs(cmd.Context(), kibanaClient)
if err != nil {
return fmt.Errorf("prompt for dashboard selection failed: %w", err)
}
Expand All @@ -104,7 +105,7 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error {
}
}

err = export.Dashboards(kibanaClient, dashboardIDs)
err = export.Dashboards(cmd.Context(), kibanaClient, dashboardIDs)
if err != nil {
return fmt.Errorf("dashboards export failed: %w", err)
}
Expand All @@ -113,8 +114,8 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error {
return nil
}

func promptDashboardIDs(kibanaClient *kibana.Client) ([]string, error) {
savedDashboards, err := kibanaClient.FindDashboards()
func promptDashboardIDs(ctx context.Context, kibanaClient *kibana.Client) ([]string, error) {
savedDashboards, err := kibanaClient.FindDashboards(ctx)
if err != nil {
return nil, fmt.Errorf("finding dashboards failed: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func installCommandAction(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("can't process check-condition flag: %w", err)
}
if len(keyValuePairs) > 0 {
manifest, err := installer.Manifest()
manifest, err := installer.Manifest(cmd.Context())
if err != nil {
return err
}
Expand All @@ -105,6 +105,6 @@ func installCommandAction(cmd *cobra.Command, _ []string) error {
return nil
}

_, err = installer.Install()
_, err = installer.Install(cmd.Context())
return err
}
2 changes: 1 addition & 1 deletion cmd/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func uninstallCommandAction(cmd *cobra.Command, args []string) error {

// Uninstall the package
cmd.Println("Uninstall the package")
err = packageInstaller.Uninstall()
err = packageInstaller.Uninstall(cmd.Context())
if err != nil {
return fmt.Errorf("can't uninstall the package: %w", err)
}
Expand Down
20 changes: 10 additions & 10 deletions internal/benchrunner/runners/rally/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (r *runner) setUp(ctx context.Context) error {
}
r.scenario = scenario

if err = r.installPackage(); err != nil {
if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
}

Expand Down Expand Up @@ -450,28 +450,28 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro
return createReport(r.options.BenchName, r.corpusFile, r.scenario, msum, rallyStats)
}

func (r *runner) installPackage() error {
func (r *runner) installPackage(ctx context.Context) error {
if len(r.options.PackageVersion) > 0 {
r.scenario.Package = r.options.PackageName
r.scenario.Version = r.options.PackageVersion
return r.installPackageFromRegistry(r.options.PackageName, r.options.PackageVersion)
return r.installPackageFromRegistry(ctx, r.options.PackageName, r.options.PackageVersion)
}

return r.installPackageFromPackageRoot()
return r.installPackageFromPackageRoot(ctx)
}

func (r *runner) installPackageFromRegistry(packageName, packageVersion string) error {
func (r *runner) installPackageFromRegistry(ctx context.Context, packageName, packageVersion string) error {
// POST /epm/packages/{pkgName}/{pkgVersion}
// Configure package (single data stream) via Ingest Manager APIs.
logger.Debug("installing package...")
_, err := r.options.KibanaClient.InstallPackage(packageName, packageVersion)
_, err := r.options.KibanaClient.InstallPackage(ctx, packageName, packageVersion)
if err != nil {
return fmt.Errorf("cannot install package %s@%s: %w", packageName, packageVersion, err)
}

r.removePackageHandler = func(ctx context.Context) error {
logger.Debug("removing benchmark package...")
if _, err := r.options.KibanaClient.RemovePackage(packageName, packageVersion); err != nil {
if _, err := r.options.KibanaClient.RemovePackage(ctx, packageName, packageVersion); err != nil {
return fmt.Errorf("error removing benchmark package: %w", err)
}
return nil
Expand All @@ -480,7 +480,7 @@ func (r *runner) installPackageFromRegistry(packageName, packageVersion string)
return nil
}

func (r *runner) installPackageFromPackageRoot() error {
func (r *runner) installPackageFromPackageRoot(ctx context.Context) error {
logger.Debug("Installing package...")
installer, err := installer.NewForPackage(installer.Options{
Kibana: r.options.KibanaClient,
Expand All @@ -492,13 +492,13 @@ func (r *runner) installPackageFromPackageRoot() error {
return fmt.Errorf("failed to initialize package installer: %w", err)
}

_, err = installer.Install()
_, err = installer.Install(ctx)
if err != nil {
return fmt.Errorf("failed to install package: %w", err)
}

r.removePackageHandler = func(ctx context.Context) error {
if err := installer.Uninstall(); err != nil {
if err := installer.Uninstall(ctx); err != nil {
return fmt.Errorf("error removing benchmark package: %w", err)
}

Expand Down
12 changes: 6 additions & 6 deletions internal/benchrunner/runners/stream/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *runner) setUp(ctx context.Context) error {
}
r.scenarios = scenarios

if err = r.installPackage(); err != nil {
if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
}

Expand Down Expand Up @@ -192,11 +192,11 @@ func (r *runner) wipeDataStreamsOnSetup() error {
return nil
}

func (r *runner) installPackage() error {
return r.installPackageFromPackageRoot()
func (r *runner) installPackage(ctx context.Context) error {
return r.installPackageFromPackageRoot(ctx)
}

func (r *runner) installPackageFromPackageRoot() error {
func (r *runner) installPackageFromPackageRoot(ctx context.Context) error {
logger.Debug("Installing package...")
installer, err := installer.NewForPackage(installer.Options{
Kibana: r.options.KibanaClient,
Expand All @@ -208,13 +208,13 @@ func (r *runner) installPackageFromPackageRoot() error {
return fmt.Errorf("failed to initialize package installer: %w", err)
}

_, err = installer.Install()
_, err = installer.Install(ctx)
if err != nil {
return fmt.Errorf("failed to install package: %w", err)
}

r.removePackageHandler = func(ctx context.Context) error {
if err := installer.Uninstall(); err != nil {
if err := installer.Uninstall(ctx); err != nil {
return fmt.Errorf("error removing benchmark package: %w", err)
}

Expand Down
20 changes: 10 additions & 10 deletions internal/benchrunner/runners/system/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (r *runner) setUp(ctx context.Context) error {
return fmt.Errorf("reading package manifest failed: %w", err)
}

policy, err := r.createBenchmarkPolicy(pkgManifest)
policy, err := r.createBenchmarkPolicy(ctx, pkgManifest)
if err != nil {
return err
}
Expand Down Expand Up @@ -353,7 +353,7 @@ func (r *runner) deleteDataStreamDocs(dataStream string) error {
return nil
}

func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*kibana.Policy, error) {
func (r *runner) createBenchmarkPolicy(ctx context.Context, pkgManifest *packages.PackageManifest) (*kibana.Policy, error) {
// Configure package (single data stream) via Ingest Manager APIs.
logger.Debug("creating benchmark policy...")
benchTime := time.Now().Format("20060102T15:04:05Z")
Expand All @@ -369,12 +369,12 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*
p.DataOutputID = "fleet-logstash-output"
}

policy, err := r.options.KibanaClient.CreatePolicy(p)
policy, err := r.options.KibanaClient.CreatePolicy(ctx, p)
if err != nil {
return nil, err
}

packagePolicy, err := r.createPackagePolicy(pkgManifest, policy)
packagePolicy, err := r.createPackagePolicy(ctx, pkgManifest, policy)
if err != nil {
return nil, err
}
Expand All @@ -383,12 +383,12 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*
var merr multierror.Error

logger.Debug("deleting benchmark package policy...")
if err := r.options.KibanaClient.DeletePackagePolicy(*packagePolicy); err != nil {
if err := r.options.KibanaClient.DeletePackagePolicy(ctx, *packagePolicy); err != nil {
merr = append(merr, fmt.Errorf("error cleaning up benchmark package policy: %w", err))
}

logger.Debug("deleting benchmark policy...")
if err := r.options.KibanaClient.DeletePolicy(*policy); err != nil {
if err := r.options.KibanaClient.DeletePolicy(ctx, *policy); err != nil {
merr = append(merr, fmt.Errorf("error cleaning up benchmark policy: %w", err))
}

Expand All @@ -402,7 +402,7 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*
return policy, nil
}

func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *kibana.Policy) (*kibana.PackagePolicy, error) {
func (r *runner) createPackagePolicy(ctx context.Context, pkgManifest *packages.PackageManifest, p *kibana.Policy) (*kibana.PackagePolicy, error) {
logger.Debug("creating package policy...")

if r.scenario.Version == "" {
Expand Down Expand Up @@ -437,7 +437,7 @@ func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *k
pp.Package.Name = pkgManifest.Name
pp.Package.Version = r.scenario.Version

policy, err := r.options.KibanaClient.CreatePackagePolicy(pp)
policy, err := r.options.KibanaClient.CreatePackagePolicy(ctx, pp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -620,7 +620,7 @@ func (r *runner) runGenerator(destDir string) error {
func (r *runner) checkEnrolledAgents(ctx context.Context) ([]kibana.Agent, error) {
var agents []kibana.Agent
enrolled, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
allAgents, err := r.options.KibanaClient.ListAgents()
allAgents, err := r.options.KibanaClient.ListAgents(ctx)
if err != nil {
return false, fmt.Errorf("could not list agents: %w", err)
}
Expand Down Expand Up @@ -696,7 +696,7 @@ func (r *runner) enrollAgents(ctx context.Context) error {
return nil
}

policyWithDataStream, err := r.options.KibanaClient.GetPolicy(r.benchPolicy.ID)
policyWithDataStream, err := r.options.KibanaClient.GetPolicy(ctx, r.benchPolicy.ID)
if err != nil {
return fmt.Errorf("could not read the policy with data stream: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/dump/agentpolicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewAgentPoliciesDumper(client *kibana.Client) *AgentPoliciesDumper {
}

func (d *AgentPoliciesDumper) getAgentPolicy(ctx context.Context, name string) (*AgentPolicy, error) {
policy, err := d.client.GetRawPolicy(name)
policy, err := d.client.GetRawPolicy(ctx, name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func getPackagesUsingAgentPolicy(packagePolicies []packagePolicy) []string {
}

func (d *AgentPoliciesDumper) getAgentPoliciesFilteredByPackage(ctx context.Context, packageName string) ([]AgentPolicy, error) {
rawPolicies, err := d.client.ListRawPolicies()
rawPolicies, err := d.client.ListRawPolicies(ctx)

if err != nil {
return nil, err
Expand Down
9 changes: 5 additions & 4 deletions internal/export/dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package export

import (
"context"
"encoding/json"
"fmt"
"os"
Expand All @@ -20,7 +21,7 @@ import (

// Dashboards method exports selected dashboards with references objects. All Kibana objects are saved to local files
// in appropriate directories.
func Dashboards(kibanaClient *kibana.Client, dashboardsIDs []string) error {
func Dashboards(ctx context.Context, kibanaClient *kibana.Client, dashboardsIDs []string) error {
packageRoot, err := packages.MustFindPackageRoot()
if err != nil {
return fmt.Errorf("locating package root failed: %w", err)
Expand All @@ -40,16 +41,16 @@ func Dashboards(kibanaClient *kibana.Client, dashboardsIDs []string) error {
return fmt.Errorf("cannot import from this Kibana version: %w", err)
}

objects, err := kibanaClient.Export(dashboardsIDs)
objects, err := kibanaClient.Export(ctx, dashboardsIDs)
if err != nil {
return fmt.Errorf("exporting dashboards using Kibana client failed: %w", err)
}

ctx := &transformationContext{
transformContext := &transformationContext{
packageName: m.Name,
}

objects, err = applyTransformations(ctx, objects)
objects, err = applyTransformations(transformContext, objects)
if err != nil {
return fmt.Errorf("can't transform Kibana objects: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions internal/kibana/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (a *Agent) String() string {
}

// ListAgents returns the list of agents enrolled with Fleet.
func (c *Client) ListAgents() ([]Agent, error) {
statusCode, respBody, err := c.get(fmt.Sprintf("%s/agents", FleetAPI))
func (c *Client) ListAgents(ctx context.Context) ([]Agent, error) {
statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agents", FleetAPI))
if err != nil {
return nil, fmt.Errorf("could not list agents: %w", err)
}
Expand All @@ -73,7 +73,7 @@ func (c *Client) AssignPolicyToAgent(ctx context.Context, a Agent, p Policy) err
reqBody := `{ "policy_id": "` + p.ID + `" }`

path := fmt.Sprintf("%s/agents/%s/reassign", FleetAPI, a.ID)
statusCode, respBody, err := c.put(path, []byte(reqBody))
statusCode, respBody, err := c.put(ctx, path, []byte(reqBody))
if err != nil {
return fmt.Errorf("could not assign policy to agent: %w", err)
}
Expand All @@ -96,7 +96,7 @@ func (c *Client) waitUntilPolicyAssigned(ctx context.Context, a Agent, p Policy)
defer ticker.Stop()

for {
agent, err := c.getAgent(a.ID)
agent, err := c.getAgent(ctx, a.ID)
if err != nil {
return fmt.Errorf("can't get the agent: %w", err)
}
Expand All @@ -122,8 +122,8 @@ func (c *Client) waitUntilPolicyAssigned(ctx context.Context, a Agent, p Policy)
return nil
}

func (c *Client) getAgent(agentID string) (*Agent, error) {
statusCode, respBody, err := c.get(fmt.Sprintf("%s/agents/%s", FleetAPI, agentID))
func (c *Client) getAgent(ctx context.Context, agentID string) (*Agent, error) {
statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agents/%s", FleetAPI, agentID))
if err != nil {
return nil, fmt.Errorf("could not list agents: %w", err)
}
Expand Down
Loading

0 comments on commit 62fe1c9

Please sign in to comment.