Skip to content

Commit

Permalink
added typer to store
Browse files Browse the repository at this point in the history
  • Loading branch information
paynejacob committed Oct 22, 2021
1 parent afd06f5 commit aaca61a
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 72 deletions.
2 changes: 1 addition & 1 deletion example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {
}

Schemas.MustImportAndCustomize(&version, Foo{}, func(schema *types.Schema) {
if err := crdFactory.AssignStores(context.Background(), types.DefaultStorageContext, schema); err != nil {
if err := crdFactory.AssignStores(context.Background(), types.DefaultStorageContext, nil, schema); err != nil {
panic(err)
}
})
Expand Down
48 changes: 0 additions & 48 deletions go.sum

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions store/crd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (f *Factory) BatchWait() error {
return f.eg.Wait()
}

func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.StorageContext, schemas *types.Schemas, version *types.APIVersion, schemaIDs ...string) {
func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.StorageContext, typer proxy.StoreTyper, schemas *types.Schemas, version *types.APIVersion, schemaIDs ...string) {
f.eg.Go(func() error {
var schemasToCreate []*types.Schema

Expand All @@ -56,7 +56,7 @@ func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.Stor
schemasToCreate = append(schemasToCreate, s)
}

err := f.AssignStores(ctx, storageContext, schemasToCreate...)
err := f.AssignStores(ctx, storageContext, typer, schemasToCreate...)
if err != nil {
return fmt.Errorf("creating CRD store %v", err)
}
Expand All @@ -65,7 +65,7 @@ func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.Stor
})
}

