-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Priority Queue library to sdk (#6664)
* Add priority queue to sdk * fix issue of storing pointers and now copy * update to use copy structure * Remove file, put Item struct def. into other file * add link * clean up docs * refactor internal data structure to hide heap method implementations. Other cleanup after feedback * rename PushItem and PopItem to just Push/Pop, after encapsulating the heap methods * updates after feedback * refactoring/renaming * guard against pushing a nil item * minor updates after feedback * Add read lock to the Len() method and move the interface check into the test file * fix a deadlock * make the mutex a RWMutex, and make it private again * nil check itemRaw before trying to type cast it
- Loading branch information
Showing
3 changed files
with
411 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
Vault SDK - Queue | ||
================= | ||
|
||
The `queue` package provides Vault plugins with a Priority Queue. It can be used | ||
as an in-memory list of `queue.Item` sorted by their `priority`, and offers | ||
methods to find or remove items by their key. Internally it | ||
uses `container/heap`; see [Example Priority | ||
Queue](https://golang.org/pkg/container/heap/#example__priorityQueue) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
// Package queue provides Vault plugins with a Priority Queue. It can be used | ||
// as an in-memory list of queue.Item sorted by their priority, and offers | ||
// methods to find or remove items by their key. Internally it uses | ||
// container/heap; see Example Priority Queue: | ||
// https://golang.org/pkg/container/heap/#example__priorityQueue | ||
package queue | ||
|
||
import ( | ||
"container/heap" | ||
"errors" | ||
"sync" | ||
|
||
"github.com/mitchellh/copystructure" | ||
) | ||
|
||
// ErrEmpty is returned for queues with no items | ||
var ErrEmpty = errors.New("queue is empty") | ||
|
||
// ErrDuplicateItem is returned when the queue attmepts to push an item to a key that | ||
// already exists. The queue does not attempt to update, instead returns this | ||
// error. If an Item needs to be updated or replaced, pop the item first. | ||
var ErrDuplicateItem = errors.New("duplicate item") | ||
|
||
// New initializes the internal data structures and returns a new | ||
// PriorityQueue | ||
func New() *PriorityQueue { | ||
pq := PriorityQueue{ | ||
data: make(queue, 0), | ||
dataMap: make(map[string]*Item), | ||
} | ||
heap.Init(&pq.data) | ||
return &pq | ||
} | ||
|
||
// PriorityQueue facilitates queue of Items, providing Push, Pop, and | ||
// PopByKey convenience methods. The ordering (priority) is an int64 value | ||
// with the smallest value is the highest priority. PriorityQueue maintains both | ||
// an internal slice for the queue as well as a map of the same items with their | ||
// keys as the index. This enables users to find specific items by key. The map | ||
// must be kept in sync with the data slice. | ||
// See https://golang.org/pkg/container/heap/#example__priorityQueue | ||
type PriorityQueue struct { | ||
// data is the internal structure that holds the queue, and is operated on by | ||
// heap functions | ||
data queue | ||
|
||
// dataMap represents all the items in the queue, with unique indexes, used | ||
// for finding specific items. dataMap is kept in sync with the data slice | ||
dataMap map[string]*Item | ||
|
||
// lock is a read/write mutex, and used to facilitate read/write locks on the | ||
// data and dataMap fields | ||
lock sync.RWMutex | ||
} | ||
|
||
// queue is the internal data structure used to satisfy heap.Interface. This | ||
// prevents users from calling Pop and Push heap methods directly | ||
type queue []*Item | ||
|
||
// Item is something managed in the priority queue | ||
type Item struct { | ||
// Key is a unique string used to identify items in the internal data map | ||
Key string | ||
// Value is an unspecified type that implementations can use to store | ||
// information | ||
Value interface{} | ||
|
||
// Priority determines ordering in the queue, with the lowest value being the | ||
// highest priority | ||
Priority int64 | ||
|
||
// index is an internal value used by the heap package, and should not be | ||
// modified by any consumer of the priority queue | ||
index int | ||
} | ||
|
||
// Len returns the count of items in the Priority Queue | ||
func (pq *PriorityQueue) Len() int { | ||
pq.lock.RLock() | ||
defer pq.lock.RUnlock() | ||
return pq.data.Len() | ||
} | ||
|
||
// Pop pops the highest priority item from the queue. This is a | ||
// wrapper/convenience method that calls heap.Pop, so consumers do not need to | ||
// invoke heap functions directly | ||
func (pq *PriorityQueue) Pop() (*Item, error) { | ||
if pq.Len() == 0 { | ||
return nil, ErrEmpty | ||
} | ||
|
||
pq.lock.Lock() | ||
defer pq.lock.Unlock() | ||
|
||
item := heap.Pop(&pq.data).(*Item) | ||
delete(pq.dataMap, item.Key) | ||
return item, nil | ||
} | ||
|
||
// Push pushes an item on to the queue. This is a wrapper/convenience | ||
// method that calls heap.Push, so consumers do not need to invoke heap | ||
// functions directly. Items must have unique Keys, and Items in the queue | ||
// cannot be updated. To modify an Item, users must first remove it and re-push | ||
// it after modifications | ||
func (pq *PriorityQueue) Push(i *Item) error { | ||
if i == nil || i.Key == "" { | ||
return errors.New("error adding item: Item Key is required") | ||
} | ||
|
||
pq.lock.Lock() | ||
defer pq.lock.Unlock() | ||
|
||
if _, ok := pq.dataMap[i.Key]; ok { | ||
return ErrDuplicateItem | ||
} | ||
// copy the item value(s) so that modifications to the source item does not | ||
// affect the item on the queue | ||
clone, err := copystructure.Copy(i) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
pq.dataMap[i.Key] = clone.(*Item) | ||
heap.Push(&pq.data, clone) | ||
return nil | ||
} | ||
|
||
// PopByKey searches the queue for an item with the given key and removes it | ||
// from the queue if found. Returns ErrItemNotFound(key) if not found. This | ||
// method must fix the queue after removal. | ||
func (pq *PriorityQueue) PopByKey(key string) (*Item, error) { | ||
pq.lock.Lock() | ||
defer pq.lock.Unlock() | ||
|
||
item, ok := pq.dataMap[key] | ||
if !ok { | ||
return nil, nil | ||
} | ||
|
||
// remove the item the heap and delete it from the dataMap | ||
itemRaw := heap.Remove(&pq.data, item.index) | ||
delete(pq.dataMap, key) | ||
|
||
if itemRaw != nil { | ||
if i, ok := itemRaw.(*Item); ok { | ||
return i, nil | ||
} | ||
} | ||
|
||
return nil, nil | ||
} | ||
|
||
// Len returns the number of items in the queue data structure. Do not use this | ||
// method directly on the queue, use PriorityQueue.Len() instead. | ||
func (q queue) Len() int { return len(q) } | ||
|
||
// Less returns whether the Item with index i should sort before the Item with | ||
// index j in the queue. This method is used by the queue to determine priority | ||
// internally; the Item with the lower value wins. (priority zero is higher | ||
// priority than 1). The priority of Items with equal values is undetermined. | ||
func (q queue) Less(i, j int) bool { | ||
return q[i].Priority < q[j].Priority | ||
} | ||
|
||
// Swap swaps things in-place; part of sort.Interface | ||
func (q queue) Swap(i, j int) { | ||
q[i], q[j] = q[j], q[i] | ||
q[i].index = i | ||
q[j].index = j | ||
} | ||
|
||
// Push is used by heap.Interface to push items onto the heap. This method is | ||
// invoked by container/heap, and should not be used directly. | ||
// See: https://golang.org/pkg/container/heap/#Interface | ||
func (q *queue) Push(x interface{}) { | ||
n := len(*q) | ||
item := x.(*Item) | ||
item.index = n | ||
*q = append(*q, item) | ||
} | ||
|
||
// Pop is used by heap.Interface to pop items off of the heap. This method is | ||
// invoked by container/heap, and should not be used directly. | ||
// See: https://golang.org/pkg/container/heap/#Interface | ||
func (q *queue) Pop() interface{} { | ||
old := *q | ||
n := len(old) | ||
item := old[n-1] | ||
old[n-1] = nil // avoid memory leak | ||
item.index = -1 // for safety | ||
*q = old[0 : n-1] | ||
return item | ||
} |
Oops, something went wrong.