From 3629f572de47ea763a287acacd5750ad5f0361a0 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 3 Mar 2022 21:24:39 +0000 Subject: [PATCH 1/4] backport of commit d725970cb0fb7ec6576c2ef0b4cf044b9f3d368e --- nomad/csi_endpoint_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 0d06a85910a..bb6156ff5f6 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -735,6 +735,22 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(1001), resp.Index) require.Len(t, resp.Volumes, len(vols)) + + // Lookup volumes in all namespaces with prefix + get = &structs.CSIVolumeListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Prefix: id0[:4], + Namespace: "*", + }, + } + var resp2 structs.CSIVolumeListResponse + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.List", get, &resp2) + require.NoError(t, err) + require.Equal(t, uint64(1001), resp.Index) + require.Len(t, resp2.Volumes, 1) + require.Equal(t, vols[0].ID, resp2.Volumes[0].ID) + require.Equal(t, structs.DefaultNamespace, resp2.Volumes[0].Namespace) } func TestCSIVolumeEndpoint_Create(t *testing.T) { From 07b71a98a6568ca6285154bdbfb4a1d8c001a5a6 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 3 Mar 2022 21:25:18 +0000 Subject: [PATCH 2/4] backport of commit dfda28a11acc5966d6f8246e9dc91ad2c454c274 --- nomad/csi_endpoint.go | 8 +++++--- nomad/state/state_store.go | 31 ++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 8885ff68905..552d65a6eab 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -129,10 +129,12 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID) } else if args.PluginID != "" { iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID) - } else if ns == structs.AllNamespacesSentinel { - iter, err = snap.CSIVolumes(ws) - } else { + } else if prefix != "" { + iter, err = snap.CSIVolumesByIDPrefix(ws, ns, prefix) + } else if ns != structs.AllNamespacesSentinel { iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix) + } else { + iter, err = snap.CSIVolumes(ws) } if err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index fd28594cc26..edae5ad7662 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2225,8 +2225,13 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, } // CSIVolumesByIDPrefix supports search. Caller should snapshot if it wants to -// also denormalize the plugins. +// also denormalize the plugins. If using a prefix, the results will not use an +// index. func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) { + if namespace == structs.AllNamespacesSentinel { + return s.csiVolumeByIDPrefixAllNamespaces(ws, volumeID) + } + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID) @@ -2239,6 +2244,30 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID return iter, nil } +func (s *StateStore) csiVolumeByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + // Walk the entire csi_volumes table + iter, err := txn.Get("csi_volumes", "id") + + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + // Filter the iterator by ID prefix + f := func(raw interface{}) bool { + v, ok := raw.(*structs.CSIVolume) + if !ok { + return false + } + return !strings.HasPrefix(v.ID, prefix) + } + wrap := memdb.NewFilterIterator(iter, f) + return wrap, nil +} + // CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should // snapshot if it wants to also denormalize the plugins. func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) { From c0ca29a0d47fbdfb0b8da95ba33959e24db69bd8 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 3 Mar 2022 21:31:09 +0000 Subject: [PATCH 3/4] backport of commit e75906306c0772108b4b59065ad08f90509ad85b --- .changelog/12184.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/12184.txt diff --git a/.changelog/12184.txt b/.changelog/12184.txt new file mode 100644 index 00000000000..e885a9ad572 --- /dev/null +++ b/.changelog/12184.txt @@ -0,0 +1,3 @@ +```release-note:bug +api: Apply prefix filter when querying CSI volumes in all namespaces +``` From 507afd4bfec964ffc6c8ec2ca75a9a0f5132f9a5 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 3 Mar 2022 22:14:36 +0000 Subject: [PATCH 4/4] backport of commit f7bd72b09c4881c31ffe66ac66dc975335a26ac8 --- nomad/state/state_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index edae5ad7662..2b0f9023e01 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2225,8 +2225,8 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, } // CSIVolumesByIDPrefix supports search. Caller should snapshot if it wants to -// also denormalize the plugins. If using a prefix, the results will not use an -// index. +// also denormalize the plugins. If using a prefix with the wildcard namespace, +// the results will not use the index prefix. func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) { if namespace == structs.AllNamespacesSentinel { return s.csiVolumeByIDPrefixAllNamespaces(ws, volumeID)