From c9c9ba5c1464d805e118adeeb5335a092293d0a4 Mon Sep 17 00:00:00 2001 From: Yury Kovalev Date: Fri, 15 Mar 2024 15:05:21 +0100 Subject: [PATCH] ROX-16615: Make probe multiregion --- .../templates/01-operator-04-deployment.yaml | 4 +- probe/cmd/probe/main.go | 2 +- probe/config/config.go | 23 +++++++- probe/config/config_test.go | 51 ++++++++++++++++++ probe/internal/cli/cli_test.go | 8 ++- probe/pkg/metrics/metrics.go | 11 ++-- probe/pkg/metrics/metrics_test.go | 19 +++++-- probe/pkg/metrics/server.go | 11 ++-- probe/pkg/metrics/server_test.go | 6 +-- probe/pkg/probe/probe.go | 18 +++---- probe/pkg/probe/probe_moq.go | 23 +++++--- probe/pkg/probe/probe_test.go | 7 ++- probe/pkg/runtime/runtime.go | 54 +++++++++++++------ probe/pkg/runtime/runtime_test.go | 15 ++++-- 14 files changed, 191 insertions(+), 61 deletions(-) diff --git a/deploy/helm/probe/templates/01-operator-04-deployment.yaml b/deploy/helm/probe/templates/01-operator-04-deployment.yaml index bae7f7e95e..785859cee8 100644 --- a/deploy/helm/probe/templates/01-operator-04-deployment.yaml +++ b/deploy/helm/probe/templates/01-operator-04-deployment.yaml @@ -50,8 +50,8 @@ spec: name: rhacs-probe-ocm-token key: TOKEN {{- end }} - - name: DATA_PLANE_REGION - value: {{ .Values.dataPlaneRegion | quote }} + - name: CENTRAL_SPECS + value: {{ printf `[{ "cloudProvider": "aws", "region": "%s" }]` .Values.dataPlaneRegion | quote }} ports: - name: monitoring containerPort: 7070 diff --git a/probe/cmd/probe/main.go b/probe/cmd/probe/main.go index a5a0ea515e..d534d2e862 100644 --- a/probe/cmd/probe/main.go +++ b/probe/cmd/probe/main.go @@ -28,7 +28,7 @@ func main() { glog.Fatal(err) } - if metricsServer := metrics.NewMetricsServer(config.MetricsAddress, config.DataPlaneRegion); metricsServer != nil { + if metricsServer := metrics.NewMetricsServer(config); metricsServer != nil { defer metrics.CloseMetricsServer(metricsServer) go metrics.ListenAndServe(metricsServer) } else { diff --git a/probe/config/config.go b/probe/config/config.go index dc0affb4c3..3a3947afc6 100644 --- a/probe/config/config.go +++ b/probe/config/config.go @@ -6,6 +6,7 @@ import ( "time" "github.com/stackrox/rox/pkg/errorhelpers" + "sigs.k8s.io/yaml" "github.com/caarlos0/env/v6" "github.com/pkg/errors" @@ -14,8 +15,7 @@ import ( // Config contains this application's runtime configuration. type Config struct { AuthType string `env:"AUTH_TYPE" envDefault:"RHSSO"` - DataCloudProvider string `env:"DATA_PLANE_CLOUD_PROVIDER" envDefault:"aws"` - DataPlaneRegion string `env:"DATA_PLANE_REGION" envDefault:"us-east-1"` + CentralSpecs CentralSpecs `env:"CENTRAL_SPECS" envDefault:"[{ \"cloudProvider\": \"aws\", \"region\": \"us-east-1\" }]"` FleetManagerEndpoint string `env:"FLEET_MANAGER_ENDPOINT" envDefault:"http://127.0.0.1:8000"` MetricsAddress string `env:"METRICS_ADDRESS" envDefault:":7070"` RHSSOClientID string `env:"RHSSO_SERVICE_ACCOUNT_CLIENT_ID"` @@ -30,6 +30,25 @@ type Config struct { ProbeUsername string } +// CentralSpecs container for the CentralSpec slice +type CentralSpecs []CentralSpec + +// CentralSpec the desired central configuration +type CentralSpec struct { + CloudProvider string `json:"cloudProvider"` + Region string `json:"region"` +} + +// UnmarshalText implements encoding.TextUnmarshaler so that CentralSpec can be parsed by env.Parse +func (s *CentralSpecs) UnmarshalText(text []byte) error { + var specs []CentralSpec + if err := yaml.Unmarshal(text, &specs); err != nil { + return fmt.Errorf("unmarshal central spec: %w", err) + } + *s = specs + return nil +} + // GetConfig retrieves the current runtime configuration from the environment and returns it. func GetConfig() (*Config, error) { // Default value if PROBE_NAME and HOSTNAME are not set. diff --git a/probe/config/config_test.go b/probe/config/config_test.go index 577408af29..a464f0bc37 100644 --- a/probe/config/config_test.go +++ b/probe/config/config_test.go @@ -31,3 +31,54 @@ func TestGetConfig_Failure(t *testing.T) { assert.Error(t, err) assert.Nil(t, cfg) } + +func TestGetConfig_CentralSpec(t *testing.T) { + t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888") + t.Setenv("AUTH_TYPE", "RHSSO") + t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy") + t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy") + t.Setenv("CENTRAL_SPECS", `[{ "cloudProvider": "aws", "region": "us-east-1" }, { "cloudProvider": "aws", "region": "eu-west-1" }]`) + + cfg, err := GetConfig() + + require.NoError(t, err) + assert.Equal(t, CentralSpecs{ + { + CloudProvider: "aws", + Region: "us-east-1", + }, + { + CloudProvider: "aws", + Region: "eu-west-1", + }, + }, cfg.CentralSpecs) +} + +func TestGetConfig_CentralSpecDefault(t *testing.T) { + t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888") + t.Setenv("AUTH_TYPE", "RHSSO") + t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy") + t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy") + + cfg, err := GetConfig() + + require.NoError(t, err) + assert.Equal(t, CentralSpecs{ + { + CloudProvider: "aws", + Region: "us-east-1", + }, + }, cfg.CentralSpecs) +} + +func TestGetConfig_CentralSpecInvalidJson(t *testing.T) { + t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888") + t.Setenv("AUTH_TYPE", "RHSSO") + t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy") + t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy") + t.Setenv("CENTRAL_SPECS", `{ "cloudProvider": `) + + _, err := GetConfig() + + require.Error(t, err) +} diff --git a/probe/internal/cli/cli_test.go b/probe/internal/cli/cli_test.go index 1cf96d7dcb..9fe274885b 100644 --- a/probe/internal/cli/cli_test.go +++ b/probe/internal/cli/cli_test.go @@ -19,6 +19,12 @@ var testConfig = &config.Config{ ProbeRunTimeout: 100 * time.Millisecond, ProbeName: "probe", RHSSOClientID: "client", + CentralSpecs: []config.CentralSpec{ + { + CloudProvider: "aws", + Region: "us-east-1", + }, + }, } func TestCLIInterrupt(t *testing.T) { @@ -26,7 +32,7 @@ func TestCLIInterrupt(t *testing.T) { CleanUpFunc: func(ctx context.Context) error { return nil }, - ExecuteFunc: func(ctx context.Context) error { + ExecuteFunc: func(ctx context.Context, spec config.CentralSpec) error { process, err := os.FindProcess(os.Getpid()) require.NoError(t, err, "could not find current process ID") process.Signal(os.Interrupt) diff --git a/probe/pkg/metrics/metrics.go b/probe/pkg/metrics/metrics.go index f6093c6f10..3062e8b767 100644 --- a/probe/pkg/metrics/metrics.go +++ b/probe/pkg/metrics/metrics.go @@ -6,6 +6,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/stackrox/acs-fleet-manager/probe/config" ) const ( @@ -43,10 +44,12 @@ func (m *Metrics) Register(r prometheus.Registerer) { } // Init sets initial values for the gauge metrics. -func (m *Metrics) Init(region string) { - m.lastFailureTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0) - m.lastStartedTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0) - m.lastSuccessTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0) +func (m *Metrics) Init(config *config.Config) { + for _, spec := range config.CentralSpecs { + m.lastFailureTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0) + m.lastStartedTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0) + m.lastSuccessTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0) + } } // IncStartedRuns increments the metric counter for started probe runs. diff --git a/probe/pkg/metrics/metrics_test.go b/probe/pkg/metrics/metrics_test.go index 07f50ffb35..402be294c5 100644 --- a/probe/pkg/metrics/metrics_test.go +++ b/probe/pkg/metrics/metrics_test.go @@ -7,11 +7,22 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stackrox/acs-fleet-manager/probe/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -var regionValue = "us-east-1" +var ( + regionValue = "us-east-1" + cfg = &config.Config{ + MetricsAddress: ":8081", + CentralSpecs: []config.CentralSpec{ + { + Region: regionValue, + }, + }, + } +) func getMetricSeries(t *testing.T, registry *prometheus.Registry, name string) *io_prometheus_client.Metric { metrics := serveMetrics(t, registry) @@ -52,7 +63,7 @@ func TestCounterIncrements(t *testing.T) { tc := tc t.Run(tc.metricName, func(t *testing.T) { m := newMetrics() - registry := initPrometheus(m, regionValue) + registry := initPrometheus(m, cfg) tc.callIncrementFunc(m) targetSeries := getMetricSeries(t, registry, tc.metricName) @@ -96,7 +107,7 @@ func TestTimestampGauges(t *testing.T) { tc := tc t.Run(tc.metricName, func(t *testing.T) { m := newMetrics() - registry := initPrometheus(m, regionValue) + registry := initPrometheus(m, cfg) lowerBound := time.Now().Unix() targetSeries := getMetricSeries(t, registry, tc.metricName) @@ -133,7 +144,7 @@ func TestHistograms(t *testing.T) { tc := tc t.Run(tc.metricName, func(t *testing.T) { m := newMetrics() - registry := initPrometheus(m, regionValue) + registry := initPrometheus(m, cfg) expectedCount := uint64(2) expectedSum := 480.0 tc.callObserveFunc(m) diff --git a/probe/pkg/metrics/server.go b/probe/pkg/metrics/server.go index fbaeedab28..17c5fb5848 100644 --- a/probe/pkg/metrics/server.go +++ b/probe/pkg/metrics/server.go @@ -6,13 +6,14 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/stackrox/acs-fleet-manager/probe/config" "github.com/stackrox/rox/pkg/utils" ) // NewMetricsServer returns the metrics server. -func NewMetricsServer(address string, region string) *http.Server { - registry := initPrometheus(MetricsInstance(), region) - return newMetricsServer(address, registry) +func NewMetricsServer(config *config.Config) *http.Server { + registry := initPrometheus(MetricsInstance(), config) + return newMetricsServer(config.MetricsAddress, registry) } // ListenAndServe listens for incoming requests and serves the metrics. @@ -29,14 +30,14 @@ func CloseMetricsServer(server *http.Server) { } } -func initPrometheus(customMetrics *Metrics, region string) *prometheus.Registry { +func initPrometheus(customMetrics *Metrics, config *config.Config) *prometheus.Registry { registry := prometheus.NewRegistry() // Register default metrics to use a dedicated registry instead of prometheus.DefaultRegistry. // This makes it easier to isolate metric state when unit testing this package. registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) registry.MustRegister(prometheus.NewGoCollector()) customMetrics.Register(registry) - customMetrics.Init(region) + customMetrics.Init(config) return registry } diff --git a/probe/pkg/metrics/server_test.go b/probe/pkg/metrics/server_test.go index 70e6a9cce0..344cee2f74 100644 --- a/probe/pkg/metrics/server_test.go +++ b/probe/pkg/metrics/server_test.go @@ -16,13 +16,13 @@ import ( type metricResponse map[string]*io_prometheus_client.MetricFamily func TestMetricsServerCorrectAddress(t *testing.T) { - server := NewMetricsServer(":8081", regionValue) + server := NewMetricsServer(cfg) defer server.Close() assert.Equal(t, ":8081", server.Addr) } func TestMetricsServerServesDefaultMetrics(t *testing.T) { - registry := initPrometheus(newMetrics(), regionValue) + registry := initPrometheus(newMetrics(), cfg) metrics := serveMetrics(t, registry) assert.Contains(t, metrics, "go_memstats_alloc_bytes", "expected metrics to contain go default metrics but it did not") } @@ -37,7 +37,7 @@ func TestMetricsServerServesCustomMetrics(t *testing.T) { customMetrics.SetLastSuccessTimestamp(regionValue) customMetrics.SetLastFailureTimestamp(regionValue) customMetrics.ObserveTotalDuration(time.Minute, regionValue) - registry := initPrometheus(customMetrics, regionValue) + registry := initPrometheus(customMetrics, cfg) metrics := serveMetrics(t, registry) expectedKeys := []string{ diff --git a/probe/pkg/probe/probe.go b/probe/pkg/probe/probe.go index 6fc50591ee..944a8ec0fb 100644 --- a/probe/pkg/probe/probe.go +++ b/probe/pkg/probe/probe.go @@ -27,7 +27,7 @@ import ( // //go:generate moq -out probe_moq.go . Probe type Probe interface { - Execute(ctx context.Context) error + Execute(ctx context.Context, spec config.CentralSpec) error CleanUp(ctx context.Context) error } @@ -65,16 +65,16 @@ func (p *ProbeImpl) newCentralName() (string, error) { } // Execute the probe of the fleet manager API. -func (p *ProbeImpl) Execute(ctx context.Context) error { +func (p *ProbeImpl) Execute(ctx context.Context, spec config.CentralSpec) error { glog.Infof("probe run has been started: fleetManagerEndpoint=%q, provider=%q, region=%q", p.config.FleetManagerEndpoint, - p.config.DataCloudProvider, - p.config.DataPlaneRegion, + spec.CloudProvider, + spec.Region, ) defer glog.Info("probe run has ended") - defer recordElapsedTime(time.Now(), p.config.DataPlaneRegion) + defer recordElapsedTime(time.Now(), spec.Region) - central, err := p.createCentral(ctx) + central, err := p.createCentral(ctx, spec) if err != nil { return err } @@ -133,7 +133,7 @@ func (p *ProbeImpl) cleanupFunc(ctx context.Context) error { } // Create a Central and verify that it transitioned to 'ready' state. -func (p *ProbeImpl) createCentral(ctx context.Context) (*public.CentralRequest, error) { +func (p *ProbeImpl) createCentral(ctx context.Context, spec config.CentralSpec) (*public.CentralRequest, error) { centralName, err := p.newCentralName() if err != nil { return nil, errors.Wrap(err, "failed to create central name") @@ -141,8 +141,8 @@ func (p *ProbeImpl) createCentral(ctx context.Context) (*public.CentralRequest, request := public.CentralRequestPayload{ Name: centralName, MultiAz: true, - CloudProvider: p.config.DataCloudProvider, - Region: p.config.DataPlaneRegion, + CloudProvider: spec.CloudProvider, + Region: spec.Region, } central, resp, err := p.fleetManagerPublicAPI.CreateCentral(ctx, true, request) defer utils.IgnoreError(closeBodyIfNonEmpty(resp)) diff --git a/probe/pkg/probe/probe_moq.go b/probe/pkg/probe/probe_moq.go index 4242e4f8dd..16d34fe06a 100644 --- a/probe/pkg/probe/probe_moq.go +++ b/probe/pkg/probe/probe_moq.go @@ -5,6 +5,7 @@ package probe import ( "context" + "github.com/stackrox/acs-fleet-manager/probe/config" "sync" ) @@ -21,7 +22,7 @@ var _ Probe = &ProbeMock{} // CleanUpFunc: func(ctx context.Context) error { // panic("mock out the CleanUp method") // }, -// ExecuteFunc: func(ctx context.Context) error { +// ExecuteFunc: func(ctx context.Context, spec config.CentralSpec) error { // panic("mock out the Execute method") // }, // } @@ -35,7 +36,7 @@ type ProbeMock struct { CleanUpFunc func(ctx context.Context) error // ExecuteFunc mocks the Execute method. - ExecuteFunc func(ctx context.Context) error + ExecuteFunc func(ctx context.Context, spec config.CentralSpec) error // calls tracks calls to the methods. calls struct { @@ -48,6 +49,8 @@ type ProbeMock struct { Execute []struct { // Ctx is the ctx argument value. Ctx context.Context + // Spec is the spec argument value. + Spec config.CentralSpec } } lockCleanUp sync.RWMutex @@ -87,19 +90,21 @@ func (mock *ProbeMock) CleanUpCalls() []struct { } // Execute calls ExecuteFunc. -func (mock *ProbeMock) Execute(ctx context.Context) error { +func (mock *ProbeMock) Execute(ctx context.Context, spec config.CentralSpec) error { if mock.ExecuteFunc == nil { panic("ProbeMock.ExecuteFunc: method is nil but Probe.Execute was just called") } callInfo := struct { - Ctx context.Context + Ctx context.Context + Spec config.CentralSpec }{ - Ctx: ctx, + Ctx: ctx, + Spec: spec, } mock.lockExecute.Lock() mock.calls.Execute = append(mock.calls.Execute, callInfo) mock.lockExecute.Unlock() - return mock.ExecuteFunc(ctx) + return mock.ExecuteFunc(ctx, spec) } // ExecuteCalls gets all the calls that were made to Execute. @@ -107,10 +112,12 @@ func (mock *ProbeMock) Execute(ctx context.Context) error { // // len(mockedProbe.ExecuteCalls()) func (mock *ProbeMock) ExecuteCalls() []struct { - Ctx context.Context + Ctx context.Context + Spec config.CentralSpec } { var calls []struct { - Ctx context.Context + Ctx context.Context + Spec config.CentralSpec } mock.lockExecute.RLock() calls = mock.calls.Execute diff --git a/probe/pkg/probe/probe_test.go b/probe/pkg/probe/probe_test.go index 3463c9f95f..8d2fa44c81 100644 --- a/probe/pkg/probe/probe_test.go +++ b/probe/pkg/probe/probe_test.go @@ -30,6 +30,11 @@ var testConfig = &config.Config{ ProbeUsername: "service-account-client", } +var centralSpec = config.CentralSpec{ + CloudProvider: "aws", + Region: "us-east-1", +} + func makeHTTPResponse(statusCode int) *http.Response { response := &http.Response{ Body: io.NopCloser(bytes.NewBufferString(`{}`)), @@ -123,7 +128,7 @@ func TestCreateCentral(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), testConfig.ProbeRunTimeout) defer cancel() - central, err := probe.createCentral(ctx) + central, err := probe.createCentral(ctx, centralSpec) if tc.wantErr { assert.Error(t, err, "expected an error during probe run") diff --git a/probe/pkg/runtime/runtime.go b/probe/pkg/runtime/runtime.go index dea126b881..15f9a4d1f1 100644 --- a/probe/pkg/runtime/runtime.go +++ b/probe/pkg/runtime/runtime.go @@ -3,19 +3,17 @@ package runtime import ( "context" + "errors" + "fmt" + "sync" "time" "github.com/golang/glog" - "github.com/pkg/errors" "github.com/stackrox/acs-fleet-manager/probe/config" "github.com/stackrox/acs-fleet-manager/probe/pkg/metrics" "github.com/stackrox/acs-fleet-manager/probe/pkg/probe" ) -var ( - errCleanupFailed = errors.New("cleanup failed") -) - // Runtime orchestrates probe runs against fleet manager. type Runtime struct { Config *config.Config @@ -38,7 +36,7 @@ func (r *Runtime) RunLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "probe context invalid") + return fmt.Errorf("probe context invalid: %w", ctx.Err()) case <-ticker.C: if err := r.RunSingle(ctx); err != nil { glog.Error(err) @@ -48,9 +46,31 @@ func (r *Runtime) RunLoop(ctx context.Context) error { } // RunSingle executes a single probe run. -func (r *Runtime) RunSingle(ctx context.Context) (errReturn error) { - metrics.MetricsInstance().IncStartedRuns(r.Config.DataPlaneRegion) - metrics.MetricsInstance().SetLastStartedTimestamp(r.Config.DataPlaneRegion) +func (r *Runtime) RunSingle(ctx context.Context) error { + var wg sync.WaitGroup + errCh := make(chan error, len(r.Config.CentralSpecs)) + + for _, spec := range r.Config.CentralSpecs { + wg.Add(1) + go func(spec config.CentralSpec) { + defer wg.Done() + errCh <- r.runWithSpec(ctx, spec) + }(spec) + } + + wg.Wait() + close(errCh) + + var result error + for err := range errCh { + result = errors.Join(result, err) + } + return result +} + +func (r *Runtime) runWithSpec(ctx context.Context, spec config.CentralSpec) (errReturn error) { + metrics.MetricsInstance().IncStartedRuns(spec.Region) + metrics.MetricsInstance().SetLastStartedTimestamp(spec.Region) probeRunCtx, cancel := context.WithTimeout(ctx, r.Config.ProbeRunTimeout) defer cancel() @@ -64,20 +84,20 @@ func (r *Runtime) RunSingle(ctx context.Context) (errReturn error) { // If ONLY the clean up failed, the context error is wrapped and // returned in `SingleRun`. if errReturn != nil { - errReturn = errors.Wrapf(errReturn, "%s: %s", errCleanupFailed, err) + errReturn = fmt.Errorf("cleanup failed: %w: %w", err, errReturn) } else { - errReturn = errors.Wrap(err, errCleanupFailed.Error()) + errReturn = fmt.Errorf("cleanup failed: %w", err) } } }() - if err := r.probe.Execute(probeRunCtx); err != nil { - metrics.MetricsInstance().IncFailedRuns(r.Config.DataPlaneRegion) - metrics.MetricsInstance().SetLastFailureTimestamp(r.Config.DataPlaneRegion) + if err := r.probe.Execute(probeRunCtx, spec); err != nil { + metrics.MetricsInstance().IncFailedRuns(spec.Region) + metrics.MetricsInstance().SetLastFailureTimestamp(spec.Region) glog.Error("probe run failed: ", err) - return errors.Wrap(err, "probe run failed") + return fmt.Errorf("probe run failed: %w", err) } - metrics.MetricsInstance().IncSucceededRuns(r.Config.DataPlaneRegion) - metrics.MetricsInstance().SetLastSuccessTimestamp(r.Config.DataPlaneRegion) + metrics.MetricsInstance().IncSucceededRuns(spec.Region) + metrics.MetricsInstance().SetLastSuccessTimestamp(spec.Region) return nil } diff --git a/probe/pkg/runtime/runtime_test.go b/probe/pkg/runtime/runtime_test.go index dd78d5a231..3703fd6116 100644 --- a/probe/pkg/runtime/runtime_test.go +++ b/probe/pkg/runtime/runtime_test.go @@ -19,6 +19,12 @@ var testConfig = &config.Config{ ProbeRunWaitPeriod: 10 * time.Millisecond, ProbeName: "probe", RHSSOClientID: "client", + CentralSpecs: []config.CentralSpec{ + { + CloudProvider: "aws", + Region: "us-east-1", + }, + }, } func TestRunSingle(t *testing.T) { @@ -32,7 +38,7 @@ func TestRunSingle(t *testing.T) { CleanUpFunc: func(ctx context.Context) error { return nil }, - ExecuteFunc: func(ctx context.Context) error { + ExecuteFunc: func(ctx context.Context, spec config.CentralSpec) error { concurrency.WaitWithTimeout(ctx, 2*testConfig.ProbeRunTimeout) return ctx.Err() }, @@ -45,7 +51,7 @@ func TestRunSingle(t *testing.T) { concurrency.WaitWithTimeout(ctx, 2*testConfig.ProbeCleanUpTimeout) return ctx.Err() }, - ExecuteFunc: func(ctx context.Context) error { + ExecuteFunc: func(ctx context.Context, spec config.CentralSpec) error { return nil }, }, @@ -54,6 +60,7 @@ func TestRunSingle(t *testing.T) { for _, tc := range tt { t.Run(tc.testName, func(t *testing.T) { + runtime, err := New(testConfig, tc.mockProbe) require.NoError(t, err, "failed to create runtime") ctx, cancel := context.WithTimeout(context.TODO(), testConfig.ProbeRunTimeout) @@ -78,7 +85,7 @@ func TestCanceledContextStillCleansUp(t *testing.T) { CleanUpFunc: func(ctx context.Context) error { return ctx.Err() }, - ExecuteFunc: func(ctx context.Context) error { + ExecuteFunc: func(ctx context.Context, spec config.CentralSpec) error { concurrency.WaitWithTimeout(ctx, 10*time.Millisecond) return ctx.Err() }, @@ -99,7 +106,7 @@ func TestCanceledContextStillCleansUp(t *testing.T) { }() err = runtime.RunSingle(ctx) - assert.NotContains(t, err.Error(), errCleanupFailed.Error()) + assert.NotContains(t, err.Error(), "cleanup failed") assert.Equal(t, 1, len(tc.mockProbe.CleanUpCalls()), "must clean up centrals") }) }