Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.5.11] Added typer to store #402

Merged
merged 1 commit into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {
}

Schemas.MustImportAndCustomize(&version, Foo{}, func(schema *types.Schema) {
if err := crdFactory.AssignStores(context.Background(), types.DefaultStorageContext, schema); err != nil {
if err := crdFactory.AssignStores(context.Background(), types.DefaultStorageContext, nil, schema); err != nil {
panic(err)
}
})
Expand Down
48 changes: 0 additions & 48 deletions go.sum

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions store/crd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (f *Factory) BatchWait() error {
return f.eg.Wait()
}

func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.StorageContext, schemas *types.Schemas, version *types.APIVersion, schemaIDs ...string) {
func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.StorageContext, typer proxy.StoreTyper, schemas *types.Schemas, version *types.APIVersion, schemaIDs ...string) {
f.eg.Go(func() error {
var schemasToCreate []*types.Schema

Expand All @@ -56,7 +56,7 @@ func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.Stor
schemasToCreate = append(schemasToCreate, s)
}

err := f.AssignStores(ctx, storageContext, schemasToCreate...)
err := f.AssignStores(ctx, storageContext, typer, schemasToCreate...)
if err != nil {
return fmt.Errorf("creating CRD store %v", err)
}
Expand All @@ -65,7 +65,7 @@ func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.Stor
})
}

