diff --git a/e2e/csi/csi.go b/e2e/csi/csi.go index 761bbe242ec..d500b6d2c06 100644 --- a/e2e/csi/csi.go +++ b/e2e/csi/csi.go @@ -13,29 +13,20 @@ import ( "strings" "time" - "github.com/stretchr/testify/require" - "github.com/hashicorp/nomad/api" e2e "github.com/hashicorp/nomad/e2e/e2eutil" "github.com/hashicorp/nomad/e2e/framework" - "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/testutil" ) -type CSIVolumesTest struct { - framework.TC - testJobIDs []string - volumeIDs []string - pluginJobIDs []string -} - func init() { framework.AddSuites(&framework.TestSuite{ Component: "CSI", CanRunLocal: true, Consul: false, Cases: []framework.TestCase{ - new(CSIVolumesTest), + new(CSIControllerPluginEBSTest), // see ebs.go + new(CSINodeOnlyPluginEFSTest), // see efs.go }, }) } @@ -45,269 +36,27 @@ const ns = "" var pluginWait = &e2e.WaitConfig{Interval: 5 * time.Second, Retries: 36} // 3min var reapWait = &e2e.WaitConfig{Interval: 5 * time.Second, Retries: 36} // 3min -func (tc *CSIVolumesTest) BeforeAll(f *framework.F) { - t := f.T() - - _, err := os.Stat("csi/input/volume-ebs.hcl") - if err != nil { - t.Skip("skipping CSI test because EBS volume spec file missing:", err) - } - - _, err = os.Stat("csi/input/volume-efs.hcl") +// assertNoErrorElseDump calls a non-halting assert on the error and dumps the +// plugin logs if it fails. +func assertNoErrorElseDump(f *framework.F, err error, msg string, pluginJobIDs []string) { if err != nil { - t.Skip("skipping CSI test because EFS volume spec file missing:", err) + dumpLogs(pluginJobIDs) + f.Assert().NoError(err, fmt.Sprintf("%v: %v", msg, err)) } - - // Ensure cluster has leader and at least two client - // nodes in a ready state before running tests - e2e.WaitForLeader(t, tc.Nomad()) - e2e.WaitForNodesReady(t, tc.Nomad(), 2) -} - -// TestEBSVolumeClaim launches AWS EBS plugins and registers an EBS volume -// as a Nomad CSI volume. We then deploy a job that writes to the volume, -// stop that job, and reuse the volume for another job which should be able -// to read the data written by the first job. -func (tc *CSIVolumesTest) TestEBSVolumeClaim(f *framework.F) { - t := f.T() - require := require.New(t) - nomadClient := tc.Nomad() - uuid := uuid.Generate() - pluginID := "aws-ebs0" - - // deploy the controller plugin job - controllerJobID := "aws-ebs-plugin-controller-" + uuid[0:8] - f.NoError(e2e.Register(controllerJobID, "csi/input/plugin-aws-ebs-controller.nomad")) - tc.pluginJobIDs = append(tc.pluginJobIDs, controllerJobID) - expected := []string{"running", "running"} - f.NoError( - e2e.WaitForAllocStatusExpected(controllerJobID, ns, expected), - "job should be running") - - // deploy the node plugins job - nodesJobID := "aws-ebs-plugin-nodes-" + uuid[0:8] - f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-ebs-nodes.nomad")) - tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID) - - f.NoError(e2e.WaitForAllocStatusComparison( - func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) }, - func(got []string) bool { - for _, status := range got { - if status != "running" { - return false - } - } - return true - }, nil, - )) - - f.NoError(waitForPluginStatusControllerCount(pluginID, 2, pluginWait), - "aws-ebs0 controller plugins did not become healthy") - f.NoError(waitForPluginStatusMinNodeCount(pluginID, 2, pluginWait), - "aws-ebs0 node plugins did not become healthy") - - // register a volume - volID := "ebs-vol[0]" - err := volumeRegister(volID, "csi/input/volume-ebs.hcl") - require.NoError(err) - tc.volumeIDs = append(tc.volumeIDs, volID) - - // deploy a job that writes to the volume - writeJobID := "write-ebs-" + uuid[0:8] - f.NoError(e2e.Register(writeJobID, "csi/input/use-ebs-volume.nomad")) - f.NoError( - e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}), - "job should be running") - - allocs, err := e2e.AllocsForJob(writeJobID, ns) - f.NoError(err, "could not get allocs for write job") - f.Len(allocs, 1, "could not get allocs for write job") - writeAllocID := allocs[0]["ID"] - - // read data from volume and assert the writer wrote a file to it - expectedPath := "/task/test/" + writeAllocID - _, err = readFile(nomadClient, writeAllocID, expectedPath) - require.NoError(err) - - // Shutdown (and purge) the writer so we can run a reader. - // we could mount the EBS volume with multi-attach, but we - // want this test to exercise the unpublish workflow. - _, err = e2e.Command("nomad", "job", "stop", "-purge", writeJobID) - require.NoError(err) - - // wait for the volume unpublish workflow to complete - require.NoError(waitForVolumeClaimRelease(volID, reapWait), - "write-ebs alloc claim was not released") - - // deploy a job so we can read from the volume - readJobID := "read-ebs-" + uuid[0:8] - tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up - f.NoError(e2e.Register(readJobID, "csi/input/use-ebs-volume.nomad")) - f.NoError( - e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}), - "job should be running") - - allocs, err = e2e.AllocsForJob(readJobID, ns) - f.NoError(err, "could not get allocs for read job") - f.Len(allocs, 1, "could not get allocs for read job") - readAllocID := allocs[0]["ID"] - - // read data from volume and assert we can read the file the writer wrote - expectedPath = "/task/test/" + readAllocID - _, err = readFile(nomadClient, readAllocID, expectedPath) - require.NoError(err) - } -// TestEFSVolumeClaim launches AWS EFS plugins and registers an EFS volume -// as a Nomad CSI volume. We then deploy a job that writes to the volume, -// and share the volume with another job which should be able to read the -// data written by the first job. -func (tc *CSIVolumesTest) TestEFSVolumeClaim(f *framework.F) { - t := f.T() - require := require.New(t) - nomadClient := tc.Nomad() - uuid := uuid.Generate() - pluginID := "aws-efs0" - - // deploy the node plugins job (no need for a controller for EFS) - nodesJobID := "aws-efs-plugin-nodes-" + uuid[0:8] - f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-efs-nodes.nomad")) - tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID) - - f.NoError(e2e.WaitForAllocStatusComparison( - func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) }, - func(got []string) bool { - for _, status := range got { - if status != "running" { - return false - } - } - return true - }, nil, - )) - - f.NoError(waitForPluginStatusMinNodeCount(pluginID, 2, pluginWait), - "aws-efs0 node plugins did not become healthy") - - // register a volume - volID := "efs-vol0" - err := volumeRegister(volID, "csi/input/volume-efs.hcl") - require.NoError(err) - tc.volumeIDs = append(tc.volumeIDs, volID) - - // deploy a job that writes to the volume - writeJobID := "write-efs-" + uuid[0:8] - tc.testJobIDs = append(tc.testJobIDs, writeJobID) // ensure failed tests clean up - f.NoError(e2e.Register(writeJobID, "csi/input/use-efs-volume-write.nomad")) - f.NoError( - e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}), - "job should be running") - - allocs, err := e2e.AllocsForJob(writeJobID, ns) - f.NoError(err, "could not get allocs for write job") - f.Len(allocs, 1, "could not get allocs for write job") - writeAllocID := allocs[0]["ID"] - - // read data from volume and assert the writer wrote a file to it - expectedPath := "/task/test/" + writeAllocID - _, err = readFile(nomadClient, writeAllocID, expectedPath) - require.NoError(err) - - // Shutdown the writer so we can run a reader. - // although EFS should support multiple readers, the plugin - // does not. - _, err = e2e.Command("nomad", "job", "stop", writeJobID) - require.NoError(err) - - // wait for the volume unpublish workflow to complete - require.NoError(waitForVolumeClaimRelease(volID, reapWait), - "write-efs alloc claim was not released") - - // deploy a job that reads from the volume - readJobID := "read-efs-" + uuid[0:8] - tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up - f.NoError(e2e.Register(readJobID, "csi/input/use-efs-volume-read.nomad")) - f.NoError( - e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}), - "job should be running") - - allocs, err = e2e.AllocsForJob(readJobID, ns) - f.NoError(err, "could not get allocs for read job") - f.Len(allocs, 1, "could not get allocs for read job") - readAllocID := allocs[0]["ID"] - - // read data from volume and assert the writer wrote a file to it - require.NoError(err) - _, err = readFile(nomadClient, readAllocID, expectedPath) - require.NoError(err) -} - -func (tc *CSIVolumesTest) AfterEach(f *framework.F) { - - // Stop all jobs in test - for _, id := range tc.testJobIDs { - out, err := e2e.Command("nomad", "job", "stop", "-purge", id) - f.Assert().NoError(err, out) - } - tc.testJobIDs = []string{} - - // Deregister all volumes in test - for _, id := range tc.volumeIDs { - // make sure all the test jobs have finished unpublishing claims - err := waitForVolumeClaimRelease(id, reapWait) - f.Assert().NoError(err, "volume claims were not released") - - out, err := e2e.Command("nomad", "volume", "deregister", id) - if err != nil { - fmt.Println("could not deregister volume, dumping allocation logs") - f.Assert().NoError(tc.dumpLogs()) - } - f.Assert().NoError(err, out) - } - tc.volumeIDs = []string{} - - // Deregister all plugin jobs in test - for _, id := range tc.pluginJobIDs { - out, err := e2e.Command("nomad", "job", "stop", "-purge", id) - f.Assert().NoError(err, out) +// requireNoErrorElseDump calls a halting assert on the error and dumps the +// plugin logs if it fails. +func requireNoErrorElseDump(f *framework.F, err error, msg string, pluginJobIDs []string) { + if err != nil { + dumpLogs(pluginJobIDs) + f.NoError(err, fmt.Sprintf("%v: %v", msg, err)) } - tc.pluginJobIDs = []string{} - - // Garbage collect - out, err := e2e.Command("nomad", "system", "gc") - f.Assert().NoError(err, out) -} - -// waitForVolumeClaimRelease makes sure we don't try to re-claim a volume -// that's in the process of being unpublished. we can't just wait for allocs -// to stop, but need to wait for their claims to be released -func waitForVolumeClaimRelease(volID string, wc *e2e.WaitConfig) error { - var out string - var err error - testutil.WaitForResultRetries(wc.Retries, func() (bool, error) { - time.Sleep(wc.Interval) - out, err = e2e.Command("nomad", "volume", "status", volID) - if err != nil { - return false, err - } - section, err := e2e.GetSection(out, "Allocations") - if err != nil { - return false, err - } - return strings.Contains(section, "No allocations placed"), nil - }, func(e error) { - if e == nil { - err = nil - } - err = fmt.Errorf("alloc claim was not released: %v\n%s", e, out) - }) - return err } -func (tc *CSIVolumesTest) dumpLogs() error { +func dumpLogs(pluginIDs []string) error { - for _, id := range tc.pluginJobIDs { + for _, id := range pluginIDs { allocs, err := e2e.AllocsForJob(id, ns) if err != nil { return fmt.Errorf("could not find allocs for plugin: %v", err) @@ -340,6 +89,32 @@ func (tc *CSIVolumesTest) dumpLogs() error { return nil } +// waitForVolumeClaimRelease makes sure we don't try to re-claim a volume +// that's in the process of being unpublished. we can't just wait for allocs +// to stop, but need to wait for their claims to be released +func waitForVolumeClaimRelease(volID string, wc *e2e.WaitConfig) error { + var out string + var err error + testutil.WaitForResultRetries(wc.Retries, func() (bool, error) { + time.Sleep(wc.Interval) + out, err = e2e.Command("nomad", "volume", "status", volID) + if err != nil { + return false, err + } + section, err := e2e.GetSection(out, "Allocations") + if err != nil { + return false, err + } + return strings.Contains(section, "No allocations placed"), nil + }, func(e error) { + if e == nil { + err = nil + } + err = fmt.Errorf("alloc claim was not released: %v\n%s", e, out) + }) + return err +} + // TODO(tgross): replace this w/ AllocFS().Stat() after // https://github.com/hashicorp/nomad/issues/7365 is fixed func readFile(client *api.Client, allocID string, path string) (bytes.Buffer, error) { @@ -434,11 +209,12 @@ func waitForPluginStatusCompare(pluginID string, compare func(got string) (bool, return err } -// VolumeRegister registers a jobspec from a file but with a unique ID. -// The caller is responsible for recording that ID for later cleanup. -func volumeRegister(volID, volFilePath string) error { +// volumeRegister creates or registers a volume spec from a file but with a +// unique ID. The caller is responsible for recording that ID for later +// cleanup. +func volumeRegister(volID, volFilePath, createOrRegister string) error { - cmd := exec.Command("nomad", "volume", "register", "-") + cmd := exec.Command("nomad", "volume", createOrRegister, "-") stdin, err := cmd.StdinPipe() if err != nil { return fmt.Errorf("could not open stdin?: %w", err) diff --git a/e2e/csi/ebs.go b/e2e/csi/ebs.go new file mode 100644 index 00000000000..d158a38e6f4 --- /dev/null +++ b/e2e/csi/ebs.go @@ -0,0 +1,175 @@ +package csi + +import ( + "fmt" + + e2e "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" +) + +// CSIControllerPluginEBSTest exercises the AWS EBS plugin, which is an +// example of a plugin that supports most of the CSI Controller RPCs. +type CSIControllerPluginEBSTest struct { + framework.TC + uuid string + testJobIDs []string + volumeIDs []string + pluginJobIDs []string +} + +const ebsPluginID = "aws-ebs0" + +// BeforeAll waits for the cluster to be ready, deploys the CSI plugins, and +// creates two EBS volumes for use in the test. +func (tc *CSIControllerPluginEBSTest) BeforeAll(f *framework.F) { + e2e.WaitForLeader(f.T(), tc.Nomad()) + e2e.WaitForNodesReady(f.T(), tc.Nomad(), 2) + + tc.uuid = uuid.Generate()[0:8] + + // deploy the controller plugin job + controllerJobID := "aws-ebs-plugin-controller-" + tc.uuid + f.NoError(e2e.Register(controllerJobID, "csi/input/plugin-aws-ebs-controller.nomad")) + tc.pluginJobIDs = append(tc.pluginJobIDs, controllerJobID) + expected := []string{"running", "running"} + f.NoError( + e2e.WaitForAllocStatusExpected(controllerJobID, ns, expected), + "job should be running") + + // deploy the node plugins job + nodesJobID := "aws-ebs-plugin-nodes-" + tc.uuid + f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-ebs-nodes.nomad")) + tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID) + + f.NoError(e2e.WaitForAllocStatusComparison( + func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) }, + func(got []string) bool { + for _, status := range got { + if status != "running" { + return false + } + } + return true + }, nil, + )) + + f.NoError(waitForPluginStatusControllerCount(ebsPluginID, 2, pluginWait), + "aws-ebs0 controller plugins did not become healthy") + f.NoError(waitForPluginStatusMinNodeCount(ebsPluginID, 2, pluginWait), + "aws-ebs0 node plugins did not become healthy") + + // ideally we'd wait until after we check `nomad volume status -verbose` + // to verify these volumes are ready, but the plugin doesn't support the + // CSI ListVolumes RPC + volID := "ebs-vol[0]" + err := volumeRegister(volID, "csi/input/ebs-volume0.hcl", "create") + requireNoErrorElseDump(f, err, "could not create volume", tc.pluginJobIDs) + tc.volumeIDs = append(tc.volumeIDs, volID) + + volID = "ebs-vol[1]" + err = volumeRegister(volID, "csi/input/ebs-volume1.hcl", "create") + requireNoErrorElseDump(f, err, "could not create volume", tc.pluginJobIDs) + tc.volumeIDs = append(tc.volumeIDs, volID) +} + +// AfterAll cleans up the volumes and plugin jobs created by the test. +func (tc *CSIControllerPluginEBSTest) AfterAll(f *framework.F) { + for _, volID := range tc.volumeIDs { + err := waitForVolumeClaimRelease(volID, reapWait) + f.Assert().NoError(err, "volume claims were not released") + + out, err := e2e.Command("nomad", "volume", "delete", volID) + assertNoErrorElseDump(f, err, + fmt.Sprintf("could not delete volume:\n%v", out), tc.pluginJobIDs) + } + + // Deregister all plugin jobs in test + for _, id := range tc.pluginJobIDs { + out, err := e2e.Command("nomad", "job", "stop", "-purge", id) + f.Assert().NoError(err, out) + } + tc.pluginJobIDs = []string{} + + // Garbage collect + out, err := e2e.Command("nomad", "system", "gc") + f.Assert().NoError(err, out) + +} + +// TestVolumeClaim exercises the volume publish/unpublish workflows for the +// EBS plugin. +func (tc *CSIControllerPluginEBSTest) TestVolumeClaim(f *framework.F) { + nomadClient := tc.Nomad() + + // deploy a job that writes to the volume + writeJobID := "write-ebs-" + tc.uuid + f.NoError(e2e.Register(writeJobID, "csi/input/use-ebs-volume.nomad")) + f.NoError( + e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}), + "job should be running") + + allocs, err := e2e.AllocsForJob(writeJobID, ns) + f.NoError(err, "could not get allocs for write job") + f.Len(allocs, 1, "could not get allocs for write job") + writeAllocID := allocs[0]["ID"] + + // read data from volume and assert the writer wrote a file to it + expectedPath := "/task/test/" + writeAllocID + _, err = readFile(nomadClient, writeAllocID, expectedPath) + f.NoError(err) + + // Shutdown (and purge) the writer so we can run a reader. + // we could mount the EBS volume with multi-attach, but we + // want this test to exercise the unpublish workflow. + _, err = e2e.Command("nomad", "job", "stop", "-purge", writeJobID) + f.NoError(err) + + // wait for the volume unpublish workflow to complete + for _, volID := range tc.volumeIDs { + err := waitForVolumeClaimRelease(volID, reapWait) + f.NoError(err, "volume claims were not released") + } + + // deploy a job so we can read from the volume + readJobID := "read-ebs-" + tc.uuid + tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up + f.NoError(e2e.Register(readJobID, "csi/input/use-ebs-volume.nomad")) + f.NoError( + e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}), + "job should be running") + + allocs, err = e2e.AllocsForJob(readJobID, ns) + f.NoError(err, "could not get allocs for read job") + f.Len(allocs, 1, "could not get allocs for read job") + readAllocID := allocs[0]["ID"] + + // read data from volume and assert we can read the file the writer wrote + expectedPath = "/task/test/" + readAllocID + _, err = readFile(nomadClient, readAllocID, expectedPath) + f.NoError(err) +} + +// TestSnapshot exercises the snapshot commands. +func (tc *CSIControllerPluginEBSTest) TestSnapshot(f *framework.F) { + + out, err := e2e.Command("nomad", "volume", "snapshot", "create", + tc.volumeIDs[0], "snap-"+tc.uuid) + requireNoErrorElseDump(f, err, "could not create volume snapshot", tc.pluginJobIDs) + + snaps, err := e2e.ParseColumns(out) + + defer func() { + _, err := e2e.Command("nomad", "volume", "snapshot", "delete", + ebsPluginID, snaps[0]["External ID"]) + requireNoErrorElseDump(f, err, "could not delete volume snapshot", tc.pluginJobIDs) + }() + + f.NoError(err, fmt.Sprintf("could not parse output:\n%v", out)) + f.Len(snaps, 1, fmt.Sprintf("could not parse output:\n%v", out)) + + out, err = e2e.Command("nomad", "volume", "snapshot", "list") + requireNoErrorElseDump(f, err, "could not list volume snapshots", tc.pluginJobIDs) + f.Contains(out, snaps[0]["ID"], + fmt.Sprintf("volume snapshot list did not include expected snapshot:\n%v", out)) +} diff --git a/e2e/csi/efs.go b/e2e/csi/efs.go new file mode 100644 index 00000000000..8a82cee70c1 --- /dev/null +++ b/e2e/csi/efs.go @@ -0,0 +1,153 @@ +package csi + +import ( + "fmt" + "os" + + e2e "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/stretchr/testify/require" +) + +// CSINodeOnlyPluginEFSTest exercises the AWS EFS plugin, which is an +// example of a plugin that can run in Node-only mode. +type CSINodeOnlyPluginEFSTest struct { + framework.TC + uuid string + testJobIDs []string + volumeIDs []string + pluginJobIDs []string +} + +const efsPluginID = "aws-efs0" + +func (tc *CSINodeOnlyPluginEFSTest) BeforeAll(f *framework.F) { + t := f.T() + + _, err := os.Stat("csi/input/volume-efs.hcl") + if err != nil { + t.Skip("skipping CSI test because EFS volume spec file missing:", err) + } + + // Ensure cluster has leader and at least two client + // nodes in a ready state before running tests + e2e.WaitForLeader(t, tc.Nomad()) + e2e.WaitForNodesReady(t, tc.Nomad(), 2) +} + +// TestEFSVolumeClaim launches AWS EFS plugins and registers an EFS volume +// as a Nomad CSI volume. We then deploy a job that writes to the volume, +// and share the volume with another job which should be able to read the +// data written by the first job. +func (tc *CSINodeOnlyPluginEFSTest) TestEFSVolumeClaim(f *framework.F) { + t := f.T() + require := require.New(t) + nomadClient := tc.Nomad() + tc.uuid = uuid.Generate()[0:8] + + // deploy the node plugins job (no need for a controller for EFS) + nodesJobID := "aws-efs-plugin-nodes-" + tc.uuid + f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-efs-nodes.nomad")) + tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID) + + f.NoError(e2e.WaitForAllocStatusComparison( + func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) }, + func(got []string) bool { + for _, status := range got { + if status != "running" { + return false + } + } + return true + }, nil, + )) + + f.NoError(waitForPluginStatusMinNodeCount(efsPluginID, 2, pluginWait), + "aws-efs0 node plugins did not become healthy") + + // register a volume + volID := "efs-vol0" + err := volumeRegister(volID, "csi/input/volume-efs.hcl", "register") + require.NoError(err) + tc.volumeIDs = append(tc.volumeIDs, volID) + + // deploy a job that writes to the volume + writeJobID := "write-efs-" + tc.uuid + tc.testJobIDs = append(tc.testJobIDs, writeJobID) // ensure failed tests clean up + f.NoError(e2e.Register(writeJobID, "csi/input/use-efs-volume-write.nomad")) + f.NoError( + e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}), + "job should be running") + + allocs, err := e2e.AllocsForJob(writeJobID, ns) + f.NoError(err, "could not get allocs for write job") + f.Len(allocs, 1, "could not get allocs for write job") + writeAllocID := allocs[0]["ID"] + + // read data from volume and assert the writer wrote a file to it + expectedPath := "/task/test/" + writeAllocID + _, err = readFile(nomadClient, writeAllocID, expectedPath) + require.NoError(err) + + // Shutdown the writer so we can run a reader. + // although EFS should support multiple readers, the plugin + // does not. + _, err = e2e.Command("nomad", "job", "stop", writeJobID) + require.NoError(err) + + // wait for the volume unpublish workflow to complete + require.NoError(waitForVolumeClaimRelease(volID, reapWait), + "write-efs alloc claim was not released") + + // deploy a job that reads from the volume + readJobID := "read-efs-" + tc.uuid + tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up + f.NoError(e2e.Register(readJobID, "csi/input/use-efs-volume-read.nomad")) + f.NoError( + e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}), + "job should be running") + + allocs, err = e2e.AllocsForJob(readJobID, ns) + f.NoError(err, "could not get allocs for read job") + f.Len(allocs, 1, "could not get allocs for read job") + readAllocID := allocs[0]["ID"] + + // read data from volume and assert the writer wrote a file to it + require.NoError(err) + _, err = readFile(nomadClient, readAllocID, expectedPath) + require.NoError(err) +} + +func (tc *CSINodeOnlyPluginEFSTest) AfterEach(f *framework.F) { + + // Stop all jobs in test + for _, id := range tc.testJobIDs { + out, err := e2e.Command("nomad", "job", "stop", "-purge", id) + f.Assert().NoError(err, out) + } + tc.testJobIDs = []string{} + + // Deregister all volumes in test + for _, id := range tc.volumeIDs { + // make sure all the test jobs have finished unpublishing claims + err := waitForVolumeClaimRelease(id, reapWait) + f.Assert().NoError(err, "volume claims were not released") + + out, err := e2e.Command("nomad", "volume", "deregister", id) + assertNoErrorElseDump(f, err, + fmt.Sprintf("could not deregister volume:\n%v", out), tc.pluginJobIDs) + } + tc.volumeIDs = []string{} + + // Deregister all plugin jobs in test + for _, id := range tc.pluginJobIDs { + out, err := e2e.Command("nomad", "job", "stop", "-purge", id) + f.Assert().NoError(err, out) + } + tc.pluginJobIDs = []string{} + + // Garbage collect + out, err := e2e.Command("nomad", "system", "gc") + f.Assert().NoError(err, out) +} diff --git a/e2e/csi/input/ebs-volume0.hcl b/e2e/csi/input/ebs-volume0.hcl new file mode 100644 index 00000000000..bf961efedd6 --- /dev/null +++ b/e2e/csi/input/ebs-volume0.hcl @@ -0,0 +1,21 @@ +id = "ebs-vol[0]" +name = "this-is-a-test-0" # CSIVolumeName tag +type = "csi" +plugin_id = "aws-ebs0" + +capacity_min = "10GiB" +capacity_max = "20GiB" + +capability { + access_mode = "single-node-writer" + attachment_mode = "file-system" +} + +capability { + access_mode = "single-node-writer" + attachment_mode = "block-device" +} + +parameters { + type = "gp2" +} diff --git a/e2e/csi/input/ebs-volume1.hcl b/e2e/csi/input/ebs-volume1.hcl new file mode 100644 index 00000000000..df38b903472 --- /dev/null +++ b/e2e/csi/input/ebs-volume1.hcl @@ -0,0 +1,21 @@ +id = "ebs-vol[1]" +name = "this-is-a-test-1" # CSIVolumeName tag +type = "csi" +plugin_id = "aws-ebs0" + +capacity_min = "10GiB" +capacity_max = "20GiB" + +capability { + access_mode = "single-node-writer" + attachment_mode = "file-system" +} + +capability { + access_mode = "single-node-writer" + attachment_mode = "block-device" +} + +parameters { + type = "gp2" +} diff --git a/e2e/csi/input/use-ebs-volume.nomad b/e2e/csi/input/use-ebs-volume.nomad index 1f30f6f6efe..586a5f8dbf1 100644 --- a/e2e/csi/input/use-ebs-volume.nomad +++ b/e2e/csi/input/use-ebs-volume.nomad @@ -9,9 +9,11 @@ job "use-ebs-volume" { group "group" { volume "test" { - type = "csi" - source = "ebs-vol" - per_alloc = true + type = "csi" + source = "ebs-vol" + attachment_mode = "file-system" + access_mode = "single-node-writer" + per_alloc = true } task "task" { diff --git a/e2e/csi/input/use-efs-volume-read.nomad b/e2e/csi/input/use-efs-volume-read.nomad index d8210d84dde..a4bd78bbb32 100644 --- a/e2e/csi/input/use-efs-volume-read.nomad +++ b/e2e/csi/input/use-efs-volume-read.nomad @@ -9,9 +9,12 @@ job "use-efs-volume" { } group "group" { + volume "test" { - type = "csi" - source = "efs-vol0" + type = "csi" + source = "efs-vol0" + attachment_mode = "file-system" + access_mode = "single-node-writer" } task "task" { diff --git a/e2e/csi/input/use-efs-volume-write.nomad b/e2e/csi/input/use-efs-volume-write.nomad index b9e4a55df9d..10df5ee9186 100644 --- a/e2e/csi/input/use-efs-volume-write.nomad +++ b/e2e/csi/input/use-efs-volume-write.nomad @@ -8,9 +8,12 @@ job "use-efs-volume" { } group "group" { + volume "test" { - type = "csi" - source = "efs-vol0" + type = "csi" + source = "efs-vol0" + attachment_mode = "file-system" + access_mode = "single-node-writer" } task "task" { diff --git a/e2e/terraform/volumes.tf b/e2e/terraform/volumes.tf index 572241c9c93..519765c2f26 100644 --- a/e2e/terraform/volumes.tf +++ b/e2e/terraform/volumes.tf @@ -15,30 +15,6 @@ resource "aws_efs_mount_target" "csi" { security_groups = [aws_security_group.nfs[0].id] } -resource "aws_ebs_volume" "csi" { - count = var.volumes ? 1 : 0 - availability_zone = var.availability_zone - size = 40 - - tags = { - Name = "${local.random_name}-ebs" - User = data.aws_caller_identity.current.arn - } -} - -data "template_file" "ebs_volume_hcl" { - count = var.volumes ? 1 : 0 - template = <