Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/v1.45] Anexia Provider: Utilize Creating state instead of blocking Create call #1536

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions pkg/cloudprovider/provider/anexia/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package anexia

import (
"encoding/json"
"net/http"
"testing"

"github.com/anexia-it/go-anxcloud/pkg/vsphere/search"
"github.com/gophercloud/gophercloud/testhelper"
"github.com/kubermatic/machine-controller/pkg/apis/cluster/v1alpha1"
anxtypes "github.com/kubermatic/machine-controller/pkg/cloudprovider/provider/anexia/types"
Expand Down Expand Up @@ -63,27 +61,6 @@ func getSpecsForValidationTest(t *testing.T, configCases []ConfigTestCase) []Val
return testCases
}

func createSearchHandler(t *testing.T, iterations int) http.HandlerFunc {
counter := 0
return func(writer http.ResponseWriter, request *http.Request) {
test := request.URL.Query().Get("name")
testhelper.AssertEquals(t, "%-TestMachine", test)
testhelper.TestMethod(t, request, http.MethodGet)
if iterations == counter {
encoder := json.NewEncoder(writer)
testhelper.AssertNoErr(t, encoder.Encode(map[string]interface{}{
"data": []search.VM{
{
Name: "543053-TestMachine",
Identifier: TestIdentifier,
},
},
}))
}
counter++
}
}

