Skip to content

Commit

Permalink
ROX-16615: Make probe multiregion
Browse files Browse the repository at this point in the history
  • Loading branch information
kovayur committed Mar 19, 2024
1 parent 8c79788 commit c9c9ba5
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 61 deletions.
4 changes: 2 additions & 2 deletions deploy/helm/probe/templates/01-operator-04-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ spec:
name: rhacs-probe-ocm-token
key: TOKEN
{{- end }}
- name: DATA_PLANE_REGION
value: {{ .Values.dataPlaneRegion | quote }}
- name: CENTRAL_SPECS
value: {{ printf `[{ "cloudProvider": "aws", "region": "%s" }]` .Values.dataPlaneRegion | quote }}
ports:
- name: monitoring
containerPort: 7070
Expand Down
2 changes: 1 addition & 1 deletion probe/cmd/probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
glog.Fatal(err)
}

if metricsServer := metrics.NewMetricsServer(config.MetricsAddress, config.DataPlaneRegion); metricsServer != nil {
if metricsServer := metrics.NewMetricsServer(config); metricsServer != nil {
defer metrics.CloseMetricsServer(metricsServer)
go metrics.ListenAndServe(metricsServer)
} else {
Expand Down
23 changes: 21 additions & 2 deletions probe/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/stackrox/rox/pkg/errorhelpers"
"sigs.k8s.io/yaml"

"github.com/caarlos0/env/v6"
"github.com/pkg/errors"
Expand All @@ -14,8 +15,7 @@ import (
// Config contains this application's runtime configuration.
type Config struct {
AuthType string `env:"AUTH_TYPE" envDefault:"RHSSO"`
DataCloudProvider string `env:"DATA_PLANE_CLOUD_PROVIDER" envDefault:"aws"`
DataPlaneRegion string `env:"DATA_PLANE_REGION" envDefault:"us-east-1"`
CentralSpecs CentralSpecs `env:"CENTRAL_SPECS" envDefault:"[{ \"cloudProvider\": \"aws\", \"region\": \"us-east-1\" }]"`
FleetManagerEndpoint string `env:"FLEET_MANAGER_ENDPOINT" envDefault:"http://127.0.0.1:8000"`
MetricsAddress string `env:"METRICS_ADDRESS" envDefault:":7070"`
RHSSOClientID string `env:"RHSSO_SERVICE_ACCOUNT_CLIENT_ID"`
Expand All @@ -30,6 +30,25 @@ type Config struct {
ProbeUsername string
}

// CentralSpecs container for the CentralSpec slice
type CentralSpecs []CentralSpec

// CentralSpec the desired central configuration
type CentralSpec struct {
CloudProvider string `json:"cloudProvider"`
Region string `json:"region"`
}

// UnmarshalText implements encoding.TextUnmarshaler so that CentralSpec can be parsed by env.Parse
func (s *CentralSpecs) UnmarshalText(text []byte) error {
var specs []CentralSpec
if err := yaml.Unmarshal(text, &specs); err != nil {
return fmt.Errorf("unmarshal central spec: %w", err)
}
*s = specs
return nil
}

// GetConfig retrieves the current runtime configuration from the environment and returns it.
func GetConfig() (*Config, error) {
// Default value if PROBE_NAME and HOSTNAME are not set.
Expand Down
51 changes: 51 additions & 0 deletions probe/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,54 @@ func TestGetConfig_Failure(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, cfg)
}

func TestGetConfig_CentralSpec(t *testing.T) {
t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888")
t.Setenv("AUTH_TYPE", "RHSSO")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy")
t.Setenv("CENTRAL_SPECS", `[{ "cloudProvider": "aws", "region": "us-east-1" }, { "cloudProvider": "aws", "region": "eu-west-1" }]`)

cfg, err := GetConfig()

require.NoError(t, err)
assert.Equal(t, CentralSpecs{
{
CloudProvider: "aws",
Region: "us-east-1",
},
{
CloudProvider: "aws",
Region: "eu-west-1",
},
}, cfg.CentralSpecs)
}

func TestGetConfig_CentralSpecDefault(t *testing.T) {
t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888")
t.Setenv("AUTH_TYPE", "RHSSO")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy")

cfg, err := GetConfig()

require.NoError(t, err)
assert.Equal(t, CentralSpecs{
{
CloudProvider: "aws",
Region: "us-east-1",
},
}, cfg.CentralSpecs)
}

func TestGetConfig_CentralSpecInvalidJson(t *testing.T) {
t.Setenv("FLEET_MANAGER_ENDPOINT", "http://127.0.0.1:8888")
t.Setenv("AUTH_TYPE", "RHSSO")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_ID", "dummy")
t.Setenv("RHSSO_SERVICE_ACCOUNT_CLIENT_SECRET", "dummy")
t.Setenv("CENTRAL_SPECS", `{ "cloudProvider": `)

_, err := GetConfig()

require.Error(t, err)
}
8 changes: 7 additions & 1 deletion probe/internal/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ var testConfig = &config.Config{
ProbeRunTimeout: 100 * time.Millisecond,
ProbeName: "probe",
RHSSOClientID: "client",
CentralSpecs: []config.CentralSpec{
{
CloudProvider: "aws",
Region: "us-east-1",
},
},
}

func TestCLIInterrupt(t *testing.T) {
mockProbe := &probe.ProbeMock{
CleanUpFunc: func(ctx context.Context) error {
return nil
},
ExecuteFunc: func(ctx context.Context) error {
ExecuteFunc: func(ctx context.Context, spec config.CentralSpec) error {
process, err := os.FindProcess(os.Getpid())
require.NoError(t, err, "could not find current process ID")
process.Signal(os.Interrupt)
Expand Down
11 changes: 7 additions & 4 deletions probe/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stackrox/acs-fleet-manager/probe/config"
)

const (
Expand Down Expand Up @@ -43,10 +44,12 @@ func (m *Metrics) Register(r prometheus.Registerer) {
}

// Init sets initial values for the gauge metrics.
func (m *Metrics) Init(region string) {
m.lastFailureTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0)
m.lastStartedTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0)
m.lastSuccessTimestamp.With(prometheus.Labels{regionLabelName: region}).Set(0)
func (m *Metrics) Init(config *config.Config) {
for _, spec := range config.CentralSpecs {
m.lastFailureTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0)
m.lastStartedTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0)
m.lastSuccessTimestamp.With(prometheus.Labels{regionLabelName: spec.Region}).Set(0)
}
}

// IncStartedRuns increments the metric counter for started probe runs.
Expand Down
19 changes: 15 additions & 4 deletions probe/pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,22 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stackrox/acs-fleet-manager/probe/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var regionValue = "us-east-1"
var (
regionValue = "us-east-1"
cfg = &config.Config{
MetricsAddress: ":8081",
CentralSpecs: []config.CentralSpec{
{
Region: regionValue,
},
},
}
)

func getMetricSeries(t *testing.T, registry *prometheus.Registry, name string) *io_prometheus_client.Metric {
metrics := serveMetrics(t, registry)
Expand Down Expand Up @@ -52,7 +63,7 @@ func TestCounterIncrements(t *testing.T) {
tc := tc
t.Run(tc.metricName, func(t *testing.T) {
m := newMetrics()
registry := initPrometheus(m, regionValue)
registry := initPrometheus(m, cfg)
tc.callIncrementFunc(m)

targetSeries := getMetricSeries(t, registry, tc.metricName)
Expand Down Expand Up @@ -96,7 +107,7 @@ func TestTimestampGauges(t *testing.T) {
tc := tc
t.Run(tc.metricName, func(t *testing.T) {
m := newMetrics()
registry := initPrometheus(m, regionValue)
registry := initPrometheus(m, cfg)
lowerBound := time.Now().Unix()

targetSeries := getMetricSeries(t, registry, tc.metricName)
Expand Down Expand Up @@ -133,7 +144,7 @@ func TestHistograms(t *testing.T) {
tc := tc
t.Run(tc.metricName, func(t *testing.T) {
m := newMetrics()
registry := initPrometheus(m, regionValue)
registry := initPrometheus(m, cfg)
expectedCount := uint64(2)
expectedSum := 480.0
tc.callObserveFunc(m)
Expand Down
11 changes: 6 additions & 5 deletions probe/pkg/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stackrox/acs-fleet-manager/probe/config"
"github.com/stackrox/rox/pkg/utils"
)

// NewMetricsServer returns the metrics server.
func NewMetricsServer(address string, region string) *http.Server {
registry := initPrometheus(MetricsInstance(), region)
return newMetricsServer(address, registry)
func NewMetricsServer(config *config.Config) *http.Server {
registry := initPrometheus(MetricsInstance(), config)
return newMetricsServer(config.MetricsAddress, registry)
}

// ListenAndServe listens for incoming requests and serves the metrics.
Expand All @@ -29,14 +30,14 @@ func CloseMetricsServer(server *http.Server) {
}
}

func initPrometheus(customMetrics *Metrics, region string) *prometheus.Registry {
func initPrometheus(customMetrics *Metrics, config *config.Config) *prometheus.Registry {
registry := prometheus.NewRegistry()
// Register default metrics to use a dedicated registry instead of prometheus.DefaultRegistry.
// This makes it easier to isolate metric state when unit testing this package.
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
customMetrics.Register(registry)
customMetrics.Init(region)
customMetrics.Init(config)
return registry
}

Expand Down
6 changes: 3 additions & 3 deletions probe/pkg/metrics/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
type metricResponse map[string]*io_prometheus_client.MetricFamily

func TestMetricsServerCorrectAddress(t *testing.T) {
server := NewMetricsServer(":8081", regionValue)
server := NewMetricsServer(cfg)
defer server.Close()
assert.Equal(t, ":8081", server.Addr)
}

func TestMetricsServerServesDefaultMetrics(t *testing.T) {
registry := initPrometheus(newMetrics(), regionValue)
registry := initPrometheus(newMetrics(), cfg)
metrics := serveMetrics(t, registry)
assert.Contains(t, metrics, "go_memstats_alloc_bytes", "expected metrics to contain go default metrics but it did not")
}
Expand All @@ -37,7 +37,7 @@ func TestMetricsServerServesCustomMetrics(t *testing.T) {
customMetrics.SetLastSuccessTimestamp(regionValue)
customMetrics.SetLastFailureTimestamp(regionValue)
customMetrics.ObserveTotalDuration(time.Minute, regionValue)
registry := initPrometheus(customMetrics, regionValue)
registry := initPrometheus(customMetrics, cfg)
metrics := serveMetrics(t, registry)

expectedKeys := []string{
Expand Down
18 changes: 9 additions & 9 deletions probe/pkg/probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
//
//go:generate moq -out probe_moq.go . Probe
type Probe interface {
Execute(ctx context.Context) error
Execute(ctx context.Context, spec config.CentralSpec) error
CleanUp(ctx context.Context) error
}

Expand Down Expand Up @@ -65,16 +65,16 @@ func (p *ProbeImpl) newCentralName() (string, error) {
}

// Execute the probe of the fleet manager API.
func (p *ProbeImpl) Execute(ctx context.Context) error {
func (p *ProbeImpl) Execute(ctx context.Context, spec config.CentralSpec) error {
glog.Infof("probe run has been started: fleetManagerEndpoint=%q, provider=%q, region=%q",
p.config.FleetManagerEndpoint,
p.config.DataCloudProvider,
p.config.DataPlaneRegion,
spec.CloudProvider,
spec.Region,
)
defer glog.Info("probe run has ended")
defer recordElapsedTime(time.Now(), p.config.DataPlaneRegion)
defer recordElapsedTime(time.Now(), spec.Region)

central, err := p.createCentral(ctx)
central, err := p.createCentral(ctx, spec)
if err != nil {
return err
}
Expand Down Expand Up @@ -133,16 +133,16 @@ func (p *ProbeImpl) cleanupFunc(ctx context.Context) error {
}

// Create a Central and verify that it transitioned to 'ready' state.
func (p *ProbeImpl) createCentral(ctx context.Context) (*public.CentralRequest, error) {
func (p *ProbeImpl) createCentral(ctx context.Context, spec config.CentralSpec) (*public.CentralRequest, error) {
centralName, err := p.newCentralName()
if err != nil {
return nil, errors.Wrap(err, "failed to create central name")
}
request := public.CentralRequestPayload{
Name: centralName,
MultiAz: true,
CloudProvider: p.config.DataCloudProvider,
Region: p.config.DataPlaneRegion,
CloudProvider: spec.CloudProvider,
Region: spec.Region,
}
central, resp, err := p.fleetManagerPublicAPI.CreateCentral(ctx, true, request)
defer utils.IgnoreError(closeBodyIfNonEmpty(resp))
Expand Down
Loading

0 comments on commit c9c9ba5

Please sign in to comment.