Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: enhance e2e debuggability by capturing resource changes and storing them as artifacts #281

Merged
merged 10 commits into from
Sep 23, 2024
11 changes: 10 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ jobs:
with:
name: controller-logs-${{matrix.datalossprevention}}
path: |
tests/e2e/output/
tests/e2e/output/controllers/
retention-days: 7

- name: Archive resource changes
uses: actions/upload-artifact@v4
if: failure()
with:
name: resource-changes-${{matrix.datalossprevention}}
path: |
tests/e2e/output/resources/
retention-days: 7

1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ test: codegen fmt vet envtest ## Run tests.
test-e2e: codegen fmt vet envtest ## Run e2e tests.
GOFLAGS="-count=1" go test -v ./tests/e2e/...



GOLANGCI_LINT = $(shell pwd)/bin/golangci-lint
GOLANGCI_LINT_VERSION ?= v1.58.0
Expand Down
19 changes: 17 additions & 2 deletions tests/e2e/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"sync"
"time"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -39,19 +40,33 @@ var (
monoVertexRolloutClient planepkg.MonoVertexRolloutInterface
kubeClient clientgo.Interface

wg sync.WaitGroup
stopCh chan struct{}

dataLossPrevention string
)

const (
Namespace = "numaplane-system"

NumaplaneCtrlLogs = "output/numaplane-controller.log"
NumaflowCtrlLogs = "output/numaflow-controller.log"
ControllerOutputPath = "output/controllers"
ResourceChangesOutputPath = "output/resources"

NumaplaneAPIVersion = "numaplane.numaproj.io/v1alpha1"
NumaflowAPIVersion = "numaflow.numaproj.io/v1alpha1"

NumaplaneLabel = "app.kubernetes.io/part-of=numaplane"
NumaflowLabel = "app.kubernetes.io/part-of=numaflow, app.kubernetes.io/component=controller-manager"
)

type Output struct {
APIVersion string `json:"apiVersion"`
Kind string `json:"kind"`
Metadata metav1.ObjectMeta `json:"metadata"`
Spec interface{} `json:"spec"`
Status interface{} `json:"status,omitempty"`
}

// document for Ginkgo framework and print to console
func document(testName string) {
snapshotCluster(testName)
Expand Down
21 changes: 21 additions & 0 deletions tests/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ var _ = Describe("Functional e2e", Serial, func() {
return err
}, testTimeout, testPollingInterval).Should(Succeed())

wg.Add(1)
go watchNumaflowControllerRollout()

verifyNumaflowControllerRolloutReady()

verifyNumaflowControllerReady(Namespace)
Expand All @@ -201,6 +204,12 @@ var _ = Describe("Functional e2e", Serial, func() {
return err
}, testTimeout, testPollingInterval).Should(Succeed())

wg.Add(1)
go watchISBServiceRollout()

wg.Add(1)
go watchISBService()

verifyISBSvcRolloutReady(isbServiceRolloutName)

verifyISBSvcReady(Namespace, isbServiceRolloutName, 3)
Expand All @@ -219,12 +228,18 @@ var _ = Describe("Functional e2e", Serial, func() {
return err
}, testTimeout, testPollingInterval).Should(Succeed())

wg.Add(1)
go watchPipelineRollout()

document("Verifying that the Pipeline was created")
verifyPipelineSpec(Namespace, pipelineRolloutName, func(retrievedPipelineSpec numaflowv1.PipelineSpec) bool {
return len(pipelineSpec.Vertices) == 2 // TODO: make less kludgey
//return reflect.DeepEqual(pipelineSpec, retrievedPipelineSpec) // this may have had some false negatives due to "lifecycle" field maybe, or null values in one
})

wg.Add(1)
go watchPipeline()

verifyPipelineRolloutReady(pipelineRolloutName)

verifyPipelineReady(Namespace, pipelineRolloutName, 2)
Expand All @@ -248,6 +263,12 @@ var _ = Describe("Functional e2e", Serial, func() {
return monoVertexSpec.Source != nil
})

wg.Add(1)
go watchMonoVertexRollout()

wg.Add(1)
go watchMonoVertex()

verifyMonoVertexRolloutReady(monoVertexRolloutName)

verifyMonoVertexReady(Namespace, monoVertexRolloutName)
Expand Down
102 changes: 102 additions & 0 deletions tests/e2e/isbservice.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package e2e

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/yaml"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/retry"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -126,3 +131,100 @@ func updateISBServiceRolloutInK8S(name string, f func(apiv1.ISBServiceRollout) (
})
Expect(err).ShouldNot(HaveOccurred())
}

