Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cluster name for events #4091

Merged
merged 2 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/v1/testkube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4746,7 +4746,7 @@ components:
app: "backend"

Event:
description: CRD based executor data
description: Event data
type: object
required:
- type
Expand All @@ -4768,6 +4768,9 @@ components:
$ref: "#/components/schemas/Execution"
testSuiteExecution:
$ref: "#/components/schemas/TestSuiteExecution"
clusterName:
type: string
description: cluster name of event

EventResource:
type: string
Expand Down
16 changes: 13 additions & 3 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func main() {
log.DefaultLogger.Errorw("error creating NATS connection", "error", err)
}
eventBus := bus.NewNATSBus(nc)
eventsEmitter := event.NewEmitter(eventBus)
eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName)

metrics := metrics.NewMetrics()

Expand Down Expand Up @@ -401,7 +401,17 @@ func main() {
if mode == common.ModeAgent {
log.DefaultLogger.Info("starting agent service")

agentHandle, err := agent.NewAgent(log.DefaultLogger, api.Mux.Handler(), cfg.TestkubeCloudAPIKey, grpcClient, cfg.TestkubeCloudWorkerCount, cfg.TestkubeCloudLogStreamWorkerCount, api.GetLogsStream, clusterId)
agentHandle, err := agent.NewAgent(
log.DefaultLogger,
api.Mux.Handler(),
cfg.TestkubeCloudAPIKey,
grpcClient,
cfg.TestkubeCloudWorkerCount,
cfg.TestkubeCloudLogStreamWorkerCount,
api.GetLogsStream,
clusterId,
cfg.TestkubeClusterName,
)
if err != nil {
ui.ExitOnError("Starting agent", err)
}
Expand Down Expand Up @@ -560,7 +570,7 @@ func newSlackLoader(cfg *config.Config) (*slack.SlackLoader, error) {
return nil, err
}

return slack.NewSlackLoader(slackTemplate, slackConfig, testkube.AllEventTypes), nil
return slack.NewSlackLoader(slackTemplate, slackConfig, cfg.TestkubeClusterName, testkube.AllEventTypes), nil
}

func loadFromBase64StringOrFile(base64Val string, configDir, filename, configType string) (raw string, err error) {
Expand Down
1 change: 1 addition & 0 deletions docs/docs/articles/helm-chart.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ The following Helm defaults are used in the `testkube` chart:
| testkube-api.logs.bucket | no | "testkube-logs" |
| testkube-api.cdeventsTarget | yes | "" |
| testkube-api.dashboardUri | yes | "" |
| testkube-api.clusterName | yes | "" |

>For more configuration parameters of a `MongoDB` chart please visit:
<https://github.com/bitnami/charts/tree/master/bitnami/mongodb#parameters>
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Config struct {
CDEventsTarget string `envconfig:"CDEVENTS_TARGET" default:""`
TestkubeDashboardURI string `envconfig:"TESTKUBE_DASHBOARD_URI" default:""`
DisableReconciler bool `envconfig:"DISABLE_RECONCILER" default:"false"`
TestkubeClusterName string `envconfig:"TESTKUBE_CLUSTER_NAME" default:""`
}

func Get() (*Config, error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type Agent struct {
receiveTimeout time.Duration
healthcheckInterval time.Duration

clusterID string
clusterID string
clusterName string
}

func NewAgent(logger *zap.SugaredLogger,
Expand All @@ -77,6 +78,7 @@ func NewAgent(logger *zap.SugaredLogger,
logStreamWorkerCount int,
logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error),
clusterID string,
clusterName string,
) (*Agent, error) {
return &Agent{
handler: handler,
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestCommandExecution(t *testing.T) {
var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)

logger, _ := zap.NewDevelopment()
agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "")
agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "", "")
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (ag *Agent) Metadata() map[string]string {
}

func (ag *Agent) Notify(event testkube.Event) (result testkube.EventResult) {
event.ClusterName = ag.clusterName
// Non blocking send
select {
case ag.events <- event:
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestEventLoop(t *testing.T) {
grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn)

var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)
agent, err := agent.NewAgent(logger.Sugar(), nil, "api-key", grpcClient, 5, 5, logStreamFunc, "")
agent, err := agent.NewAgent(logger.Sugar(), nil, "api-key", grpcClient, 5, 5, logStreamFunc, "", "")
assert.NoError(t, err)
go func() {
l, err := agent.Load()
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestLogStream(t *testing.T) {
}

logger, _ := zap.NewDevelopment()
agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "")
agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "", "")
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/v1/testkube/model_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
package testkube

// CRD based executor data
// Event data
type Event struct {
// UUID of event
Id string `json:"id"`
Expand All @@ -19,4 +19,6 @@ type Event struct {
Type_ *EventType `json:"type"`
TestExecution *Execution `json:"testExecution,omitempty"`
TestSuiteExecution *TestSuiteExecution `json:"testSuiteExecution,omitempty"`
// cluster name of event
ClusterName string `json:"clusterName,omitempty"`
}
27 changes: 15 additions & 12 deletions pkg/event/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@ const (
)

// NewEmitter returns new emitter instance
func NewEmitter(eventBus bus.Bus) *Emitter {
func NewEmitter(eventBus bus.Bus, clusterName string) *Emitter {
return &Emitter{
Results: make(chan testkube.EventResult, eventsBuffer),
Log: log.DefaultLogger,
Loader: NewLoader(),
Bus: eventBus,
Listeners: make(common.Listeners, 0),
Results: make(chan testkube.EventResult, eventsBuffer),
Log: log.DefaultLogger,
Loader: NewLoader(),
Bus: eventBus,
Listeners: make(common.Listeners, 0),
ClusterName: clusterName,
}
}

// Emitter handles events emitting for webhooks
type Emitter struct {
Results chan testkube.EventResult
Listeners common.Listeners
Loader *Loader
Log *zap.SugaredLogger
mutex sync.Mutex
Bus bus.Bus
Results chan testkube.EventResult
Listeners common.Listeners
Loader *Loader
Log *zap.SugaredLogger
mutex sync.Mutex
Bus bus.Bus
ClusterName string
}

// Register adds new listener
Expand Down Expand Up @@ -124,6 +126,7 @@ func (e *Emitter) UpdateListeners(listeners common.Listeners) {

// Notify notifies emitter with webhook
func (e *Emitter) Notify(event testkube.Event) {
event.ClusterName = e.ClusterName
err := e.Bus.PublishTopic(event.Topic(), event)
e.Log.Infow("event published", append(event.Log(), "error", err)...)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/event/emitter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func GetTestNATSEmitter() *Emitter {
if err != nil {
panic(err)
}
return NewEmitter(bus.NewNATSBus(nc))
return NewEmitter(bus.NewNATSBus(nc), "")
}

func TestEmitter_NATS_Register_Integration(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/event/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestEmitter_Register(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus)
emitter := NewEmitter(eventBus, "")
// when
emitter.Register(&dummy.DummyListener{Id: "l1"})

Expand All @@ -43,7 +43,7 @@ func TestEmitter_Listen(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus)
emitter := NewEmitter(eventBus, "")
// given listener with matching selector
listener1 := &dummy.DummyListener{Id: "l1", SelectorString: "type=listener1"}
// and listener with second matic selector
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestEmitter_Notify(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus)
emitter := NewEmitter(eventBus, "")
// and 2 listeners subscribed to the same queue
// * first on pod1
listener1 := &dummy.DummyListener{Id: "l3"}
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestEmitter_Reconcile(t *testing.T) {
t.Parallel()
// given first reconciler loop was done
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus)
emitter := NewEmitter(eventBus, "")
emitter.Loader.Register(&dummy.DummyLoader{IdPrefix: "dummy1"})
emitter.Loader.Register(&dummy.DummyLoader{IdPrefix: "dummy2"})

Expand Down Expand Up @@ -185,7 +185,7 @@ func TestEmitter_UpdateListeners(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus)
emitter := NewEmitter(eventBus, "")
// given listener with matching selector
listener1 := &dummy.DummyListener{Id: "l1", SelectorString: "type=listener1"}
// and listener with second matching selector
Expand Down
4 changes: 2 additions & 2 deletions pkg/event/kind/slack/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (

var _ common.ListenerLoader = (*SlackLoader)(nil)

func NewSlackLoader(messageTemplate, configString string, events []testkube.EventType) *SlackLoader {
func NewSlackLoader(messageTemplate, configString, clusterName string, events []testkube.EventType) *SlackLoader {
var config []slack.NotificationsConfig
if err := json.Unmarshal([]byte(configString), &config); err != nil {
log.DefaultLogger.Errorw("error unmarshalling slack config", "error", err)
}
slackNotifier := slack.NewNotifier(messageTemplate, config)
slackNotifier := slack.NewNotifier(messageTemplate, clusterName, config)
return &SlackLoader{
Log: log.DefaultLogger,
events: events,
Expand Down
2 changes: 1 addition & 1 deletion pkg/event/kind/slack/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestSlackLoader_Load(t *testing.T) {
t.Parallel()
// given
// default slack notifier is not ready by default
l := NewSlackLoader("", "", testkube.AllEventTypes)
l := NewSlackLoader("", "", "", testkube.AllEventTypes)

// when
listeners, err := l.Load()
Expand Down
8 changes: 6 additions & 2 deletions pkg/slack/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ type MessageArgs struct {
StartTime string
EndTime string
Duration string
ClusterName string
}

type Notifier struct {
client *slack.Client
timestamps map[string]string
Ready bool
messageTemplate string
clusterName string
config *Config
}

func NewNotifier(template string, config []NotificationsConfig) *Notifier {
notifier := Notifier{messageTemplate: template, config: NewConfig(config)}
func NewNotifier(template, clusterName string, config []NotificationsConfig) *Notifier {
notifier := Notifier{messageTemplate: template, clusterName: clusterName, config: NewConfig(config)}
notifier.timestamps = make(map[string]string)
if token, ok := os.LookupEnv("SLACK_TOKEN"); ok {
log.DefaultLogger.Infow("initializing slack client", "SLACK_TOKEN", token)
Expand Down Expand Up @@ -182,6 +184,7 @@ func (s *Notifier) composeTestsuiteMessage(execution *testkube.TestSuiteExecutio
Duration: execution.Duration,
TotalSteps: len(execution.ExecuteStepResults),
FailedSteps: execution.FailedStepsCount(),
ClusterName: s.clusterName,
}

log.DefaultLogger.Infow("Execution changed", "status", execution.Status)
Expand Down Expand Up @@ -216,6 +219,7 @@ func (s *Notifier) composeTestMessage(execution *testkube.Execution, eventType t
Duration: execution.Duration,
TotalSteps: len(execution.ExecutionResult.Steps),
FailedSteps: execution.ExecutionResult.FailedStepsCount(),
ClusterName: s.clusterName,
}

log.DefaultLogger.Infow("Execution changed", "status", execution.ExecutionResult.Status)
Expand Down
2 changes: 1 addition & 1 deletion pkg/triggers/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestExecute(t *testing.T) {

mockExecutor := client.NewMockExecutor(mockCtrl)

mockEventEmitter := event.NewEmitter(bus.NewEventBusMock())
mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "")

mockTest := testsv3.Test{
ObjectMeta: metav1.ObjectMeta{Namespace: "testkube", Name: "some-test"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/triggers/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestService_Run(t *testing.T) {

mockExecutor := client.NewMockExecutor(mockCtrl)

mockEventEmitter := event.NewEmitter(bus.NewEventBusMock())
mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "")

mockTest := testsv3.Test{
ObjectMeta: metav1.ObjectMeta{Namespace: "testkube", Name: "some-test"},
Expand Down