Skip to content

Commit

Permalink
E2E: extend CSI test to cover create and snapshot workflows
Browse files Browse the repository at this point in the history
Split the EBS and EFS tests out into their own test cases:
* EBS exercises the Controller RPCs, including the create/snapshot workflow.
* EFS exercises only the Node RPCs, and assumes we have an existing volume
that gets registered, rather than created.
  • Loading branch information
tgross committed Apr 8, 2021
1 parent 83a559f commit 90929dd
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 309 deletions.
316 changes: 46 additions & 270 deletions e2e/csi/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,20 @@ import (
"strings"
"time"

"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/api"
e2e "github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
)

type CSIVolumesTest struct {
framework.TC
testJobIDs []string
volumeIDs []string
pluginJobIDs []string
}

func init() {
framework.AddSuites(&framework.TestSuite{
Component: "CSI",
CanRunLocal: true,
Consul: false,
Cases: []framework.TestCase{
new(CSIVolumesTest),
new(CSIControllerPluginEBSTest), // see ebs.go
new(CSINodeOnlyPluginEFSTest), // see efs.go
},
})
}
Expand All @@ -45,269 +36,27 @@ const ns = ""
var pluginWait = &e2e.WaitConfig{Interval: 5 * time.Second, Retries: 36} // 3min
var reapWait = &e2e.WaitConfig{Interval: 5 * time.Second, Retries: 36} // 3min

func (tc *CSIVolumesTest) BeforeAll(f *framework.F) {
t := f.T()

_, err := os.Stat("csi/input/volume-ebs.hcl")
if err != nil {
t.Skip("skipping CSI test because EBS volume spec file missing:", err)
}

_, err = os.Stat("csi/input/volume-efs.hcl")
// assertNoErrorElseDump calls a non-halting assert on the error and dumps the
// plugin logs if it fails.
func assertNoErrorElseDump(f *framework.F, err error, msg string, pluginJobIDs []string) {
if err != nil {
t.Skip("skipping CSI test because EFS volume spec file missing:", err)
dumpLogs(pluginJobIDs)
f.Assert().NoError(err, fmt.Sprintf("%v: %v", msg, err))
}

// Ensure cluster has leader and at least two client
// nodes in a ready state before running tests
e2e.WaitForLeader(t, tc.Nomad())
e2e.WaitForNodesReady(t, tc.Nomad(), 2)
}

// TestEBSVolumeClaim launches AWS EBS plugins and registers an EBS volume
// as a Nomad CSI volume. We then deploy a job that writes to the volume,
// stop that job, and reuse the volume for another job which should be able
// to read the data written by the first job.
func (tc *CSIVolumesTest) TestEBSVolumeClaim(f *framework.F) {
t := f.T()
require := require.New(t)
nomadClient := tc.Nomad()
uuid := uuid.Generate()
pluginID := "aws-ebs0"

// deploy the controller plugin job
controllerJobID := "aws-ebs-plugin-controller-" + uuid[0:8]
f.NoError(e2e.Register(controllerJobID, "csi/input/plugin-aws-ebs-controller.nomad"))
tc.pluginJobIDs = append(tc.pluginJobIDs, controllerJobID)
expected := []string{"running", "running"}
f.NoError(
e2e.WaitForAllocStatusExpected(controllerJobID, ns, expected),
"job should be running")

// deploy the node plugins job
nodesJobID := "aws-ebs-plugin-nodes-" + uuid[0:8]
f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-ebs-nodes.nomad"))
tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID)

f.NoError(e2e.WaitForAllocStatusComparison(
func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) },
func(got []string) bool {
for _, status := range got {
if status != "running" {
return false
}
}
return true
}, nil,
))

f.NoError(waitForPluginStatusControllerCount(pluginID, 2, pluginWait),
"aws-ebs0 controller plugins did not become healthy")
f.NoError(waitForPluginStatusMinNodeCount(pluginID, 2, pluginWait),
"aws-ebs0 node plugins did not become healthy")

// register a volume
volID := "ebs-vol[0]"
err := volumeRegister(volID, "csi/input/volume-ebs.hcl")
require.NoError(err)
tc.volumeIDs = append(tc.volumeIDs, volID)

// deploy a job that writes to the volume
writeJobID := "write-ebs-" + uuid[0:8]
f.NoError(e2e.Register(writeJobID, "csi/input/use-ebs-volume.nomad"))
f.NoError(
e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}),
"job should be running")

