Skip to content

Commit

Permalink
dynamic host volumes: create/register RPC validation
Browse files Browse the repository at this point in the history
Add several validation steps in the create/register RPCs for dynamic host
volumes. We first check that submitted volumes are self-consistent (ex. max
capacity is more than min capacity), then that any updates we've made are
valid. And we validate against state: preventing claimed volumes from being
updated and preventing placement requests for nodes that don't exist.

Ref: #15489
  • Loading branch information
tgross committed Dec 9, 2024
1 parent 51dd0b2 commit a33d82a
Show file tree
Hide file tree
Showing 9 changed files with 535 additions and 47 deletions.
2 changes: 1 addition & 1 deletion command/agent/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestHostVolumeEndpoint_CRUD(t *testing.T) {

// Create a volume on the test node

vol := mock.HostVolumeRequest()
vol := mock.HostVolumeRequest(structs.DefaultNamespace)
reqBody := struct {
Volumes []*structs.HostVolume
}{Volumes: []*structs.HostVolume{vol}}
Expand Down
2 changes: 1 addition & 1 deletion command/volume_create_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ capability {
}
capability {
access_mode = "single-node-reader"
access_mode = "single-node-reader-only"
attachment_mode = "block-device"
}
Expand Down
4 changes: 4 additions & 0 deletions command/volume_delete_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type = "host"
plugin_id = "plugin_id"
node_id = "%s"
node_pool = "default"
capability {
access_mode = "single-node-reader-only"
attachment_mode = "file-system"
}
`, nodeID)

file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl")
Expand Down
8 changes: 8 additions & 0 deletions command/volume_status_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type = "host"
plugin_id = "plugin_id"
node_id = "%s"
node_pool = "default"
capability {
access_mode = "single-node-reader-only"
attachment_mode = "file-system"
}
`, vol.Namespace, vol.ID, nodeID)

