Skip to content

Commit

Permalink
💚 cluster should have healthy time synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
mboersma committed Oct 28, 2020
1 parent 7aae523 commit 14a716d
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 3 deletions.
3 changes: 2 additions & 1 deletion test/e2e/azure_logcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ func (k AzureLogCollector) CollectMachineLog(ctx context.Context,
}
controlPlaneEndpoint := cluster.Spec.ControlPlaneEndpoint.Host
hostname := m.Spec.InfrastructureRef.Name
port := e2eConfig.GetVariable(VMSSHPort)
execToPathFn := func(outputFileName, command string, args ...string) func() error {
return func() error {
f, err := fileOnHost(filepath.Join(outputPath, outputFileName))
if err != nil {
return err
}
defer f.Close()
return execOnHost(controlPlaneEndpoint, hostname, f, command, args...)
return execOnHost(controlPlaneEndpoint, hostname, port, f, command, args...)
}
}

Expand Down
40 changes: 40 additions & 0 deletions test/e2e/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ var _ = Describe("Workload cluster creation", func() {
WaitForMachineDeployments: e2eConfig.GetIntervals(specName, "wait-worker-nodes"),
})
cluster = result.Cluster

Context("Validating time synchronization", func() {
AzureTimeSyncSpec(ctx, func() AzureTimeSyncSpecInput {
return AzureTimeSyncSpecInput{
BootstrapClusterProxy: bootstrapClusterProxy,
Namespace: namespace,
ClusterName: clusterName,
}
})
})
})
})

Expand All @@ -121,6 +131,16 @@ var _ = Describe("Workload cluster creation", func() {
})
cluster = result.Cluster

Context("Validating time synchronization", func() {
AzureTimeSyncSpec(ctx, func() AzureTimeSyncSpecInput {
return AzureTimeSyncSpecInput{
BootstrapClusterProxy: bootstrapClusterProxy,
Namespace: namespace,
ClusterName: clusterName,
}
})
})