func watchISBServiceRollout() {

defer wg.Done()
watcher, err := isbServiceRolloutClient.Watch(context.Background(), metav1.ListOptions{})
if err != nil {
fmt.Printf("Failed to start watcher: %v\n", err)
return
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesOutputPath, "isbservice_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
if rollout, ok := event.Object.(*apiv1.ISBServiceRollout); ok {
rollout.ManagedFields = nil
rl := Output{
APIVersion: NumaplaneAPIVersion,
Kind: "ISBServiceRollout",
Metadata: rollout.ObjectMeta,
Spec: rollout.Spec,
Status: rollout.Status,
}
bytes, _ := yaml.Marshal(rl)
updateLog := fmt.Sprintf("ISBServiceRollout update time: %v\n%s\n", time.Now().Format(time.RFC3339), string(bytes))
_, err = file.WriteString(updateLog)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
}
case <-stopCh:
return
}
}
}

func watchISBService() {

defer wg.Done()
watcher, err := dynamicClient.Resource(getGVRForISBService()).Namespace(Namespace).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
fmt.Printf("Failed to start watcher: %v\n", err)
return
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesOutputPath, "isbservice.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
if obj, ok := event.Object.(*unstructured.Unstructured); ok {
isbsvc := numaflowv1.InterStepBufferService{}
err = util.StructToStruct(&obj, &isbsvc)
if err != nil {
fmt.Printf("Failed to convert unstruct: %v\n", err)
return
}
isbsvc.ManagedFields = nil
output := Output{
APIVersion: NumaflowAPIVersion,
Kind: "InterStepBufferService",
Metadata: isbsvc.ObjectMeta,
Spec: isbsvc.Spec,
Status: isbsvc.Status,
}
bytes, _ := yaml.Marshal(output)
updateLog := fmt.Sprintf("ISBService update time: %v\nSpec: %s\n", time.Now().Format(time.RFC3339), string(bytes))
_, err = file.WriteString(updateLog)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
}
case <-stopCh:
return
}
}

}
103 changes: 103 additions & 0 deletions tests/e2e/monovertex.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package e2e

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/yaml"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/util"
Expand Down Expand Up @@ -125,3 +131,100 @@ func updateMonoVertexRolloutInK8S(name string, f func(apiv1.MonoVertexRollout) (
})
Expect(err).ShouldNot(HaveOccurred())
}

func watchMonoVertexRollout() {

defer wg.Done()
watcher, err := monoVertexRolloutClient.Watch(context.Background(), metav1.ListOptions{})
if err != nil {
fmt.Printf("Failed to start watcher: %v\n", err)
return
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesOutputPath, "monovertex_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
if rollout, ok := event.Object.(*apiv1.MonoVertexRollout); ok {
rollout.ManagedFields = nil
rl := Output{
APIVersion: NumaplaneAPIVersion,
Kind: "MonoVertexRollout",
Metadata: rollout.ObjectMeta,
Spec: rollout.Spec,
Status: rollout.Status,
}
bytes, _ := yaml.Marshal(rl)
updateLog := fmt.Sprintf("MonoVertexRollout update time: %v\n%s\n", time.Now().Format(time.RFC3339), string(bytes))
_, err = file.WriteString(updateLog)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
}
case <-stopCh:
return
}
}
}

func watchMonoVertex() {

defer wg.Done()
watcher, err := dynamicClient.Resource(getGVRForMonoVertex()).Namespace(Namespace).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
fmt.Printf("Failed to start watcher: %v\n", err)
return
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesOutputPath, "monovertex.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
if obj, ok := event.Object.(*unstructured.Unstructured); ok {
mvtx := numaflowv1.MonoVertex{}
err = util.StructToStruct(&obj, &mvtx)
if err != nil {
fmt.Printf("Failed to convert unstruct: %v\n", err)
return
}
mvtx.ManagedFields = nil
output := Output{
APIVersion: NumaflowAPIVersion,
Kind: "MonoVertex",
Metadata: mvtx.ObjectMeta,
Spec: mvtx.Spec,
Status: mvtx.Status,
}
bytes, _ := yaml.Marshal(output)
updateLog := fmt.Sprintf("MonoVertex update time: %v\nSpec: %s\n", time.Now().Format(time.RFC3339), string(bytes))
_, err = file.WriteString(updateLog)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
}
case <-stopCh:
return
}
}

}
Loading
Loading