Skip to content

Commit

Permalink
[OSS] gRPC Blocking Queries (#17426)
Browse files Browse the repository at this point in the history
* feat: initial grpc blocking queries

* changelog and docs update
  • Loading branch information
DanStough authored and nickethier committed May 26, 2023
1 parent 7ae36e5 commit 45a8117
Show file tree
Hide file tree
Showing 42 changed files with 1,374 additions and 1,152 deletions.
5 changes: 5 additions & 0 deletions .changelog/17426.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```release-note:improvement
peering: gRPC queries for TrustBundleList, TrustBundleRead, PeeringList, and PeeringRead now support blocking semantics,
reducing network and CPU demand.
The HTTP APIs for Peering List and Read have been updated to support blocking.
```
208 changes: 208 additions & 0 deletions agent/blockingquery/blockingquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package blockingquery

import (
"context"
"errors"
"fmt"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/lib"
)

// Sentinel errors that must be used with blockingQuery
var (
ErrNotFound = fmt.Errorf("no data found for query")
ErrNotChanged = fmt.Errorf("data did not change for query")
)

// QueryFn is used to perform a query operation. See Server.blockingQuery for
// the requirements of this function.
type QueryFn func(memdb.WatchSet, *state.Store) error

// RequestOptions are options used by Server.blockingQuery to modify the
// behaviour of the query operation, or to populate response metadata.
type RequestOptions interface {
GetToken() string
GetMinQueryIndex() uint64
GetMaxQueryTime() (time.Duration, error)
GetRequireConsistent() bool
}

// ResponseMeta is an interface used to populate the response struct
// with metadata about the query and the state of the server.
type ResponseMeta interface {
SetLastContact(time.Duration)
SetKnownLeader(bool)
GetIndex() uint64
SetIndex(uint64)
SetResultsFilteredByACLs(bool)
}

// FSMServer is interface into the stateful components of a Consul server, such
// as memdb or raft leadership.
type FSMServer interface {
ConsistentRead() error
DecrementBlockingQueries() uint64
GetShutdownChannel() chan struct{}
GetState() *state.Store
IncrementBlockingQueries() uint64
RPCQueryTimeout(time.Duration) time.Duration
SetQueryMeta(ResponseMeta, string)
}

// Query performs a blocking query if opts.GetMinQueryIndex is
// greater than 0, otherwise performs a non-blocking query. Blocking queries will
// block until responseMeta.Index is greater than opts.GetMinQueryIndex,
// or opts.GetMaxQueryTime is reached. Non-blocking queries return immediately
// after performing the query.
//
// If opts.GetRequireConsistent is true, blockingQuery will first verify it is
// still the cluster leader before performing the query.
//
// The query function is expected to be a closure that has access to responseMeta
// so that it can set the Index. The actual result of the query is opaque to blockingQuery.
//
// The query function can return ErrNotFound, which is a sentinel error. Returning
// ErrNotFound indicates that the query found no results, which allows
// blockingQuery to keep blocking until the query returns a non-nil error.
// The query function must take care to set the actual result of the query to
// nil in these cases, otherwise when blockingQuery times out it may return
// a previous result. ErrNotFound will never be returned to the caller, it is
// converted to nil before returning.
//
// The query function can return ErrNotChanged, which is a sentinel error. This
// can only be returned on calls AFTER the first call, as it would not be
// possible to detect the absence of a change on the first call. Returning
// ErrNotChanged indicates that the query results are identical to the prior
// results which allows blockingQuery to keep blocking until the query returns
// a real changed result.
//
// The query function must take care to ensure the actual result of the query
// is either left unmodified or explicitly left in a good state before
// returning, otherwise when blockingQuery times out it may return an
// incomplete or unexpected result. ErrNotChanged will never be returned to the
// caller, it is converted to nil before returning.
//
// If query function returns any other error, the error is returned to the caller
// immediately.
//
// The query function must follow these rules:
//
// 1. to access data it must use the passed in state.Store.
// 2. it must set the responseMeta.Index to an index greater than
// opts.GetMinQueryIndex if the results return by the query have changed.
// 3. any channels added to the memdb.WatchSet must unblock when the results
// returned by the query have changed.
//
// To ensure optimal performance of the query, the query function should make a
// best-effort attempt to follow these guidelines:
//
// 1. only set responseMeta.Index to an index greater than
// opts.GetMinQueryIndex when the results returned by the query have changed.
// 2. any channels added to the memdb.WatchSet should only unblock when the
// results returned by the query have changed.
func Query(
fsmServer FSMServer,
requestOpts RequestOptions,
responseMeta ResponseMeta,
query QueryFn,
) error {
var ctx context.Context = &lib.StopChannelContext{StopCh: fsmServer.GetShutdownChannel()}

metrics.IncrCounter([]string{"rpc", "query"}, 1)

minQueryIndex := requestOpts.GetMinQueryIndex()
// Perform a non-blocking query
if minQueryIndex == 0 {
if requestOpts.GetRequireConsistent() {
if err := fsmServer.ConsistentRead(); err != nil {
return err
}
}

var ws memdb.WatchSet
err := query(ws, fsmServer.GetState())
fsmServer.SetQueryMeta(responseMeta, requestOpts.GetToken())
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotChanged) {
return nil
}
return err
}

maxQueryTimeout, err := requestOpts.GetMaxQueryTime()
if err != nil {
return err
}
timeout := fsmServer.RPCQueryTimeout(maxQueryTimeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

count := fsmServer.IncrementBlockingQueries()
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(count))
// decrement the count when the function returns.
defer fsmServer.DecrementBlockingQueries()

var (
notFound bool
ranOnce bool
)

for {
if requestOpts.GetRequireConsistent() {
if err := fsmServer.ConsistentRead(); err != nil {
return err
}
}

// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
// build watches.
store := fsmServer.GetState()

ws := memdb.NewWatchSet()
// This channel will be closed if a snapshot is restored and the
// whole state store is abandoned.
ws.Add(store.AbandonCh())

err := query(ws, store)
fsmServer.SetQueryMeta(responseMeta, requestOpts.GetToken())

switch {
case errors.Is(err, ErrNotFound):
if notFound {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}
notFound = true
case errors.Is(err, ErrNotChanged):
if ranOnce {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}
case err != nil:
return err
}
ranOnce = true

if responseMeta.GetIndex() > minQueryIndex {
return nil
}

// block until something changes, or the timeout
if err := ws.WatchCtx(ctx); err != nil {
// exit if we've reached the timeout, or other cancellation
return nil
}

// exit if the state store has been abandoned
select {
case <-store.AbandonCh():
return nil
default:
}
}
}
4 changes: 4 additions & 0 deletions agent/blockingquery/blockingquery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package blockingquery

