Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROX-16615: Make the probe service create multiple centrals (per region / cloud provider) #1719

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deploy/helm/probe/templates/01-operator-04-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ spec:
name: rhacs-probe-ocm-token
key: TOKEN
{{- end }}
- name: DATA_PLANE_REGION
value: {{ .Values.dataPlaneRegion | quote }}
- name: CENTRAL_SPECS
value: {{ printf `[{ "cloudProvider": "aws", "region": "%s" }]` .Values.dataPlaneRegion | quote }}
ports:
- name: monitoring
containerPort: 7070
Expand Down
2 changes: 1 addition & 1 deletion probe/cmd/probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
glog.Fatal(err)
}

if metricsServer := metrics.NewMetricsServer(config.MetricsAddress, config.DataPlaneRegion); metricsServer != nil {
if metricsServer := metrics.NewMetricsServer(config); metricsServer != nil {
defer metrics.CloseMetricsServer(metricsServer)
go metrics.ListenAndServe(metricsServer)
} else {
Expand Down
23 changes: 21 additions & 2 deletions probe/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/stackrox/rox/pkg/errorhelpers"
"sigs.k8s.io/yaml"

"github.com/caarlos0/env/v6"
"github.com/pkg/errors"
Expand All @@ -14,8 +15,7 @@ import (
// Config contains this application's runtime configuration.
type Config struct {
AuthType string `env:"AUTH_TYPE" envDefault:"RHSSO"`
DataCloudProvider string `env:"DATA_PLANE_CLOUD_PROVIDER" envDefault:"aws"`
DataPlaneRegion string `env:"DATA_PLANE_REGION" envDefault:"us-east-1"`
CentralSpecs CentralSpecs `env:"CENTRAL_SPECS" envDefault:"[{ \"cloudProvider\": \"aws\", \"region\": \"us-east-1\" }]"`
FleetManagerEndpoint string `env:"FLEET_MANAGER_ENDPOINT" envDefault:"http://127.0.0.1:8000"`
MetricsAddress string `env:"METRICS_ADDRESS" envDefault:":7070"`
RHSSOClientID string `env:"RHSSO_SERVICE_ACCOUNT_CLIENT_ID"`
Expand All @@ -30,6 +30,25 @@ type Config struct {
ProbeUsername string
}

// CentralSpecs container for the CentralSpec slice
type CentralSpecs []CentralSpec

// CentralSpec the desired central configuration
type CentralSpec struct {
CloudProvider string `json:"cloudProvider"`
Region string `json:"region"`
}

// UnmarshalText implements encoding.TextUnmarshaler so that CentralSpec can be parsed by env.Parse
func (s *CentralSpecs) UnmarshalText(text []byte) error {
var specs []CentralSpec
if err := yaml.Unmarshal(text, &specs); err != nil {
return fmt.Errorf("unmarshal central spec: %w", err)
}
*s = specs
return nil
}

// GetConfig retrieves the current runtime configuration from the environment and returns it.
func GetConfig() (*Config, error) {
// Default value if PROBE_NAME and HOSTNAME are not set.
Expand Down
51 changes: 51 additions & 0 deletions probe/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,54 @@ func TestGetConfig_Failure(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, cfg)
}

func TestGetConfig_CentralSpec(t *testing.T) {
t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888")
t.Setenv("AUTH_TYPE", "RHSSO")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy")
t.Setenv("CENTRAL_SPECS", `[{ "cloudProvider": "aws", "region": "us-east-1" }, { "cloudProvider": "aws", "region": "eu-west-1" }]`)

cfg, err := GetConfig()

require.NoError(t, err)
assert.Equal(t, CentralSpecs{
{
CloudProvider: "aws",
Region: "us-east-1",
},
{
CloudProvider: "aws",
Region: "eu-west-1",
},
}, cfg.CentralSpecs)
}

func TestGetConfig_CentralSpecDefault(t *testing.T) {
t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888")
t.Setenv("AUTH_TYPE", "RHSSO")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy")

cfg, err := GetConfig()

require.NoError(t, err)
assert.Equal(t, CentralSpecs{
{
CloudProvider: "aws",
Region: "us-east-1",
},
}, cfg.CentralSpecs)
}

func TestGetConfig_CentralSpecInvalidJson(t *testing.T) {
t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888")
t.Setenv("AUTH_TYPE", "RHSSO")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy")
t.Setenv("CENTRAL_SPECS", `{ "cloudProvider": `)

_, err := GetConfig()

require.Error(t, err)
}
8 changes: 7 additions & 1 deletion probe/internal/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ var testConfig = &config.Config{
ProbeRunTimeout: 100 * time.Millisecond,
ProbeName: "probe",
RHSSOClientID: "client",
CentralSpecs: []config.CentralSpec{
{
CloudProvider: "aws",
Region: "us-east-1",
},
},
}

func TestCLIInterrupt(t *testing.T) {
mockProbe := &probe.ProbeMock{
CleanUpFunc: func(ctx context.Context) error {
return nil
},
ExecuteFunc: func(ctx context.Context) error {
ExecuteFunc: func(ctx context.Context, spec config.CentralSpec) error {
process, err := os.FindProcess(os.Getpid())
require.NoError(t, err, "could not find current process ID")
process.Signal(os.Interrupt)
Expand Down
11 changes: 7 additions & 4 deletions probe/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stackrox/acs-fleet-manager/probe/config"
)

const (
Expand Down Expand Up @@ -43,10 +44,12 @@ func (m *Metrics) Register(r prometheus.Registerer) {
}

// Init sets initial values for the gauge metrics.
func (m *Metrics) Init(region string) {
m.lastFailureTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0)
m.lastStartedTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0)
m.lastSuccessTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0)
func (m *Metrics) Init(config *config.Config) {
for _, spec := range config.CentralSpecs {
m.lastFailureTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0)
m.lastStartedTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0)
m.lastSuccessTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0)
}
}

// IncStartedRuns increments the metric counter for started probe runs.
Expand Down
19 changes: 15 additions & 4 deletions probe/pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,22 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stackrox/acs-fleet-manager/probe/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var regionValue = "us-east-1"
var (
regionValue = "us-east-1"
cfg = &config.Config{
MetricsAddress: ":8081",
CentralSpecs: []config.CentralSpec{
{
Region: regionValue,
},
},
}
)

func getMetricSeries(t *testing.T, registry *prometheus.Registry, name string) *io_prometheus_client.Metric {
metrics := serveMetrics(t, registry)
Expand Down Expand Up @@ -52,7 +63,7 @@ func TestCounterIncrements(t *testing.T) {
tc := tc
t.Run(tc.metricName, func(t *testing.T) {
m := newMetrics()
registry := initPrometheus(m, regionValue)
registry := initPrometheus(m, cfg)
tc.callIncrementFunc(m)

targetSeries := getMetricSeries(t, registry, tc.metricName)
Expand Down Expand Up @@ -96,7 +107,7 @@ func TestTimestampGauges(t *testing.T) {
tc := tc
t.Run(tc.metricName, func(t *testing.T) {
m := newMetrics()
registry := initPrometheus(m, regionValue)
registry := initPrometheus(m, cfg)
lowerBound := time.Now().Unix()

targetSeries := getMetricSeries(t, registry, tc.metricName)
Expand Down Expand Up @@ -133,7 +144,7 @@ func TestHistograms(t *testing.T) {
tc := tc
t.Run(tc.metricName, func(t *testing.T) {
m := newMetrics()
registry := initPrometheus(m, regionValue)
registry := initPrometheus(m, cfg)
expectedCount := uint64(2)
expectedSum := 480.0
tc.callObserveFunc(m)
Expand Down
11 changes: 6 additions & 5 deletions probe/pkg/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stackrox/acs-fleet-manager/probe/config"
"github.com/stackrox/rox/pkg/utils"
)

// NewMetricsServer returns the metrics server.
func NewMetricsServer(address string, region string) *http.Server {
registry := initPrometheus(MetricsInstance(), region)
return newMetricsServer(address, registry)
func NewMetricsServer(config *config.Config) *http.Server {
registry := initPrometheus(MetricsInstance(), config)
return newMetricsServer(config.MetricsAddress, registry)
}

// ListenAndServe listens for incoming requests and serves the metrics.
Expand All @@ -29,14 +30,14 @@ func CloseMetricsServer(server *http.Server) {
}
}

func initPrometheus(customMetrics *Metrics, region string) *prometheus.Registry {
func initPrometheus(customMetrics *Metrics, config *config.Config) *prometheus.Registry {
registry := prometheus.NewRegistry()
// Register default metrics to use a dedicated registry instead of prometheus.DefaultRegistry.
// This makes it easier to isolate metric state when unit testing this package.
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
customMetrics.Register(registry)
customMetrics.Init(region)
customMetrics.Init(config)
return registry
}

Expand Down
6 changes: 3 additions & 3 deletions probe/pkg/metrics/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
type metricResponse map[string]*io_prometheus_client.MetricFamily

func TestMetricsServerCorrectAddress(t *testing.T) {
server := NewMetricsServer(":8081", regionValue)
server := NewMetricsServer(cfg)
defer server.Close()
assert.Equal(t, ":8081", server.Addr)
}

func TestMetricsServerServesDefaultMetrics(t *testing.T) {
registry := initPrometheus(newMetrics(), regionValue)
registry := initPrometheus(newMetrics(), cfg)
metrics := serveMetrics(t, registry)
assert.Contains(t, metrics, "go_memstats_alloc_bytes", "expected metrics to contain go default metrics but it did not")
}
Expand All @@ -37,7 +37,7 @@ func TestMetricsServerServesCustomMetrics(t *testing.T) {
customMetrics.SetLastSuccessTimestamp(regionValue)
customMetrics.SetLastFailureTimestamp(regionValue)
customMetrics.ObserveTotalDuration(time.Minute, regionValue)
registry := initPrometheus(customMetrics, regionValue)
registry := initPrometheus(customMetrics, cfg)
metrics := serveMetrics(t, registry)

expectedKeys := []string{
Expand Down
18 changes: 9 additions & 9 deletions probe/pkg/probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
//
//go:generate moq -out probe_moq.go . Probe
type Probe interface {
Execute(ctx context.Context) error
Execute(ctx context.Context, spec config.CentralSpec) error
CleanUp(ctx context.Context) error
}

Expand Down Expand Up @@ -65,16 +65,16 @@ func (p *ProbeImpl) newCentralName() (string, error) {
}

// Execute the probe of the fleet manager API.
func (p *ProbeImpl) Execute(ctx context.Context) error {
func (p *ProbeImpl) Execute(ctx context.Context, spec config.CentralSpec) error {
glog.Infof("probe run has been started: fleetManagerEndpoint=%q, provider=%q, region=%q",
p.config.FleetManagerEndpoint,
p.config.DataCloudProvider,
p.config.DataPlaneRegion,
spec.CloudProvider,
spec.Region,
)
defer glog.Info("probe run has ended")
defer recordElapsedTime(time.Now(), p.config.DataPlaneRegion)
defer recordElapsedTime(time.Now(), spec.Region)

central, err := p.createCentral(ctx)
central, err := p.createCentral(ctx, spec)
if err != nil {
return err
}
Expand Down Expand Up @@ -133,16 +133,16 @@ func (p *ProbeImpl) cleanupFunc(ctx context.Context) error {
}

// Create a Central and verify that it transitioned to 'ready' state.
func (p *ProbeImpl) createCentral(ctx context.Context) (*public.CentralRequest, error) {
func (p *ProbeImpl) createCentral(ctx context.Context, spec config.CentralSpec) (*public.CentralRequest, error) {
centralName, err := p.newCentralName()
if err != nil {
return nil, errors.Wrap(err, "failed to create central name")
}
request := public.CentralRequestPayload{
Name: centralName,
MultiAz: true,
CloudProvider: p.config.DataCloudProvider,
Region: p.config.DataPlaneRegion,
CloudProvider: spec.CloudProvider,
Region: spec.Region,
}
central, resp, err := p.fleetManagerPublicAPI.CreateCentral(ctx, true, request)
defer utils.IgnoreError(closeBodyIfNonEmpty(resp))
Expand Down
Loading
Loading