Skip to content

Commit

Permalink
Cherry-Pick Retry commit in master (#452)
Browse files Browse the repository at this point in the history
* fix: retry on unauthorized error when retrieving resources by gvk (#449)

* fix: retry on unauthorized when retrieving resources by gvk

Signed-off-by: Leonardo Luz Almeida <[email protected]>

* add test case to validate retry is just invoked if error is Unauthorized

Signed-off-by: Leonardo Luz Almeida <[email protected]>

Signed-off-by: Leonardo Luz Almeida <[email protected]>

* Fix merge conflict

Signed-off-by: Leonardo Luz Almeida <[email protected]>

* Fix lint

Signed-off-by: Leonardo Luz Almeida <[email protected]>

* Fix lint

Signed-off-by: Leonardo Luz Almeida <[email protected]>

Signed-off-by: Leonardo Luz Almeida <[email protected]>
  • Loading branch information
leoluz authored Aug 19, 2022
1 parent a56a803 commit 9970fab
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 1 deletion.
26 changes: 25 additions & 1 deletion pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2/klogr"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/openapi"
Expand Down Expand Up @@ -697,9 +699,31 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
task.liveObj = sc.liveObj(task.targetObj)
}

isRetryable := func(err error) bool {
return apierr.IsUnauthorized(err)
}

serverResCache := make(map[schema.GroupVersionKind]*metav1.APIResource)

// check permissions
for _, task := range tasks {
serverRes, err := kube.ServerResourceForGroupVersionKind(sc.disco, task.groupVersionKind(), "get")

var serverRes *metav1.APIResource
var err error

if val, ok := serverResCache[task.groupVersionKind()]; ok {
serverRes = val
err = nil
} else {
err = retry.OnError(retry.DefaultRetry, isRetryable, func() error {
serverRes, err = kube.ServerResourceForGroupVersionKind(sc.disco, task.groupVersionKind(), "get")
return err
})
if serverRes != nil {
serverResCache[task.groupVersionKind()] = serverRes
}
}

if err != nil {
// Special case for custom resources: if CRD is not yet known by the K8s API server,
// and the CRD is part of this sync or the resource is annotated with SkipDryRunOnMissingResource=true,
Expand Down
189 changes: 189 additions & 0 deletions pkg/sync/sync_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ import (
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"net/http"
"net/http/httptest"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery"
fakedisco "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/rest"
Expand All @@ -24,6 +28,7 @@ import (

"github.com/argoproj/gitops-engine/pkg/diff"
"github.com/argoproj/gitops-engine/pkg/health"
"github.com/argoproj/gitops-engine/pkg/sync/common"
synccommon "github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"github.com/argoproj/gitops-engine/pkg/utils/kube/kubetest"
Expand Down Expand Up @@ -463,6 +468,190 @@ func TestSyncPruneFailure(t *testing.T) {
assert.Equal(t, "foo", result.Message)
}

type APIServerMock struct {
calls int
errorStatus int
errorBody []byte
}

func (s *APIServerMock) newHttpServer(t *testing.T, apiFailuresCount int) *httptest.Server {
t.Helper()
stable := metav1.APIResourceList{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod"},
{Name: "services", Namespaced: true, Kind: "Service", Verbs: v1.Verbs{"get"}},
{Name: "namespaces", Namespaced: false, Kind: "Namespace"},
},
}

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
s.calls++
if s.calls <= apiFailuresCount {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(s.errorStatus)
w.Write(s.errorBody) // nolint:errcheck
return
}
var list interface{}
switch req.URL.Path {
case "/api/v1":
list = &stable
case "/apis/v1":
list = &stable
default:
t.Logf("unexpected request: %s", req.URL.Path)
w.WriteHeader(http.StatusNotFound)
return
}

output, err := json.Marshal(list)
if err != nil {
t.Errorf("unexpected encoding error: %v", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(output) // nolint:errcheck
}))
return server
}

func TestServerResourcesRetry(t *testing.T) {
type fixture struct {
apiServerMock *APIServerMock
httpServer *httptest.Server
syncCtx *syncContext
}
setup := func(t *testing.T, apiFailuresCount int) *fixture {
t.Helper()
syncCtx := newTestSyncCtx(nil, WithOperationSettings(false, false, false, true))

unauthorizedStatus := &metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusUnauthorized,
Reason: metav1.StatusReasonUnauthorized,
Message: "some error",
}
unauthorizedJSON, err := json.Marshal(unauthorizedStatus)
if err != nil {
t.Errorf("unexpected encoding error while marshaling unauthorizedStatus: %v", err)
return nil
}
server := &APIServerMock{
errorStatus: http.StatusUnauthorized,
errorBody: unauthorizedJSON,
}
httpServer := server.newHttpServer(t, apiFailuresCount)

syncCtx.disco = discovery.NewDiscoveryClientForConfigOrDie(&rest.Config{Host: httpServer.URL})
testSvc := NewService()
testSvc.SetName("test-service")
testSvc.SetNamespace(FakeArgoCDNamespace)
syncCtx.resources = groupResources(ReconciliationResult{
Live: []*unstructured.Unstructured{testSvc, testSvc, testSvc, testSvc},
Target: []*unstructured.Unstructured{testSvc, testSvc, testSvc, testSvc},
})
return &fixture{
apiServerMock: server,
httpServer: httpServer,
syncCtx: syncCtx,
}

}
type testCase struct {
desc string
apiFailureCount int
apiErrorHTTPStatus int
expectedAPICalls int
expectedResources int
expectedPhase synccommon.OperationPhase
expectedMessage string
}
testCases := []testCase{
{
desc: "will return success when no api failure",
apiFailureCount: 0,
expectedAPICalls: 1,
expectedResources: 1,
expectedPhase: common.OperationSucceeded,
expectedMessage: "success",
},
{
desc: "will return success after 1 api failure attempt",
apiFailureCount: 1,
expectedAPICalls: 2,
expectedResources: 1,
expectedPhase: common.OperationSucceeded,
expectedMessage: "success",
},
{
desc: "will return success after 2 api failure attempt",
apiFailureCount: 2,
expectedAPICalls: 3,
expectedResources: 1,
expectedPhase: common.OperationSucceeded,
expectedMessage: "success",
},
{
desc: "will return success after 3 api failure attempt",
apiFailureCount: 3,
expectedAPICalls: 4,
expectedResources: 1,
expectedPhase: common.OperationSucceeded,
expectedMessage: "success",
},
{
desc: "will return success after 4 api failure attempt",
apiFailureCount: 4,
expectedAPICalls: 5,
expectedResources: 1,
expectedPhase: common.OperationSucceeded,
expectedMessage: "success",
},
{
desc: "will fail after 5 api failure attempt",
apiFailureCount: 5,
expectedAPICalls: 5,
expectedResources: 1,
expectedPhase: common.OperationFailed,
expectedMessage: "not valid",
},
{
desc: "will not retry if returned error is different than Unauthorized",
apiErrorHTTPStatus: http.StatusConflict,
apiFailureCount: 1,
expectedAPICalls: 1,
expectedResources: 1,
expectedPhase: common.OperationFailed,
expectedMessage: "not valid",
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
// Given
t.Parallel()
fixture := setup(t, tc.apiFailureCount)
defer fixture.httpServer.Close()
if tc.apiErrorHTTPStatus != 0 {
fixture.apiServerMock.errorStatus = tc.apiErrorHTTPStatus
}

// When
fixture.syncCtx.Sync()
phase, msg, resources := fixture.syncCtx.GetState()

// Then
assert.Equal(t, tc.expectedAPICalls, fixture.apiServerMock.calls, "api calls mismatch")
assert.Len(t, resources, tc.expectedResources, "resources len mismatch")
assert.Contains(t, msg, tc.expectedMessage, "expected message mismatch")
require.Equal(t, tc.expectedPhase, phase, "expected phase mismatch")
require.Len(t, fixture.syncCtx.syncRes, 1, "sync result len mismatch")
})
}
}

func TestDoNotSyncOrPruneHooks(t *testing.T) {
syncCtx := newTestSyncCtx(nil, WithOperationSettings(false, false, false, true))
targetPod := NewPod()
Expand Down

0 comments on commit 9970fab

Please sign in to comment.