diff --git a/cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go b/cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go index 9120596d6c13..232023bc89a5 100644 --- a/cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go +++ b/cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go @@ -115,6 +115,9 @@ type AutoscalingGceClient interface { ResizeMig(GceRef, int64) error DeleteInstances(migRef GceRef, instances []GceRef) error CreateInstances(GceRef, string, int64, []string) error + + // extension point for new operations + WaitForOperation(operationName, operationType, project, zone string) error } type autoscalingGceClientV1 struct { @@ -123,39 +126,34 @@ type autoscalingGceClientV1 struct { projectId string domainUrl string - // These can be overridden, e.g. for testing. - operationWaitTimeout time.Duration - operationPollInterval time.Duration - operationDeletionPollInterval time.Duration + // Can be overridden, e.g. for testing. + operationWaitTimeout time.Duration } // NewAutoscalingGceClientV1WithTimeout creates a new client with custom timeouts // for communicating with GCE v1 API -func NewAutoscalingGceClientV1WithTimeout(client *http.Client, projectId string, userAgent string, - waitTimeout, pollInterval, deletionPollInterval time.Duration) (*autoscalingGceClientV1, error) { +func NewAutoscalingGceClientV1WithTimeout(client *http.Client, projectId string, userAgent string, waitTimeout time.Duration) (*autoscalingGceClientV1, error) { gceService, err := gce.New(client) if err != nil { return nil, err } gceService.UserAgent = userAgent + return &autoscalingGceClientV1{ - projectId: projectId, - gceService: gceService, - operationWaitTimeout: waitTimeout, - operationPollInterval: pollInterval, - operationDeletionPollInterval: deletionPollInterval, + projectId: projectId, + gceService: gceService, + operationWaitTimeout: waitTimeout, }, nil } // NewAutoscalingGceClientV1 creates a new client for communicating with GCE v1 API. func NewAutoscalingGceClientV1(client *http.Client, projectId string, userAgent string) (*autoscalingGceClientV1, error) { - return NewAutoscalingGceClientV1WithTimeout(client, projectId, userAgent, defaultOperationWaitTimeout, defaultOperationPollInterval, defaultOperationDeletionPollInterval) + return NewAutoscalingGceClientV1WithTimeout(client, projectId, userAgent, defaultOperationWaitTimeout) } // NewCustomAutoscalingGceClientV1 creates a new client using custom server url and timeouts // for communicating with GCE v1 API. -func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, userAgent, domainUrl string, - waitTimeout, pollInterval time.Duration, deletionPollInterval time.Duration) (*autoscalingGceClientV1, error) { +func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, userAgent, domainUrl string, waitTimeout time.Duration) (*autoscalingGceClientV1, error) { gceService, err := gce.New(client) if err != nil { return nil, err @@ -164,12 +162,10 @@ func NewCustomAutoscalingGceClientV1(client *http.Client, projectId, serverUrl, gceService.UserAgent = userAgent return &autoscalingGceClientV1{ - projectId: projectId, - gceService: gceService, - domainUrl: domainUrl, - operationWaitTimeout: waitTimeout, - operationPollInterval: pollInterval, - operationDeletionPollInterval: deletionPollInterval, + projectId: projectId, + gceService: gceService, + domainUrl: domainUrl, + operationWaitTimeout: waitTimeout, }, nil } @@ -240,7 +236,7 @@ func (client *autoscalingGceClientV1) ResizeMig(migRef GceRef, size int64) error if err != nil { return err } - return client.waitForOp(op, migRef.Project, migRef.Zone, false) + return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone) } func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName string, delta int64, existingInstanceProviderIds []string) error { @@ -257,7 +253,7 @@ func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName st if err != nil { return err } - return client.waitForOp(op, migRef.Project, migRef.Zone, false) + return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone) } func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool { @@ -274,32 +270,31 @@ func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool { return instanceNames } -func (client *autoscalingGceClientV1) waitForOp(operation *gce.Operation, project, zone string, isDeletion bool) error { - pollInterval := client.operationPollInterval - if isDeletion { - pollInterval = client.operationDeletionPollInterval - } - for start := time.Now(); time.Since(start) < client.operationWaitTimeout; time.Sleep(pollInterval) { - klog.V(4).Infof("Waiting for operation %s %s %s", project, zone, operation.Name) - registerRequest("zone_operations", "get") - if op, err := client.gceService.ZoneOperations.Get(project, zone, operation.Name).Do(); err == nil { - klog.V(4).Infof("Operation %s %s %s status: %s", project, zone, operation.Name, op.Status) - if op.Status == "DONE" { - if op.Error != nil { - errBytes, err := op.Error.MarshalJSON() - if err != nil { - errBytes = []byte(fmt.Sprintf("operation failed, but error couldn't be recovered: %v", err)) - } - return fmt.Errorf("error while getting operation %s on %s: %s", operation.Name, operation.TargetLink, errBytes) - } +func (client *autoscalingGceClientV1) WaitForOperation(operationName, operationType, project, zone string) error { + ctx, cancel := context.WithTimeout(context.TODO(), client.operationWaitTimeout) + defer cancel() + + for { + klog.V(4).Infof("Waiting for operation %s (%s/%s/%s)", operationName, project, zone, operationType) + registerRequest("zone_operations", "wait") + op, err := client.gceService.ZoneOperations.Wait(project, zone, operationName).Context(ctx).Do() + if err != nil { + return fmt.Errorf("error while waiting for operation %s: %w", operationName, err) + } - return nil + klog.V(4).Infof("Operation %s (%s/%s/%s) status: %s", operationName, project, zone, operationType, op.Status) + if op.Status == "DONE" { + if op.Error != nil { + errBytes, err := op.Error.MarshalJSON() + if err != nil { + errBytes = []byte(fmt.Sprintf("operation failed, but error couldn't be recovered: %v", err)) + } + return fmt.Errorf("error while waiting for operation %s: %s", op.Name, errBytes) } - } else { - klog.Warningf("Error while getting operation %s on %s: %v", operation.Name, operation.TargetLink, err) + + return nil } } - return fmt.Errorf("timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink) } func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances []GceRef) error { @@ -315,7 +310,7 @@ func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances [ if err != nil { return err } - return client.waitForOp(op, migRef.Project, migRef.Zone, true) + return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone) } func (client *autoscalingGceClientV1) FetchMigInstances(migRef GceRef) ([]GceInstance, error) { diff --git a/cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go b/cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go index ad44678404c2..9b22500a2ffe 100644 --- a/cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go +++ b/cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go @@ -17,6 +17,7 @@ limitations under the License. package gce import ( + "context" "encoding/json" "fmt" "net/http" @@ -92,15 +93,11 @@ func TestWaitForOp(t *testing.T) { defer server.Close() g := newTestAutoscalingGceClient(t, "project1", server.URL, "") - g.operationPollInterval = 1 * time.Millisecond - g.operationWaitTimeout = 500 * time.Millisecond - - server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationRunningResponse).Times(3) - server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationDoneResponse).Once() - + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationRunningResponse).Times(3) + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationDoneResponse).Once() operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"} - err := g.waitForOp(operation, projectId, zoneB, false) + err := g.WaitForOperation(operation.Name, "TestWaitForOp", projectId, zoneB) assert.NoError(t, err) mock.AssertExpectationsForObjects(t, server) } @@ -110,11 +107,10 @@ func TestWaitForOpError(t *testing.T) { defer server.Close() g := newTestAutoscalingGceClient(t, "project1", server.URL, "") - server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationDoneResponseError).Once() - + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationDoneResponseError).Once() operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"} - err := g.waitForOp(operation, projectId, zoneB, false) + err := g.WaitForOperation(operation.Name, "TestWaitForOpError", projectId, zoneB) assert.Error(t, err) } @@ -123,21 +119,28 @@ func TestWaitForOpTimeout(t *testing.T) { defer server.Close() g := newTestAutoscalingGceClient(t, "project1", server.URL, "") - // The values here are higher than in other tests since we're aiming for timeout. - // Lower values make this fragile and flakey. - g.operationPollInterval = 10 * time.Millisecond - g.operationWaitTimeout = 49 * time.Millisecond - - // Sometimes, only 3 calls are made, but it doesn't really matter, - // so let's not assert expectations for this mock, just check for timeout error. - server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return(operationRunningResponse).Times(5) + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return(operationRunningResponse).Once() operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"} - err := g.waitForOp(operation, projectId, zoneB, false) + err := g.WaitForOperation(operation.Name, "TestWaitForOpTimeout", projectId, zoneB) assert.Error(t, err) } +func TestWaitForOpContextTimeout(t *testing.T) { + server := test_util.NewHttpServerMock() + defer server.Close() + g := newTestAutoscalingGceClient(t, "project1", server.URL, "") + + g.operationWaitTimeout = 1 * time.Millisecond + + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").After(50 * time.Millisecond).Return(operationDoneResponse).Once() + operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"} + + err := g.WaitForOperation(operation.Name, "TestWaitForOpContextTimeout", projectId, zoneB) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} + func TestErrors(t *testing.T) { const instanceUrl = "https://content.googleapis.com/compute/v1/projects/myprojid/zones/myzone/instances/myinst" server := test_util.NewHttpServerMock() @@ -553,12 +556,8 @@ func TestUserAgent(t *testing.T) { defer server.Close() g := newTestAutoscalingGceClient(t, "project1", server.URL, "testuseragent") - g.operationPollInterval = 10 * time.Millisecond - g.operationWaitTimeout = 49 * time.Millisecond - - server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197").Return("testuseragent", operationRunningResponse).Maybe() - + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505728466148-d16f5197/wait").Return("testuseragent", operationRunningResponse).Maybe() operation := &gce_api.Operation{Name: "operation-1505728466148-d16f5197"} - g.waitForOp(operation, projectId, zoneB, false) + g.WaitForOperation(operation.Name, "TestUserAgent", projectId, zoneB) } diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go index 953c9789bf28..886fad8d62b1 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go @@ -329,7 +329,6 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa // Override wait for op timeouts. gceService.operationWaitTimeout = 50 * time.Millisecond - gceService.operationPollInterval = 1 * time.Millisecond cache := &GceCache{ migs: make(map[GceRef]Mig), @@ -473,7 +472,7 @@ func TestDeleteInstances(t *testing.T) { server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Once() server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/deleteInstances").Return(deleteInstancesResponse).Once() - server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa").Return(deleteInstancesOperationResponse).Once() + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa/wait").Return(deleteInstancesOperationResponse).Once() instances := []GceRef{ { @@ -583,7 +582,7 @@ func TestGetAndSetMigSize(t *testing.T) { // set target size for extraPoolMig; will require resize API call and API call for polling for resize operation server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%s/resize", extraPoolMigName)).Return(setMigSizeResponse).Once() - server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505739408819-5597646964339-eb839c88-28805931").Return(setMigSizeOperationResponse).Once() + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1505739408819-5597646964339-eb839c88-28805931/wait").Return(setMigSizeOperationResponse).Once() err = g.SetMigSize(extraPoolMig, 4) assert.NoError(t, err) mock.AssertExpectationsForObjects(t, server) @@ -1509,7 +1508,7 @@ func TestAppendInstances(t *testing.T) { defaultPoolMig := setupTestDefaultPool(g, true) server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Once() server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%v/createInstances", defaultPoolMig.gceRef.Name)).Return(createInstancesResponse).Once() - server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32").Return(createInstancesOperationResponse).Once() + server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32/wait").Return(createInstancesOperationResponse).Once() err := g.CreateInstances(defaultPoolMig, 2) assert.NoError(t, err) mock.AssertExpectationsForObjects(t, server) diff --git a/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go b/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go index 6c2ea6dc75e6..e6b36d75eba9 100644 --- a/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go +++ b/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go @@ -121,6 +121,10 @@ func (client *mockAutoscalingGceClient) CreateInstances(_ GceRef, _ string, _ in return nil } +func (client *mockAutoscalingGceClient) WaitForOperation(_, _, _, _ string) error { + return nil +} + func TestFillMigInstances(t *testing.T) { migRef := GceRef{Project: "test", Zone: "zone-A", Name: "some-mig"} oldInstances := []GceInstance{