file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl")
Expand Down Expand Up @@ -114,6 +118,10 @@ type = "host"
plugin_id = "plugin_id"
node_id = "%s"
node_pool = "default"
capability {
access_mode = "single-node-reader-only"
attachment_mode = "file-system"
}
`, nodeID)

file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl")
Expand Down
87 changes: 70 additions & 17 deletions nomad/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -232,7 +231,7 @@ func (v *HostVolume) Create(args *structs.HostVolumeCreateRequest, reply *struct
// volumes
validVols, err := v.validateVolumeUpdates(args.Volumes)
if err != nil {
return err
return helper.FlattenMultierror(err)
}

// Attempt to create all the validated volumes and write only successfully
Expand Down Expand Up @@ -310,7 +309,7 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st
// volumes
validVols, err := v.validateVolumeUpdates(args.Volumes)
if err != nil {
return err
return helper.FlattenMultierror(err)
}

raftArgs := &structs.HostVolumeRegisterRequest{
Expand All @@ -335,7 +334,7 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st

func (v *HostVolume) validateVolumeUpdates(requested []*structs.HostVolume) ([]*structs.HostVolume, error) {

now := time.Now().UnixNano()
now := time.Now()
var vols []*structs.HostVolume

snap, err := v.srv.State().Snapshot()
Expand All @@ -345,31 +344,85 @@ func (v *HostVolume) validateVolumeUpdates(requested []*structs.HostVolume) ([]*

var mErr *multierror.Error
for _, vol := range requested {
vol.ModifyTime = now

if vol.ID == "" {
vol.ID = uuid.Generate()
vol.CreateTime = now
}

// if the volume already exists, we'll ensure we're validating the
// update
current, err := snap.HostVolumeByID(nil, vol.Namespace, vol.ID, false)
// validate the volume spec
err := vol.Validate()
if err != nil {
mErr = multierror.Append(mErr, err)
mErr = multierror.Append(mErr, fmt.Errorf("volume validation failed: %v", err))
continue
}
if err = vol.Validate(current); err != nil {
mErr = multierror.Append(mErr, err)

// validate any update we're making
var existing *structs.HostVolume
volID := vol.ID
if vol.ID != "" {
existing, err = snap.HostVolumeByID(nil, vol.Namespace, vol.ID, true)
if err != nil {
return nil, err // should never hit, bail out
}
if existing == nil {
mErr = multierror.Append(mErr,
fmt.Errorf("cannot update volume %q: volume does not exist", vol.ID))
continue
}
err = vol.ValidateUpdate(existing)
if err != nil {
mErr = multierror.Append(mErr,
fmt.Errorf("validating volume %q update failed: %v", vol.ID, err))
continue
}
} else {
// capture this for nicer error messages later
volID = vol.Name
}

// set zero values as needed, possibly from existing
vol.CanonicalizeForUpdate(existing, now)

// make sure any nodes or pools actually exist
err = v.validateVolumeForState(vol, snap)
if err != nil {
mErr = multierror.Append(mErr,
fmt.Errorf("validating volume %q against state failed: %v", volID, err))
continue
}

vols = append(vols, vol.Copy())
vols = append(vols, vol)
}

return vols, mErr.ErrorOrNil()
}

// validateVolumeForState ensures that any references to node IDs or node pools are valid
func (v *HostVolume) validateVolumeForState(vol *structs.HostVolume, snap *state.StateSnapshot) error {
var poolFromExistingNode string
if vol.NodeID != "" {
node, err := snap.NodeByID(nil, vol.NodeID)
if err != nil {
return err // should never hit, bail out
}
if node == nil {
return fmt.Errorf("node %q does not exist", vol.NodeID)
}
poolFromExistingNode = node.NodePool
}

if vol.NodePool != "" {
pool, err := snap.NodePoolByName(nil, vol.NodePool)
if err != nil {
return err // should never hit, bail out
}
if pool == nil {
return fmt.Errorf("node pool %q does not exist", vol.NodePool)
}
if poolFromExistingNode != "" && poolFromExistingNode != pool.Name {
return fmt.Errorf("node ID %q is not in pool %q", vol.NodeID, vol.NodePool)
}
}

return nil
}

func (v *HostVolume) createVolume(vol *structs.HostVolume) error {

// TODO(1.10.0): proper node selection based on constraints and node
Expand Down
144 changes: 125 additions & 19 deletions nomad/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -74,13 +75,62 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {

t.Run("invalid create", func(t *testing.T) {

// TODO(1.10.0): once validation logic for updating an existing volume is in
// place, fully test it here

req.Namespace = ns
var resp structs.HostVolumeCreateResponse
err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp)
must.EqError(t, err, "missing volume definition")

req.Volumes = []*structs.HostVolume{
{}, // missing basic fields
{
Name: "example",
PluginID: "example_plugin",
Constraints: []*structs.Constraint{{
RTarget: "r1",
Operand: "=",
}},
RequestedCapacityMinBytes: 200000,
RequestedCapacityMaxBytes: 100000,
RequestedCapabilities: []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
{
AttachmentMode: "bad",
AccessMode: "invalid",
},
},
}, // fails other field validations
}
err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp)
// TODO(1.10.0): nested multierrors are really ugly, we could really use
// some helper functions to make these nicer everywhere they pop up
must.EqError(t, err, `2 errors occurred:
* volume validation failed: 2 errors occurred:
* missing name
* must include at least one capability block
* volume validation failed: 3 errors occurred:
* capacity_max (100000) must be larger than capacity_min (200000)
* invalid attachment mode: "bad"
* invalid constraint: 1 error occurred:
* No LTarget provided but is required by constraint
`)

invalidNode := &structs.Node{ID: uuid.Generate(), NodePool: "does-not-exist"}
volOnInvalidNode := mock.HostVolumeRequestForNode(ns, invalidNode)
req.Volumes = []*structs.HostVolume{volOnInvalidNode}
err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp)
must.EqError(t, err, fmt.Sprintf(
`validating volume "example" against state failed: node %q does not exist`,
invalidNode.ID))
})

var vol1ID, vol2ID string
Expand All @@ -91,12 +141,10 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
CapacityBytes: 150000,
}, nil)

vol1 := mock.HostVolumeRequest()
vol1.Namespace = "apps"
vol1 := mock.HostVolumeRequest("apps")
vol1.Name = "example1"
vol1.NodePool = "prod"
vol2 := mock.HostVolumeRequest()
vol2.Namespace = "apps"
vol2 := mock.HostVolumeRequest("apps")
vol2.Name = "example2"
vol2.NodePool = "prod"
req.Volumes = []*structs.HostVolume{vol1, vol2}
Expand Down Expand Up @@ -136,6 +184,50 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
must.NotNil(t, getResp.Volume)
})

t.Run("invalid updates", func(t *testing.T) {

vol1, err := store.HostVolumeByID(nil, ns, vol1ID, false)
must.NoError(t, err)
must.NotNil(t, vol1)
invalidVol1 := vol1.Copy()
invalidVol2 := &structs.HostVolume{}

createReq := &structs.HostVolumeCreateRequest{
Volumes: []*structs.HostVolume{invalidVol1, invalidVol2},
WriteRequest: structs.WriteRequest{
Region: srv.Region(),
Namespace: ns,
AuthToken: token},
}
c1.setCreate(nil, errors.New("should not call this endpoint on invalid RPCs"))
var createResp structs.HostVolumeCreateResponse
err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", createReq, &createResp)
must.EqError(t, err, `volume validation failed: 2 errors occurred:
* missing name
* must include at least one capability block
`, must.Sprint("initial validation failures should exit early even if there's another valid vol"))

invalidVol1.NodeID = uuid.Generate()
invalidVol1.RequestedCapacityMinBytes = 100
invalidVol1.RequestedCapacityMaxBytes = 200
registerReq := &structs.HostVolumeRegisterRequest{
Volumes: []*structs.HostVolume{invalidVol1},
WriteRequest: structs.WriteRequest{
Region: srv.Region(),
Namespace: ns,
AuthToken: token},
}
var registerResp structs.HostVolumeRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, &registerResp)
must.EqError(t, err, fmt.Sprintf(`validating volume %q update failed: 2 errors occurred:
* node ID cannot be updated
* capacity_max (200) cannot be less than existing provisioned capacity (150000)
`, invalidVol1.ID), must.Sprint("update validation checks should have failed"))

})

t.Run("blocking Get unblocks on write", func(t *testing.T) {
vol1, err := store.HostVolumeByID(nil, ns, vol1ID, false)
must.NoError(t, err)
Expand Down Expand Up @@ -308,25 +400,24 @@ func TestHostVolumeEndpoint_List(t *testing.T) {
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
index, nodes[2], state.NodeUpsertWithNodePool))

vol1, vol2 := mock.HostVolume(), mock.HostVolume()
vol1.NodeID = nodes[0].ID
vol1 := mock.HostVolumeRequestForNode(ns1, nodes[0])
vol1.Name = "foobar-example"
vol1.Namespace = ns1
vol2.NodeID = nodes[1].ID
vol1.Parameters = map[string]string{"mockID": "vol1"}

vol2 := mock.HostVolumeRequestForNode(ns1, nodes[1])
vol2.Name = "foobaz-example"
vol2.Namespace = ns1
vol2.Parameters = map[string]string{"mockID": "vol2"}

vol3, vol4 := mock.HostVolume(), mock.HostVolume()
vol3.NodeID = nodes[2].ID
vol3.NodePool = "prod"
vol3.Namespace = ns2
vol3 := mock.HostVolumeRequestForNode(ns2, nodes[2])
vol3.Name = "foobar-example"
vol4.Namespace = ns2
vol4.NodeID = nodes[1].ID
vol3.Parameters = map[string]string{"mockID": "vol3"}

vol4 := mock.HostVolumeRequestForNode(ns2, nodes[1])
vol4.Name = "foobaz-example"
vol4.Parameters = map[string]string{"mockID": "vol4"}

// we need to register these rather than upsert them so we have the correct
// indexes for unblocking later
// indexes for unblocking later.
registerReq := &structs.HostVolumeRegisterRequest{
Volumes: []*structs.HostVolume{vol1, vol2, vol3, vol4},
WriteRequest: structs.WriteRequest{
Expand All @@ -338,6 +429,21 @@ func TestHostVolumeEndpoint_List(t *testing.T) {
err := msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, &registerResp)
must.NoError(t, err)

// IDs are generated by the server, so we need to read them back to figure
// out which mock got which ID
for _, vol := range registerResp.Volumes {
switch vol.Parameters["mockID"] {
case "vol1":
vol1 = vol
case "vol2":
vol2 = vol
case "vol3":
vol3 = vol
case "vol4":
vol4 = vol
}
}

testCases := []struct {
name string
req *structs.HostVolumeListRequest
Expand Down
Loading

0 comments on commit a33d82a

Please sign in to comment.