func (f *Factory) AssignStores(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) error {
func (f *Factory) AssignStores(ctx context.Context, storageContext types.StorageContext, typer proxy.StoreTyper, schemas ...*types.Schema) error {
schemaStatus, err := f.CreateCRDs(ctx, storageContext, schemas...)
if err != nil {
return err
Expand All @@ -79,6 +79,7 @@ func (f *Factory) AssignStores(ctx context.Context, storageContext types.Storage

schema.Store = proxy.NewProxyStore(ctx, f.ClientGetter,
storageContext,
typer,
[]string{"apis"},
crd.Spec.Group,
// Even if CRD is created as v1beta1, it's served as v1 with a single element in Versions
Expand Down
87 changes: 68 additions & 19 deletions store/proxy/proxy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (s *simpleClientGetter) APIExtClient(apiContext *types.APIContext, context
return s.apiExtClient, nil
}

type StoreTyper interface {
runtime.ObjectConvertor
runtime.ObjectCreater
}

type Store struct {
sync.Mutex

Expand All @@ -98,10 +103,17 @@ type Store struct {
authContext map[string]string
close context.Context
broadcasters map[rest.Interface]*broadcast.Broadcaster
typer StoreTyper
}

func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContext types.StorageContext,
func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContext types.StorageContext, typer StoreTyper,
prefix []string, group, version, kind, resourcePlural string) types.Store {

// Default to an empty scheme, all types will default to generic
if typer == nil {
typer = runtime.NewScheme()
}

return &errorStore{
Store: &Store{
clientGetter: clientGetter,
Expand All @@ -117,6 +129,7 @@ func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContex
},
close: ctx,
broadcasters: map[rest.Interface]*broadcast.Broadcaster{},
typer: typer,
},
}
}
Expand Down Expand Up @@ -201,16 +214,19 @@ func (s *Store) Context() types.StorageContext {
}

func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) ([]map[string]interface{}, error) {
var resultList unstructured.UnstructuredList
result := []map[string]interface{}{}

// if there are no namespaces field in options, a single request is made
if opt == nil || opt.Namespaces == nil {
ns := getNamespace(apiContext, opt)
list, err := s.retryList(ns, apiContext)
resultList := s.getListStruct()

err := s.retryList(ns, apiContext, resultList)
if err != nil {
return nil, err
}
resultList = *list

result = append(result, s.collectionFromInternal(resultList, apiContext, schema)...)
} else {
var (
errGroup errgroup.Group
Expand All @@ -221,13 +237,15 @@ func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty
for _, ns := range allNS {
nsCopy := ns
errGroup.Go(func() error {
list, err := s.retryList(nsCopy, apiContext)
resultList := s.getListStruct()

err := s.retryList(nsCopy, apiContext, resultList)
if err != nil {
return err
}

mux.Lock()
resultList.Items = append(resultList.Items, list.Items...)
result = append(result, s.collectionFromInternal(resultList, apiContext, schema)...)
mux.Unlock()

return nil
Expand All @@ -238,38 +256,30 @@ func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty
}
}

var result []map[string]interface{}

for _, obj := range resultList.Items {
result = append(result, s.fromInternal(apiContext, schema, obj.Object))
}

return apiContext.AccessControl.FilterList(apiContext, schema, result, s.authContext), nil
}

func (s *Store) retryList(namespace string, apiContext *types.APIContext) (*unstructured.UnstructuredList, error) {
var resultList *unstructured.UnstructuredList
func (s *Store) retryList(namespace string, apiContext *types.APIContext, resultList runtime.Object) error {
k8sClient, err := s.k8sClient(apiContext)
if err != nil {
return nil, err
return err
}

for i := 0; i < 3; i++ {
req := s.common(namespace, k8sClient.Get())
start := time.Now()
resultList = &unstructured.UnstructuredList{}
err = req.Do(apiContext.Request.Context()).Into(resultList)
logrus.Tracef("LIST: %v, %v", time.Now().Sub(start), s.resourcePlural)
if err != nil {
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
logrus.Infof("Error on LIST %v: %v. Attempt: %v. Retrying", s.resourcePlural, err, i+1)
continue
}
return resultList, err
return err
}
return resultList, err
return err
}
return resultList, err
return err
}

func (s *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
Expand Down Expand Up @@ -568,3 +578,42 @@ func (s *Store) fromInternal(apiContext *types.APIContext, schema *types.Schema,

return data
}

// getListStruct returns a runtime object for storing results from list requests. If the Store's scheme does not return
// a type for the resource associated with the store, a generic type will be used.
func (s *Store) getListStruct() runtime.Object {
// try to find the list type for this store
obj, err := s.typer.New(schema.GroupVersionKind{
Group: s.group,
Version: s.version,
Kind: s.kind + "List",
})
// if we cannot get the specific type default to a generic parser
if err != nil {
logrus.Debugf("Falling back to generic list type for [%s]", s.kind)
return new(unstructured.UnstructuredList)
}

return obj
}

// collectionFromInternal maps a collection runtime object to an array of maps.
func (s *Store) collectionFromInternal(list runtime.Object, apiContext *types.APIContext, schema *types.Schema) []map[string]interface{} {
var ul unstructured.UnstructuredList

// Since we are converting to the generic version of a collection any collection type will be convertable. If a non
// collection type is passed to this method the line below will return an error and the method will return an empty
// result set. There is no way to handle this error returned because it is only returned if this method is used
// improperly. We swallow the error here to prevent having to check it in the caller.
err := s.typer.Convert(list, &ul, nil)
if err != nil {
logrus.Errorf("Failed to covert collection. Did you use getListStruct? %v", err)
}

results := make([]map[string]interface{}, len(ul.Items))
for i := range ul.Items {
results[i] = s.fromInternal(apiContext, schema, ul.Items[i].Object)
}

return results
}
172 changes: 171 additions & 1 deletion store/proxy/proxy_store_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
package proxy

import (
"bytes"
"encoding/json"
"github.com/rancher/norman/authorization"
"github.com/rancher/norman/types"
"io/ioutil"
v1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
"net/http"
"sync"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetDeletionOptions(t *testing.T) {
Expand All @@ -29,3 +41,161 @@ func TestGetDeletionOptions(t *testing.T) {
assert.Empty(t, err)
assert.Equal(t, options, expected, "unexpected deletion options for query 'gracePeriodSeconds=0'")
}

func TestList(t *testing.T) {

var data = v1.ConfigMapList{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMapList",
APIVersion: "v1",
},
ListMeta: metav1.ListMeta{
ResourceVersion: "v1",
RemainingItemCount: new(int64),
},
Items: []v1.ConfigMap{
{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test1",
Namespace: "default",
},
Immutable: new(bool),
Data: map[string]string{
"a": "av",
"b": "bv",
"c": "cv",
},
},
{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
Namespace: "default",
},
Immutable: new(bool),
Data: map[string]string{
"a2": "av",
"b2": "bv",
"c2": "cv",
},
},
{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test3",
Namespace: "default",
},
Immutable: new(bool),
Data: map[string]string{
"a3": "av",
"b3": "bv",
"c3": "cv",
},
},
},
}

clientGetter := mockClientGetter{
&fake.RESTClient{
NegotiatedSerializer: serializer.NewCodecFactory(runtime.NewScheme()),
},
}

typer := runtime.NewScheme()

var sut = &Store{
Mutex: sync.Mutex{},
clientGetter: &clientGetter,
group: "",
version: "v1",
kind: "ConfigMap",
resourcePlural: "configmaps",
typer: typer,
}

schema := types.Schema{
Mapper: types.Mappers{},
}

req, _ := http.NewRequest(http.MethodGet, "", nil)
apiContext := types.APIContext{
Request: req,
AccessControl: &authorization.AllAccess{},
}

// no results
{
body := data
body.Items = nil
var fakeResponse bytes.Buffer
_ = json.NewEncoder(&fakeResponse).Encode(body)
clientGetter.RESTClient.Resp = &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(&fakeResponse),
}

res, err := sut.List(&apiContext, &schema, &types.QueryOptions{})

assert.NoError(t, err)
assert.IsType(t, []map[string]interface{}{}, res)
assert.Len(t, res, 0)
}

// generic type
{
body := data
var fakeResponse bytes.Buffer
_ = json.NewEncoder(&fakeResponse).Encode(body)
clientGetter.RESTClient.Resp = &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(&fakeResponse),
}

res, err := sut.List(&apiContext, &schema, &types.QueryOptions{})

assert.NoError(t, err)
assert.IsType(t, []map[string]interface{}{}, res)
assert.Len(t, res, 3)
}

_ = v1.SchemeBuilder.AddToScheme(typer)

// specific type
{
body := data
var fakeResponse bytes.Buffer
_ = json.NewEncoder(&fakeResponse).Encode(body)
clientGetter.RESTClient.Resp = &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(&fakeResponse),
}

res, err := sut.List(&apiContext, &schema, &types.QueryOptions{})

assert.NoError(t, err)
assert.IsType(t, []map[string]interface{}{}, res)
assert.Len(t, res, 3)
}
}

type mockClientGetter struct {
*fake.RESTClient
}

func (m mockClientGetter) UnversionedClient(_ *types.APIContext, _ types.StorageContext) (rest.Interface, error) {
return m.RESTClient, nil
}

func (m mockClientGetter) APIExtClient(_ *types.APIContext, _ types.StorageContext) (clientset.Interface, error) {
return nil, nil
}
Loading

0 comments on commit aaca61a

Please sign in to comment.