From f2f710e806ecd37ea90747bfc7408248a16f32a6 Mon Sep 17 00:00:00 2001 From: Maksim Paskal Date: Thu, 21 Mar 2024 08:35:34 +0000 Subject: [PATCH] refactor Signed-off-by: Maksim Paskal --- Makefile | 5 -- codecov.yml | 1 - e2e/main_test.go | 73 +++++++++++++---- e2e/testdata/config_test.yaml | 6 +- internal/internal.go | 101 +++++++++++++++++++++-- pkg/api/api.go | 13 ++- pkg/config/config.go | 4 + pkg/events/events.go | 143 ++++++++++++++------------------ pkg/events/events_test.go | 149 ++++++++++++++++++++++++++++++++++ pkg/utils/utils.go | 4 + 10 files changed, 382 insertions(+), 117 deletions(-) create mode 100644 pkg/events/events_test.go diff --git a/Makefile b/Makefile index e8db42b..80b96ce 100644 --- a/Makefile +++ b/Makefile @@ -69,11 +69,6 @@ test: e2e: go test -v -race ./e2e \ -kubeconfig=$(KUBECONFIG) \ - -log.level=INFO \ - -log.pretty \ - -taint.node \ - -taint.effect=NoExecute \ - -podGracePeriodSeconds=30 \ -node=${node} \ -telegram.token=${telegramToken} \ -telegram.chatID=${telegramChatID} diff --git a/codecov.yml b/codecov.yml index dd8e2ce..297387e 100644 --- a/codecov.yml +++ b/codecov.yml @@ -6,5 +6,4 @@ ignore: # ignore because to test need active connection to the kubernetes cluster - "pkg/web/web.go" - "pkg/api/api.go" -- "pkg/events/events.go" - "pkg/client/client.go" \ No newline at end of file diff --git a/e2e/main_test.go b/e2e/main_test.go index 3c8f646..6e1d108 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -14,51 +14,88 @@ package main_test import ( "context" + "encoding/json" "flag" + "net/http" + "net/http/httptest" "testing" - "github.com/maksim-paskal/aks-node-termination-handler/pkg/alert" + "github.com/maksim-paskal/aks-node-termination-handler/internal" "github.com/maksim-paskal/aks-node-termination-handler/pkg/api" "github.com/maksim-paskal/aks-node-termination-handler/pkg/client" "github.com/maksim-paskal/aks-node-termination-handler/pkg/config" - "github.com/maksim-paskal/aks-node-termination-handler/pkg/template" "github.com/maksim-paskal/aks-node-termination-handler/pkg/types" + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var ctx = context.TODO() +const testAzureName = "test-e2e-resource" -func TestDrain(t *testing.T) { +func TestDrain(t *testing.T) { //nolint:funlen t.Parallel() + log.SetLevel(log.DebugLevel) + log.SetReportCaller(true) + + eventID := "test-event-id" + + handler := http.NewServeMux() + handler.HandleFunc("/document", func(w http.ResponseWriter, _ *http.Request) { + message, _ := json.Marshal(types.ScheduledEventsType{ + DocumentIncarnation: 1, + Events: []types.ScheduledEventsEvent{ + { + EventId: eventID, + EventType: types.EventTypePreempt, + ResourceType: "resourceType", + Resources: []string{testAzureName}, + }, + }, + }) + + w.WriteHeader(http.StatusOK) + _, _ = w.Write(message) + }) + + testServer := httptest.NewServer(handler) + _ = flag.Set("config", "./testdata/config_test.yaml") + _ = flag.Set("endpoint", testServer.URL+"/document") + _ = flag.Set("resource.name", testAzureName) flag.Parse() - if err := config.Load(); err != nil { + ctx := context.TODO() + + if err := internal.Run(ctx); err != nil { t.Fatal(err) } - if err := client.Init(); err != nil { + node, err := client.GetKubernetesClient().CoreV1().Nodes().Get(ctx, *config.Get().NodeName, metav1.GetOptions{}) + if err != nil { t.Fatal(err) } - if err := alert.Init(); err != nil { - t.Fatal(err) + if !node.Spec.Unschedulable { + t.Fatal("node must be unschedulable") } - if err := alert.SendTelegram(&template.MessageType{Template: "e2e"}); err != nil { - t.Fatal(err) + if len(node.Spec.Taints) == 0 { + t.Fatal("node must have taints") } - if err := api.DrainNode(ctx, *config.Get().NodeName, "Preempt", "manual"); err != nil { - t.Fatal(err) + taintFound := false + + for _, taint := range node.Spec.Taints { + if taint.Key == api.GetTaintKey(testAzureName) && taint.Value == string(corev1.TaintEffectNoSchedule) { + taintFound = true + + break + } } - if err := api.AddNodeEvent(ctx, &types.EventMessage{ - Type: "Info", - Reason: "TestDrain", - Message: "TestDrain", - }); err != nil { - t.Fatal(err) + if !taintFound { + t.Fatal("taint not found") } } diff --git a/e2e/testdata/config_test.yaml b/e2e/testdata/config_test.yaml index 6d30dc0..5c9da2c 100644 --- a/e2e/testdata/config_test.yaml +++ b/e2e/testdata/config_test.yaml @@ -1,2 +1,4 @@ -taintNode: true -taintEffect: NoSchedule \ No newline at end of file +taintnode: true +tainteffect: NoSchedule +podgraceperiodseconds: 30 +exitafternodedrain: true \ No newline at end of file diff --git a/internal/internal.go b/internal/internal.go index 504a2ee..c6a11cc 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -21,20 +21,23 @@ import ( "github.com/maksim-paskal/aks-node-termination-handler/pkg/client" "github.com/maksim-paskal/aks-node-termination-handler/pkg/config" "github.com/maksim-paskal/aks-node-termination-handler/pkg/events" + "github.com/maksim-paskal/aks-node-termination-handler/pkg/template" + "github.com/maksim-paskal/aks-node-termination-handler/pkg/types" "github.com/maksim-paskal/aks-node-termination-handler/pkg/web" + "github.com/maksim-paskal/aks-node-termination-handler/pkg/webhook" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) func Run(ctx context.Context) error { - err := config.Check() + err := config.Load() if err != nil { - return errors.Wrap(err, "error in config check") + return errors.Wrap(err, "error in config load") } - err = config.Load() + err = config.Check() if err != nil { - return errors.Wrap(err, "error in config load") + return errors.Wrap(err, "error in config check") } log.Debugf("using config: %s", config.Get().String()) @@ -49,14 +52,98 @@ func Run(ctx context.Context) error { return errors.Wrap(err, "error in init api") } + go cache.SheduleCleaning(ctx) + go web.Start(ctx) + + if err := startReadingEvents(ctx); err != nil { + return errors.Wrap(err, "error in startReadingEvents") + } + + return nil +} + +func startReadingEvents(ctx context.Context) error { //nolint:funlen azureResource, err := api.GetAzureResourceName(ctx, *config.Get().NodeName) if err != nil { return errors.Wrap(err, "error in getting azure resource name") } - go cache.SheduleCleaning(ctx) - go events.ReadEvents(ctx, azureResource) - go web.Start(ctx) + eventReader := events.NewReader() + eventReader.AzureResource = azureResource + eventReader.Period = *config.Get().Period + eventReader.Endpoint = *config.Get().Endpoint + eventReader.RequestTimeout = *config.Get().RequestTimeout + eventReader.NodeName = *config.Get().NodeName + eventReader.BeforeReading = func(ctx context.Context) error { + nodeEvent := types.EventMessage{ + Type: "Info", + Reason: "ReadEvents", + Message: "Start to listen events from Azure API", + } + if err := api.AddNodeEvent(ctx, &nodeEvent); err != nil { + return errors.Wrap(err, "error in add node event") + } + + return nil + } + + eventReader.EventReceived = func(ctx context.Context, event types.ScheduledEventsEvent) (bool, error) { + nodeEvent := types.EventMessage{ + Type: "Warning", + Reason: string(event.EventType), + Message: "Azure API sended schedule event for this node", + } + if err := api.AddNodeEvent(ctx, &nodeEvent); err != nil { + return false, errors.Wrap(err, "error in add node event") + } + + if config.Get().IsExcludedEvent(event.EventType) { + log.Infof("Excluded event %s by user config", event.EventType) + + return false, nil + } + + // send event in separate goroutine + go func() { + if err := sendEvent(ctx, event); err != nil { + log.WithError(err).Error("error in sendEvent") + } + }() + + if err := api.DrainNode(ctx, *config.Get().NodeName, string(event.EventType), event.EventId); err != nil { + return false, errors.Wrap(err, "error in DrainNode") + } + + return true, nil + } + + // run reading events in separate goroutine + if *config.Get().ExitAfterNodeDrain { + eventReader.ReadEvents(ctx) + } else { + go eventReader.ReadEvents(ctx) + } + + return nil +} + +func sendEvent(ctx context.Context, event types.ScheduledEventsEvent) error { + message, err := template.NewMessageType(ctx, *config.Get().NodeName, event) + if err != nil { + return errors.Wrap(err, "error in template.NewMessageType") + } + + log.Infof("Message: %+v", message) + + message.Template = *config.Get().AlertMessage + + if err := alert.SendTelegram(message); err != nil { + log.WithError(err).Error("error in alert.SendTelegram") + } + + if err := webhook.SendWebHook(ctx, message); err != nil { + log.WithError(err).Error("error in webhook.SendWebHook") + } return nil } diff --git a/pkg/api/api.go b/pkg/api/api.go index 10941e9..91cb169 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -35,6 +35,11 @@ import ( const taintKeyPrefix = "aks-node-termination-handler" func GetAzureResourceName(ctx context.Context, nodeName string) (string, error) { + // return user defined resource name + if len(*config.Get().ResourceName) > 0 { + return *config.Get().ResourceName, nil + } + node, err := client.GetKubernetesClient().CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return "", errors.Wrap(err, "error in Clientset.CoreV1().Nodes().Get") @@ -64,7 +69,7 @@ func DrainNode(ctx context.Context, nodeName string, eventType string, eventID s // taint node before draining if effect is NoSchedule or TaintEffectPreferNoSchedule if *config.Get().TaintNode && *config.Get().TaintEffect != string(corev1.TaintEffectNoExecute) { - err = addTaint(ctx, node, getTaintKey(eventType), eventID) + err = addTaint(ctx, node, GetTaintKey(eventType), eventID) if err != nil { return errors.Wrap(err, "failed to taint node") } @@ -98,7 +103,7 @@ func DrainNode(ctx context.Context, nodeName string, eventType string, eventID s // taint node after draining if effect is TaintEffectNoExecute // this NoExecute taint effect will stop all daemonsents on the node that can not handle this effect if *config.Get().TaintNode && *config.Get().TaintEffect == string(corev1.TaintEffectNoExecute) { - err = addTaint(ctx, node, getTaintKey(eventType), eventID) + err = addTaint(ctx, node, GetTaintKey(eventType), eventID) if err != nil { return errors.Wrap(err, "failed to taint node") } @@ -107,11 +112,13 @@ func DrainNode(ctx context.Context, nodeName string, eventType string, eventID s return nil } -func getTaintKey(eventType string) string { +func GetTaintKey(eventType string) string { return fmt.Sprintf("%s/%s", taintKeyPrefix, strings.ToLower(eventType)) } func addTaint(ctx context.Context, node *corev1.Node, taintKey string, taintValue string) error { + log.Infof("Adding taint %s=%s on node %s", taintKey, taintValue, node.Name) + freshNode := node.DeepCopy() var err error diff --git a/pkg/config/config.go b/pkg/config/config.go index 76ebe9f..c6c921e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -69,6 +69,8 @@ type Type struct { NodeGracePeriodSeconds *int GracePeriodSeconds *int DrainOnFreezeEvent *bool + ResourceName *string + ExitAfterNodeDrain *bool } var config = Type{ @@ -97,6 +99,8 @@ var config = Type{ NodeGracePeriodSeconds: flag.Int("nodeGracePeriodSeconds", defaultNodeGracePeriodSeconds, "maximum time in seconds to drain the node"), //nolint:lll GracePeriodSeconds: flag.Int("gracePeriodSeconds", defaultGracePeriodSecond, "grace period is seconds for application termination"), //nolint:lll DrainOnFreezeEvent: flag.Bool("drainOnFreezeEvent", false, "drain node on freeze event"), + ResourceName: flag.String("resource.name", "", "Azure resource name to drain"), + ExitAfterNodeDrain: flag.Bool("exitAfterNodeDrain", false, "process will exit after node drain"), } func (t *Type) GracePeriod() time.Duration { diff --git a/pkg/events/events.go b/pkg/events/events.go index 13d89ff..28d7044 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -19,41 +19,57 @@ import ( "net/http" "time" - "github.com/maksim-paskal/aks-node-termination-handler/pkg/alert" - "github.com/maksim-paskal/aks-node-termination-handler/pkg/api" "github.com/maksim-paskal/aks-node-termination-handler/pkg/cache" - "github.com/maksim-paskal/aks-node-termination-handler/pkg/config" "github.com/maksim-paskal/aks-node-termination-handler/pkg/metrics" - "github.com/maksim-paskal/aks-node-termination-handler/pkg/template" "github.com/maksim-paskal/aks-node-termination-handler/pkg/types" "github.com/maksim-paskal/aks-node-termination-handler/pkg/utils" - "github.com/maksim-paskal/aks-node-termination-handler/pkg/webhook" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) -const eventCacheTTL = 10 * time.Minute +const ( + requestTimeout = 10 * time.Second + readInterval = 5 * time.Second + eventCacheTTL = 10 * time.Minute +) var httpClient = &http.Client{ Transport: metrics.NewInstrumenter("events", false).InstrumentedRoundTripper(), } -func ReadEvents(ctx context.Context, azureResource string) { - log.Infof("Watching for resource in events %s", azureResource) +type Reader struct { + Method string + Endpoint string + RequestTimeout time.Duration + Period time.Duration + NodeName string + AzureResource string + BeforeReading func(ctx context.Context) error `json:"-"` + EventReceived func(ctx context.Context, event types.ScheduledEventsEvent) (bool, error) `json:"-"` +} - nodeEvent := types.EventMessage{ - Type: "Info", - Reason: "ReadEvents", - Message: "Start to listen events from Azure API", +func NewReader() *Reader { + return &Reader{ + Method: http.MethodGet, + Endpoint: "http://169.254.169.254/metadata/scheduledevents?api-version=2020-07-01", + RequestTimeout: requestTimeout, + Period: readInterval, } - if err := api.AddNodeEvent(ctx, &nodeEvent); err != nil { - log.WithError(err).Error() +} + +func (r *Reader) ReadEvents(ctx context.Context) { + log.Infof("Start reading events %s", r.String()) + + if r.BeforeReading != nil { + if err := r.BeforeReading(ctx); err != nil { + log.WithError(err).Error("Error in BeforeReading") + } } for ctx.Err() == nil { - stopReadingEvents, err := readEndpoint(ctx, azureResource) + stopReadingEvents, err := r.ReadEndpoint(ctx) if err != nil { - metrics.ErrorReadingEndpoint.WithLabelValues(getsharedMetricsLabels(azureResource)...).Inc() + metrics.ErrorReadingEndpoint.WithLabelValues(r.getMetricsLabels()...).Inc() log.WithError(err).Error() } @@ -64,20 +80,17 @@ func ReadEvents(ctx context.Context, azureResource string) { return } - log.Debugf("Sleep %s", *config.Get().Period) - utils.SleepWithContext(ctx, *config.Get().Period) + utils.SleepWithContext(ctx, r.Period) } } -func readEndpoint(ctx context.Context, azureResource string) (bool, error) { //nolint:cyclop,funlen,gocognit - reqCtx, cancel := context.WithTimeout(ctx, *config.Get().RequestTimeout) +func (r *Reader) getScheduledEvents(ctx context.Context) (*types.ScheduledEventsType, error) { + ctx, cancel := context.WithTimeout(ctx, r.RequestTimeout) defer cancel() - log.Debugf("read %s", *config.Get().Endpoint) - - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, *config.Get().Endpoint, nil) + req, err := http.NewRequestWithContext(ctx, r.Method, r.Endpoint, nil) if err != nil { - return false, errors.Wrap(err, "error in http.NewRequestWithContext") + return nil, errors.Wrap(err, "error in http.NewRequestWithContext") } req.Header.Add("Metadata", "true") @@ -90,7 +103,7 @@ func readEndpoint(ctx context.Context, azureResource string) (bool, error) { //n resp, err := httpClient.Do(req) if err != nil { - return false, errors.Wrap(err, "error in client.Do(req)") + return nil, errors.Wrap(err, "error in client.Do(req)") } defer resp.Body.Close() @@ -99,7 +112,7 @@ func readEndpoint(ctx context.Context, azureResource string) (bool, error) { //n body, err := io.ReadAll(resp.Body) if err != nil { - return false, errors.Wrap(err, "error in io.ReadAll") + return nil, errors.Wrap(err, "error in io.ReadAll") } log.Debugf("response body: %s", string(body)) @@ -107,20 +120,28 @@ func readEndpoint(ctx context.Context, azureResource string) (bool, error) { //n if len(body) == 0 { log.Warn("Events response is empty") - return false, nil + return &types.ScheduledEventsType{}, nil } message := types.ScheduledEventsType{} - err = json.Unmarshal(body, &message) + if err := json.Unmarshal(body, &message); err != nil { + return nil, errors.Wrap(err, "error in json.Unmarshal") + } + + return &message, nil +} + +func (r *Reader) ReadEndpoint(ctx context.Context) (bool, error) { + message, err := r.getScheduledEvents(ctx) if err != nil { - return false, errors.Wrap(err, "error in json.Unmarshal") + return false, errors.Wrap(err, "error in getScheduledEvents") } for _, event := range message.Events { - for _, r := range event.Resources { - if r == azureResource { //nolint:nestif - log.Info(string(body)) + for _, resource := range event.Resources { + if resource == r.AzureResource { + log.Infof("%+v", message) if cache.HasKey(event.EventId) { log.Infof("Event %s already processed", event.EventId) @@ -131,36 +152,11 @@ func readEndpoint(ctx context.Context, azureResource string) (bool, error) { //n // add to cache, ignore similar events for 10 minutes cache.Add(event.EventId, eventCacheTTL) - metrics.ScheduledEventsTotal.WithLabelValues(append(getsharedMetricsLabels(azureResource), string(event.EventType))...).Inc() //nolint:lll - - nodeEvent := types.EventMessage{ - Type: "Warning", - Reason: string(event.EventType), - Message: "Azure API sended schedule event for this node", - } - if err := api.AddNodeEvent(ctx, &nodeEvent); err != nil { - log.WithError(err).Error() - } - - if config.Get().IsExcludedEvent(event.EventType) { - log.Infof("Excluded event %s by user config", event.EventType) + metrics.ScheduledEventsTotal.WithLabelValues(append(r.getMetricsLabels(), string(event.EventType))...).Inc() //nolint:lll - continue + if r.EventReceived != nil { + return r.EventReceived(ctx, event) } - - // send event in separate goroutine - go func() { - if err := sendEvent(ctx, event); err != nil { - log.WithError(err).Error("error in sendEvent") - } - }() - - err = api.DrainNode(ctx, *config.Get().NodeName, string(event.EventType), event.EventId) - if err != nil { - return false, errors.Wrap(err, "error in DrainNode") - } - - return true, nil } } } @@ -168,30 +164,15 @@ func readEndpoint(ctx context.Context, azureResource string) (bool, error) { //n return false, nil } -func getsharedMetricsLabels(resourceName string) []string { +func (r *Reader) getMetricsLabels() []string { return []string{ - *config.Get().NodeName, - resourceName, + r.NodeName, + r.AzureResource, } } -func sendEvent(ctx context.Context, event types.ScheduledEventsEvent) error { - message, err := template.NewMessageType(ctx, *config.Get().NodeName, event) - if err != nil { - return errors.Wrap(err, "error in template.NewMessageType") - } - - log.Infof("Message: %+v", message) - - message.Template = *config.Get().AlertMessage - - if err := alert.SendTelegram(message); err != nil { - log.WithError(err).Error("error in alert.SendTelegram") - } - - if err := webhook.SendWebHook(ctx, message); err != nil { - log.WithError(err).Error("error in webhook.SendWebHook") - } +func (r *Reader) String() string { + b, _ := json.Marshal(r) //nolint:errchkjson - return nil + return string(b) } diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 0000000..9da35c1 --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,149 @@ +/* +Copyright paskal.maksim@gmail.com +Licensed under the Apache License, Version 2.0 (the "License") +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package events_test + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/maksim-paskal/aks-node-termination-handler/pkg/events" + "github.com/maksim-paskal/aks-node-termination-handler/pkg/types" + "github.com/maksim-paskal/aks-node-termination-handler/pkg/utils" + log "github.com/sirupsen/logrus" +) + +func TestReadingEvents(t *testing.T) { //nolint:funlen + t.Parallel() + + log.SetLevel(log.DebugLevel) + + ctx := context.TODO() + + handler := http.NewServeMux() + handler.HandleFunc("/badjson", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`!!!{"DocumentIncarnation":1,"Events":[]}`)) + }) + handler.HandleFunc("/emptyjson", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(``)) + }) + handler.HandleFunc("/timeout", func(w http.ResponseWriter, r *http.Request) { + utils.SleepWithContext(r.Context(), 5*time.Second) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(``)) + }) + handler.HandleFunc("/document", func(w http.ResponseWriter, _ *http.Request) { + message, _ := json.Marshal(types.ScheduledEventsType{ + DocumentIncarnation: 1, + Events: []types.ScheduledEventsEvent{ + { + EventId: time.Now().String(), + EventType: types.EventTypeFreeze, + ResourceType: "resourceType", + Resources: []string{"resource1", "resource2"}, + }, + }, + }) + + w.WriteHeader(http.StatusOK) + _, _ = w.Write(message) + }) + + testServer := httptest.NewServer(handler) + + t.Run("badjson", func(t *testing.T) { + t.Parallel() + + eventReader := events.NewReader() + eventReader.Endpoint = testServer.URL + "/badjson" + + if _, err := eventReader.ReadEndpoint(ctx); err == nil { + t.Error("expected error") + } + }) + + t.Run("badhttp", func(t *testing.T) { + t.Parallel() + + eventReader := events.NewReader() + eventReader.Method = "bad method" + eventReader.Endpoint = "fake://fake" + + if _, err := eventReader.ReadEndpoint(ctx); err == nil { + t.Error("expected error") + } + + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + eventReader.ReadEvents(ctx) + }) + + t.Run("emptyjson", func(t *testing.T) { + t.Parallel() + + eventReader := events.NewReader() + eventReader.Endpoint = testServer.URL + "/emptyjson" + + if _, err := eventReader.ReadEndpoint(ctx); err != nil { + t.Error(err) + } + }) + + t.Run("timeout", func(t *testing.T) { + t.Parallel() + + eventReader := events.NewReader() + eventReader.Endpoint = testServer.URL + "/timeout" + eventReader.RequestTimeout = 1 * time.Second + + if _, err := eventReader.ReadEndpoint(ctx); !errors.Is(err, context.DeadlineExceeded) { + t.Error(err) + } + }) + + t.Run("document", func(t *testing.T) { + t.Parallel() + + receivedDocument := types.ScheduledEventsEvent{} + + eventReader := events.NewReader() + eventReader.Endpoint = testServer.URL + "/document" + eventReader.AzureResource = "resource1" + eventReader.BeforeReading = func(_ context.Context) error { + return errors.New("error in BeforeReading") //nolint:goerr113 + } + eventReader.EventReceived = func(_ context.Context, event types.ScheduledEventsEvent) (bool, error) { + receivedDocument = event + + return true, nil + } + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + eventReader.ReadEvents(ctx) + + t.Logf("%+v", receivedDocument) + + if receivedDocument.EventId == "" { + t.Error("unexpected event id") + } + }) +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 16d3649..0419b08 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -15,9 +15,13 @@ package utils import ( "context" "time" + + log "github.com/sirupsen/logrus" ) func SleepWithContext(ctx context.Context, d time.Duration) { + log.Debugf("Sleep %s", d) + select { case <-ctx.Done(): return