Skip to content

Commit

Permalink
AKS: enable IsVnetManaged with rate limiting and 429 retry
Browse files Browse the repository at this point in the history
  • Loading branch information
jackfrancis committed Aug 10, 2022
1 parent 9db52be commit a2afb14
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 4 deletions.
9 changes: 8 additions & 1 deletion azure/scope/managedcontrolplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,14 @@ func (s *ManagedControlPlaneScope) IsIPv6Enabled() bool {

// IsVnetManaged returns true if the vnet is managed.
func (s *ManagedControlPlaneScope) IsVnetManaged() bool {
return true
ctx := context.Background()
ctx, log, done := tele.StartSpanWithLogger(ctx, "scope.ManagedControlPlaneScope.IsVnetManaged")
defer done()
isManaged, err := virtualnetworks.New(s).IsManaged(ctx)
if err != nil {
log.Error(err, "Unable to determine if ManagedControlPlaneScope VNET is managed by capz", "AzureManagedCluster", s.ClusterName())
}
return isManaged
}

// APIServerLBName returns the API Server LB spec.
Expand Down
89 changes: 89 additions & 0 deletions azure/services/ratelimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package services

import (
"net/http"
"strconv"
"time"

"k8s.io/client-go/util/flowcontrol"
)

// RateLimitConfig indicates the rate limit config options.
type RateLimitConfig struct {
// Enable rate limiting
AzureServiceRateLimit bool `json:"azureServiceRateLimit,omitempty" yaml:"azureServiceRateLimit,omitempty"`
// Rate limit QPS (Read)
AzureServiceRateLimitQPS float32 `json:"azureServiceRateLimitQPS,omitempty" yaml:"azureServiceRateLimitQPS,omitempty"`
// Rate limit Bucket Size
AzureServiceRateLimitBucket int `json:"azureServiceRateLimitBucket,omitempty" yaml:"azureServiceRateLimitBucket,omitempty"`
// Rate limit QPS (Write)
AzureServiceRateLimitQPSWrite float32 `json:"azureServiceRateLimitQPSWrite,omitempty" yaml:"azureServiceRateLimitQPSWrite,omitempty"`
// Rate limit Bucket Size
AzureServiceRateLimitBucketWrite int `json:"azureServiceRateLimitBucketWrite,omitempty" yaml:"azureServiceRateLimitBucketWrite,omitempty"`
}

type RateLimiter struct {
Reader flowcontrol.RateLimiter
Writer flowcontrol.RateLimiter
}

type RetryAfter struct {
Reader time.Time
Writer time.Time
}

const (
AzureServiceRateLimit bool = true
AzureServiceRateLimitQPS float32 = 1.0
AzureServiceRateLimitBucket int = 1
AzureServiceRateLimitQPSWrite float32 = 1.0
AzureServiceRateLimitBucketWrite int = 1
)

// NewRateLimiter creates new read and write flowcontrol.RateLimiter from RateLimitConfig.
func NewRateLimiter(config *RateLimitConfig) (flowcontrol.RateLimiter, flowcontrol.RateLimiter) {
readLimiter := flowcontrol.NewFakeAlwaysRateLimiter()
writeLimiter := flowcontrol.NewFakeAlwaysRateLimiter()

if config != nil && config.AzureServiceRateLimit {
readLimiter = flowcontrol.NewTokenBucketRateLimiter(
config.AzureServiceRateLimitQPS,
config.AzureServiceRateLimitBucket)

writeLimiter = flowcontrol.NewTokenBucketRateLimiter(
config.AzureServiceRateLimitQPSWrite,
config.AzureServiceRateLimitBucketWrite)
}

return readLimiter, writeLimiter
}

func GetRetryAfterTime(resp *http.Response) time.Time {
ra := resp.Header.Get("Retry-After")
if ra != "" {
var dur time.Duration
if retryAfter, _ := strconv.Atoi(ra); retryAfter > 0 {
dur = time.Duration(retryAfter) * time.Second
} else if t, err := time.Parse(time.RFC1123, ra); err == nil {
dur = t.Sub(time.Now())
}
return time.Now().Add(dur)
}
return time.Now()
}
38 changes: 35 additions & 3 deletions azure/services/virtualnetworks/virtualnetworks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package virtualnetworks

import (
"context"
"net/http"
"time"

"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2021-02-01/network"
"github.com/Azure/go-autorest/autorest/to"
"github.com/pkg/errors"
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/azure/converters"
"sigs.k8s.io/cluster-api-provider-azure/azure/services"
"sigs.k8s.io/cluster-api-provider-azure/azure/services/async"
"sigs.k8s.io/cluster-api-provider-azure/util/reconciler"
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
Expand All @@ -48,15 +51,29 @@ type Service struct {
Scope VNetScope
async.Reconciler
async.Getter
RateLimiter services.RateLimiter
RetryAfter services.RetryAfter
}

// New creates a new service.
func New(scope VNetScope) *Service {
client := newClient(scope)
reader, writer := services.NewRateLimiter(&services.RateLimitConfig{
AzureServiceRateLimit: services.AzureServiceRateLimit,
AzureServiceRateLimitQPS: services.AzureServiceRateLimitQPS,
AzureServiceRateLimitBucket: services.AzureServiceRateLimitBucket,
AzureServiceRateLimitQPSWrite: services.AzureServiceRateLimitQPSWrite,
AzureServiceRateLimitBucketWrite: services.AzureServiceRateLimitBucketWrite,
})
return &Service{
Scope: scope,
Getter: client,
Reconciler: async.New(scope, client, client),
RateLimiter: services.RateLimiter{
Reader: reader,
Writer: writer,
},
RetryAfter: services.RetryAfter{},
}
}

Expand Down Expand Up @@ -150,19 +167,34 @@ func (s *Service) IsManaged(ctx context.Context) (bool, error) {
ctx, _, done := tele.StartSpanWithLogger(ctx, "virtualnetworks.Service.IsManaged")
defer done()

// Report errors if the client is rate limited.
if !s.RateLimiter.Reader.TryAccept() {
return false, errors.Errorf("VirtualNetwork GET is currently rate limited")
}
// Report errors if the client is throttled.
if s.RetryAfter.Reader.After(time.Now()) {
return false, errors.Errorf("Retry-After has not yet expired")
}
spec := s.Scope.VNetSpec()
if spec == nil {
return false, errors.New("cannot get vnet to check if it is managed: spec is nil")
}

vnetIface, err := s.Get(ctx, spec)
if err != nil {
return false, err
}
vnet, ok := vnetIface.(network.VirtualNetwork)
if !ok {
return false, errors.Errorf("%T is not a network.VirtualNetwork", vnetIface)
}
if err != nil {
if vnet.StatusCode == http.StatusTooManyRequests {
resp := &http.Response{
StatusCode: vnet.StatusCode,
Header: vnet.Header,
}
s.RetryAfter.Reader = services.GetRetryAfterTime(resp)
return false, err
}
}
tags := converters.MapToTags(vnet.Tags)
return tags.HasOwned(s.Scope.ClusterName()), nil
}

0 comments on commit a2afb14

Please sign in to comment.