Skip to content

Commit

Permalink
Anexia Provider: Utilize Creating state instead of blocking `Create…
Browse files Browse the repository at this point in the history
…` call (#1510)

Signed-off-by: Mario Schäfer <[email protected]>

Signed-off-by: Mario Schäfer <[email protected]>
Co-authored-by: Mario Schäfer <[email protected]>
  • Loading branch information
kubermatic-bot and Mario Schäfer authored Jan 3, 2023
1 parent 27a5053 commit 87a0dbc
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 104 deletions.
23 changes: 0 additions & 23 deletions pkg/cloudprovider/provider/anexia/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ package anexia

import (
"encoding/json"
"net/http"
"testing"

"github.com/gophercloud/gophercloud/testhelper"
"go.anx.io/go-anxcloud/pkg/vsphere/search"

"github.com/kubermatic/machine-controller/pkg/apis/cluster/v1alpha1"
anxtypes "github.com/kubermatic/machine-controller/pkg/cloudprovider/provider/anexia/types"
Expand Down Expand Up @@ -64,27 +62,6 @@ func getSpecsForValidationTest(t *testing.T, configCases []ConfigTestCase) []Val
return testCases
}

func createSearchHandler(t *testing.T, iterations int) http.HandlerFunc {
counter := 0
return func(writer http.ResponseWriter, request *http.Request) {
test := request.URL.Query().Get("name")
testhelper.AssertEquals(t, "%-TestMachine", test)
testhelper.TestMethod(t, request, http.MethodGet)
if iterations == counter {
encoder := json.NewEncoder(writer)
testhelper.AssertNoErr(t, encoder.Encode(map[string]interface{}{
"data": []search.VM{
{
Name: "543053-TestMachine",
Identifier: TestIdentifier,
},
},
}))
}
counter++
}
}

func newConfigVarString(str string) types.ConfigVarString {
return types.ConfigVarString{
Value: str,
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/provider/anexia/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

type anexiaInstance struct {
isCreating bool
info *info.Info
reservedAddresses []string
}
Expand Down Expand Up @@ -85,6 +86,10 @@ func (ai *anexiaInstance) Addresses() map[string]v1.NodeAddressType {
}

func (ai *anexiaInstance) Status() instance.Status {
if ai.isCreating {
return instance.StatusCreating
}

if ai.info != nil {
if ai.info.Status == anxtypes.MachinePoweredOn {
return instance.StatusRunning
Expand Down
77 changes: 28 additions & 49 deletions pkg/cloudprovider/provider/anexia/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"

anxclient "go.anx.io/go-anxcloud/pkg/client"
Expand All @@ -45,7 +47,6 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
)

Expand Down Expand Up @@ -115,16 +116,6 @@ func (p *provider) Create(ctx context.Context, machine *clusterv1alpha1.Machine,
retErr = anxtypes.NewMultiError(retErr, updateMachineStatus(machine, status, data.Update))
}()

// check whether machine is already provisioning
if isAlreadyProvisioning(ctx) && status.ProvisioningID == "" {
klog.Info("ongoing provisioning detected")
err := waitForVM(ctx, client)
if err != nil {
return nil, err
}
return p.Get(ctx, machine, data)
}

// provision machine
err = provisionVM(ctx, client)
if err != nil {
Expand All @@ -133,33 +124,6 @@ func (p *provider) Create(ctx context.Context, machine *clusterv1alpha1.Machine,
return p.Get(ctx, machine, data)
}

func waitForVM(ctx context.Context, client anxclient.Client) error {
reconcileContext := getReconcileContext(ctx)
api := vsphere.NewAPI(client)
var identifier string
err := wait.PollImmediate(5*time.Second, 1*time.Minute, func() (bool, error) {
klog.V(2).Info("checking for VM with name ", reconcileContext.Machine.Name)
vms, err := api.Search().ByName(ctx, fmt.Sprintf("%%-%s", reconcileContext.Machine.Name))
if err != nil {
return false, nil
}
if len(vms) < 1 {
return false, nil
}
if len(vms) > 1 {
return false, errors.New("too many VMs returned by search")
}
identifier = vms[0].Identifier
return true, nil
})
if err != nil {
return err
}

reconcileContext.Status.InstanceID = identifier
return updateMachineStatus(reconcileContext.Machine, *reconcileContext.Status, reconcileContext.ProviderData.Update)
}

func provisionVM(ctx context.Context, client anxclient.Client) error {
reconcileContext := getReconcileContext(ctx)
vmAPI := vsphere.NewAPI(client)
Expand Down Expand Up @@ -229,15 +193,6 @@ func provisionVM(ctx context.Context, client anxclient.Client) error {
klog.V(2).Info(fmt.Sprintf("Using provisionID from machine '%s' to await completion",
reconcileContext.Machine.Name))

instanceID, err := vmAPI.Provisioning().Progress().AwaitCompletion(ctx, status.ProvisioningID)
if err != nil {
klog.Errorf("failed to await machine completion '%s'", reconcileContext.Machine.Name)
// something went wrong remove provisioning ID, so we can start from scratch
status.ProvisioningID = ""
return newError(common.CreateMachineError, "instance provisioning failed: %v", err)
}

status.InstanceID = instanceID
meta.SetStatusCondition(&status.Conditions, v1.Condition{
Type: ProvisionedType,
Status: v1.ConditionTrue,
Expand All @@ -248,6 +203,8 @@ func provisionVM(ctx context.Context, client anxclient.Client) error {
return updateMachineStatus(reconcileContext.Machine, *status, reconcileContext.ProviderData.Update)
}

var _engsup3404mutex sync.Mutex

func getIPAddress(ctx context.Context, client anxclient.Client) (string, error) {
reconcileContext := getReconcileContext(ctx)
status := reconcileContext.Status
Expand All @@ -258,6 +215,9 @@ func getIPAddress(ctx context.Context, client anxclient.Client) (string, error)
return status.ReservedIP, nil
}

_engsup3404mutex.Lock()
defer _engsup3404mutex.Unlock()

klog.Info(fmt.Sprintf("Creating a new IP for machine %q", reconcileContext.Machine.Name))
addrAPI := anxaddr.NewAPI(client)
config := reconcileContext.Config
Expand Down Expand Up @@ -447,7 +407,7 @@ func (p *provider) Validate(_ context.Context, machinespec clusterv1alpha1.Machi
return nil
}

func (p *provider) Get(ctx context.Context, machine *clusterv1alpha1.Machine, _ *cloudprovidertypes.ProviderData) (instance.Instance, error) {
func (p *provider) Get(ctx context.Context, machine *clusterv1alpha1.Machine, pd *cloudprovidertypes.ProviderData) (instance.Instance, error) {
config, _, err := p.getConfig(machine.Spec.ProviderSpec)
if err != nil {
return nil, newError(common.InvalidConfigurationMachineError, "failed to retrieve config: %v", err)
Expand All @@ -464,10 +424,29 @@ func (p *provider) Get(ctx context.Context, machine *clusterv1alpha1.Machine, _
return nil, newError(common.InvalidConfigurationMachineError, "failed to get machine status: %v", err)
}

if status.InstanceID == "" {
if status.InstanceID == "" && status.ProvisioningID == "" {
return nil, cloudprovidererrors.ErrInstanceNotFound
}

if status.InstanceID == "" {
progress, err := vsphereAPI.Provisioning().Progress().Get(ctx, status.ProvisioningID)
if err != nil {
return nil, fmt.Errorf("failed to get provisioning progress: %w", err)
}
if len(progress.Errors) > 0 {
return nil, fmt.Errorf("vm provisioning had errors: %s", strings.Join(progress.Errors, ","))
}
if progress.Progress < 100 || progress.VMIdentifier == "" {
return &anexiaInstance{isCreating: true}, nil
}

status.InstanceID = progress.VMIdentifier

if err := updateMachineStatus(machine, status, pd.Update); err != nil {
return nil, fmt.Errorf("failed updating machine status: %w", err)
}
}

instance := anexiaInstance{}

if status.IPState == anxtypes.IPStateBound && status.ReservedIP != "" {
Expand Down
32 changes: 0 additions & 32 deletions pkg/cloudprovider/provider/anexia/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,6 @@ func TestAnexiaProvider(t *testing.T) {
server.Close()
})

t.Run("Test waiting for VM", func(t *testing.T) {
t.Parallel()

waitUntilVMIsFound := 2
testhelper.Mux.HandleFunc("/api/vsphere/v1/search/by_name.json", createSearchHandler(t, waitUntilVMIsFound))

providerStatus := anxtypes.ProviderStatus{}
ctx := createReconcileContext(context.Background(), reconcileContext{
Machine: &v1alpha1.Machine{
ObjectMeta: metav1.ObjectMeta{Name: "TestMachine"},
},
Status: &providerStatus,
UserData: "",
Config: resolvedConfig{},

ProviderData: &cloudprovidertypes.ProviderData{
Update: func(m *clusterv1alpha1.Machine, mod ...cloudprovidertypes.MachineModifier) error {
return nil
},
},
})

err := waitForVM(ctx, client)
if err != nil {
t.Fatal("No error was expected", err)
}

if providerStatus.InstanceID != TestIdentifier {
t.Error("Expected InstanceID to be set")
}
})

t.Run("Test provision VM", func(t *testing.T) {
t.Parallel()
testhelper.Mux.HandleFunc("/api/ipam/v1/address/reserve/ip/count.json", func(writer http.ResponseWriter, request *http.Request) {
Expand Down

0 comments on commit 87a0dbc

Please sign in to comment.