Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: resolve invalid claim states #11890

Merged
merged 2 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/11890.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where garbage collected allocations could block new claims on a volume
```
4 changes: 4 additions & 0 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,10 @@ NEXT_VOLUME:
continue
}

// TODO(tgross): consider moving the TerminalStatus check into
// the denormalize volume logic so that we can just check the
// volume for past claims

// we only call the claim release RPC if the volume has claims
// that no longer have valid allocations. otherwise we'd send
// out a lot of do-nothing RPCs.
Expand Down
55 changes: 50 additions & 5 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,19 +2383,64 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

// the volumewatcher will hit an error here because there's no
// path to the node. but we can't update the claim to bypass the
// client RPCs without triggering the volumewatcher's normal code
// path.
// TODO(tgross): the condition below means this test doesn't tell
// us much; ideally we should be intercepting the claim request
// and verifying that we send the expected claims but we don't
// have test infra in place to do that for server RPCs

// sending the GC claim will trigger the volumewatcher's normal
// code path. but the volumewatcher will hit an error here
// because there's no path to the node, so we shouldn't see
// the WriteClaims removed
require.Eventually(func() bool {
vol, _ := state.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 0
len(vol.PastClaims) == 1
}, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly")

}

func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) {
t.Parallel()
require := require.New(t)

srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})

defer shutdown()
testutil.WaitForLeader(t, srv.RPC)

err := state.TestBadCSIState(t, srv.State())
require.NoError(err)

snap, err := srv.State().Snapshot()
require.NoError(err)
core := NewCoreScheduler(srv, snap)

index, _ := srv.State().LatestIndex()
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

require.Eventually(func() bool {
vol, _ := srv.State().CSIVolumeByID(nil,
structs.DefaultNamespace, "csi-volume-nfs0")
if len(vol.PastClaims) != 2 {
return false
}
for _, claim := range vol.PastClaims {
if claim.State != structs.CSIVolumeClaimStateUnpublishing {
return false
}
}
return true
}, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC")

}

func TestCoreScheduler_FailLoop(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
26 changes: 26 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2513,6 +2513,18 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st
State: structs.CSIVolumeClaimStateTaken,
}
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims
vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimRead,
State: structs.CSIVolumeClaimStateUnpublishing,
}
readClaim := vol.ReadClaims[id]
if readClaim != nil {
vol.PastClaims[id].NodeID = readClaim.NodeID
}
}
}

Expand All @@ -2531,6 +2543,20 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st
State: structs.CSIVolumeClaimStateTaken,
}
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims

vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateUnpublishing,
}
writeClaim := vol.WriteClaims[id]
if writeClaim != nil {
vol.PastClaims[id].NodeID = writeClaim.NodeID
}

}
}

Expand Down
187 changes: 187 additions & 0 deletions nomad/state/testing.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package state

import (
"math"
"testing"
"time"

"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -124,3 +126,188 @@ func createTestCSIPlugin(s *StateStore, id string, requiresController bool) func
s.DeleteNode(structs.MsgTypeTestSetup, index, ids)
}
}

func TestBadCSIState(t testing.TB, store *StateStore) error {

pluginID := "org.democratic-csi.nfs"

controllerInfo := func(isHealthy bool) map[string]*structs.CSIInfo {
desc := "healthy"
if !isHealthy {
desc = "failed fingerprinting with error"
}
return map[string]*structs.CSIInfo{
pluginID: {
PluginID: pluginID,
AllocID: uuid.Generate(),
Healthy: isHealthy,
HealthDescription: desc,
RequiresControllerPlugin: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsAttachDetach: true,
},
},
}
}

nodeInfo := func(nodeName string, isHealthy bool) map[string]*structs.CSIInfo {
desc := "healthy"
if !isHealthy {
desc = "failed fingerprinting with error"
}
return map[string]*structs.CSIInfo{
pluginID: {
PluginID: pluginID,
AllocID: uuid.Generate(),
Healthy: isHealthy,
HealthDescription: desc,
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{
ID: nodeName,
MaxVolumes: math.MaxInt64,
RequiresNodeStageVolume: true,
},
},
}
}

nodes := make([]*structs.Node, 3)
for i := range nodes {
n := mock.Node()
n.Attributes["nomad.version"] = "1.2.4"
nodes[i] = n
}

nodes[0].CSIControllerPlugins = controllerInfo(true)
nodes[0].CSINodePlugins = nodeInfo("nomad-client0", true)

drainID := uuid.Generate()

// drained node
nodes[1].CSIControllerPlugins = controllerInfo(false)
nodes[1].CSINodePlugins = nodeInfo("nomad-client1", false)

nodes[1].LastDrain = &structs.DrainMetadata{
StartedAt: time.Now().Add(-10 * time.Minute),
UpdatedAt: time.Now().Add(-30 * time.Second),
Status: structs.DrainStatusComplete,
AccessorID: drainID,
}
nodes[1].SchedulingEligibility = structs.NodeSchedulingIneligible

// previously drained but now eligible
nodes[2].CSIControllerPlugins = controllerInfo(true)
nodes[2].CSINodePlugins = nodeInfo("nomad-client2", true)
nodes[2].LastDrain = &structs.DrainMetadata{
StartedAt: time.Now().Add(-15 * time.Minute),
UpdatedAt: time.Now().Add(-5 * time.Minute),
Status: structs.DrainStatusComplete,
AccessorID: drainID,
}
nodes[2].SchedulingEligibility = structs.NodeSchedulingEligible

// Insert nodes into the state store
index := uint64(999)
for _, n := range nodes {
index++
err := store.UpsertNode(structs.MsgTypeTestSetup, index, n)
if err != nil {
return err
}
}

allocID0 := uuid.Generate() // nil alloc
allocID2 := uuid.Generate() // nil alloc

alloc1 := mock.Alloc()
alloc1.ClientStatus = "complete"
alloc1.DesiredStatus = "stop"

// Insert allocs into the state store
err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
if err != nil {
return err
}

vol := &structs.CSIVolume{
ID: "csi-volume-nfs0",
Name: "csi-volume-nfs0",
ExternalID: "csi-volume-nfs0",
Namespace: "default",
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{
MountFlags: []string{"noatime"},
},
Context: map[string]string{
"node_attach_driver": "nfs",
"provisioner_driver": "nfs-client",
"server": "192.168.56.69",
},
Capacity: 0,
RequestedCapacityMin: 107374182,
RequestedCapacityMax: 107374182,
RequestedCapabilities: []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
},
WriteAllocs: map[string]*structs.Allocation{
allocID0: nil,
alloc1.ID: nil,
allocID2: nil,
},
WriteClaims: map[string]*structs.CSIVolumeClaim{
allocID0: {
AllocationID: allocID0,
NodeID: nodes[0].ID,
Mode: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
},
alloc1.ID: {
AllocationID: alloc1.ID,
NodeID: nodes[1].ID,
Mode: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
},
allocID2: {
AllocationID: allocID2,
NodeID: nodes[2].ID,
Mode: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
},
},
Schedulable: true,
PluginID: pluginID,
Provider: pluginID,
ProviderVersion: "1.4.3",
ControllerRequired: true,
ControllersHealthy: 2,
ControllersExpected: 2,
NodesHealthy: 2,
NodesExpected: 0,
}

err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
if err != nil {
return err
}

return nil
}
29 changes: 29 additions & 0 deletions nomad/volumewatcher/volume_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,32 @@ func TestVolumeWatch_Reap(t *testing.T) {
require.NoError(err)
require.Len(vol.PastClaims, 2) // alloc claim + GC claim
}

func TestVolumeReapBadState(t *testing.T) {

store := state.TestStateStore(t)
err := state.TestBadCSIState(t, store)
require.NoError(t, err)
srv := &MockRPCServer{
state: store,
}

vol, err := srv.state.CSIVolumeByID(nil,
structs.DefaultNamespace, "csi-volume-nfs0")
require.NoError(t, err)
srv.state.CSIVolumeDenormalize(nil, vol)

ctx, exitFn := context.WithCancel(context.Background())
w := &volumeWatcher{
v: vol,
rpc: srv,
state: srv.State(),
ctx: ctx,
exitFn: exitFn,
logger: testlog.HCLogger(t),
}

err = w.volumeReapImpl(vol)
require.NoError(t, err)
require.Equal(t, 2, srv.countCSIUnpublish)
}
7 changes: 6 additions & 1 deletion nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,15 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID)

index++
err := srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{alloc})
require.NoError(err)

watcher.SetEnabled(true, srv.State(), "")

index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(err)

// we should get or start up a watcher when we get an update for
Expand Down
Loading