allocs, err := e2e.AllocsForJob(writeJobID, ns)
f.NoError(err, "could not get allocs for write job")
f.Len(allocs, 1, "could not get allocs for write job")
writeAllocID := allocs[0]["ID"]

// read data from volume and assert the writer wrote a file to it
expectedPath := "/task/test/" + writeAllocID
_, err = readFile(nomadClient, writeAllocID, expectedPath)
require.NoError(err)

// Shutdown (and purge) the writer so we can run a reader.
// we could mount the EBS volume with multi-attach, but we
// want this test to exercise the unpublish workflow.
_, err = e2e.Command("nomad", "job", "stop", "-purge", writeJobID)
require.NoError(err)

// wait for the volume unpublish workflow to complete
require.NoError(waitForVolumeClaimRelease(volID, reapWait),
"write-ebs alloc claim was not released")

// deploy a job so we can read from the volume
readJobID := "read-ebs-" + uuid[0:8]
tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up
f.NoError(e2e.Register(readJobID, "csi/input/use-ebs-volume.nomad"))
f.NoError(
e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}),
"job should be running")

allocs, err = e2e.AllocsForJob(readJobID, ns)
f.NoError(err, "could not get allocs for read job")
f.Len(allocs, 1, "could not get allocs for read job")
readAllocID := allocs[0]["ID"]

// read data from volume and assert we can read the file the writer wrote
expectedPath = "/task/test/" + readAllocID
_, err = readFile(nomadClient, readAllocID, expectedPath)
require.NoError(err)

}

// TestEFSVolumeClaim launches AWS EFS plugins and registers an EFS volume
// as a Nomad CSI volume. We then deploy a job that writes to the volume,
// and share the volume with another job which should be able to read the
// data written by the first job.
func (tc *CSIVolumesTest) TestEFSVolumeClaim(f *framework.F) {
t := f.T()
require := require.New(t)
nomadClient := tc.Nomad()
uuid := uuid.Generate()
pluginID := "aws-efs0"

// deploy the node plugins job (no need for a controller for EFS)
nodesJobID := "aws-efs-plugin-nodes-" + uuid[0:8]
f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-efs-nodes.nomad"))
tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID)

f.NoError(e2e.WaitForAllocStatusComparison(
func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) },
func(got []string) bool {
for _, status := range got {
if status != "running" {
return false
}
}
return true
}, nil,
))

f.NoError(waitForPluginStatusMinNodeCount(pluginID, 2, pluginWait),
"aws-efs0 node plugins did not become healthy")

// register a volume
volID := "efs-vol0"
err := volumeRegister(volID, "csi/input/volume-efs.hcl")
require.NoError(err)
tc.volumeIDs = append(tc.volumeIDs, volID)

// deploy a job that writes to the volume
writeJobID := "write-efs-" + uuid[0:8]
tc.testJobIDs = append(tc.testJobIDs, writeJobID) // ensure failed tests clean up
f.NoError(e2e.Register(writeJobID, "csi/input/use-efs-volume-write.nomad"))
f.NoError(
e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}),
"job should be running")

allocs, err := e2e.AllocsForJob(writeJobID, ns)
f.NoError(err, "could not get allocs for write job")
f.Len(allocs, 1, "could not get allocs for write job")
writeAllocID := allocs[0]["ID"]

// read data from volume and assert the writer wrote a file to it
expectedPath := "/task/test/" + writeAllocID
_, err = readFile(nomadClient, writeAllocID, expectedPath)
require.NoError(err)

// Shutdown the writer so we can run a reader.
// although EFS should support multiple readers, the plugin
// does not.
_, err = e2e.Command("nomad", "job", "stop", writeJobID)
require.NoError(err)

// wait for the volume unpublish workflow to complete
require.NoError(waitForVolumeClaimRelease(volID, reapWait),
"write-efs alloc claim was not released")

// deploy a job that reads from the volume
readJobID := "read-efs-" + uuid[0:8]
tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up
f.NoError(e2e.Register(readJobID, "csi/input/use-efs-volume-read.nomad"))
f.NoError(
e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}),
"job should be running")

allocs, err = e2e.AllocsForJob(readJobID, ns)
f.NoError(err, "could not get allocs for read job")
f.Len(allocs, 1, "could not get allocs for read job")
readAllocID := allocs[0]["ID"]

// read data from volume and assert the writer wrote a file to it
require.NoError(err)
_, err = readFile(nomadClient, readAllocID, expectedPath)
require.NoError(err)
}

