From 8b7b032f98414a3b98c77b31575431746177837c Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 27 Jun 2023 13:48:01 +0300 Subject: [PATCH 1/2] feat: cluster name for events --- api/v1/testkube.yaml | 5 ++++- cmd/api-server/main.go | 16 +++++++++++++--- internal/config/config.go | 1 + pkg/agent/agent.go | 4 +++- pkg/agent/agent_test.go | 2 +- pkg/agent/events.go | 1 + pkg/api/v1/testkube/model_event.go | 4 +++- pkg/event/emitter.go | 27 +++++++++++++++------------ pkg/event/emitter_test.go | 10 +++++----- pkg/event/kind/slack/loader.go | 4 ++-- pkg/slack/slack.go | 8 ++++++-- 11 files changed, 54 insertions(+), 28 deletions(-) diff --git a/api/v1/testkube.yaml b/api/v1/testkube.yaml index 2484c47d897..5a0b8989b09 100644 --- a/api/v1/testkube.yaml +++ b/api/v1/testkube.yaml @@ -4746,7 +4746,7 @@ components: app: "backend" Event: - description: CRD based executor data + description: Event data type: object required: - type @@ -4768,6 +4768,9 @@ components: $ref: "#/components/schemas/Execution" testSuiteExecution: $ref: "#/components/schemas/TestSuiteExecution" + clusterName: + type: string + description: cluster name of event EventResource: type: string diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 201ab81d862..53ac4e0cb64 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -286,7 +286,7 @@ func main() { log.DefaultLogger.Errorw("error creating NATS connection", "error", err) } eventBus := bus.NewNATSBus(nc) - eventsEmitter := event.NewEmitter(eventBus) + eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName) metrics := metrics.NewMetrics() @@ -401,7 +401,17 @@ func main() { if mode == common.ModeAgent { log.DefaultLogger.Info("starting agent service") - agentHandle, err := agent.NewAgent(log.DefaultLogger, api.Mux.Handler(), cfg.TestkubeCloudAPIKey, grpcClient, cfg.TestkubeCloudWorkerCount, cfg.TestkubeCloudLogStreamWorkerCount, api.GetLogsStream, clusterId) + agentHandle, err := agent.NewAgent( + log.DefaultLogger, + api.Mux.Handler(), + cfg.TestkubeCloudAPIKey, + grpcClient, + cfg.TestkubeCloudWorkerCount, + cfg.TestkubeCloudLogStreamWorkerCount, + api.GetLogsStream, + clusterId, + cfg.TestkubeClusterName, + ) if err != nil { ui.ExitOnError("Starting agent", err) } @@ -560,7 +570,7 @@ func newSlackLoader(cfg *config.Config) (*slack.SlackLoader, error) { return nil, err } - return slack.NewSlackLoader(slackTemplate, slackConfig, testkube.AllEventTypes), nil + return slack.NewSlackLoader(slackTemplate, slackConfig, cfg.TestkubeClusterName, testkube.AllEventTypes), nil } func loadFromBase64StringOrFile(base64Val string, configDir, filename, configType string) (raw string, err error) { diff --git a/internal/config/config.go b/internal/config/config.go index 0dd8dc8ce80..9c64dcb2ccc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -61,6 +61,7 @@ type Config struct { CDEventsTarget string `envconfig:"CDEVENTS_TARGET" default:""` TestkubeDashboardURI string `envconfig:"TESTKUBE_DASHBOARD_URI" default:""` DisableReconciler bool `envconfig:"DISABLE_RECONCILER" default:"false"` + TestkubeClusterName string `envconfig:"TESTKUBE_CLUSTER_NAME" default:""` } func Get() (*Config, error) { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 8c65884695c..2f5f04fc868 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -66,7 +66,8 @@ type Agent struct { receiveTimeout time.Duration healthcheckInterval time.Duration - clusterID string + clusterID string + clusterName string } func NewAgent(logger *zap.SugaredLogger, @@ -77,6 +78,7 @@ func NewAgent(logger *zap.SugaredLogger, logStreamWorkerCount int, logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error), clusterID string, + clusterName string, ) (*Agent, error) { return &Agent{ handler: handler, diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 01597a0d8df..14fff844a89 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -56,7 +56,7 @@ func TestCommandExecution(t *testing.T) { var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error) logger, _ := zap.NewDevelopment() - agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "") + agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "", "") if err != nil { t.Fatal(err) } diff --git a/pkg/agent/events.go b/pkg/agent/events.go index b62ddb7bf27..7e7a7354eba 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -48,6 +48,7 @@ func (ag *Agent) Metadata() map[string]string { } func (ag *Agent) Notify(event testkube.Event) (result testkube.EventResult) { + event.ClusterName = ag.clusterName // Non blocking send select { case ag.events <- event: diff --git a/pkg/api/v1/testkube/model_event.go b/pkg/api/v1/testkube/model_event.go index 880b287be65..8e762deed32 100644 --- a/pkg/api/v1/testkube/model_event.go +++ b/pkg/api/v1/testkube/model_event.go @@ -9,7 +9,7 @@ */ package testkube -// CRD based executor data +// Event data type Event struct { // UUID of event Id string `json:"id"` @@ -19,4 +19,6 @@ type Event struct { Type_ *EventType `json:"type"` TestExecution *Execution `json:"testExecution,omitempty"` TestSuiteExecution *TestSuiteExecution `json:"testSuiteExecution,omitempty"` + // cluster name of event + ClusterName string `json:"clusterName,omitempty"` } diff --git a/pkg/event/emitter.go b/pkg/event/emitter.go index 5cdb7bcf6b3..8d8f6856e32 100644 --- a/pkg/event/emitter.go +++ b/pkg/event/emitter.go @@ -20,24 +20,26 @@ const ( ) // NewEmitter returns new emitter instance -func NewEmitter(eventBus bus.Bus) *Emitter { +func NewEmitter(eventBus bus.Bus, clusterName string) *Emitter { return &Emitter{ - Results: make(chan testkube.EventResult, eventsBuffer), - Log: log.DefaultLogger, - Loader: NewLoader(), - Bus: eventBus, - Listeners: make(common.Listeners, 0), + Results: make(chan testkube.EventResult, eventsBuffer), + Log: log.DefaultLogger, + Loader: NewLoader(), + Bus: eventBus, + Listeners: make(common.Listeners, 0), + ClusterName: clusterName, } } // Emitter handles events emitting for webhooks type Emitter struct { - Results chan testkube.EventResult - Listeners common.Listeners - Loader *Loader - Log *zap.SugaredLogger - mutex sync.Mutex - Bus bus.Bus + Results chan testkube.EventResult + Listeners common.Listeners + Loader *Loader + Log *zap.SugaredLogger + mutex sync.Mutex + Bus bus.Bus + ClusterName string } // Register adds new listener @@ -124,6 +126,7 @@ func (e *Emitter) UpdateListeners(listeners common.Listeners) { // Notify notifies emitter with webhook func (e *Emitter) Notify(event testkube.Event) { + event.ClusterName = e.ClusterName err := e.Bus.PublishTopic(event.Topic(), event) e.Log.Infow("event published", append(event.Log(), "error", err)...) } diff --git a/pkg/event/emitter_test.go b/pkg/event/emitter_test.go index a82a93966f5..62882b40e92 100644 --- a/pkg/event/emitter_test.go +++ b/pkg/event/emitter_test.go @@ -25,7 +25,7 @@ func TestEmitter_Register(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus) + emitter := NewEmitter(eventBus, "") // when emitter.Register(&dummy.DummyListener{Id: "l1"}) @@ -43,7 +43,7 @@ func TestEmitter_Listen(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus) + emitter := NewEmitter(eventBus, "") // given listener with matching selector listener1 := &dummy.DummyListener{Id: "l1", SelectorString: "type=listener1"} // and listener with second matic selector @@ -97,7 +97,7 @@ func TestEmitter_Notify(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus) + emitter := NewEmitter(eventBus, "") // and 2 listeners subscribed to the same queue // * first on pod1 listener1 := &dummy.DummyListener{Id: "l3"} @@ -131,7 +131,7 @@ func TestEmitter_Reconcile(t *testing.T) { t.Parallel() // given first reconciler loop was done eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus) + emitter := NewEmitter(eventBus, "") emitter.Loader.Register(&dummy.DummyLoader{IdPrefix: "dummy1"}) emitter.Loader.Register(&dummy.DummyLoader{IdPrefix: "dummy2"}) @@ -185,7 +185,7 @@ func TestEmitter_UpdateListeners(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus) + emitter := NewEmitter(eventBus, "") // given listener with matching selector listener1 := &dummy.DummyListener{Id: "l1", SelectorString: "type=listener1"} // and listener with second matching selector diff --git a/pkg/event/kind/slack/loader.go b/pkg/event/kind/slack/loader.go index 70d87b99d96..1944a81a546 100644 --- a/pkg/event/kind/slack/loader.go +++ b/pkg/event/kind/slack/loader.go @@ -13,12 +13,12 @@ import ( var _ common.ListenerLoader = (*SlackLoader)(nil) -func NewSlackLoader(messageTemplate, configString string, events []testkube.EventType) *SlackLoader { +func NewSlackLoader(messageTemplate, configString, clusterName string, events []testkube.EventType) *SlackLoader { var config []slack.NotificationsConfig if err := json.Unmarshal([]byte(configString), &config); err != nil { log.DefaultLogger.Errorw("error unmarshalling slack config", "error", err) } - slackNotifier := slack.NewNotifier(messageTemplate, config) + slackNotifier := slack.NewNotifier(messageTemplate, clusterName, config) return &SlackLoader{ Log: log.DefaultLogger, events: events, diff --git a/pkg/slack/slack.go b/pkg/slack/slack.go index 6ae8f62a4ec..ffc24551ca8 100644 --- a/pkg/slack/slack.go +++ b/pkg/slack/slack.go @@ -26,6 +26,7 @@ type MessageArgs struct { StartTime string EndTime string Duration string + ClusterName string } type Notifier struct { @@ -33,11 +34,12 @@ type Notifier struct { timestamps map[string]string Ready bool messageTemplate string + clusterName string config *Config } -func NewNotifier(template string, config []NotificationsConfig) *Notifier { - notifier := Notifier{messageTemplate: template, config: NewConfig(config)} +func NewNotifier(template, clusterName string, config []NotificationsConfig) *Notifier { + notifier := Notifier{messageTemplate: template, clusterName: clusterName, config: NewConfig(config)} notifier.timestamps = make(map[string]string) if token, ok := os.LookupEnv("SLACK_TOKEN"); ok { log.DefaultLogger.Infow("initializing slack client", "SLACK_TOKEN", token) @@ -182,6 +184,7 @@ func (s *Notifier) composeTestsuiteMessage(execution *testkube.TestSuiteExecutio Duration: execution.Duration, TotalSteps: len(execution.ExecuteStepResults), FailedSteps: execution.FailedStepsCount(), + ClusterName: s.clusterName, } log.DefaultLogger.Infow("Execution changed", "status", execution.Status) @@ -216,6 +219,7 @@ func (s *Notifier) composeTestMessage(execution *testkube.Execution, eventType t Duration: execution.Duration, TotalSteps: len(execution.ExecutionResult.Steps), FailedSteps: execution.ExecutionResult.FailedStepsCount(), + ClusterName: s.clusterName, } log.DefaultLogger.Infow("Execution changed", "status", execution.ExecutionResult.Status) From 3a9c0fe9df0d13cb03424139c79334407499544b Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 27 Jun 2023 14:35:08 +0300 Subject: [PATCH 2/2] fix: unit tests --- docs/docs/articles/helm-chart.md | 1 + pkg/agent/events_test.go | 2 +- pkg/agent/logs_test.go | 2 +- pkg/event/emitter_integration_test.go | 2 +- pkg/event/kind/slack/loader_test.go | 2 +- pkg/triggers/executor_test.go | 2 +- pkg/triggers/service_test.go | 2 +- 7 files changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/docs/articles/helm-chart.md b/docs/docs/articles/helm-chart.md index cf7acb59d41..9dbb3802d47 100644 --- a/docs/docs/articles/helm-chart.md +++ b/docs/docs/articles/helm-chart.md @@ -114,6 +114,7 @@ The following Helm defaults are used in the `testkube` chart: | testkube-api.logs.bucket | no | "testkube-logs" | | testkube-api.cdeventsTarget | yes | "" | | testkube-api.dashboardUri | yes | "" | +| testkube-api.clusterName | yes | "" | >For more configuration parameters of a `MongoDB` chart please visit: diff --git a/pkg/agent/events_test.go b/pkg/agent/events_test.go index d42f7855669..ae15f59bc49 100644 --- a/pkg/agent/events_test.go +++ b/pkg/agent/events_test.go @@ -52,7 +52,7 @@ func TestEventLoop(t *testing.T) { grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn) var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error) - agent, err := agent.NewAgent(logger.Sugar(), nil, "api-key", grpcClient, 5, 5, logStreamFunc, "") + agent, err := agent.NewAgent(logger.Sugar(), nil, "api-key", grpcClient, 5, 5, logStreamFunc, "", "") assert.NoError(t, err) go func() { l, err := agent.Load() diff --git a/pkg/agent/logs_test.go b/pkg/agent/logs_test.go index f371d10f131..660dd4f4833 100644 --- a/pkg/agent/logs_test.go +++ b/pkg/agent/logs_test.go @@ -63,7 +63,7 @@ func TestLogStream(t *testing.T) { } logger, _ := zap.NewDevelopment() - agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "") + agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "", "") if err != nil { t.Fatal(err) } diff --git a/pkg/event/emitter_integration_test.go b/pkg/event/emitter_integration_test.go index de133faf264..c1750c624a1 100644 --- a/pkg/event/emitter_integration_test.go +++ b/pkg/event/emitter_integration_test.go @@ -24,7 +24,7 @@ func GetTestNATSEmitter() *Emitter { if err != nil { panic(err) } - return NewEmitter(bus.NewNATSBus(nc)) + return NewEmitter(bus.NewNATSBus(nc), "") } func TestEmitter_NATS_Register_Integration(t *testing.T) { diff --git a/pkg/event/kind/slack/loader_test.go b/pkg/event/kind/slack/loader_test.go index 477a85c39f3..5afb3bfd419 100644 --- a/pkg/event/kind/slack/loader_test.go +++ b/pkg/event/kind/slack/loader_test.go @@ -14,7 +14,7 @@ func TestSlackLoader_Load(t *testing.T) { t.Parallel() // given // default slack notifier is not ready by default - l := NewSlackLoader("", "", testkube.AllEventTypes) + l := NewSlackLoader("", "", "", testkube.AllEventTypes) // when listeners, err := l.Load() diff --git a/pkg/triggers/executor_test.go b/pkg/triggers/executor_test.go index 82e28b27e8a..be84fc9cb71 100644 --- a/pkg/triggers/executor_test.go +++ b/pkg/triggers/executor_test.go @@ -51,7 +51,7 @@ func TestExecute(t *testing.T) { mockExecutor := client.NewMockExecutor(mockCtrl) - mockEventEmitter := event.NewEmitter(bus.NewEventBusMock()) + mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "") mockTest := testsv3.Test{ ObjectMeta: metav1.ObjectMeta{Namespace: "testkube", Name: "some-test"}, diff --git a/pkg/triggers/service_test.go b/pkg/triggers/service_test.go index df0ede2807a..512b13ea149 100644 --- a/pkg/triggers/service_test.go +++ b/pkg/triggers/service_test.go @@ -57,7 +57,7 @@ func TestService_Run(t *testing.T) { mockExecutor := client.NewMockExecutor(mockCtrl) - mockEventEmitter := event.NewEmitter(bus.NewEventBusMock()) + mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "") mockTest := testsv3.Test{ ObjectMeta: metav1.ObjectMeta{Namespace: "testkube", Name: "some-test"},