From 60a55181934ae09624d729ce4fd92f9dc266e91d Mon Sep 17 00:00:00 2001 From: Beraldo Leal Date: Wed, 26 Jun 2024 21:09:09 -0400 Subject: [PATCH] WiP: gcp: implementing provisioning interface This will create cluster and vpc within GCP. Signed-off-by: Beraldo Leal --- src/cloud-api-adaptor/go.mod | 3 +- .../test/provisioner/gcp/cluster.go | 269 ++++++++++++++++++ .../test/provisioner/gcp/provision.go | 15 + .../test/provisioner/gcp/provision_common.go | 77 +++++ .../test/provisioner/gcp/vpc.go | 169 +++++++++++ 5 files changed, 531 insertions(+), 2 deletions(-) create mode 100644 src/cloud-api-adaptor/test/provisioner/gcp/cluster.go create mode 100644 src/cloud-api-adaptor/test/provisioner/gcp/provision.go create mode 100644 src/cloud-api-adaptor/test/provisioner/gcp/provision_common.go create mode 100644 src/cloud-api-adaptor/test/provisioner/gcp/vpc.go diff --git a/src/cloud-api-adaptor/go.mod b/src/cloud-api-adaptor/go.mod index 9a460e3147..4965ed2811 100644 --- a/src/cloud-api-adaptor/go.mod +++ b/src/cloud-api-adaptor/go.mod @@ -58,6 +58,7 @@ require ( github.com/tj/assert v0.0.3 golang.org/x/crypto v0.23.0 golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 + google.golang.org/api v0.162.0 google.golang.org/protobuf v1.33.0 k8s.io/api v0.26.2 k8s.io/apimachinery v0.26.2 @@ -191,10 +192,8 @@ require ( golang.org/x/sync v0.6.0 // indirect golang.org/x/term v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect - golang.org/x/term v0.19.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.14.0 // indirect - google.golang.org/api v0.162.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240213162025-012b6fc9bca9 // indirect diff --git a/src/cloud-api-adaptor/test/provisioner/gcp/cluster.go b/src/cloud-api-adaptor/test/provisioner/gcp/cluster.go new file mode 100644 index 0000000000..b04424a225 --- /dev/null +++ b/src/cloud-api-adaptor/test/provisioner/gcp/cluster.go @@ -0,0 +1,269 @@ +// (C) Copyright Confidential Containers Contributors +// SPDX-License-Identifier: Apache-2.0 + +package gcp + +import ( + "context" + "fmt" + "strconv" + "time" + + log "github.com/sirupsen/logrus" + "google.golang.org/api/container/v1" + "google.golang.org/api/googleapi" + "google.golang.org/api/option" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/retry" + kconf "sigs.k8s.io/e2e-framework/klient/conf" +) + +// GKECluster implements the basic GKE Cluster client operations. +type GKECluster struct { + clusterName string + clusterVersion string + credentials string + machineType string + nodeCount int64 + projectID string + zone string +} + +// NewGKECluster creates a new GKECluster with the given properties +func NewGKECluster(properties map[string]string) (*GKECluster, error) { + defaults := map[string]string{ + "cluster_name": "peer-pods", + "cluster_version": "1.27.11-gke.1062004", + "machine_type": "n1-standard-1", + "node_count": "2", + "zone": "us-central1-a", + } + + for key, value := range properties { + defaults[key] = value + } + + requiredFields := []string{"project_id", "credentials"} + for _, field := range requiredFields { + if _, ok := defaults[field]; !ok { + return nil, fmt.Errorf("%s is required", field) + } + } + + nodeCount, err := strconv.ParseInt(defaults["node_count"], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid node_count: %v", err) + } + + return &GKECluster{ + clusterName: defaults["cluster_name"], + clusterVersion: defaults["cluster_version"], + credentials: defaults["credentials"], + machineType: defaults["machine_type"], + nodeCount: nodeCount, + projectID: defaults["project_id"], + zone: defaults["zone"], + }, nil +} + +// CreateCluster creates the GKE cluster +func (g *GKECluster) CreateCluster(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, time.Hour) + defer cancel() + + srv, err := container.NewService( + ctx, option.WithCredentialsFile(g.credentials), + ) + if err != nil { + return fmt.Errorf("GKE: container.NewService: %v", err) + } + + cluster := &container.Cluster{ + Name: g.clusterName, + InitialNodeCount: g.nodeCount, + NodeConfig: &container.NodeConfig{ + MachineType: g.machineType, + }, + } + + req := &container.CreateClusterRequest{ + Cluster: cluster, + } + + op, err := srv.Projects.Zones.Clusters.Create( + g.projectID, g.zone, req, + ).Context(ctx).Do() + if err != nil { + return fmt.Errorf("GKE: Projects.Zones.Clusters.Create: %v", err) + } + + log.Infof("GKE: Cluster creation operation: %v\n", op.Name) + + err = g.WaitForClusterActive(ctx, 30*time.Minute) + if err != nil { + return fmt.Errorf("GKE: Error waiting for cluster to become active: %v", err) + } + + err = g.ApplyNodeLabels(ctx) + if err != nil { + return fmt.Errorf("GKE: Error applying node labels: %v", err) + } + return nil +} + +// DeleteCluster deletes the GKE cluster +func (g *GKECluster) DeleteCluster(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, time.Hour) + defer cancel() + + srv, err := container.NewService( + ctx, option.WithCredentialsFile(g.credentials), + ) + if err != nil { + return fmt.Errorf("GKE: container.NewService: %v", err) + } + + op, err := srv.Projects.Zones.Clusters.Delete( + g.projectID, g.zone, g.clusterName, + ).Context(ctx).Do() + if err != nil { + return fmt.Errorf("GKE: Projects.Zones.Clusters.Delete: %v", err) + } + + log.Infof("GKE: Cluster deletion operation: %v\n", op.Name) + + // Wait for the cluster to be deleted + activationTimeout := 30 * time.Minute + err = g.WaitForClusterDeleted(ctx, activationTimeout) + if err != nil { + return fmt.Errorf("GKE: error waiting for cluster to be deleted: %v", err) + } + return nil +} + +// WaitForClusterActive waits until the GKE cluster is active +func (g *GKECluster) WaitForClusterActive( + ctx context.Context, activationTimeout time.Duration, +) error { + srv, err := container.NewService( + ctx, option.WithCredentialsFile(g.credentials), + ) + if err != nil { + return fmt.Errorf("GKE: container.NewService: %v", err) + } + + timeoutCtx, cancel := context.WithTimeout(ctx, activationTimeout) + defer cancel() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-timeoutCtx.Done(): + return fmt.Errorf("GKE: Reached timeout waiting for cluster.") + case <-ticker.C: + cluster, err := srv.Projects.Zones.Clusters.Get(g.projectID, g.zone, g.clusterName).Context(ctx).Do() + if err != nil { + return fmt.Errorf("GKE: Projects.Zones.Clusters.Get: %v", err) + } + + if cluster.Status == "RUNNING" { + log.Info("GKE: Cluster is now active") + return nil + } + + log.Info("GKE: Waiting for cluster to become active...") + } + } +} + +// WaitForClusterDeleted waits until the GKE cluster is deleted +func (g *GKECluster) WaitForClusterDeleted( + ctx context.Context, activationTimeout time.Duration, +) error { + srv, err := container.NewService( + ctx, option.WithCredentialsFile(g.credentials), + ) + if err != nil { + return fmt.Errorf("GKE: container.NewService: %v", err) + } + + timeoutCtx, cancel := context.WithTimeout(ctx, activationTimeout) + defer cancel() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-timeoutCtx.Done(): + return fmt.Errorf("GKE: timeout waiting for cluster deletion") + case <-ticker.C: + _, err := srv.Projects.Zones.Clusters.Get(g.projectID, g.zone, g.clusterName).Context(ctx).Do() + if err != nil { + if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == 404 { + log.Info("GKE: Cluster deleted successfully") + return nil + } + return fmt.Errorf("GKE: Projects.Zones.Clusters.Get: %v", err) + } + + log.Info("GKE: Waiting for cluster to be deleted...") + } + } +} + +// GetKubeconfigFile looks for the kubeconfig on the default locations +func (g *GKECluster) GetKubeconfigFile() (string, error) { + kubeconfigPath := kconf.ResolveKubeConfigFile() + if kubeconfigPath == "" { + return "", fmt.Errorf("Unabled to find a kubeconfig file") + } + + return kubeconfigPath, nil +} + +func (g *GKECluster) ApplyNodeLabels(ctx context.Context) error { + kubeconfigPath, err := g.GetKubeconfigFile() + if err != nil { + return err + } + + config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + return fmt.Errorf("failed to build kubeconfig: %v", err) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("failed to create clientset: %v", err) + } + + nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list nodes: %v", err) + } + + for _, node := range nodes.Items { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + n, err := clientset.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get node: %v", err) + } + + n.Labels["node.kubernetes.io/worker"] = "" + n.Labels["node-role.kubernetes.io/worker"] = "" + _, err = clientset.CoreV1().Nodes().Update(ctx, n, metav1.UpdateOptions{}) + return err + }) + if err != nil { + fmt.Printf("Failed to label node %s: %v\n", node.Name, err) + } else { + fmt.Printf("Successfully labeled node %s\n", node.Name) + } + } + return nil +} diff --git a/src/cloud-api-adaptor/test/provisioner/gcp/provision.go b/src/cloud-api-adaptor/test/provisioner/gcp/provision.go new file mode 100644 index 0000000000..ad4399de43 --- /dev/null +++ b/src/cloud-api-adaptor/test/provisioner/gcp/provision.go @@ -0,0 +1,15 @@ +//go:build gcp + +// (C) Copyright Confidential Containers Contributors +// SPDX-License-Identifier: Apache-2.0 + +package gcp + +import ( + pv "github.com/confidential-containers/cloud-api-adaptor/src/cloud-api-adaptor/test/provisioner" +) + +func init() { + pv.NewProvisionerFunctions["gcp"] = NewGCPProvisioner + pv.NewInstallOverlayFunctions["gcp"] = NewGCPInstallOverlay +} diff --git a/src/cloud-api-adaptor/test/provisioner/gcp/provision_common.go b/src/cloud-api-adaptor/test/provisioner/gcp/provision_common.go new file mode 100644 index 0000000000..d575c444d6 --- /dev/null +++ b/src/cloud-api-adaptor/test/provisioner/gcp/provision_common.go @@ -0,0 +1,77 @@ +// (C) Copyright Confidential Containers Contributors +// SPDX-License-Identifier: Apache-2.0 + +package gcp + +import ( + "context" + + pv "github.com/confidential-containers/cloud-api-adaptor/src/cloud-api-adaptor/test/provisioner" + "sigs.k8s.io/e2e-framework/pkg/envconf" +) + +var GCPProps = &GCPProvisioner{} + +// GCPProvisioner implements the CloudProvisioner interface. +type GCPProvisioner struct { + GkeCluster *GKECluster + GcpVPC *GCPVPC +} + +// NewGCPProvisioner creates a new GCPProvisioner with the given properties. +func NewGCPProvisioner(properties map[string]string) (pv.CloudProvisioner, error) { + gkeCluster, err := NewGKECluster(properties) + if err != nil { + return nil, err + } + + gcpVPC, err := NewGCPVPC(properties) + if err != nil { + return nil, err + } + + GCPProps = &GCPProvisioner{ + GkeCluster: gkeCluster, + GcpVPC: gcpVPC, + } + return GCPProps, nil +} + +// CreateCluster creates a new GKE cluster. +func (p *GCPProvisioner) CreateCluster(ctx context.Context, cfg *envconf.Config) error { + err := p.GkeCluster.CreateCluster(ctx) + if err != nil { + return err + } + + kubeconfigPath, err := p.GkeCluster.GetKubeconfigFile() + if err != nil { + return err + } + *cfg = *envconf.NewWithKubeConfig(kubeconfigPath) + + return nil +} + +// CreateVPC creates a new VPC in Google Cloud. +func (p *GCPProvisioner) CreateVPC(ctx context.Context, cfg *envconf.Config) error { + return p.GcpVPC.CreateVPC(ctx, cfg) +} + +// DeleteCluster deletes the GKE cluster. +func (p *GCPProvisioner) DeleteCluster(ctx context.Context, cfg *envconf.Config) error { + return p.GkeCluster.DeleteCluster(ctx) +} + +// DeleteVPC deletes the VPC in Google Cloud. +func (p *GCPProvisioner) DeleteVPC(ctx context.Context, cfg *envconf.Config) error { + return p.GcpVPC.DeleteVPC(ctx, cfg) +} + +func (p *GCPProvisioner) GetProperties(ctx context.Context, cfg *envconf.Config) map[string]string { + return map[string]string{"TO BE IMPLEMENTED": "TO BE IMPLEMENTED"} +} + +func (p *GCPProvisioner) UploadPodvm(imagePath string, ctx context.Context, cfg *envconf.Config) error { + return nil +} diff --git a/src/cloud-api-adaptor/test/provisioner/gcp/vpc.go b/src/cloud-api-adaptor/test/provisioner/gcp/vpc.go new file mode 100644 index 0000000000..ee034cc329 --- /dev/null +++ b/src/cloud-api-adaptor/test/provisioner/gcp/vpc.go @@ -0,0 +1,169 @@ +// (C) Copyright Confidential Containers Contributors +// SPDX-License-Identifier: Apache-2.0 + +package gcp + +import ( + "context" + "fmt" + "time" + + log "github.com/sirupsen/logrus" + "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" + "google.golang.org/api/option" + "sigs.k8s.io/e2e-framework/pkg/envconf" +) + +// GCPVPC implements the Google Compute VPC interface. +type GCPVPC struct { + vpcName string + credentials string + projectID string +} + +// NewGCPVPC creates a new GCPVPC object. +func NewGCPVPC(properties map[string]string) (*GCPVPC, error) { + defaults := map[string]string{ + "vpc_name": "peer-pods-vpc", + } + + for key, value := range properties { + defaults[key] = value + } + + requiredFields := []string{"project_id", "credentials"} + for _, field := range requiredFields { + if _, ok := defaults[field]; !ok { + return nil, fmt.Errorf("%s is required", field) + } + } + + return &GCPVPC{ + vpcName: defaults["vpc_name"], + credentials: defaults["credentials"], + projectID: defaults["project_id"], + }, nil +} + +// CreateVPC creates a new VPC in Google Cloud. +func (g *GCPVPC) CreateVPC( + ctx context.Context, cfg *envconf.Config, +) error { + ctx, cancel := context.WithTimeout(ctx, time.Hour) + defer cancel() + + srv, err := compute.NewService(ctx, option.WithCredentialsFile(g.credentials)) + if err != nil { + return fmt.Errorf("GKE: compute.NewService: %v", err) + } + + network := &compute.Network{ + Name: g.vpcName, + AutoCreateSubnetworks: true, + } + + op, err := srv.Networks.Insert(g.projectID, network).Context(ctx).Do() + if err != nil { + return fmt.Errorf("GKE: Networks.Insert: %v", err) + } + + log.Infof("GKE: VPC creation operation started: %v\n", op.Name) + + err = g.WaitForVPCCreation(ctx, 30*time.Minute) + if err != nil { + return fmt.Errorf("GKE: Error waiting for VPC to be created: %v", err) + } + return nil +} + +// DeleteVPC deletes a VPC in Google Cloud. +func (g *GCPVPC) DeleteVPC(ctx context.Context, cfg *envconf.Config) error { + srv, err := compute.NewService(ctx, option.WithCredentialsFile(g.credentials)) + if err != nil { + return fmt.Errorf("GKE: compute.NewService: %v", err) + } + + op, err := srv.Networks.Delete(g.projectID, g.vpcName).Context(ctx).Do() + if err != nil { + return fmt.Errorf("GKE: Networks.Delete: %v", err) + } + + log.Infof("GKE: VPC deletion operation started: %v\n", op.Name) + + err = g.WaitForVPCDeleted(ctx, 30*time.Minute) + if err != nil { + return fmt.Errorf("GKE: Error waiting for VPC to be deleted: %v", err) + } + + return nil +} + +// WaitForVPCCreation waits until the VPC is created and available. +func (g *GCPVPC) WaitForVPCCreation( + ctx context.Context, timeout time.Duration, +) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + srv, err := compute.NewService(ctx, option.WithCredentialsFile(g.credentials)) + if err != nil { + return fmt.Errorf("compute.NewService: %v", err) + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for VPC creation") + case <-ticker.C: + network, err := srv.Networks.Get(g.projectID, g.vpcName).Context(ctx).Do() + if err != nil { + if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == 404 { + log.Info("Waiting for VPC to be created...") + continue + } + return fmt.Errorf("Networks.Get: %v", err) + } + if network.SelfLink != "" { + log.Info("VPC created successfully") + return nil + } + } + } +} + +// WaitForVPCDeleted waits until the VPC is deleted. +func (g *GCPVPC) WaitForVPCDeleted( + ctx context.Context, timeout time.Duration, +) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + srv, err := compute.NewService(ctx, option.WithCredentialsFile(g.credentials)) + if err != nil { + return fmt.Errorf("GKE: compute.NewService: %v", err) + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("GKE: timeout waiting for VPC deletion") + case <-ticker.C: + _, err := srv.Networks.Get(g.projectID, g.vpcName).Context(ctx).Do() + if err != nil { + if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == 404 { + log.Info("GKE: VPC deleted successfully") + return nil + } + return fmt.Errorf("GKE: Networks.Get: %v", err) + } + log.Info("GKE: Waiting for VPC to be deleted...") + } + } +}