func (tc *CSIVolumesTest) AfterEach(f *framework.F) {

// Stop all jobs in test
for _, id := range tc.testJobIDs {
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
f.Assert().NoError(err, out)
}
tc.testJobIDs = []string{}

// Deregister all volumes in test
for _, id := range tc.volumeIDs {
// make sure all the test jobs have finished unpublishing claims
err := waitForVolumeClaimRelease(id, reapWait)
f.Assert().NoError(err, "volume claims were not released")

out, err := e2e.Command("nomad", "volume", "deregister", id)
if err != nil {
fmt.Println("could not deregister volume, dumping allocation logs")
f.Assert().NoError(tc.dumpLogs())
}
f.Assert().NoError(err, out)
}
tc.volumeIDs = []string{}

// Deregister all plugin jobs in test
for _, id := range tc.pluginJobIDs {
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
f.Assert().NoError(err, out)
// requireNoErrorElseDump calls a halting assert on the error and dumps the
// plugin logs if it fails.
func requireNoErrorElseDump(f *framework.F, err error, msg string, pluginJobIDs []string) {
if err != nil {
dumpLogs(pluginJobIDs)
f.NoError(err, fmt.Sprintf("%v: %v", msg, err))
}
tc.pluginJobIDs = []string{}

// Garbage collect
out, err := e2e.Command("nomad", "system", "gc")
f.Assert().NoError(err, out)
}

// waitForVolumeClaimRelease makes sure we don't try to re-claim a volume
// that's in the process of being unpublished. we can't just wait for allocs
// to stop, but need to wait for their claims to be released
func waitForVolumeClaimRelease(volID string, wc *e2e.WaitConfig) error {
var out string
var err error
testutil.WaitForResultRetries(wc.Retries, func() (bool, error) {
time.Sleep(wc.Interval)
out, err = e2e.Command("nomad", "volume", "status", volID)
if err != nil {
return false, err
}
section, err := e2e.GetSection(out, "Allocations")
if err != nil {
return false, err
}
return strings.Contains(section, "No allocations placed"), nil
}, func(e error) {
if e == nil {
err = nil
}
err = fmt.Errorf("alloc claim was not released: %v\n%s", e, out)
})
return err
}

func (tc *CSIVolumesTest) dumpLogs() error {
func dumpLogs(pluginIDs []string) error {

for _, id := range tc.pluginJobIDs {
for _, id := range pluginIDs {
allocs, err := e2e.AllocsForJob(id, ns)
if err != nil {
return fmt.Errorf("could not find allocs for plugin: %v", err)
Expand Down Expand Up @@ -340,6 +89,32 @@ func (tc *CSIVolumesTest) dumpLogs() error {
return nil
}

// waitForVolumeClaimRelease makes sure we don't try to re-claim a volume
// that's in the process of being unpublished. we can't just wait for allocs
// to stop, but need to wait for their claims to be released
func waitForVolumeClaimRelease(volID string, wc *e2e.WaitConfig) error {
var out string
var err error
testutil.WaitForResultRetries(wc.Retries, func() (bool, error) {
time.Sleep(wc.Interval)
out, err = e2e.Command("nomad", "volume", "status", volID)
if err != nil {
return false, err
}
section, err := e2e.GetSection(out, "Allocations")
if err != nil {
return false, err
}
return strings.Contains(section, "No allocations placed"), nil
}, func(e error) {
if e == nil {
err = nil
}
err = fmt.Errorf("alloc claim was not released: %v\n%s", e, out)
})
return err
}

// TODO(tgross): replace this w/ AllocFS().Stat() after
// https://github.com/hashicorp/nomad/issues/7365 is fixed
func readFile(client *api.Client, allocID string, path string) (bytes.Buffer, error) {
Expand Down Expand Up @@ -434,11 +209,12 @@ func waitForPluginStatusCompare(pluginID string, compare func(got string) (bool,
return err
}

// VolumeRegister registers a jobspec from a file but with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func volumeRegister(volID, volFilePath string) error {
// volumeRegister creates or registers a volume spec from a file but with a
// unique ID. The caller is responsible for recording that ID for later
// cleanup.
func volumeRegister(volID, volFilePath, createOrRegister string) error {

cmd := exec.Command("nomad", "volume", "register", "-")
cmd := exec.Command("nomad", "volume", createOrRegister, "-")
stdin, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("could not open stdin?: %w", err)
Expand Down
Loading

0 comments on commit 90929dd

Please sign in to comment.