-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aerospike storage backend #10131
Aerospike storage backend #10131
Changes from 3 commits
d7c2aae
404471e
08c898d
14fccc9
a8fc7a7
e234700
f7082b0
4f93cf6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,155 @@ | ||||
package aerospike | ||||
|
||||
import ( | ||||
"context" | ||||
"fmt" | ||||
"hash/fnv" | ||||
"strconv" | ||||
"strings" | ||||
|
||||
aero "github.com/aerospike/aerospike-client-go" | ||||
log "github.com/hashicorp/go-hclog" | ||||
"github.com/hashicorp/vault/sdk/physical" | ||||
) | ||||
|
||||
const ( | ||||
keyBin = "keyBin" | ||||
valueBin = "valueBin" | ||||
) | ||||
|
||||
// AerospikeBackend is a physical backend that stores data in Aerospike. | ||||
type AerospikeBackend struct { | ||||
client *aero.Client | ||||
namespace string | ||||
set string | ||||
logger log.Logger | ||||
} | ||||
|
||||
// Verify AerospikeBackend satisfies the correct interface. | ||||
var _ physical.Backend = (*AerospikeBackend)(nil) | ||||
|
||||
// NewAerospikeBackend constructs an AerospikeBackend backend. | ||||
func NewAerospikeBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { | ||||
namespace := conf["namespace"] | ||||
set := conf["set"] | ||||
|
||||
policy := aero.NewClientPolicy() | ||||
policy.User = conf["username"] | ||||
policy.Password = conf["password"] | ||||
|
||||
port, err := strconv.Atoi(conf["port"]) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If
|
||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
client, err := aero.NewClientWithPolicy(policy, conf["hostname"], port) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
return &AerospikeBackend{ | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since (per the docs), There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like |
||||
client: client, | ||||
namespace: namespace, | ||||
set: set, | ||||
logger: logger, | ||||
}, nil | ||||
} | ||||
|
||||
func (a *AerospikeBackend) key(userKey string) (*aero.Key, error) { | ||||
return aero.NewKey(a.namespace, a.set, hash(userKey)) | ||||
} | ||||
|
||||
// Put is used to insert or update an entry. | ||||
func (a *AerospikeBackend) Put(ctx context.Context, entry *physical.Entry) error { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/ctx/_ on these unused |
||||
aeroKey, err := a.key(entry.Key) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
|
||||
writePolicy := aero.NewWritePolicy(0, 0) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I comment or docs link that has some details about this would be helpful. |
||||
writePolicy.RecordExistsAction = aero.REPLACE | ||||
|
||||
binMap := make(aero.BinMap, 2) | ||||
binMap[keyBin] = entry.Key | ||||
binMap[valueBin] = entry.Value | ||||
|
||||
return a.client.Put(writePolicy, aeroKey, binMap) | ||||
} | ||||
|
||||
// Get is used to fetch an entry. | ||||
func (a *AerospikeBackend) Get(ctx context.Context, key string) (*physical.Entry, error) { | ||||
aeroKey, err := a.key(key) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
record, err := a.client.Get(nil, aeroKey) | ||||
if err != nil { | ||||
if err.Error() == "Key not found" { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the compared-to error exported by chance? |
||||
return nil, nil | ||||
} | ||||
return nil, err | ||||
} | ||||
|
||||
return &physical.Entry{ | ||||
Key: key, | ||||
Value: record.Bins[valueBin].([]byte), | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Considering indexing against |
||||
}, nil | ||||
} | ||||
|
||||
// Delete is used to permanently delete an entry. | ||||
func (a *AerospikeBackend) Delete(ctx context.Context, key string) error { | ||||
aeroKey, err := a.key(key) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
|
||||
_, err = a.client.Delete(nil, aeroKey) | ||||
return err | ||||
} | ||||
|
||||
// List is used to list all the keys under a given | ||||
// prefix, up to the next prefix. | ||||
func (a *AerospikeBackend) List(ctx context.Context, prefix string) ([]string, error) { | ||||
recordSet, err := a.client.ScanAll(nil, a.namespace, a.set) | ||||
calvn marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
var keyList []string | ||||
for res := range recordSet.Results() { | ||||
if res.Err != nil { | ||||
return nil, res.Err | ||||
} | ||||
recordKey := res.Record.Bins[keyBin].(string) | ||||
if strings.HasPrefix(recordKey, prefix) { | ||||
trimPrefix := strings.Replace(recordKey, prefix, "", 1) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||
keys := strings.Split(trimPrefix, "/") | ||||
if len(keys) == 1 { | ||||
keyList = append(keyList, keys[0]) | ||||
} else { | ||||
withSlash := keys[0] + "/" | ||||
if !listContains(keyList, withSlash) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a helper you can use: vault/sdk/helper/strutil/strutil.go Line 26 in 8af1f3b
|
||||
keyList = append(keyList, withSlash) | ||||
} | ||||
} | ||||
} | ||||
} | ||||
|
||||
return keyList, nil | ||||
} | ||||
|
||||
func hash(s string) string { | ||||
h := fnv.New32a() | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason this was chosen in particular? 32-bits is a fairly small hash size. We'd typically use sha256 to satisfy both irreversibility and collision avoidance. |
||||
h.Write([]byte(s)) | ||||
return fmt.Sprint(h.Sum32()) | ||||
} | ||||
|
||||
func listContains(list []string, s string) bool { | ||||
for _, i := range list { | ||||
if i == s { | ||||
return true | ||||
} | ||||
} | ||||
return false | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package aerospike | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
aero "github.com/aerospike/aerospike-client-go" | ||
log "github.com/hashicorp/go-hclog" | ||
"github.com/hashicorp/vault/helper/testhelpers/docker" | ||
"github.com/hashicorp/vault/sdk/helper/logging" | ||
"github.com/hashicorp/vault/sdk/physical" | ||
) | ||
|
||
func TestAerospikeBackend(t *testing.T) { | ||
cleanup, config := prepareAerospikeContainer(t) | ||
defer cleanup() | ||
|
||
logger := logging.NewVaultLogger(log.Debug) | ||
|
||
b, err := NewAerospikeBackend(map[string]string{ | ||
"hostname": config.hostname, | ||
"port": config.port, | ||
"namespace": config.namespace, | ||
"set": config.set, | ||
}, logger) | ||
|
||
if err != nil { | ||
t.Fatalf("err: %s", err) | ||
} | ||
|
||
physical.ExerciseBackend(t, b) | ||
physical.ExerciseBackend_ListPrefix(t, b) | ||
} | ||
|
||
type aerospikeConfig struct { | ||
hostname string | ||
port string | ||
namespace string | ||
set string | ||
} | ||
|
||
func prepareAerospikeContainer(t *testing.T) (func(), *aerospikeConfig) { | ||
runner, err := docker.NewServiceRunner(docker.RunOptions{ | ||
ImageRepo: "aerospike/aerospike-server", | ||
ContainerName: "aerospikedb", | ||
ImageTag: "latest", | ||
Ports: []string{"3000/tcp", "3001/tcp", "3002/tcp", "3003/tcp"}, | ||
DoNotAutoRemove: true, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this strictly necessary? This leaves dangling containers on the host, and may result in errors if tests are ran multiple times due to colliding container name. |
||
}) | ||
if err != nil { | ||
t.Fatalf("Could not start local Aerospike: %s", err) | ||
} | ||
|
||
svc, err := runner.StartService(context.Background(), | ||
func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) { | ||
cfg := docker.NewServiceHostPort(host, port) | ||
|
||
time.Sleep(time.Second) | ||
client, err := aero.NewClient(host, port) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
node, err := client.Cluster().GetRandomNode() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
_, err = node.RequestInfo(aero.NewInfoPolicy(), "namespaces") | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return cfg, nil | ||
}, | ||
) | ||
if err != nil { | ||
t.Fatalf("Could not start local Aerospike: %s", err) | ||
} | ||
|
||
return svc.Cleanup, &aerospikeConfig{ | ||
hostname: svc.Config.URL().Hostname(), | ||
port: svc.Config.URL().Port(), | ||
namespace: "test", | ||
set: "vault", | ||
} | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit curious as to why this particular pseudo-version was chosen since v3.1.1 is available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, the
+incompatible
label is actually how go/mod sees the release: https://pkg.go.dev/github.com/aerospike/[email protected]+incompatible?tab=versionsSince
v3.1.1+incompatible
has been released, we should consider bumping it to that version (unless there's a particular reason not to)