Skip to content

Commit

Permalink
fix conflicts for getting node by local storage in yurthub filters
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch committed Jun 18, 2023
1 parent c3cf90b commit 1ef7e11
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 399 deletions.
2 changes: 1 addition & 1 deletion cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}
tenantNs := util.ParseTenantNsFromOrgs(options.YurtHubCertOrganizations)
registerInformers(options, sharedFactory, yurtSharedFactory, workingMode, tenantNs)
filterManager, err := manager.NewFilterManager(options, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, us[0].Host)
filterManager, err := manager.NewFilterManager(options, sharedFactory, yurtSharedFactory, proxiedClient, serializerManager, us[0].Host)
if err != nil {
klog.Errorf("could not create filter manager, %v", err)
return nil, err
Expand Down
11 changes: 4 additions & 7 deletions pkg/yurthub/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"sync"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -274,15 +275,11 @@ func CreateFilterChain(objFilters []ObjectFilter) ObjectFilter {
}

func (chain filterChain) Name() string {
var name string
var names []string
for i := range chain {
if len(name) == 0 {
name = chain[i].Name()
} else {
name = "," + chain[i].Name()
}
names = append(names, chain[i].Name())
}
return name
return strings.Join(names, ",")
}

func (chain filterChain) SupportedResourceAndVerbs() map[string]sets.String {
Expand Down
36 changes: 11 additions & 25 deletions pkg/yurthub/filter/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package initializer

import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"

"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"
)

Expand All @@ -45,59 +44,46 @@ type WantsNodePoolName interface {
SetNodePoolName(nodePoolName string) error
}

// WantsStorageWrapper is an interface for setting StorageWrapper
type WantsStorageWrapper interface {
SetStorageWrapper(s cachemanager.StorageWrapper) error
}

// WantsMasterServiceAddr is an interface for setting mutated master service address
type WantsMasterServiceAddr interface {
SetMasterServiceHost(host string) error
SetMasterServicePort(port string) error
}

// WantsWorkingMode is an interface for setting working mode
type WantsWorkingMode interface {
SetWorkingMode(mode util.WorkingMode) error
// WantsKubeClient is an interface for setting kube client
type WantsKubeClient interface {
SetKubeClient(client kubernetes.Interface) error
}

// genericFilterInitializer is responsible for initializing generic filter
type genericFilterInitializer struct {
factory informers.SharedInformerFactory
yurtFactory yurtinformers.SharedInformerFactory
storageWrapper cachemanager.StorageWrapper
nodeName string
nodePoolName string
masterServiceHost string
masterServicePort string
workingMode util.WorkingMode
client kubernetes.Interface
}

// New creates an filterInitializer object
func New(factory informers.SharedInformerFactory,
yurtFactory yurtinformers.SharedInformerFactory,
sw cachemanager.StorageWrapper,
nodeName, nodePoolName, masterServiceHost, masterServicePort string,
workingMode util.WorkingMode) *genericFilterInitializer {
kubeClient kubernetes.Interface,
nodeName, nodePoolName, masterServiceHost, masterServicePort string) *genericFilterInitializer {
return &genericFilterInitializer{
factory: factory,
yurtFactory: yurtFactory,
storageWrapper: sw,
nodeName: nodeName,
nodePoolName: nodePoolName,
masterServiceHost: masterServiceHost,
masterServicePort: masterServicePort,
workingMode: workingMode,
client: kubeClient,
}
}

// Initialize used for executing filter initialization
func (fi *genericFilterInitializer) Initialize(ins filter.ObjectFilter) error {
if wants, ok := ins.(WantsWorkingMode); ok {
if err := wants.SetWorkingMode(fi.workingMode); err != nil {
return err
}
}

if wants, ok := ins.(WantsNodeName); ok {
if err := wants.SetNodeName(fi.nodeName); err != nil {
return err
Expand Down Expand Up @@ -132,8 +118,8 @@ func (fi *genericFilterInitializer) Initialize(ins filter.ObjectFilter) error {
}
}

if wants, ok := ins.(WantsStorageWrapper); ok {
if err := wants.SetStorageWrapper(fi.storageWrapper); err != nil {
if wants, ok := ins.(WantsKubeClient); ok {
if err := wants.SetKubeClient(fi.client); err != nil {
return err
}
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/inclusterconfig"
Expand All @@ -47,8 +47,8 @@ type Manager struct {
func NewFilterManager(options *options.YurtHubOptions,
sharedFactory informers.SharedInformerFactory,
yurtSharedFactory yurtinformers.SharedInformerFactory,
proxiedClient kubernetes.Interface,
serializerManager *serializer.SerializerManager,
storageWrapper cachemanager.StorageWrapper,
apiserverAddr string) (*Manager, error) {
if !options.EnableResourceFilter {
return nil, nil
Expand All @@ -70,7 +70,7 @@ func NewFilterManager(options *options.YurtHubOptions,
}
}

objFilters, err := createObjectFilters(filters, sharedFactory, yurtSharedFactory, storageWrapper, util.WorkingMode(options.WorkingMode), options.NodeName, options.NodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort)
objFilters, err := createObjectFilters(filters, sharedFactory, yurtSharedFactory, proxiedClient, options.NodeName, options.NodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,14 +114,13 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,
func createObjectFilters(filters *filter.Filters,
sharedFactory informers.SharedInformerFactory,
yurtSharedFactory yurtinformers.SharedInformerFactory,
storageWrapper cachemanager.StorageWrapper,
workingMode util.WorkingMode,
proxiedClient kubernetes.Interface,
nodeName, nodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort string) ([]filter.ObjectFilter, error) {
if filters == nil {
return nil, nil
}

genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, storageWrapper, nodeName, nodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort, workingMode)
genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, proxiedClient, nodeName, nodePoolName, mutatedMasterServiceHost, mutatedMasterServicePort)
initializerChain := filter.Initializers{}
initializerChain = append(initializerChain, genericInitializer)
return filters.NewFromFilters(initializerChain)
Expand Down
54 changes: 20 additions & 34 deletions pkg/yurthub/filter/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,18 @@ import (
"k8s.io/client-go/kubernetes/fake"

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy/util"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
yurtfake "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned/fake"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"
)

func TestFindResponseFilter(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeYurtClient := &yurtfake.Clientset{}
sharedFactory, yurtSharedFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour),
yurtinformers.NewSharedInformerFactory(fakeYurtClient, 24*time.Hour)
serializerManager := serializer.NewSerializerManager()
storageManager, err := disk.NewDiskStorage("/tmp/filter_manager")
if err != nil {
t.Fatalf("could not create storage manager, %v", err)
}
storageWrapper := cachemanager.NewStorageWrapper(storageManager)
apiserverAddr := "127.0.0.1:6443"
stopper := make(chan struct{})
defer close(stopper)
sharedFactory.Start(stopper)
yurtSharedFactory.Start(stopper)

testcases := map[string]struct {
enableResourceFilter bool
Expand All @@ -67,7 +54,7 @@ func TestFindResponseFilter(t *testing.T) {
path string
mgrIsNil bool
isFound bool
names []string
names sets.String
}{
"disable resource filter": {
enableResourceFilter: false,
Expand All @@ -81,17 +68,17 @@ func TestFindResponseFilter(t *testing.T) {
verb: "GET",
path: "/api/v1/services",
isFound: true,
names: []string{"masterservice"},
names: sets.NewString("masterservice"),
},
"get discard cloud service filter": {
"get discard cloud service and node port isolation filter": {
enableResourceFilter: true,
accessServerThroughHub: true,
enableDummyIf: true,
userAgent: "kube-proxy",
verb: "GET",
path: "/api/v1/services",
isFound: true,
names: []string{"discardcloudservice"},
names: sets.NewString("discardcloudservice", "nodeportisolation"),
},
"get service topology filter": {
enableResourceFilter: true,
Expand All @@ -101,7 +88,7 @@ func TestFindResponseFilter(t *testing.T) {
verb: "GET",
path: "/api/v1/endpoints",
isFound: true,
names: []string{"servicetopology"},
names: sets.NewString("servicetopology"),
},
"disable service topology filter": {
enableResourceFilter: true,
Expand All @@ -120,7 +107,8 @@ func TestFindResponseFilter(t *testing.T) {
userAgent: "kube-proxy",
verb: "GET",
path: "/api/v1/services",
isFound: false,
isFound: true,
names: sets.NewString("nodeportisolation"),
},
}

Expand All @@ -140,14 +128,19 @@ func TestFindResponseFilter(t *testing.T) {
}
options.DisabledResourceFilters = append(options.DisabledResourceFilters, tt.disabledResourceFilters...)

mgr, _ := NewFilterManager(options, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, apiserverAddr)
if tt.mgrIsNil && mgr != nil {
t.Errorf("expect manager is nil, but got not nil: %v", mgr)
} else {
// mgr is nil, complete this test case
sharedFactory, yurtSharedFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour),
yurtinformers.NewSharedInformerFactory(fakeYurtClient, 24*time.Hour)
stopper := make(chan struct{})
defer close(stopper)

mgr, _ := NewFilterManager(options, sharedFactory, yurtSharedFactory, fakeClient, serializerManager, apiserverAddr)
if tt.mgrIsNil && mgr == nil {
return
}

sharedFactory.Start(stopper)
yurtSharedFactory.Start(stopper)

req, err := http.NewRequest(tt.verb, tt.path, nil)
if err != nil {
t.Errorf("failed to create request, %v", err)
Expand All @@ -168,20 +161,13 @@ func TestFindResponseFilter(t *testing.T) {
handler = filters.WithRequestInfo(handler, resolver)
handler.ServeHTTP(httptest.NewRecorder(), req)

if isFound != tt.isFound {
t.Errorf("expect isFound %v, but got %v", tt.isFound, isFound)
if !tt.isFound && isFound == tt.isFound {
return
}

names := strings.Split(responseFilter.Name(), ",")
if len(tt.names) != len(names) {
t.Errorf("expect filter names %v, but got %v", tt.names, names)
}

for i := range tt.names {
if tt.names[i] != names[i] {
t.Errorf("expect filter names %v, but got %v", tt.names, names)
}
if !tt.names.Equal(sets.NewString(names...)) {
t.Errorf("expect filter names %v, but got %v", tt.names.List(), names)
}
})
}
Expand Down
Loading

0 comments on commit 1ef7e11

Please sign in to comment.