func (f *Factory) AssignStores(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) error {
func (f *Factory) AssignStores(ctx context.Context, storageContext types.StorageContext, typer proxy.StoreTyper, schemas ...*types.Schema) error {
schemaStatus, err := f.CreateCRDs(ctx, storageContext, schemas...)
if err != nil {
return err
Expand All @@ -79,6 +79,7 @@ func (f *Factory) AssignStores(ctx context.Context, storageContext types.Storage

schema.Store = proxy.NewProxyStore(ctx, f.ClientGetter,
storageContext,
typer,
[]string{"apis"},
crd.Spec.Group,
// Even if CRD is created as v1beta1, it's served as v1 with a single element in Versions
Expand Down
85 changes: 66 additions & 19 deletions store/proxy/proxy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (s *simpleClientGetter) APIExtClient(apiContext *types.APIContext, context
return s.apiExtClient, nil
}

type StoreTyper interface {
runtime.ObjectConvertor
runtime.ObjectCreater
}

type Store struct {
sync.Mutex

Expand All @@ -98,10 +103,17 @@ type Store struct {
authContext map[string]string
close context.Context
broadcasters map[rest.Interface]*broadcast.Broadcaster
typer StoreTyper
}

func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContext types.StorageContext,
func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContext types.StorageContext, typer StoreTyper,
prefix []string, group, version, kind, resourcePlural string) types.Store {

// Default to an empty scheme, all types will default to generic
if typer == nil {
typer = runtime.NewScheme()
}

return &errorStore{
Store: &Store{
clientGetter: clientGetter,
Expand All @@ -117,6 +129,7 @@ func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContex
},
close: ctx,
broadcasters: map[rest.Interface]*broadcast.Broadcaster{},
typer: typer,
},
}
}
Expand Down Expand Up @@ -201,16 +214,20 @@ func (s *Store) Context() types.StorageContext {
}

func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) ([]map[string]interface{}, error) {
var resultList unstructured.UnstructuredList
result := []map[string]interface{}{}

// if there are no namespaces field in options, a single request is made
if opt == nil || opt.Namespaces == nil {
ns := getNamespace(apiContext, opt)
list, err := s.retryList(ns, apiContext)
resultList := s.getListStruct()

err := s.retryList(ns, apiContext, resultList)
if err != nil {
return nil, err
}
resultList = *list

collectionResults, _ := s.collectionFromInternal(resultList, apiContext, schema)
result = append(result, collectionResults...)
} else {
var (
errGroup errgroup.Group
Expand All @@ -221,13 +238,16 @@ func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty
for _, ns := range allNS {
nsCopy := ns
errGroup.Go(func() error {
list, err := s.retryList(nsCopy, apiContext)
resultList := s.getListStruct()

err := s.retryList(nsCopy, apiContext, resultList)
if err != nil {
return err
}

mux.Lock()
resultList.Items = append(resultList.Items, list.Items...)
collectionResults, _ := s.collectionFromInternal(resultList, apiContext, schema)
result = append(result, collectionResults...)
mux.Unlock()

return nil
Expand All @@ -238,38 +258,30 @@ func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty
}
}

var result []map[string]interface{}

for _, obj := range resultList.Items {
result = append(result, s.fromInternal(apiContext, schema, obj.Object))
}

return apiContext.AccessControl.FilterList(apiContext, schema, result, s.authContext), nil
}

func (s *Store) retryList(namespace string, apiContext *types.APIContext) (*unstructured.UnstructuredList, error) {
var resultList *unstructured.UnstructuredList
func (s *Store) retryList(namespace string, apiContext *types.APIContext, resultList runtime.Object) error {
k8sClient, err := s.k8sClient(apiContext)
if err != nil {
return nil, err
return err
}

for i := 0; i < 3; i++ {
req := s.common(namespace, k8sClient.Get())
start := time.Now()
resultList = &unstructured.UnstructuredList{}
err = req.Do(apiContext.Request.Context()).Into(resultList)
logrus.Tracef("LIST: %v, %v", time.Now().Sub(start), s.resourcePlural)
if err != nil {
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
logrus.Infof("Error on LIST %v: %v. Attempt: %v. Retrying", s.resourcePlural, err, i+1)
continue
}
return resultList, err
return err
}
return resultList, err
return err
}
return resultList, err
return err
}

func (s *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
Expand Down Expand Up @@ -568,3 +580,38 @@ func (s *Store) fromInternal(apiContext *types.APIContext, schema *types.Schema,

return data
}

// getListStruct returns a runtime object for storing results from list requests. If the Store's scheme does not return
// a type for the resource associated with the store, a generic type will be used.
func (s *Store) getListStruct() runtime.Object {
// try to find the list type for this store
obj, err := s.typer.New(schema.GroupVersionKind{
Group: s.group,
Version: s.version,
Kind: s.kind + "List",
})
// if we cannot get the specific type default to a generic parser
if err != nil {
logrus.Debugf("Falling back to generic list type for [%s]: %v", s.kind, err)
return new(unstructured.UnstructuredList)
}

return obj
}

// collectionFromInternal maps a collection runtime object to an array of maps.
func (s *Store) collectionFromInternal(list runtime.Object, apiContext *types.APIContext, schema *types.Schema) ([]map[string]interface{}, error) {
var ul unstructured.UnstructuredList

err := s.typer.Convert(list, &ul, nil)
if err != nil {
return nil, err
}

results := make([]map[string]interface{}, len(ul.Items))
for i := range ul.Items {
results[i] = s.fromInternal(apiContext, schema, ul.Items[i].Object)
}

return results, nil
}
175 changes: 172 additions & 3 deletions store/proxy/proxy_store_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package proxy

import (
"net/http"
"testing"

"bytes"
"encoding/json"
"github.com/rancher/norman/authorization"
"github.com/rancher/norman/types"
"github.com/stretchr/testify/assert"
"io/ioutil"
v1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
"net/http"
"sync"
"testing"
)

func TestGetDeletionOptions(t *testing.T) {
Expand All @@ -29,3 +40,161 @@ func TestGetDeletionOptions(t *testing.T) {
assert.Empty(t, err)
assert.Equal(t, options, expected, "unexpected deletion options for query 'gracePeriodSeconds=0'")
}

func TestList(t *testing.T) {

var data = v1.ConfigMapList{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMapList",
APIVersion: "v1",
},
ListMeta: metav1.ListMeta{
ResourceVersion: "v1",
RemainingItemCount: new(int64),
},
Items: []v1.ConfigMap{
{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test1",
Namespace: "default",
},
Immutable: new(bool),
Data: map[string]string{
"a": "av",
"b": "bv",
"c": "cv",
},
},
{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
Namespace: "default",
},
Immutable: new(bool),
Data: map[string]string{
"a2": "av",
"b2": "bv",
"c2": "cv",
},
},
{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test3",
Namespace: "default",
},
Immutable: new(bool),
Data: map[string]string{
"a3": "av",
"b3": "bv",
"c3": "cv",
},
},
},
}

clientGetter := mockClientGetter{
&fake.RESTClient{
NegotiatedSerializer: serializer.NewCodecFactory(runtime.NewScheme()),
},
}

typer := runtime.NewScheme()

var sut = &Store{
Mutex: sync.Mutex{},
clientGetter: &clientGetter,
group: "",
version: "v1",
kind: "ConfigMap",
resourcePlural: "configmaps",
typer: typer,
}

schema := types.Schema{
Mapper: types.Mappers{},
}

req, _ := http.NewRequest(http.MethodGet, "", nil)
apiContext := types.APIContext{
Request: req,
AccessControl: &authorization.AllAccess{},
}

// no results
{
body := data
body.Items = nil
var fakeResponse bytes.Buffer
_ = json.NewEncoder(&fakeResponse).Encode(body)
clientGetter.RESTClient.Resp = &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(&fakeResponse),
}

res, err := sut.List(&apiContext, &schema, &types.QueryOptions{})

assert.NoError(t, err)
assert.IsType(t, []map[string]interface{}{}, res)
assert.Len(t, res, 0)
}

// generic type
{
body := data
var fakeResponse bytes.Buffer
_ = json.NewEncoder(&fakeResponse).Encode(body)
clientGetter.RESTClient.Resp = &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(&fakeResponse),
}

res, err := sut.List(&apiContext, &schema, &types.QueryOptions{})

assert.NoError(t, err)
assert.IsType(t, []map[string]interface{}{}, res)
assert.Len(t, res, 3)
}

_ = v1.SchemeBuilder.AddToScheme(typer)

// specific type
{
body := data
var fakeResponse bytes.Buffer
_ = json.NewEncoder(&fakeResponse).Encode(body)
clientGetter.RESTClient.Resp = &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(&fakeResponse),
}

res, err := sut.List(&apiContext, &schema, &types.QueryOptions{})

assert.NoError(t, err)
assert.IsType(t, []map[string]interface{}{}, res)
assert.Len(t, res, 3)
}
}

type mockClientGetter struct {
*fake.RESTClient
}

func (m mockClientGetter) UnversionedClient(_ *types.APIContext, _ types.StorageContext) (rest.Interface, error) {
return m.RESTClient, nil
}

func (m mockClientGetter) APIExtClient(_ *types.APIContext, _ types.StorageContext) (clientset.Interface, error) {
return nil, nil
}
Loading