From ab5907c12f373ff94de193ac6d73b52328859c7a Mon Sep 17 00:00:00 2001 From: gfichtenholt Date: Tue, 6 Jul 2021 00:46:17 -0700 Subject: [PATCH] Michael's feedback #4 --- .../plugins/fluxv2/packages/v1alpha1/cache.go | 12 ++++++++++-- .../plugins/fluxv2/packages/v1alpha1/main.go | 1 - .../plugins/fluxv2/packages/v1alpha1/server.go | 3 ++- .../plugins/fluxv2/packages/v1alpha1/server_test.go | 7 +------ 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache.go index 180b1322d0d..4a1120d19bd 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache.go @@ -228,13 +228,18 @@ func (c *ResourceWatcherCache) onAddOrModify(add bool, unstructuredObj map[strin var funcName string var value interface{} var setVal bool + // Define an actual type so you can use it in your interface earlier also, as well as below: + type CacheSetter func(string, map[string]interface{}) (interface{}, bool, error) + + var addOrModify CacheSetter if add { funcName = "OnAdd" - value, setVal, err = c.config.onAdd(*key, unstructuredObj) + addOrModify = c.config.onAdd } else { funcName = "OnModify" - value, setVal, err = c.config.onModify(*key, unstructuredObj) + addOrModify = c.config.onModify } + value, setVal, err = addOrModify(*key, unstructuredObj) if err != nil { log.Errorf("Invokation of [%s] for object %s\nfailed due to: %v", funcName, prettyPrintMap(unstructuredObj), err) return @@ -340,6 +345,7 @@ func (c *ResourceWatcherCache) fetchCachedObjects(requestItems []unstructured.Un wg.Add(1) go func() { for job := range requestChan { + // The following loop will only terminate when the request channel is closed (and there are no more items) result, err := c.fetchForOne(job.key) responseChan <- fetchValueJobResult{result, err} } @@ -364,6 +370,8 @@ func (c *ResourceWatcherCache) fetchCachedObjects(requestItems []unstructured.Un }() // Start receiving results + // The following loop will only terminate when the response channel is closed, i.e. + // after the all the requests have been processed for resp := range responseChan { if resp.err == nil { // resp.result may be nil when there is a cache miss diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/main.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/main.go index d0d6c43d74d..7a29df4e6aa 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/main.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/main.go @@ -28,7 +28,6 @@ import ( // returning the server implementation. func RegisterWithGRPCServer(s grpc.ServiceRegistrar, clientGetter server.KubernetesClientGetter) (interface{}, error) { log.Infof("+fluxv2 RegisterWithGRPCServer") - // TODO (gfichtenholt) return an error when func signature is changed to allow for it svr, err := NewServer(clientGetter) if err != nil { return nil, err diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go index d9288ec379c..d6b9748f31f 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go @@ -154,7 +154,8 @@ func (s *Server) GetPackageRepositories(ctx context.Context, request *v1alpha1.G func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *corev1.GetAvailablePackageSummariesRequest) (*corev1.GetAvailablePackageSummariesResponse, error) { log.Infof("+fluxv2 GetAvailablePackageSummaries(request: [%v])", request) - if request != nil && request.Context != nil && request.Context.Cluster != "" { + // grpc compiles in getters for you which automatically return a default (empty) struct if the pointer was nil + if request != nil && request.GetContext().GetCluster() != "" { return nil, status.Errorf( codes.Unimplemented, "Not supported yet: request.Context.Cluster: [%v]", diff --git a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go index 10a4bdd0755..2fad35ae314 100644 --- a/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go +++ b/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go @@ -361,7 +361,6 @@ func TestGetAvailablePackageSummaries(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { repos := []runtime.Object{} - httpServers := []*httptest.Server{} for _, rs := range tc.testRepos { indexYAMLBytes, err := ioutil.ReadFile(rs.index) @@ -373,7 +372,7 @@ func TestGetAvailablePackageSummaries(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, string(indexYAMLBytes)) })) - httpServers = append(httpServers, ts) + defer ts.Close() repoSpec := map[string]interface{}{ "url": rs.url, @@ -412,10 +411,6 @@ func TestGetAvailablePackageSummaries(t *testing.T) { if got, want := response.AvailablePackagesSummaries, tc.expectedPackages; !cmp.Equal(got, want, opt1, opt2) { t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opt1, opt2)) } - - for _, ts := range httpServers { - ts.Close() - } }) } }