diff --git a/sdk/queue/README.md b/sdk/queue/README.md new file mode 100644 index 000000000000..345db195d573 --- /dev/null +++ b/sdk/queue/README.md @@ -0,0 +1,9 @@ +Vault SDK - Queue +================= + +The `queue` package provides Vault plugins with a Priority Queue. It can be used +as an in-memory list of `queue.Item` sorted by their `priority`, and offers +methods to find or remove items by their key. Internally it +uses `container/heap`; see [Example Priority +Queue](https://golang.org/pkg/container/heap/#example__priorityQueue) + diff --git a/sdk/queue/priority_queue.go b/sdk/queue/priority_queue.go new file mode 100644 index 000000000000..22a45e5d15b9 --- /dev/null +++ b/sdk/queue/priority_queue.go @@ -0,0 +1,193 @@ +// Package queue provides Vault plugins with a Priority Queue. It can be used +// as an in-memory list of queue.Item sorted by their priority, and offers +// methods to find or remove items by their key. Internally it uses +// container/heap; see Example Priority Queue: +// https://golang.org/pkg/container/heap/#example__priorityQueue +package queue + +import ( + "container/heap" + "errors" + "sync" + + "github.com/mitchellh/copystructure" +) + +// ErrEmpty is returned for queues with no items +var ErrEmpty = errors.New("queue is empty") + +// ErrDuplicateItem is returned when the queue attmepts to push an item to a key that +// already exists. The queue does not attempt to update, instead returns this +// error. If an Item needs to be updated or replaced, pop the item first. +var ErrDuplicateItem = errors.New("duplicate item") + +// New initializes the internal data structures and returns a new +// PriorityQueue +func New() *PriorityQueue { + pq := PriorityQueue{ + data: make(queue, 0), + dataMap: make(map[string]*Item), + } + heap.Init(&pq.data) + return &pq +} + +// PriorityQueue facilitates queue of Items, providing Push, Pop, and +// PopByKey convenience methods. The ordering (priority) is an int64 value +// with the smallest value is the highest priority. PriorityQueue maintains both +// an internal slice for the queue as well as a map of the same items with their +// keys as the index. This enables users to find specific items by key. The map +// must be kept in sync with the data slice. +// See https://golang.org/pkg/container/heap/#example__priorityQueue +type PriorityQueue struct { + // data is the internal structure that holds the queue, and is operated on by + // heap functions + data queue + + // dataMap represents all the items in the queue, with unique indexes, used + // for finding specific items. dataMap is kept in sync with the data slice + dataMap map[string]*Item + + // lock is a read/write mutex, and used to facilitate read/write locks on the + // data and dataMap fields + lock sync.RWMutex +} + +// queue is the internal data structure used to satisfy heap.Interface. This +// prevents users from calling Pop and Push heap methods directly +type queue []*Item + +// Item is something managed in the priority queue +type Item struct { + // Key is a unique string used to identify items in the internal data map + Key string + // Value is an unspecified type that implementations can use to store + // information + Value interface{} + + // Priority determines ordering in the queue, with the lowest value being the + // highest priority + Priority int64 + + // index is an internal value used by the heap package, and should not be + // modified by any consumer of the priority queue + index int +} + +// Len returns the count of items in the Priority Queue +func (pq *PriorityQueue) Len() int { + pq.lock.RLock() + defer pq.lock.RUnlock() + return pq.data.Len() +} + +// Pop pops the highest priority item from the queue. This is a +// wrapper/convenience method that calls heap.Pop, so consumers do not need to +// invoke heap functions directly +func (pq *PriorityQueue) Pop() (*Item, error) { + if pq.Len() == 0 { + return nil, ErrEmpty + } + + pq.lock.Lock() + defer pq.lock.Unlock() + + item := heap.Pop(&pq.data).(*Item) + delete(pq.dataMap, item.Key) + return item, nil +} + +// Push pushes an item on to the queue. This is a wrapper/convenience +// method that calls heap.Push, so consumers do not need to invoke heap +// functions directly. Items must have unique Keys, and Items in the queue +// cannot be updated. To modify an Item, users must first remove it and re-push +// it after modifications +func (pq *PriorityQueue) Push(i *Item) error { + if i == nil || i.Key == "" { + return errors.New("error adding item: Item Key is required") + } + + pq.lock.Lock() + defer pq.lock.Unlock() + + if _, ok := pq.dataMap[i.Key]; ok { + return ErrDuplicateItem + } + // copy the item value(s) so that modifications to the source item does not + // affect the item on the queue + clone, err := copystructure.Copy(i) + if err != nil { + return err + } + + pq.dataMap[i.Key] = clone.(*Item) + heap.Push(&pq.data, clone) + return nil +} + +// PopByKey searches the queue for an item with the given key and removes it +// from the queue if found. Returns ErrItemNotFound(key) if not found. This +// method must fix the queue after removal. +func (pq *PriorityQueue) PopByKey(key string) (*Item, error) { + pq.lock.Lock() + defer pq.lock.Unlock() + + item, ok := pq.dataMap[key] + if !ok { + return nil, nil + } + + // remove the item the heap and delete it from the dataMap + itemRaw := heap.Remove(&pq.data, item.index) + delete(pq.dataMap, key) + + if itemRaw != nil { + if i, ok := itemRaw.(*Item); ok { + return i, nil + } + } + + return nil, nil +} + +// Len returns the number of items in the queue data structure. Do not use this +// method directly on the queue, use PriorityQueue.Len() instead. +func (q queue) Len() int { return len(q) } + +// Less returns whether the Item with index i should sort before the Item with +// index j in the queue. This method is used by the queue to determine priority +// internally; the Item with the lower value wins. (priority zero is higher +// priority than 1). The priority of Items with equal values is undetermined. +func (q queue) Less(i, j int) bool { + return q[i].Priority < q[j].Priority +} + +// Swap swaps things in-place; part of sort.Interface +func (q queue) Swap(i, j int) { + q[i], q[j] = q[j], q[i] + q[i].index = i + q[j].index = j +} + +// Push is used by heap.Interface to push items onto the heap. This method is +// invoked by container/heap, and should not be used directly. +// See: https://golang.org/pkg/container/heap/#Interface +func (q *queue) Push(x interface{}) { + n := len(*q) + item := x.(*Item) + item.index = n + *q = append(*q, item) +} + +// Pop is used by heap.Interface to pop items off of the heap. This method is +// invoked by container/heap, and should not be used directly. +// See: https://golang.org/pkg/container/heap/#Interface +func (q *queue) Pop() interface{} { + old := *q + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *q = old[0 : n-1] + return item +} diff --git a/sdk/queue/priority_queue_test.go b/sdk/queue/priority_queue_test.go new file mode 100644 index 000000000000..e570c1b0a732 --- /dev/null +++ b/sdk/queue/priority_queue_test.go @@ -0,0 +1,209 @@ +package queue + +import ( + "container/heap" + "fmt" + "testing" + "time" +) + +// Ensure we satisfy the heap.Interface +var _ heap.Interface = &queue{} + +// some tests rely on the ordering of items from this method +func testCases() (tc []*Item) { + // create a slice of items with priority / times offest by these seconds + for i, m := range []time.Duration{ + 5, + 183600, // 51 hours + 15, // 15 seconds + 45, // 45 seconds + 900, // 15 minutes + 300, // 5 minutes + 7200, // 2 hours + 183600, // 51 hours + 7201, // 2 hours, 1 second + 115200, // 32 hours + 1209600, // 2 weeks + } { + n := time.Now() + ft := n.Add(time.Second * m) + tc = append(tc, &Item{ + Key: fmt.Sprintf("item-%d", i), + Value: 1, + Priority: ft.Unix(), + }) + } + return +} + +func TestPriorityQueue_New(t *testing.T) { + pq := New() + + if len(pq.data) != len(pq.dataMap) || len(pq.data) != 0 { + t.Fatalf("error in queue/map size, expected data and map to be initialized, got (%d) and (%d)", len(pq.data), len(pq.dataMap)) + } + + if pq.Len() != 0 { + t.Fatalf("expected new queue to have zero size, got (%d)", pq.Len()) + } +} + +func TestPriorityQueue_Push(t *testing.T) { + pq := New() + + // don't allow nil pushing + if err := pq.Push(nil); err == nil { + t.Fatal("Expected error on pushing nil") + } + + tc := testCases() + tcl := len(tc) + for _, i := range tc { + if err := pq.Push(i); err != nil { + t.Fatal(err) + } + } + + if pq.Len() != tcl { + t.Fatalf("error adding items, expected (%d) items, got (%d)", tcl, pq.Len()) + } + + testValidateInternalData(t, pq, len(tc), false) + + item, err := pq.Pop() + if err != nil { + t.Fatalf("error popping item: %s", err) + } + if tc[0].Priority != item.Priority { + t.Fatalf("expected tc[0] and popped item to match, got (%q) and (%q)", tc[0], item.Priority) + } + if tc[0].Key != item.Key { + t.Fatalf("expected tc[0] and popped item to match, got (%q) and (%q)", tc[0], item.Priority) + } + + testValidateInternalData(t, pq, len(tc)-1, false) + + // push item with no key + dErr := pq.Push(tc[1]) + if dErr != ErrDuplicateItem { + t.Fatal(err) + } + // push item with no key + tc[2].Key = "" + kErr := pq.Push(tc[2]) + if kErr != nil && kErr.Error() != "error adding item: Item Key is required" { + t.Fatal(kErr) + } + + testValidateInternalData(t, pq, len(tc)-1, true) + + // check nil,nil error for not found + i, err := pq.PopByKey("empty") + if err != nil && i != nil { + t.Fatalf("expected nil error for PopByKey of non-existing key, got: %s", err) + } +} + +func TestPriorityQueue_Pop(t *testing.T) { + pq := New() + + tc := testCases() + for _, i := range tc { + if err := pq.Push(i); err != nil { + t.Fatal(err) + } + } + + topItem, err := pq.Pop() + if err != nil { + t.Fatalf("error calling pop: %s", err) + } + if tc[0].Priority != topItem.Priority { + t.Fatalf("expected tc[0] and popped item to match, got (%q) and (%q)", tc[0], topItem.Priority) + } + if tc[0].Key != topItem.Key { + t.Fatalf("expected tc[0] and popped item to match, got (%q) and (%q)", tc[0], topItem.Priority) + } + + var items []*Item + items = append(items, topItem) + // pop the remaining items, compare size of input and output + i, _ := pq.Pop() + for ; i != nil; i, _ = pq.Pop() { + items = append(items, i) + } + if len(items) != len(tc) { + t.Fatalf("expected popped item count to match test cases, got (%d)", len(items)) + } +} + +func TestPriorityQueue_PopByKey(t *testing.T) { + pq := New() + + tc := testCases() + for _, i := range tc { + if err := pq.Push(i); err != nil { + t.Fatal(err) + } + } + + // grab the top priority item, to capture it's value for checking later + item, _ := pq.Pop() + oldPriority := item.Priority + oldKey := item.Key + + // push the item back on, so it gets removed with PopByKey and we verify + // the top item has changed later + err := pq.Push(item) + if err != nil { + t.Fatalf("error re-pushing top item: %s", err) + } + + popKeys := []int{2, 4, 7, 1, 0} + for _, i := range popKeys { + item, err := pq.PopByKey(fmt.Sprintf("item-%d", i)) + if err != nil { + t.Fatalf("failed to pop item-%d, \n\terr: %s\n\titem: %#v", i, err, item) + } + } + + testValidateInternalData(t, pq, len(tc)-len(popKeys), false) + + // grab the top priority item again, to compare with the top item priority + // from above + item, _ = pq.Pop() + newPriority := item.Priority + newKey := item.Key + + if oldPriority == newPriority || oldKey == newKey { + t.Fatalf("expected old/new key and priority to differ, got (%s/%s) and (%d/%d)", oldKey, newKey, oldPriority, newPriority) + } + + testValidateInternalData(t, pq, len(tc)-len(popKeys)-1, true) +} + +// testValidateInternalData checks the internal data stucture of the PriorityQueue +// and verifies that items are in-sync. Use drain only at the end of a test, +// because it will mutate the input queue +func testValidateInternalData(t *testing.T, pq *PriorityQueue, expectedSize int, drain bool) { + actualSize := pq.Len() + if actualSize != expectedSize { + t.Fatalf("expected new queue size to be (%d), got (%d)", expectedSize, actualSize) + } + + if len(pq.data) != len(pq.dataMap) || len(pq.data) != expectedSize { + t.Fatalf("error in queue/map size, expected data and map to be (%d), got (%d) and (%d)", expectedSize, len(pq.data), len(pq.dataMap)) + } + + if drain && pq.Len() > 0 { + // pop all the items, verify lengths + i, _ := pq.Pop() + for ; i != nil; i, _ = pq.Pop() { + expectedSize-- + if len(pq.data) != len(pq.dataMap) || len(pq.data) != expectedSize { + t.Fatalf("error in queue/map size, expected data and map to be (%d), got (%d) and (%d)", expectedSize, len(pq.data), len(pq.dataMap)) + } + } + } +}