// TODO: move tests from the consul package, rpc_test.go, TestServer_blockingQuery
// here using mock for FSMServer w/ structs.QueryOptions and structs.QueryOptions
54 changes: 36 additions & 18 deletions agent/cache-types/peerings.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,24 @@ import (
"context"
"fmt"
"strconv"
"time"

external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/mitchellh/hashstructure"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul/agent/cache"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
)

// PeeringListName is the recommended name for registration.
const PeeringListName = "peers"

// PeeringListRequest represents the combination of request payload
// and options that would normally be sent over headers.
type PeeringListRequest struct {
Request *pbpeering.PeeringListRequest
structs.QueryOptions
Expand All @@ -32,13 +34,10 @@ func (r *PeeringListRequest) CacheInfo() cache.RequestInfo {
info := cache.RequestInfo{
Token: r.Token,
Datacenter: "",
MinIndex: 0,
Timeout: 0,
MustRevalidate: false,

// OPTIMIZE(peering): Cache.notifyPollingQuery polls at this interval. We need to revisit how that polling works.
// Using an exponential backoff when the result hasn't changed may be preferable.
MaxAge: 1 * time.Second,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MaxAge: r.MaxAge,
MustRevalidate: r.MustRevalidate,
}

v, err := hashstructure.Hash([]interface{}{
Expand All @@ -56,7 +55,7 @@ func (r *PeeringListRequest) CacheInfo() cache.RequestInfo {

// Peerings supports fetching the list of peers for a given partition or wildcard-specifier.
type Peerings struct {
RegisterOptionsNoRefresh
RegisterOptionsBlockingRefresh
Client PeeringLister
}

Expand All @@ -67,7 +66,7 @@ type PeeringLister interface {
) (*pbpeering.PeeringListResponse, error)
}

func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
func (t *Peerings) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a PeeringListRequest.
Expand All @@ -79,10 +78,17 @@ func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchRe
"Internal cache failure: request wrong type: %T", req)
}

// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup

// Set the minimum query index to our current index, so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout

// We allow stale queries here to spread out the RPC load, but peerstream information, including the STATUS,
// will not be returned. Right now this is fine for the watch in proxycfg/mesh_gateway.go,
// but it could be a problem for a future consumer.
reqReal.QueryOptions.SetAllowStale(true)

ctx, err := external.ContextWithQueryOptions(context.Background(), reqReal.QueryOptions)
Expand All @@ -91,7 +97,8 @@ func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchRe
}

// Fetch
reply, err := t.Client.PeeringList(ctx, reqReal.Request)
var header metadata.MD
reply, err := t.Client.PeeringList(ctx, reqReal.Request, grpc.Header(&header))
if err != nil {
// Return an empty result if the error is due to peering being disabled.
// This allows mesh gateways to receive an update and confirm that the watch is set.
Expand All @@ -103,8 +110,19 @@ func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchRe
return result, err
}

// This first case is using the legacy index field
// It should be removed in a future version in favor of the index from QueryMeta
if reply.OBSOLETE_Index != 0 {
result.Index = reply.OBSOLETE_Index
} else {
meta, err := external.QueryMetaFromGRPCMeta(header)
if err != nil {
return result, fmt.Errorf("could not convert gRPC metadata to query meta: %w", err)
}
result.Index = meta.GetIndex()
}

result.Value = reply
result.Index = reply.Index

return result, nil
}
Loading

0 comments on commit 45a8117

Please sign in to comment.