func newConfigVarString(str string) types.ConfigVarString {
return types.ConfigVarString{
Value: str,
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/provider/anexia/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

type anexiaInstance struct {
isCreating bool
info *info.Info
reservedAddresses []string
}
Expand Down Expand Up @@ -81,6 +82,10 @@ func (ai *anexiaInstance) Addresses() map[string]v1.NodeAddressType {
}

func (ai *anexiaInstance) Status() instance.Status {
if ai.isCreating {
return instance.StatusCreating
}

if ai.info != nil {
if ai.info.Status == anxtypes.MachinePoweredOn {
return instance.StatusRunning
Expand Down
85 changes: 32 additions & 53 deletions pkg/cloudprovider/provider/anexia/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/anexia-it/go-anxcloud/pkg/vsphere/provisioning/progress"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

anxclient "github.com/anexia-it/go-anxcloud/pkg/client"
anxaddr "github.com/anexia-it/go-anxcloud/pkg/ipam/address"
Expand All @@ -48,6 +48,7 @@ import (

"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
)

const (
Expand Down Expand Up @@ -116,16 +117,6 @@ func (p *provider) Create(machine *clusterv1alpha1.Machine, data *cloudprovidert
retErr = anxtypes.NewMultiError(retErr, updateMachineStatus(machine, status, data.Update))
}()

// check whether machine is already provisioning
if isAlreadyProvisioning(ctx) && status.ProvisioningID == "" {
klog.Info("ongoing provisioning detected")
err := waitForVM(ctx, client)
if err != nil {
return nil, err
}
return p.Get(machine, data)
}

// provision machine
err = provisionVM(ctx, client)
if err != nil {
Expand All @@ -134,33 +125,6 @@ func (p *provider) Create(machine *clusterv1alpha1.Machine, data *cloudprovidert
return p.Get(machine, data)
}

func waitForVM(ctx context.Context, client anxclient.Client) error {
reconcileContext := getReconcileContext(ctx)
api := vsphere.NewAPI(client)
var identifier string
err := wait.PollImmediate(5*time.Second, 1*time.Minute, func() (bool, error) {
klog.V(2).Info("checking for VM with name ", reconcileContext.Machine.Name)
vms, err := api.Search().ByName(ctx, fmt.Sprintf("%%-%s", reconcileContext.Machine.Name))
if err != nil {
return false, nil
}
if len(vms) < 1 {
return false, nil
}
if len(vms) > 1 {
return false, errors.New("too many VMs returned by search")
}
identifier = vms[0].Identifier
return true, nil
})
if err != nil {
return err
}

reconcileContext.Status.InstanceID = identifier
return updateMachineStatus(reconcileContext.Machine, *reconcileContext.Status, reconcileContext.ProviderData.Update)
}

func provisionVM(ctx context.Context, client anxclient.Client) error {
reconcileContext := getReconcileContext(ctx)
vmAPI := vsphere.NewAPI(client)
Expand Down Expand Up @@ -230,15 +194,6 @@ func provisionVM(ctx context.Context, client anxclient.Client) error {
klog.V(2).Info(fmt.Sprintf("Using provisionID from machine '%s' to await completion",
reconcileContext.Machine.Name))

instanceID, err := vmAPI.Provisioning().Progress().AwaitCompletion(ctx, status.ProvisioningID)
if err != nil {
klog.Errorf("failed to await machine completion '%s'", reconcileContext.Machine.Name)
// something went wrong remove provisioning ID, so we can start from scratch
status.ProvisioningID = ""
return newError(common.CreateMachineError, "instance provisioning failed: %v", err)
}

status.InstanceID = instanceID
meta.SetStatusCondition(&status.Conditions, v1.Condition{
Type: ProvisionedType,
Status: v1.ConditionTrue,
Expand All @@ -249,6 +204,8 @@ func provisionVM(ctx context.Context, client anxclient.Client) error {
return updateMachineStatus(reconcileContext.Machine, *status, reconcileContext.ProviderData.Update)
}

var _engsup3404mutex sync.Mutex

func getIPAddress(ctx context.Context, client anxclient.Client) (string, error) {
reconcileContext := getReconcileContext(ctx)
status := reconcileContext.Status
Expand All @@ -259,6 +216,9 @@ func getIPAddress(ctx context.Context, client anxclient.Client) (string, error)
return status.ReservedIP, nil
}

_engsup3404mutex.Lock()
defer _engsup3404mutex.Unlock()

klog.Info(fmt.Sprintf("Creating a new IP for machine %q", reconcileContext.Machine.Name))
addrAPI := anxaddr.NewAPI(client)
config := reconcileContext.Config
Expand Down Expand Up @@ -448,7 +408,7 @@ func (p *provider) Validate(machinespec clusterv1alpha1.MachineSpec) error {
return nil
}

func (p *provider) Get(machine *clusterv1alpha1.Machine, _ *cloudprovidertypes.ProviderData) (instance.Instance, error) {
func (p *provider) Get(machine *clusterv1alpha1.Machine, pd *cloudprovidertypes.ProviderData) (instance.Instance, error) {
config, _, err := p.getConfig(machine.Spec.ProviderSpec)
if err != nil {
return nil, newError(common.InvalidConfigurationMachineError, "failed to retrieve config: %v", err)
Expand All @@ -465,19 +425,38 @@ func (p *provider) Get(machine *clusterv1alpha1.Machine, _ *cloudprovidertypes.P
return nil, newError(common.InvalidConfigurationMachineError, "failed to get machine status: %v", err)
}

if status.InstanceID == "" {
if status.InstanceID == "" && status.ProvisioningID == "" {
return nil, cloudprovidererrors.ErrInstanceNotFound
}

ctx, cancel := context.WithTimeout(context.Background(), anxtypes.GetRequestTimeout)
defer cancel()

if status.InstanceID == "" {
progress, err := vsphereAPI.Provisioning().Progress().Get(ctx, status.ProvisioningID)
if err != nil {
return nil, fmt.Errorf("failed to get provisioning progress: %w", err)
}
if len(progress.Errors) > 0 {
return nil, fmt.Errorf("vm provisioning had errors: %s", strings.Join(progress.Errors, ","))
}
if progress.Progress < 100 || progress.VMIdentifier == "" {
return &anexiaInstance{isCreating: true}, nil
}

status.InstanceID = progress.VMIdentifier

if err := updateMachineStatus(machine, status, pd.Update); err != nil {
return nil, fmt.Errorf("failed updating machine status: %w", err)
}
}

instance := anexiaInstance{}

if status.IPState == anxtypes.IPStateBound && status.ReservedIP != "" {
instance.reservedAddresses = []string{status.ReservedIP}
}

ctx, cancel := context.WithTimeout(context.Background(), anxtypes.GetRequestTimeout)
defer cancel()

info, err := vsphereAPI.Info().Get(ctx, status.InstanceID)
if err != nil {
return nil, fmt.Errorf("failed get machine info: %w", err)
Expand Down
33 changes: 0 additions & 33 deletions pkg/cloudprovider/provider/anexia/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,39 +47,6 @@ func TestAnexiaProvider(t *testing.T) {
server.Close()
})

t.Run("Test waiting for VM", func(t *testing.T) {
t.Parallel()

waitUntilVMIsFound := 2
testhelper.Mux.HandleFunc("/api/vsphere/v1/search/by_name.json", createSearchHandler(t, waitUntilVMIsFound))

providerStatus := anxtypes.ProviderStatus{}
ctx := createReconcileContext(reconcileContext{
Machine: &v1alpha1.Machine{
ObjectMeta: metav1.ObjectMeta{Name: "TestMachine"},
},
Status: &providerStatus,
UserData: "",
Config: resolvedConfig{},

ProviderData: &cloudprovidertypes.ProviderData{
Update: func(m *clusterv1alpha1.Machine, mod ...cloudprovidertypes.MachineModifier) error {
return nil
},
},
})

err := waitForVM(ctx, client)
if err != nil {
t.Fatal("No error was expected", err)

}

if providerStatus.InstanceID != TestIdentifier {
t.Errorf("Excpected InstanceID to be set")
}
})

t.Run("Test provision VM", func(t *testing.T) {
t.Parallel()
testhelper.Mux.HandleFunc("/api/ipam/v1/address/reserve/ip/count.json", func(writer http.ResponseWriter, request *http.Request) {
Expand Down