Skip to content

Commit

Permalink
Merge branch 'main' into docs/krastin/retitle
Browse files Browse the repository at this point in the history
  • Loading branch information
krastin authored May 9, 2023
2 parents 3aaf9c5 + 40eefab commit 3c56d01
Show file tree
Hide file tree
Showing 39 changed files with 1,866 additions and 670 deletions.
3 changes: 3 additions & 0 deletions .changelog/17241.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
connect: Fix multiple inefficient behaviors when querying service health.
```
13 changes: 13 additions & 0 deletions .github/workflows/backport-assistant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,16 @@ jobs:
BACKPORT_LABEL_REGEXP: "backport/(?P<target>\\d+\\.\\d+)"
BACKPORT_TARGET_TEMPLATE: "release/{{.target}}.x"
GITHUB_TOKEN: ${{ secrets.ELEVATED_GITHUB_TOKEN }}
handle-failure:
needs:
- backport
if: always() && needs.backport.result == 'failure'
runs-on: ubuntu-latest
steps:
- name: Comment on PR
run: |
github_message="Backport failed @${{ github.event.sender.login }}. Run: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
curl -s -H "Authorization: token ${{ secrets.PR_COMMENT_TOKEN }}" \
-X POST \
-d "{ \"body\": \"${github_message}\"}" \
"https://api.github.com/repos/${GITHUB_REPOSITORY}/pull/${{ github.event.pull_request.number }}/comments"
45 changes: 43 additions & 2 deletions .github/workflows/test-integrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,47 @@ jobs:
name: ${{ env.TEST_RESULTS_ARTIFACT_NAME }}
path: ${{ env.TEST_RESULTS_DIR }}

generate-compatibility-job-matrices:
needs: [setup]
runs-on: ${{ fromJSON(needs.setup.outputs.compute-small) }}
name: Generate Compatibility Job Matrices
outputs:
compatibility-matrix: ${{ steps.set-matrix.outputs.compatibility-matrix }}
steps:
- uses: actions/checkout@24cb9080177205b6e8c946b17badbe402adc938f # v3.4.0
- name: Generate Compatibility Job Matrix
id: set-matrix
env:
TOTAL_RUNNERS: 6
JQ_SLICER: '[ inputs ] | [_nwise(length / $runnercount | floor)]'
run: |
cd ./test/integration/consul-container
NUM_RUNNERS=$TOTAL_RUNNERS
NUM_DIRS=$(find ./test -mindepth 1 -maxdepth 2 -type d | wc -l)
if [ "$NUM_DIRS" -lt "$NUM_RUNNERS" ]; then
echo "TOTAL_RUNNERS is larger than the number of tests/packages to split."
NUM_RUNNERS=$((NUM_DIRS-1))
fi
# fix issue where test splitting calculation generates 1 more split than TOTAL_RUNNERS.
NUM_RUNNERS=$((NUM_RUNNERS-1))
{
echo -n "compatibility-matrix="
find ./test -maxdepth 2 -type d -print0 | xargs -0 -n 1 \
| grep -v util | grep -v upgrade \
| jq --raw-input --argjson runnercount "$NUM_RUNNERS" "$JQ_SLICER" \
| jq --compact-output 'map(join(" "))'
} >> "$GITHUB_OUTPUT"
compatibility-integration-test:
runs-on: ${{ fromJSON(needs.setup.outputs.compute-xl) }}
needs:
- setup
- dev-build
- generate-compatibility-job-matrices
strategy:
fail-fast: false
matrix:
test-cases: ${{ fromJSON(needs.generate-compatibility-job-matrices.outputs.compatibility-matrix) }}
steps:
- uses: actions/checkout@24cb9080177205b6e8c946b17badbe402adc938f # v3.4.0
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568 # v3.5.0
Expand Down Expand Up @@ -271,9 +307,12 @@ jobs:
mkdir -p "/tmp/test-results"
cd ./test/integration/consul-container
docker run --rm ${{ env.CONSUL_LATEST_IMAGE_NAME }}:local consul version
echo "Running $(sed 's,|, ,g' <<< "${{ matrix.test-cases }}" |wc -w) subtests"
# shellcheck disable=SC2001
sed 's, ,\n,g' <<< "${{ matrix.test-cases }}"
go run gotest.tools/gotestsum@v${{env.GOTESTSUM_VERSION}} \
--raw-command \
--format=standard-verbose \
--format=short-verbose \
--debug \
--rerun-fails=3 \
-- \
Expand All @@ -282,7 +321,7 @@ jobs:
-tags "${{ env.GOTAGS }}" \
-timeout=30m \
-json \
`go list ./... | grep -v upgrade` \
${{ matrix.test-cases }} \
--target-image ${{ env.CONSUL_LATEST_IMAGE_NAME }} \
--target-version local \
--latest-image docker.mirror.hashicorp.services/${{ env.CONSUL_LATEST_IMAGE_NAME }} \
Expand Down Expand Up @@ -383,6 +422,7 @@ jobs:
--raw-command \
--format=short-verbose \
--debug \
--rerun-fails=3 \
--packages="./..." \
-- \
go test \
Expand Down Expand Up @@ -416,6 +456,7 @@ jobs:
- vault-integration-test
- generate-envoy-job-matrices
- envoy-integration-test
- generate-compatibility-job-matrices
- compatibility-integration-test
- generate-upgrade-job-matrices
- upgrade-integration-test
Expand Down
12 changes: 5 additions & 7 deletions agent/consul/controller/queue/defer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type deferredRequest[T ItemType] struct {
// future processing
type deferQueue[T ItemType] struct {
heap *deferHeap[T]
entries map[T]*deferredRequest[T]
entries map[string]*deferredRequest[T]

addChannel chan *deferredRequest[T]
heartbeat *time.Ticker
Expand All @@ -55,7 +55,7 @@ func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T] {

return &deferQueue[T]{
heap: dHeap,
entries: make(map[T]*deferredRequest[T]),
entries: make(map[string]*deferredRequest[T]),
addChannel: make(chan *deferredRequest[T]),
heartbeat: time.NewTicker(tick),
}
Expand All @@ -78,7 +78,7 @@ func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) {

// deferEntry adds a deferred request to the priority queue
func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) {
existing, exists := q.entries[entry.item]
existing, exists := q.entries[entry.item.Key()]
if exists {
// insert or update the item deferral time
if existing.enqueueAt.After(entry.enqueueAt) {
Expand All @@ -90,7 +90,7 @@ func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) {
}

heap.Push(q.heap, entry)
q.entries[entry.item] = entry
q.entries[entry.item.Key()] = entry
}

// readyRequest returns a pointer to the next ready Request or
Expand All @@ -108,7 +108,7 @@ func (q *deferQueue[T]) readyRequest() *T {
}

entry = heap.Pop(q.heap).(*deferredRequest[T])
delete(q.entries, entry.item)
delete(q.entries, entry.item.Key())
return &entry.item
}

Expand Down Expand Up @@ -182,8 +182,6 @@ func (q *deferQueue[T]) Process(ctx context.Context, callback func(item T)) {
}
}

var _ heap.Interface = &deferHeap[string]{}

// deferHeap implements heap.Interface
type deferHeap[T ItemType] []*deferredRequest[T]

Expand Down
27 changes: 15 additions & 12 deletions agent/consul/controller/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go

// ItemType is the type constraint for items in the WorkQueue.
type ItemType comparable
type ItemType interface {
// Key returns a string that will be used to de-duplicate items in the queue.
Key() string
}

// WorkQueue is an interface for a work queue with semantics to help with
// retries and rate limiting.
Expand Down Expand Up @@ -43,9 +46,9 @@ type queue[T ItemType] struct {

// dirty holds the working set of all Requests, whether they are being
// processed or not
dirty map[T]struct{}
dirty map[string]struct{}
// processing holds the set of current requests being processed
processing map[T]struct{}
processing map[string]struct{}

// deferred is an internal priority queue that tracks deferred
// Requests
Expand All @@ -66,8 +69,8 @@ type queue[T ItemType] struct {
func RunWorkQueue[T ItemType](ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue[T] {
q := &queue[T]{
ratelimiter: NewRateLimiter[T](baseBackoff, maxBackoff),
dirty: make(map[T]struct{}),
processing: make(map[T]struct{}),
dirty: make(map[string]struct{}),
processing: make(map[string]struct{}),
cond: sync.NewCond(&sync.Mutex{}),
deferred: NewDeferQueue[T](500 * time.Millisecond),
ctx: ctx,
Expand Down Expand Up @@ -115,8 +118,8 @@ func (q *queue[T]) Get() (item T, shutdown bool) {

item, q.queue = q.queue[0], q.queue[1:]

q.processing[item] = struct{}{}
delete(q.dirty, item)
q.processing[item.Key()] = struct{}{}
delete(q.dirty, item.Key())

return item, false
}
Expand All @@ -129,12 +132,12 @@ func (q *queue[T]) Add(item T) {
if q.shuttingDown() {
return
}
if _, ok := q.dirty[item]; ok {
if _, ok := q.dirty[item.Key()]; ok {
return
}

q.dirty[item] = struct{}{}
if _, ok := q.processing[item]; ok {
q.dirty[item.Key()] = struct{}{}
if _, ok := q.processing[item.Key()]; ok {
return
}

Expand Down Expand Up @@ -175,8 +178,8 @@ func (q *queue[T]) Done(item T) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

delete(q.processing, item)
if _, ok := q.dirty[item]; ok {
delete(q.processing, item.Key())
if _, ok := q.dirty[item.Key()]; ok {
q.queue = append(q.queue, item)
q.cond.Signal()
}
Expand Down
12 changes: 5 additions & 7 deletions agent/consul/controller/queue/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ type Limiter[T ItemType] interface {
Forget(request T)
}

var _ Limiter[string] = &ratelimiter[string]{}

type ratelimiter[T ItemType] struct {
failures map[T]int
failures map[string]int
base time.Duration
max time.Duration
mutex sync.RWMutex
Expand All @@ -35,7 +33,7 @@ type ratelimiter[T ItemType] struct {
// backoff.
func NewRateLimiter[T ItemType](base, max time.Duration) Limiter[T] {
return &ratelimiter[T]{
failures: make(map[T]int),
failures: make(map[string]int),
base: base,
max: max,
}
Expand All @@ -47,8 +45,8 @@ func (r *ratelimiter[T]) NextRetry(request T) time.Duration {
r.mutex.RLock()
defer r.mutex.RUnlock()

exponent := r.failures[request]
r.failures[request] = r.failures[request] + 1
exponent := r.failures[request.Key()]
r.failures[request.Key()] = r.failures[request.Key()] + 1

backoff := float64(r.base.Nanoseconds()) * math.Pow(2, float64(exponent))
// make sure we don't overflow time.Duration
Expand All @@ -69,5 +67,5 @@ func (r *ratelimiter[T]) Forget(request T) {
r.mutex.Lock()
defer r.mutex.Unlock()

delete(r.failures, request)
delete(r.failures, request.Key())
}
2 changes: 2 additions & 0 deletions agent/consul/controller/queue/rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

type Request struct{ Kind string }

func (r Request) Key() string { return r.Kind }

func TestRateLimiter_Backoff(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 0 additions & 2 deletions agent/consul/controller/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/hashicorp/consul/agent/consul/controller/queue"
)

var _ queue.WorkQueue[string] = &countingWorkQueue[string]{}

type countingWorkQueue[T queue.ItemType] struct {
getCounter uint64
addCounter uint64
Expand Down
12 changes: 12 additions & 0 deletions agent/consul/controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ type Request struct {
Meta *acl.EnterpriseMeta
}

// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.
func (r Request) Key() string {
return fmt.Sprintf(
`kind=%q,name=%q,part=%q,ns=%q`,
r.Kind,
r.Name,
r.Meta.PartitionOrDefault(),
r.Meta.NamespaceOrDefault(),
)
}

// RequeueAfterError is an error that allows a Reconciler to override the
// exponential backoff behavior of the Controller, rather than applying
// the backoff algorithm, returning a RequeueAfterError will cause the
Expand Down
37 changes: 19 additions & 18 deletions agent/consul/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,28 +214,10 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
f = h.serviceNodesDefault
}

authzContext := acl.AuthorizerContext{
Peer: args.PeerName,
}
authz, err := h.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}

if err := h.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}

// If we're doing a connect or ingress query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect || args.Ingress {
// TODO(acl-error-enhancements) Look for ways to percolate this information up to give any feedback to the user.
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// Just return nil, which will return an empty response (tested)
return nil
}
}

filter, err := bexpr.CreateFilter(args.Filter, nil, reply.Nodes)
if err != nil {
return err
Expand All @@ -257,6 +239,25 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
return err
}

authzContext := acl.AuthorizerContext{
Peer: args.PeerName,
}
authz, err := h.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}

// If we're doing a connect or ingress query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect || args.Ingress {
// TODO(acl-error-enhancements) Look for ways to percolate this information up to give any feedback to the user.
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// Return the index here so that the agent cache does not infinitely loop.
reply.Index = index
return nil
}
}

resolvedNodes := nodes
if args.MergeCentralConfig {
for _, node := range resolvedNodes {
Expand Down
1 change: 1 addition & 0 deletions agent/consul/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ node "foo" {
var resp structs.IndexedCheckServiceNodes
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &resp))
assert.Len(t, resp.Nodes, 0)
assert.Greater(t, resp.Index, uint64(0))

// List w/ token. This should work since we're requesting "foo", but should
// also only contain the proxies with names that adhere to our ACL.
Expand Down
2 changes: 2 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
"github.com/hashicorp/consul/internal/mesh"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/demo"
"github.com/hashicorp/consul/internal/resource/reaper"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/routine"
Expand Down Expand Up @@ -844,6 +845,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
func (s *Server) registerResources() {
catalog.RegisterTypes(s.typeRegistry)
mesh.RegisterTypes(s.typeRegistry)
reaper.RegisterControllers(s.controllerManager)

if s.config.DevMode {
demo.RegisterTypes(s.typeRegistry)
Expand Down
Loading

0 comments on commit 3c56d01

Please sign in to comment.