Skip to content

Commit

Permalink
Load balance support (#65)
Browse files Browse the repository at this point in the history
* load balance support

* update load balance tracing

* upgrade golangci version

* update

* skip cache for golangci lint

* update

* update

* update envtest version

* update envtest tool version

* add cancel context

* resolve comments

* fix lint error

* no need to update isFailoverRequest when succeed to get settings
  • Loading branch information
linglingye001 authored Aug 26, 2024
1 parent af334bc commit 807bf33
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 85 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Run Test
run: |
echo 'Install evntest tool'
ENVTEST_VERSION=release-0.16
ENVTEST_VERSION=release-0.18
GOBIN=$GITHUB_WORKSPACE go install sigs.k8s.io/controller-runtime/tools/setup-envtest@$ENVTEST_VERSION
echo 'Run tests'
KUBEBUILDER_ASSETS=$($GITHUB_WORKSPACE/setup-envtest use 1.19 -p path) CGO_ENABLED=0 go test ./... -coverprofile cover.out
KUBEBUILDER_ASSETS=$($GITHUB_WORKSPACE/setup-envtest use 1.30.0 -p path) CGO_ENABLED=0 go test ./... -coverprofile cover.out
4 changes: 2 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: stable
go-version: 1.22.6
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.59
version: v1.59.1
args: --timeout 10m
4 changes: 3 additions & 1 deletion api/v1/azureappconfigurationprovider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type AzureAppConfigurationProviderSpec struct {
// +kubebuilder:validation:Format=uri
Endpoint *string `json:"endpoint,omitempty"`
// +kubebuilder:default=true
ReplicaDiscoveryEnabled bool `json:"replicaDiscoveryEnabled,omitempty"`
ReplicaDiscoveryEnabled bool `json:"replicaDiscoveryEnabled,omitempty"`
// +kubebuilder:default=false
LoadBalancingEnabled bool `json:"loadBalancingEnabled,omitempty"`
ConnectionStringReference *string `json:"connectionStringReference,omitempty"`
Target ConfigurationGenerationParameters `json:"target"`
Auth *AzureAppConfigurationProviderAuth `json:"auth,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ spec:
type: object
type: array
type: object
loadBalancingEnabled:
default: false
type: boolean
replicaDiscoveryEnabled:
default: true
type: boolean
Expand Down
12 changes: 11 additions & 1 deletion internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ limitations under the License.
package controller

import (
"context"
"path/filepath"
"testing"
"time"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -48,6 +50,8 @@ var k8sClient client.Client
var testEnv *envtest.Environment
var mockCtrl *gomock.Controller
var mockConfigurationSettings *mocks.MockConfigurationSettingsRetriever
var ctx context.Context
var cancel context.CancelFunc

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)
Expand All @@ -58,6 +62,8 @@ func TestAPIs(t *testing.T) {
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

ctx, cancel = context.WithCancel(context.TODO())

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
Expand Down Expand Up @@ -103,8 +109,12 @@ var _ = BeforeSuite(func() {

var _ = AfterSuite(func() {
By("tearing down the test environment")
cancel()
mockCtrl.Finish()
err := testEnv.Stop()

if err != nil {
time.Sleep(1 * time.Minute)
}
err = testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
2 changes: 1 addition & 1 deletion internal/loader/configuraiton_setting_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ var _ = Describe("AppConfiguationProvider Get All Settings", func() {
Expect(err).Should(BeNil())
Expect(failedClient.FailedAttempts).Should(Equal(1))
Expect(failedClient.BackOffEndTime.IsZero()).Should(BeFalse())
Expect(succeededClient.FailedAttempts).Should(Equal(0))
Expect(succeededClient.FailedAttempts).Should(Equal(-1))
Expect(succeededClient.BackOffEndTime.IsZero()).Should(BeTrue())
Expect(len(allSettings.ConfigMapSettings)).Should(Equal(6))
Expect(allSettings.ConfigMapSettings["someKey1"]).Should(Equal("value1"))
Expand Down
4 changes: 4 additions & 0 deletions internal/loader/configuration_client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

type ConfigurationClientManager struct {
ReplicaDiscoveryEnabled bool
LoadBalancingEnabled bool
StaticClientWrappers []*ConfigurationClientWrapper
DynamicClientWrappers []*ConfigurationClientWrapper
validDomain string
Expand All @@ -46,6 +47,7 @@ type ConfigurationClientManager struct {
id string
lastFallbackClientAttempt metav1.Time
lastFallbackClientRefresh metav1.Time
lastSuccessfulEndpoint string
}

type ConfigurationClientWrapper struct {
Expand Down Expand Up @@ -93,6 +95,8 @@ var (
func NewConfigurationClientManager(ctx context.Context, provider acpv1.AzureAppConfigurationProvider) (ClientManager, error) {
manager := &ConfigurationClientManager{
ReplicaDiscoveryEnabled: provider.Spec.ReplicaDiscoveryEnabled,
LoadBalancingEnabled: provider.Spec.LoadBalancingEnabled,
lastSuccessfulEndpoint: "",
}

var err error
Expand Down
63 changes: 58 additions & 5 deletions internal/loader/configuration_setting_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,27 +502,48 @@ func (csl *ConfigurationSettingLoader) ExecuteFailoverPolicy(ctx context.Context
return nil, fmt.Errorf("no client is available to connect to the target App Configuration store")
}

if value, ok := os.LookupEnv(RequestTracingEnabled); ok {
if enabled, _ := strconv.ParseBool(value); enabled {
ctx = policy.WithHTTPHeader(ctx, createCorrelationContextHeader(ctx, csl.AzureAppConfigurationProvider, csl.ClientManager))
manager, ok := csl.ClientManager.(*ConfigurationClientManager)
if csl.AzureAppConfigurationProvider.Spec.LoadBalancingEnabled && ok && manager.lastSuccessfulEndpoint != "" && len(clients) > 1 {
nextClientIndex := 0
for _, clientWrapper := range clients {
nextClientIndex++
if clientWrapper.Endpoint == manager.lastSuccessfulEndpoint {
break
}
}

// If we found the last successful client,we'll rotate the list so that the next client is at the beginning
if nextClientIndex < len(clients) {
rotate(clients, nextClientIndex)
}
}

errors := make([]error, 0)
var tracingEnabled, isFailoverRequest bool
if value, ok := os.LookupEnv(RequestTracingEnabled); ok {
tracingEnabled, _ = strconv.ParseBool(value)
}
for _, clientWrapper := range clients {
successful := true
if tracingEnabled {
ctx = policy.WithHTTPHeader(ctx, createCorrelationContextHeader(ctx, csl.AzureAppConfigurationProvider, csl.ClientManager, isFailoverRequest))
}
settingsResponse, err := settingsClient.GetSettings(ctx, clientWrapper.Client)
successful := true
if err != nil {
successful = false
updateClientBackoffStatus(clientWrapper, successful)
if IsFailoverable(err) {
klog.Warningf("current client of '%s' failed to get settings: %s", clientWrapper.Endpoint, err.Error())
errors = append(errors, err)
isFailoverRequest = true
continue
}
return nil, err
}

if manager, ok := csl.ClientManager.(*ConfigurationClientManager); ok {
manager.lastSuccessfulEndpoint = clientWrapper.Endpoint
}
updateClientBackoffStatus(clientWrapper, successful)
return settingsResponse, nil
}
Expand All @@ -535,8 +556,17 @@ func (csl *ConfigurationSettingLoader) ExecuteFailoverPolicy(ctx context.Context
func updateClientBackoffStatus(clientWrapper *ConfigurationClientWrapper, successful bool) {
if successful {
clientWrapper.BackOffEndTime = metav1.Time{}
clientWrapper.FailedAttempts = 0
// Reset FailedAttempts when client succeeded
if clientWrapper.FailedAttempts > 0 {
clientWrapper.FailedAttempts = 0
}
// Use negative value to indicate that successful attempt
clientWrapper.FailedAttempts--
} else {
//Reset FailedAttempts when client failed
if clientWrapper.FailedAttempts < 0 {
clientWrapper.FailedAttempts = 0
}
clientWrapper.FailedAttempts++
clientWrapper.BackOffEndTime = metav1.Time{Time: metav1.Now().Add(calculateBackoffDuration(clientWrapper.FailedAttempts))}
}
Expand Down Expand Up @@ -796,3 +826,26 @@ func MergeSecret(secret map[string]corev1.Secret, newSecret map[string]corev1.Se

return nil
}

// rotates the slice to the left by k positions
func rotate(clients []*ConfigurationClientWrapper, k int) {
n := len(clients)
k = k % n
if k == 0 {
return
}
// Reverse the entire slice
reverseClients(clients, 0, n-1)
// Reverse the first part
reverseClients(clients, 0, n-k-1)
// Reverse the second part
reverseClients(clients, n-k, n-1)
}

func reverseClients(clients []*ConfigurationClientWrapper, start, end int) {
for start < end {
clients[start], clients[end] = clients[end], clients[start]
start++
end--
}
}
154 changes: 81 additions & 73 deletions internal/loader/request_tracing.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,81 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package loader

import (
acpv1 "azappconfig/provider/api/v1"
"context"
"fmt"
"net/http"
"os"
"strings"
)

type TracingKey string

type RequestTracing struct {
IsStartUp bool
}

const (
RequestTracingKey TracingKey = TracingKey("tracing")
AzureExtensionContext string = "AZURE_EXTENSION_CONTEXT"
)

func createCorrelationContextHeader(ctx context.Context, provider acpv1.AzureAppConfigurationProvider, clientManager ClientManager) http.Header {
header := http.Header{}
output := make([]string, 0)

output = append(output, "Host=Kubernetes")

if tracing := ctx.Value(RequestTracingKey); tracing != nil {
if tracing.(RequestTracing).IsStartUp {
output = append(output, "RequestType=StartUp")
} else {
output = append(output, "RequestType=Watch")
}
}

if provider.Spec.Secret != nil {
output = append(output, "UsesKeyVault")

if provider.Spec.Secret.Refresh != nil &&
provider.Spec.Secret.Refresh.Enabled {
output = append(output, "RefreshesKeyVault")
}
}

if provider.Spec.FeatureFlag != nil {
output = append(output, "UsesFeatureFlag")
}

if provider.Spec.ReplicaDiscoveryEnabled {
if manager, ok := clientManager.(*ConfigurationClientManager); ok {
replicaCount := 0
if manager.DynamicClientWrappers != nil {
replicaCount = len(manager.DynamicClientWrappers)
}

output = append(output, fmt.Sprintf("ReplicaCount=%d", replicaCount))
}
}

if _, ok := os.LookupEnv(AzureExtensionContext); ok {
output = append(output, "InstalledBy=Extension")
} else {
output = append(output, "InstalledBy=Helm")
}

header.Add("Correlation-Context", strings.Join(output, ","))

return header
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package loader

import (
acpv1 "azappconfig/provider/api/v1"
"context"
"fmt"
"net/http"
"os"
"strings"
)

type TracingKey string

type RequestTracing struct {
IsStartUp bool
}

const (
RequestTracingKey TracingKey = TracingKey("tracing")
AzureExtensionContext string = "AZURE_EXTENSION_CONTEXT"
)

func createCorrelationContextHeader(ctx context.Context, provider acpv1.AzureAppConfigurationProvider, clientManager ClientManager, isFailoverRequest bool) http.Header {
header := http.Header{}
output := make([]string, 0)

output = append(output, "Host=Kubernetes")

if tracing := ctx.Value(RequestTracingKey); tracing != nil {
if tracing.(RequestTracing).IsStartUp {
output = append(output, "RequestType=StartUp")
} else {
output = append(output, "RequestType=Watch")
}
}

if provider.Spec.Secret != nil {
output = append(output, "UsesKeyVault")

if provider.Spec.Secret.Refresh != nil &&
provider.Spec.Secret.Refresh.Enabled {
output = append(output, "RefreshesKeyVault")
}
}

if provider.Spec.FeatureFlag != nil {
output = append(output, "UsesFeatureFlag")
}

if provider.Spec.ReplicaDiscoveryEnabled {
if manager, ok := clientManager.(*ConfigurationClientManager); ok {
replicaCount := 0
if manager.DynamicClientWrappers != nil {
replicaCount = len(manager.DynamicClientWrappers)
}

output = append(output, fmt.Sprintf("ReplicaCount=%d", replicaCount))
}

if isFailoverRequest {
output = append(output, "FailoverRequest")
}
}

if provider.Spec.LoadBalancingEnabled {
output = append(output, "Features=LB")
}

if _, ok := os.LookupEnv(AzureExtensionContext); ok {
output = append(output, "InstalledBy=Extension")
} else {
output = append(output, "InstalledBy=Helm")
}

header.Add("Correlation-Context", strings.Join(output, ","))

return header
}

0 comments on commit 807bf33

Please sign in to comment.