Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Priority Queue library to sdk #6664

Merged
merged 17 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sdk/queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Vault SDK - Queue
=================

The `queue` package provides Vault plugins with a Priority Queue. It can be used
catsby marked this conversation as resolved.
Show resolved Hide resolved
as an in-memory list of `queue.Item` sorted by their `priority`, and offers
methods to find or remove items by their key. Internally it
uses `container/heap`; see [Example Priority
Queue](https://golang.org/pkg/container/heap/#example__priorityQueue)
catsby marked this conversation as resolved.
Show resolved Hide resolved

191 changes: 191 additions & 0 deletions sdk/queue/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Package queue provides Vault plugins with a Priority Queue. It can be used
// as an in-memory list of queue.Item sorted by their priority, and offers
// methods to find or remove items by their key. Internally it uses
// container/heap; see Example Priority Queue:
// https://golang.org/pkg/container/heap/#example__priorityQueue
package queue

import (
"container/heap"
"errors"
"sync"

"github.com/mitchellh/copystructure"
)

// ErrEmpty is returned for queues with no items
var ErrEmpty = errors.New("queue is empty")

// ErrDuplicateItem is returned when the queue attmepts to push an item to a key that
// already exists. The queue does not attempt to update, instead returns this
// error. If an Item needs to be updated or replaced, pop the item first.
var ErrDuplicateItem = errors.New("duplicate item")

// New initializes the internal data structures and returns a new
// PriorityQueue
func New() *PriorityQueue {
pq := PriorityQueue{
data: make(queue, 0),
dataMap: make(map[string]*Item),
}
heap.Init(&pq.data)
return &pq
}

// PriorityQueue facilitates queue of Items, providing Push, Pop, and
// PopByKey convenience methods. The ordering (priority) is an int64 value
// with the smallest value is the highest priority. PriorityQueue maintains both
// an internal slice for the queue as well as a map of the same items with their
// keys as the index. This enables users to find specific items by key. The map
// must be kept in sync with the data slice.
// See https://golang.org/pkg/container/heap/#example__priorityQueue
type PriorityQueue struct {
// data is the internal structure that holds the queue, and is operated on by
// heap functions
data queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a pointer to the queue instead of the queue object? I see other places where you just pass the reference and that could just be stored here.

Copy link
Contributor Author

@catsby catsby May 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that I understand what you're asking, are the other places heap methods like heap.Init(&pq.data) and heap.Pop(&pq.data).(*Item), and you're asking if we make data a *queue instead of queue, so those calls are heap.Init(pq.data) et. al?

Assuming that's what you're asking, is there an advantage to doing that? I'm not opposed to it I'm just curious of the benefits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine either way, it just seems that you are always using the reference to it in all calls instead of just storing the reference and just using it directly. It is really just a stylistic thing. If I see a bunch of & for the same thing, I start digging into the implementation.


// dataMap represents all the items in the queue, with unique indexes, used
// for finding specific items. dataMap is kept in sync with the data slice
dataMap map[string]*Item

// lock is a read/write mutex, and used to facilitate read/write locks on the
// data and dataMap fields
lock sync.RWMutex
}

// queue is the internal data structure used to satisfy heap.Interface. This
// prevents users from calling Pop and Push heap methods directly
type queue []*Item

// Item is something managed in the priority queue
type Item struct {
// Key is a unique string used to identify items in the internal data map
Key string
// Value is an unspecified type that implementations can use to store
// information
Value interface{}

// Priority determines ordering in the queue, with the lowest value being the
// highest priority
kalafut marked this conversation as resolved.
Show resolved Hide resolved
Priority int64
catsby marked this conversation as resolved.
Show resolved Hide resolved

// index is an internal value used by the heap package, and should not be
// modified by any consumer of the priority queue
index int
}

// Len returns the count of items in the Priority Queue
func (pq *PriorityQueue) Len() int {
pq.lock.RLock()
defer pq.lock.RUnlock()
return pq.data.Len()
}

// Pop pops the highest priority item from the queue. This is a
// wrapper/convenience method that calls heap.Pop, so consumers do not need to
// invoke heap functions directly
func (pq *PriorityQueue) Pop() (*Item, error) {
if pq.Len() == 0 {
return nil, ErrEmpty
}

pq.lock.Lock()
defer pq.lock.Unlock()

item := heap.Pop(&pq.data).(*Item)
delete(pq.dataMap, item.Key)
return item, nil
}

// Push pushes an item on to the queue. This is a wrapper/convenience
// method that calls heap.Push, so consumers do not need to invoke heap
// functions directly. Items must have unique Keys, and Items in the queue
// cannot be updated. To modify an Item, users must first remove it and re-push
// it after modifications
func (pq *PriorityQueue) Push(i *Item) error {
kalafut marked this conversation as resolved.
Show resolved Hide resolved
if i == nil || i.Key == "" {
return errors.New("error adding item: Item Key is required")
}

pq.lock.Lock()
defer pq.lock.Unlock()

if _, ok := pq.dataMap[i.Key]; ok {
return ErrDuplicateItem
briankassouf marked this conversation as resolved.
Show resolved Hide resolved
}
// copy the item value(s) so that modifications to the source item does not
// affect the item on the queue
clone, err := copystructure.Copy(i)
if err != nil {
return err
}

pq.dataMap[i.Key] = clone.(*Item)
heap.Push(&pq.data, clone)
return nil
}

// PopByKey searches the queue for an item with the given key and removes it
// from the queue if found. Returns ErrItemNotFound(key) if not found. This
// method must fix the queue after removal.
func (pq *PriorityQueue) PopByKey(key string) (*Item, error) {
pq.lock.Lock()
defer pq.lock.Unlock()

item, ok := pq.dataMap[key]
if !ok {
return nil, nil
}

// remove the item the heap and delete it from the dataMap
itemRaw := heap.Remove(&pq.data, item.index)
delete(pq.dataMap, key)

if i, ok := itemRaw.(*Item); ok {
catsby marked this conversation as resolved.
Show resolved Hide resolved
return i, nil
}

return nil, nil
}

// Len returns the number of items in the queue data structure. Do not use this
// method directly on the queue, use PriorityQueue.Len() instead.
func (q queue) Len() int { return len(q) }

// Less returns whether the Item with index i should sort before the Item with
// index j in the queue. This method is used by the queue to determine priority
// internally; the Item with the lower value wins. (priority zero is higher
// priority than 1). The priority of Items with equal values is undetermined.
func (q queue) Less(i, j int) bool {
return q[i].Priority < q[j].Priority
}

// Swap swaps things in-place; part of sort.Interface
func (q queue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}

// Push is used by heap.Interface to push items onto the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Push(x interface{}) {
n := len(*q)
item := x.(*Item)
item.index = n
*q = append(*q, item)
}

// Pop is used by heap.Interface to pop items off of the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Pop() interface{} {
old := *q
n := len(old)
item := old[n-1]
catsby marked this conversation as resolved.
Show resolved Hide resolved
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*q = old[0 : n-1]
return item
}
Loading