Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load balance support #65

Merged
merged 13 commits into from
Aug 26, 2024
4 changes: 3 additions & 1 deletion api/v1/azureappconfigurationprovider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type AzureAppConfigurationProviderSpec struct {
// +kubebuilder:validation:Format=uri
Endpoint *string `json:"endpoint,omitempty"`
// +kubebuilder:default=true
ReplicaDiscoveryEnabled bool `json:"replicaDiscoveryEnabled,omitempty"`
ReplicaDiscoveryEnabled bool `json:"replicaDiscoveryEnabled,omitempty"`
// +kubebuilder:default=false
LoadBalancingEnabled bool `json:"loadBalancingEnabled,omitempty"`
ConnectionStringReference *string `json:"connectionStringReference,omitempty"`
Target ConfigurationGenerationParameters `json:"target"`
Auth *AzureAppConfigurationProviderAuth `json:"auth,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ spec:
type: object
type: array
type: object
loadBalancingEnabled:
default: false
type: boolean
replicaDiscoveryEnabled:
default: true
type: boolean
Expand Down
152 changes: 151 additions & 1 deletion internal/loader/configuraiton_setting_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ var _ = Describe("AppConfiguationProvider Get All Settings", func() {
Expect(err).Should(BeNil())
Expect(failedClient.FailedAttempts).Should(Equal(1))
Expect(failedClient.BackOffEndTime.IsZero()).Should(BeFalse())
Expect(succeededClient.FailedAttempts).Should(Equal(0))
Expect(succeededClient.FailedAttempts).Should(Equal(-1))
Expect(succeededClient.BackOffEndTime.IsZero()).Should(BeTrue())
Expect(len(allSettings.ConfigMapSettings)).Should(Equal(6))
Expect(allSettings.ConfigMapSettings["someKey1"]).Should(Equal("value1"))
Expand All @@ -1169,6 +1169,156 @@ var _ = Describe("AppConfiguationProvider Get All Settings", func() {
Expect(allSettings.ConfigMapSettings["app:test:some"]).Should(Equal("value5"))
Expect(allSettings.ConfigMapSettings["app:test:"]).Should(Equal("value6"))
})

It("Succeed to get settings when load balance enabled", func() {
By("By using all clients")
testSpec := acpv1.AzureAppConfigurationProviderSpec{
Endpoint: &EndpointName,
ReplicaDiscoveryEnabled: true,
LoadBalancingEnabled: true,
Target: acpv1.ConfigurationGenerationParameters{
ConfigMapName: ConfigMapName,
},
Configuration: acpv1.AzureAppConfigurationKeyValueOptions{
Refresh: &acpv1.DynamicConfigurationRefreshParameters{
Enabled: true,
Interval: "10s",
Monitoring: &acpv1.RefreshMonitoring{
Sentinels: []acpv1.Sentinel{
{
Key: "someKey1",
},
},
},
},
},
}
testProvider := acpv1.AzureAppConfigurationProvider{
TypeMeta: metav1.TypeMeta{
APIVersion: "azconfig.io/v1",
Kind: "AppConfigurationProvider",
},
ObjectMeta: metav1.ObjectMeta{
Name: "testName",
Namespace: "testNamespace",
},
Spec: testSpec,
}

settingsToReturn := mockConfigurationSettings()
firstClient := ConfigurationClientWrapper{
Client: nil,
Endpoint: endpointName,
BackOffEndTime: metav1.Time{},
FailedAttempts: 0,
}

secondClient := ConfigurationClientWrapper{
Client: nil,
Endpoint: endpointName,
BackOffEndTime: metav1.Time{},
FailedAttempts: 0,
}

etags := make(map[acpv1.Sentinel]*azcore.ETag)
firstSettingsResponse := &SettingsResponse{
Settings: settingsToReturn,
}
secondSettingsResponse := &SettingsResponse{}
mockSettingsClient.EXPECT().GetSettings(gomock.Any(), gomock.Any()).Return(firstSettingsResponse, nil).Times(1)
mockSettingsClient.EXPECT().GetSettings(gomock.Any(), gomock.Any()).Return(secondSettingsResponse, nil).Times(1)
mockCongiurationClientManager.EXPECT().GetClients(gomock.Any()).Return([]*ConfigurationClientWrapper{&firstClient, &secondClient}, nil).Times(2)
configurationProvider, _ := NewConfigurationSettingLoader(testProvider, mockCongiurationClientManager, mockSettingsClient)
allSettings, err := configurationProvider.CreateTargetSettings(context.Background(), mockResolveSecretReference)
sentinelChanged, _, _ := configurationProvider.CheckAndRefreshSentinels(context.Background(), &testProvider, etags)

Expect(err).Should(BeNil())
Expect(firstClient.FailedAttempts).Should(Equal(-1))
Expect(secondClient.FailedAttempts).Should(Equal(-1))
Expect(sentinelChanged).Should(BeFalse())
Expect(len(allSettings.ConfigMapSettings)).Should(Equal(6))
Expect(allSettings.ConfigMapSettings["someKey1"]).Should(Equal("value1"))
Expect(allSettings.ConfigMapSettings["app:"]).Should(Equal("value2"))
Expect(allSettings.ConfigMapSettings["test:"]).Should(Equal("value3"))
Expect(allSettings.ConfigMapSettings["app:someSubKey1:1"]).Should(Equal("value4"))
Expect(allSettings.ConfigMapSettings["app:test:some"]).Should(Equal("value5"))
Expect(allSettings.ConfigMapSettings["app:test:"]).Should(Equal("value6"))
})

It("Succeed to get settings when load balance not enabled", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the test necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, test only single client used (line 1311) when load balance not enabled.

By("By using single client")
testSpec := acpv1.AzureAppConfigurationProviderSpec{
Endpoint: &EndpointName,
ReplicaDiscoveryEnabled: true,
LoadBalancingEnabled: false,
Target: acpv1.ConfigurationGenerationParameters{
ConfigMapName: ConfigMapName,
},
Configuration: acpv1.AzureAppConfigurationKeyValueOptions{
Refresh: &acpv1.DynamicConfigurationRefreshParameters{
Enabled: true,
Interval: "10s",
Monitoring: &acpv1.RefreshMonitoring{
Sentinels: []acpv1.Sentinel{
{
Key: "someKey1",
},
},
},
},
},
}
testProvider := acpv1.AzureAppConfigurationProvider{
TypeMeta: metav1.TypeMeta{
APIVersion: "azconfig.io/v1",
Kind: "AppConfigurationProvider",
},
ObjectMeta: metav1.ObjectMeta{
Name: "testName",
Namespace: "testNamespace",
},
Spec: testSpec,
}

settingsToReturn := mockConfigurationSettings()
firstClient := ConfigurationClientWrapper{
Client: nil,
Endpoint: endpointName,
BackOffEndTime: metav1.Time{},
FailedAttempts: 0,
}

secondClient := ConfigurationClientWrapper{
Client: nil,
Endpoint: endpointName,
BackOffEndTime: metav1.Time{},
FailedAttempts: 0,
}

etags := make(map[acpv1.Sentinel]*azcore.ETag)
firstSettingsResponse := &SettingsResponse{
Settings: settingsToReturn,
}
secondSettingsResponse := &SettingsResponse{}
mockSettingsClient.EXPECT().GetSettings(gomock.Any(), gomock.Any()).Return(firstSettingsResponse, nil).Times(1)
mockSettingsClient.EXPECT().GetSettings(gomock.Any(), gomock.Any()).Return(secondSettingsResponse, nil).Times(1)
mockCongiurationClientManager.EXPECT().GetClients(gomock.Any()).Return([]*ConfigurationClientWrapper{&firstClient, &secondClient}, nil).Times(2)
configurationProvider, _ := NewConfigurationSettingLoader(testProvider, mockCongiurationClientManager, mockSettingsClient)
allSettings, err := configurationProvider.CreateTargetSettings(context.Background(), mockResolveSecretReference)
sentinelChanged, _, _ := configurationProvider.CheckAndRefreshSentinels(context.Background(), &testProvider, etags)

Expect(err).Should(BeNil())
Expect(firstClient.FailedAttempts).Should(Equal(-2))
Expect(secondClient.FailedAttempts).Should(Equal(0))
Expect(sentinelChanged).Should(BeFalse())
Expect(len(allSettings.ConfigMapSettings)).Should(Equal(6))
Expect(allSettings.ConfigMapSettings["someKey1"]).Should(Equal("value1"))
Expect(allSettings.ConfigMapSettings["app:"]).Should(Equal("value2"))
Expect(allSettings.ConfigMapSettings["test:"]).Should(Equal("value3"))
Expect(allSettings.ConfigMapSettings["app:someSubKey1:1"]).Should(Equal("value4"))
Expect(allSettings.ConfigMapSettings["app:test:some"]).Should(Equal("value5"))
Expect(allSettings.ConfigMapSettings["app:test:"]).Should(Equal("value6"))
})
})
})

Expand Down
3 changes: 3 additions & 0 deletions internal/loader/configuration_client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (

type ConfigurationClientManager struct {
ReplicaDiscoveryEnabled bool
LoadBalancingEnabled bool
IsFailoverRequest bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's required to be a property in ConfigurationClientManager, we don't need to remember the state of IsFailoverRequest

StaticClientWrappers []*ConfigurationClientWrapper
DynamicClientWrappers []*ConfigurationClientWrapper
validDomain string
Expand Down Expand Up @@ -93,6 +95,7 @@ var (
func NewConfigurationClientManager(ctx context.Context, provider acpv1.AzureAppConfigurationProvider) (ClientManager, error) {
manager := &ConfigurationClientManager{
ReplicaDiscoveryEnabled: provider.Spec.ReplicaDiscoveryEnabled,
LoadBalancingEnabled: provider.Spec.LoadBalancingEnabled,
}

var err error
Expand Down
71 changes: 64 additions & 7 deletions internal/loader/configuration_setting_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import (

type ConfigurationSettingLoader struct {
acpv1.AzureAppConfigurationProvider
ClientManager ClientManager
SettingsClient SettingsClient
ClientManager ClientManager
SettingsClient SettingsClient
lastSuccessfulEndpoint string
}

type TargetKeyValueSettings struct {
Expand Down Expand Up @@ -104,6 +105,7 @@ func NewConfigurationSettingLoader(provider acpv1.AzureAppConfigurationProvider,
AzureAppConfigurationProvider: provider,
ClientManager: clientManager,
SettingsClient: settingsClient,
lastSuccessfulEndpoint: "",
}, nil
}

Expand Down Expand Up @@ -502,27 +504,50 @@ func (csl *ConfigurationSettingLoader) ExecuteFailoverPolicy(ctx context.Context
return nil, fmt.Errorf("no client is available to connect to the target App Configuration store")
}

if value, ok := os.LookupEnv(RequestTracingEnabled); ok {
if enabled, _ := strconv.ParseBool(value); enabled {
ctx = policy.WithHTTPHeader(ctx, createCorrelationContextHeader(ctx, csl.AzureAppConfigurationProvider, csl.ClientManager))
if csl.AzureAppConfigurationProvider.Spec.LoadBalancingEnabled && csl.lastSuccessfulEndpoint != "" && len(clients) > 1 {
nextClientIndex := 0
for _, clientWrapper := range clients {
nextClientIndex++
if clientWrapper.Endpoint == csl.lastSuccessfulEndpoint {
break
}
}

// If we found the last successful client,we'll rotate the list so that the next client is at the beginning
if nextClientIndex < len(clients) {
rotate(clients, nextClientIndex)
}
}

errors := make([]error, 0)
var tracingEnabled bool
if value, ok := os.LookupEnv(RequestTracingEnabled); ok {
tracingEnabled, _ = strconv.ParseBool(value)
}
for _, clientWrapper := range clients {
successful := true
if tracingEnabled {
ctx = policy.WithHTTPHeader(ctx, createCorrelationContextHeader(ctx, csl.AzureAppConfigurationProvider, csl.ClientManager))
RichardChen820 marked this conversation as resolved.
Show resolved Hide resolved
}
settingsResponse, err := settingsClient.GetSettings(ctx, clientWrapper.Client)
successful := true
if err != nil {
successful = false
updateClientBackoffStatus(clientWrapper, successful)
if IsFailoverable(err) {
klog.Warningf("current client of '%s' failed to get settings: %s", clientWrapper.Endpoint, err.Error())
errors = append(errors, err)
if manager, ok := csl.ClientManager.(*ConfigurationClientManager); ok {
manager.IsFailoverRequest = true
}
continue
}
return nil, err
}

if manager, ok := csl.ClientManager.(*ConfigurationClientManager); ok {
manager.IsFailoverRequest = false
}
csl.lastSuccessfulEndpoint = clientWrapper.Endpoint
updateClientBackoffStatus(clientWrapper, successful)
return settingsResponse, nil
}
Expand All @@ -535,8 +560,17 @@ func (csl *ConfigurationSettingLoader) ExecuteFailoverPolicy(ctx context.Context
func updateClientBackoffStatus(clientWrapper *ConfigurationClientWrapper, successful bool) {
if successful {
clientWrapper.BackOffEndTime = metav1.Time{}
clientWrapper.FailedAttempts = 0
// Reset FailedAttempts when client succeeded
if clientWrapper.FailedAttempts > 0 {
clientWrapper.FailedAttempts = 0
}
// Use negative value to indicate that successful attempt
clientWrapper.FailedAttempts--
RichardChen820 marked this conversation as resolved.
Show resolved Hide resolved
} else {
//Reset FailedAttempts when client failed
if clientWrapper.FailedAttempts < 0 {
clientWrapper.FailedAttempts = 0
}
clientWrapper.FailedAttempts++
clientWrapper.BackOffEndTime = metav1.Time{Time: metav1.Now().Add(calculateBackoffDuration(clientWrapper.FailedAttempts))}
}
Expand Down Expand Up @@ -796,3 +830,26 @@ func MergeSecret(secret map[string]corev1.Secret, newSecret map[string]corev1.Se

return nil
}

// rotates the slice to the left by k positions
func rotate(clients []*ConfigurationClientWrapper, k int) {
n := len(clients)
k = k % n
if k == 0 {
return
}
// Reverse the entire slice
reverseClients(clients, 0, n-1)
// Reverse the first part
reverseClients(clients, 0, n-k-1)
// Reverse the second part
reverseClients(clients, n-k, n-1)
}

func reverseClients(clients []*ConfigurationClientWrapper, start, end int) {
for start < end {
clients[start], clients[end] = clients[end], clients[start]
start++
end--
}
}
Loading
Loading