Skip to content

Commit

Permalink
back out blocking query in lieu of limiter and simplify code by remov…
Browse files Browse the repository at this point in the history
…ing ERR_WAIT block
  • Loading branch information
tgross committed Nov 16, 2022
1 parent b3f8ff5 commit 82fce0a
Showing 1 changed file with 11 additions and 40 deletions.
51 changes: 11 additions & 40 deletions nomad/encrypter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
log "github.com/hashicorp/go-hclog"
kms "github.com/hashicorp/go-kms-wrapping/v2"
"github.com/hashicorp/go-kms-wrapping/v2/aead"
memdb "github.com/hashicorp/go-memdb"
"golang.org/x/time/rate"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/crypto"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -432,46 +431,32 @@ func (krr *KeyringReplicator) stop() {
krr.stopFn()
}

const keyringReplicationRate = 10
const keyringReplicationRate = 5

func (krr *KeyringReplicator) run(ctx context.Context) {
krr.logger.Debug("starting encryption key replication")
defer krr.logger.Debug("exiting key replication")

store := krr.srv.fsm.State()
abandonCh := store.AbandonCh()
minIndex := uint64(0)
limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate)

for {
select {
case <-krr.srv.shutdownCtx.Done():
return
case <-ctx.Done():
return
case <-abandonCh:
// reset the blocking query to make sure we get all the keys from a
// restored snapshot
store = krr.srv.fsm.State()
abandonCh = store.AbandonCh()
minIndex = uint64(0)
default:
// ensure that we abandon the blocking query periodically to give
// other contexts a chance to fire
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
defer queryCancel()

var rawIter any
var err error
rawIter, minIndex, err = store.BlockingQuery(getRootKeyMetas, minIndex, queryCtx)
err := limiter.Wait(ctx)
if err != nil {
if queryCtx.Err() == nil {
// we get errors for closed context but don't want to log on that
krr.logger.Error("failed to fetch keyring", "error", err)
}
continue
continue // rate limit exceeded
}
iter := rawIter.(memdb.ResultIterator)

store := krr.srv.fsm.State()
iter, err := store.RootKeyMetas(nil)
if err != nil {
krr.logger.Error("failed to fetch keyring", "error", err)
continue
}
for {
raw := iter.Next()
if raw == nil {
Expand All @@ -498,20 +483,6 @@ func (krr *KeyringReplicator) run(ctx context.Context) {

}

// getRootKeyMetas implements state.QueryFn and is run as a blocking query to
// detect new key metadata
func getRootKeyMetas(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {
iter, err := store.RootKeyMetas(ws)
if err != nil {
return nil, 0, err
}
index, err := store.Index(state.TableRootKeyMeta)
if err != nil {
return nil, 0, err
}
return iter, index, nil
}

// replicateKey replicates a single key from peer servers that was present in
// the state store but missing from the keyring. Returns an error only if no
// peers have this key.
Expand Down

0 comments on commit 82fce0a

Please sign in to comment.