Context("Validating failure domains", func() {
AzureFailureDomainsSpec(ctx, func() AzureFailureDomainsSpecInput {
return AzureFailureDomainsSpecInput{
Expand Down Expand Up @@ -186,6 +206,16 @@ var _ = Describe("Workload cluster creation", func() {
})
cluster = result.Cluster

Context("Validating time synchronization", func() {
AzureTimeSyncSpec(ctx, func() AzureTimeSyncSpecInput {
return AzureTimeSyncSpecInput{
BootstrapClusterProxy: bootstrapClusterProxy,
Namespace: namespace,
ClusterName: clusterName,
}
})
})

Context("Creating an accessible ipv6 load balancer", func() {
AzureLBSpec(ctx, func() AzureLBSpecInput {
return AzureLBSpecInput{
Expand Down Expand Up @@ -222,6 +252,16 @@ var _ = Describe("Workload cluster creation", func() {
})
cluster = result.Cluster

Context("Validating time synchronization", func() {
AzureTimeSyncSpec(ctx, func() AzureTimeSyncSpecInput {
return AzureTimeSyncSpecInput{
BootstrapClusterProxy: bootstrapClusterProxy,
Namespace: namespace,
ClusterName: clusterName,
}
})
})

Context("Creating an accessible load balancer", func() {
AzureLBSpec(ctx, func() AzureLBSpecInput {
return AzureLBSpecInput{
Expand Down
89 changes: 89 additions & 0 deletions test/e2e/azure_timesync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// +build e2e

/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"context"
"fmt"
"strings"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/cluster-api/test/framework"
kinderrors "sigs.k8s.io/kind/pkg/errors"
)

// AzureTimeSyncSpecInput is the input for AzureTimeSyncSpec.
type AzureTimeSyncSpecInput struct {
BootstrapClusterProxy framework.ClusterProxy
Namespace *corev1.Namespace
ClusterName string
}

// AzureTimeSyncSpec implements a test that verifies time synchronization is healthy for
// the nodes in a cluster.
func AzureTimeSyncSpec(ctx context.Context, inputGetter func() AzureTimeSyncSpecInput) {
var (
specName = "azure-timesync"
input AzureTimeSyncSpecInput
)

input = inputGetter()
Expect(input.BootstrapClusterProxy).NotTo(BeNil(), "Invalid argument. input.BootstrapClusterProxy can't be nil when calling %s spec", specName)

namespace, name := input.Namespace.Name, input.ClusterName
managementClusterClient := input.BootstrapClusterProxy.GetClient()

sshInfo, err := getClusterSSHInfo(ctx, managementClusterClient, namespace, name)
Expect(err).NotTo(HaveOccurred())
Expect(len(sshInfo)).To(BeNumerically(">", 0))

testfuncs := []func() error{}
for _, s := range sshInfo {
Byf("checking that time synchronization is healthy on %s", s.Hostname)

execToStringFn := func(expected, command string, args ...string) func() error {
// don't assert in this test func, just return errors
return func() error {
f := &strings.Builder{}
if err := execOnHost(s.Endpoint, s.Hostname, s.Port, f, command, args...); err != nil {
return err
}
if !strings.Contains(f.String(), expected) {
return fmt.Errorf("expected \"%s\" in command output:\n%s", expected, f.String())
}
return nil
}
}

testfuncs = append(testfuncs,
execToStringFn(
"✓ chronyd is active",
"systemctl", "is-active", "chronyd", "&&",
"echo", "✓ chronyd is active",
),
execToStringFn(
"Reference ID",
"chronyc", "tracking",
),
)
}

Expect(kinderrors.AggregateConcurrent(testfuncs)).To(Succeed())
}
1 change: 1 addition & 0 deletions test/e2e/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
AzureVNetName = "AZURE_VNET_NAME"
CNIPathIPv6 = "CNI_IPV6"
CNIResourcesIPv6 = "CNI_RESOURCES_IPV6"
VMSSHPort = "VM_SSH_PORT"
)

func Byf(format string, a ...interface{}) {
Expand Down
1 change: 1 addition & 0 deletions test/e2e/config/azure-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ variables:
CONFORMANCE_CI_ARTIFACTS_KUBERNETES_VERSION: "v1.18.8"
CONFORMANCE_WORKER_MACHINE_COUNT: "5"
CONFORMANCE_CONTROL_PLANE_MACHINE_COUNT: "1"
VM_SSH_PORT: "22"

intervals:
default/wait-controllers: ["3m", "10s"]
Expand Down
94 changes: 92 additions & 2 deletions test/e2e/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ import (
typedbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
azure "sigs.k8s.io/cluster-api-provider-azure/cloud"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
clusterv1exp "sigs.k8s.io/cluster-api/exp/api/v1alpha3"
"sigs.k8s.io/cluster-api/test/framework"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -284,15 +287,102 @@ func logCheckpoint(specTimes map[string]time.Time) {
}
}

// nodeSSHInfo provides information to establish an SSH connection to a VM or VMSS instance.
type nodeSSHInfo struct {
Endpoint string // Endpoint is the control plane hostname or IP address for initial connection.
Hostname string // Hostname is the name or IP address of the destination VM or VMSS instance.
Port string // Port is the TCP port used for the SSH connection.
}

// getClusterSSHInfo returns the information needed to establish a SSH connection through a
// control plane endpoint to each node in the cluster.
func getClusterSSHInfo(ctx context.Context, c client.Client, namespace, name string) ([]nodeSSHInfo, error) {
sshInfo := []nodeSSHInfo{}

// Collect the info for each VM / Machine.
machines, err := getMachinesInCluster(ctx, c, namespace, name)
if err != nil {
return nil, err
}
for i := range machines.Items {
m := &machines.Items[i]
cluster, err := util.GetClusterFromMetadata(ctx, c, m.ObjectMeta)
if err != nil {
return nil, err
}
sshInfo = append(sshInfo, nodeSSHInfo{
Endpoint: cluster.Spec.ControlPlaneEndpoint.Host,
Hostname: m.Spec.InfrastructureRef.Name,
Port: e2eConfig.GetVariable(VMSSHPort),
})
}

// Collect the info for each instance in a VMSS / MachinePool.
machinePools, err := getMachinePoolsInCluster(ctx, c, namespace, name)
if err != nil {
return nil, err
}
for i := range machinePools.Items {
p := &machinePools.Items[i]
cluster, err := util.GetClusterFromMetadata(ctx, c, p.ObjectMeta)
if err != nil {
return nil, err
}
for j := range p.Status.NodeRefs {
n := p.Status.NodeRefs[j]
sshInfo = append(sshInfo, nodeSSHInfo{
Endpoint: cluster.Spec.ControlPlaneEndpoint.Host,
Hostname: n.Name,
Port: e2eConfig.GetVariable(VMSSHPort),
})
}

}

return sshInfo, nil
}

// getMachinesInCluster returns a list of all machines in the given cluster.
// This is adapted from CAPI's test/framework/cluster_proxy.go.
func getMachinesInCluster(ctx context.Context, c framework.Lister, namespace, name string) (*clusterv1.MachineList, error) {
if name == "" {
return nil, nil
}

machineList := &clusterv1.MachineList{}
labels := map[string]string{clusterv1.ClusterLabelName: name}

if err := c.List(ctx, machineList, client.InNamespace(namespace), client.MatchingLabels(labels)); err != nil {
return nil, err
}

return machineList, nil
}

// getMachinePoolsInCluster returns a list of all machine pools in the given cluster.
func getMachinePoolsInCluster(ctx context.Context, c framework.Lister, namespace, name string) (*clusterv1exp.MachinePoolList, error) {
if name == "" {
return nil, nil
}

machinePoolList := &clusterv1exp.MachinePoolList{}
labels := map[string]string{clusterv1.ClusterLabelName: name}

if err := c.List(ctx, machinePoolList, client.InNamespace(namespace), client.MatchingLabels(labels)); err != nil {
return nil, err
}

return machinePoolList, nil
}

// execOnHost runs the specified command directly on a node's host, using an SSH connection
// proxied through a control plane host.
func execOnHost(controlPlaneEndpoint, hostname string, f io.StringWriter, command string,
func execOnHost(controlPlaneEndpoint, hostname, port string, f io.StringWriter, command string,
args ...string) error {
config, err := newSSHConfig()
if err != nil {
return err
}
port := "22" // Need to use port 50001 for VMSS when MachinePools are supported here.

// Init a client connection to a control plane node via the public load balancer
lbClient, err := ssh.Dial("tcp", fmt.Sprintf("%s:%s", controlPlaneEndpoint, port), config)
Expand Down

0 comments on commit 14a716d

Please sign in to comment.