Skip to content

Commit

Permalink
Store less data in Cassandra prefix buckets (#7199)
Browse files Browse the repository at this point in the history
* Store less data in Cassandra prefix buckets

The Cassandra physical backend relies on storing data for sys/foo/bar
under sys, sys/foo, and sys/foo/bar. This is necessary so that we
can list the sys bucket, get a list of all child keys, and then trim
this down to find child 'folders' eg food. Right now however, we store
the full value of every storage entry in all three buckets. This is
unnecessary as the value will only ever be read out in the leaf bucket
ie sys/foo/bar. We use the intermediary buckets simply for listing keys.

We have seen some issues around compaction where certain buckets,
particularly intermediary buckets that are exclusively for listing,
get really clogged up with data to the point of not being listable.
Buckets like sys/expire/id are huge, combining lease expiry data for
all auth methods, and need to be listed for vault to successfully
become leader. This PR tries to cut down on the amount of data stored
in intermediary buckets.

* Avoid goroutine leak by buffering results channel up to the bucket count
  • Loading branch information
jackkleeman authored and Jim Kalafut committed Aug 19, 2019
1 parent 595bc39 commit 292e10d
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions physical/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,18 @@ func (c *CassandraBackend) Put(ctx context.Context, entry *physical.Entry) error

// Execute inserts to each key prefix simultaneously
stmt := fmt.Sprintf(`INSERT INTO "%s" (bucket, key, value) VALUES (?, ?, ?)`, c.table)
results := make(chan error)
buckets := c.buckets(entry.Key)
for _, _bucket := range buckets {
go func(bucket string) {
results <- c.sess.Query(stmt, bucket, entry.Key, entry.Value).Exec()
}(_bucket)
results := make(chan error, len(buckets))
for i, _bucket := range buckets {
go func(i int, bucket string) {
var value []byte
if i == len(buckets)-1 {
// Only store the full value if this is the leaf bucket where the entry will actually be read
// otherwise this write is just to allow for list operations
value = entry.Value
}
results <- c.sess.Query(stmt, bucket, entry.Key, value).Exec()
}(i, _bucket)
}
for i := 0; i < len(buckets); i++ {
if err := <-results; err != nil {
Expand Down Expand Up @@ -296,8 +302,8 @@ func (c *CassandraBackend) Delete(ctx context.Context, key string) error {
defer metrics.MeasureSince([]string{"cassandra", "delete"}, time.Now())

stmt := fmt.Sprintf(`DELETE FROM "%s" WHERE bucket = ? AND key = ?`, c.table)
results := make(chan error)
buckets := c.buckets(key)
results := make(chan error, len(buckets))

for _, bucket := range buckets {
go func(bucket string) {
Expand Down

0 comments on commit 292e10d

Please sign in to comment.