Skip to content

Commit

Permalink
Update Azure Service Bus components to track2 SDK (#1702)
Browse files Browse the repository at this point in the history
* Updated to Go 1.18

Signed-off-by: Alessandro (Ale) Segala <[email protected]>
Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* Updated Azure SDKs that are on track2
Includes some minor refactoring of auth code

Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* Updated Service Bus components to track2 SDK

Co-authored-by: halspang <[email protected]>
Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* Initial Certification test for eventhubs binding [incomplete] (#1670)

* certification test for eventhubs binding

Signed-off-by: tanvigour <[email protected]>

* modified go.mod and go.sum

Signed-off-by: tanvigour <[email protected]>

* Add connection string testing

Signed-off-by: tanvigour <[email protected]>

* iothub testing

Signed-off-by: tanvigour <[email protected]>

* address feedback and run test

Signed-off-by: tanvigour <[email protected]>

* Install Azure CLI IOT hub extension

Signed-off-by: Bernd Verst <[email protected]>

* make modtidy-all

Signed-off-by: Bernd Verst <[email protected]>

* covering all eventhubs test cases

Signed-off-by: tanvigour <[email protected]>

* dependency changes after go modtidy-all

Signed-off-by: tanvigour <[email protected]>

Co-authored-by: Bernd Verst <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
Co-authored-by: Looong Dai <[email protected]>
Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* Use revive instead of golint (#1685)

Signed-off-by: pigletfly <[email protected]>

Co-authored-by: Yaron Schneider <[email protected]>
Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* Updated to Go 1.18 (#1697)

* Updated to Go 1.18

Signed-off-by: Alessandro (Ale) Segala <[email protected]>

* Added go.work file
With Go 1.18, this allows gopls (the Go language server used for example in VS Code) to work inside test apps too.
See: https://go.dev/doc/tutorial/workspaces

Signed-off-by: ItalyPaleAle <[email protected]>

Signed-off-by: ItalyPaleAle <[email protected]>

* Removed go.work

Signed-off-by: ItalyPaleAle <[email protected]>

* 💄

Signed-off-by: ItalyPaleAle <[email protected]>

Co-authored-by: Bernd Verst <[email protected]>
Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* 💄 & 🧹

Signed-off-by: ItalyPaleAle <[email protected]>
Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* Add metadata property to configure Batching in Pulsar (#1707)

* Add metadata property to configure BatchingMaxSize&batchingMaxMessages in Pulsar
Signed-off-by: saberwang <[email protected]>

* sort field
Signed-off-by: saberwang <[email protected]>

* [pubsub]fix unit test bug
Signed-off-by: saberwang <[email protected]>

* remove unrelated changes
Signed-off-by: saberwang <[email protected]>

* Delete hard coded Metadata
Signed-off-by: saberwang <[email protected]>

* remove  .history

Signed-off-by: saberwang <[email protected]>

* restore .gitignore

Signed-off-by: saberwang <[email protected]>

* Hard coding default values and adding 'BatchingMaxPublishDelay' metadata

Signed-off-by: saberwang <[email protected]>

* fix code format

Signed-off-by: saberwang <[email protected]>

* formatting code

Signed-off-by: saberwang <[email protected]>

Co-authored-by: Looong Dai <[email protected]>
Co-authored-by: Bernd Verst <[email protected]>
Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* This test can't work with track2 SDKs
The methods to create a message with a body are not exported

Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* 🧹

Signed-off-by: Alessandro Segala (ItalyPaleAle) <[email protected]>

* There's such thing as too much logging

Signed-off-by: ItalyPaleAle <[email protected]>

* Refactored subscription.go
This greatly simplifies certain parts of the code, reducing the number of goroutines and likely improving performance.
Performance for end-users improves too as there's no need anymore to pause for 2 seconds every time that we reach `maxActiveMessages`.
Additionally, with this change the config options `prefetchCount` and `maxActiveMessagesRecoveryInSec` are removed as unnecessary anymore.

Signed-off-by: ItalyPaleAle <[email protected]>

* 💄

Signed-off-by: ItalyPaleAle <[email protected]>

* Fixed pubsub tests

Signed-off-by: ItalyPaleAle <[email protected]>

* These packages should have never been upgraded

Signed-off-by: ItalyPaleAle <[email protected]>

* Ensuring we don't fetch 1 message more than max active

Signed-off-by: ItalyPaleAle <[email protected]>

* Adding configurable timeout for servicebusqueues operations

Signed-off-by: ItalyPaleAle <[email protected]>

* Persistent connection for invoking SB queues

Signed-off-by: ItalyPaleAle <[email protected]>

* Reverted Event Hub SDK update

Signed-off-by: ItalyPaleAle <[email protected]>

* Revert "Reverted Event Hub SDK update"

This reverts commit 212220a.

Signed-off-by: ItalyPaleAle <[email protected]>

* Fix Azure deploy for users with a first.last email

Signed-off-by: ItalyPaleAle <[email protected]>

* Added some sleep that should help reduce flakiness in eventhubs binding cert test

Signed-off-by: ItalyPaleAle <[email protected]>

* Changed servicebusqueue cert test
In case of a failure in the handler (ie. users' code), the message should be correctly re-enqueued, which means that messages will be re-delivered later and won't be in order.

Signed-off-by: ItalyPaleAle <[email protected]>

Co-authored-by: halspang <[email protected]>
Co-authored-by: tanvigour <[email protected]>
Co-authored-by: Bernd Verst <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
Co-authored-by: Looong Dai <[email protected]>
Co-authored-by: Wang Bing <[email protected]>
Co-authored-by: saber-wang <[email protected]>
  • Loading branch information
8 people authored May 13, 2022
1 parent 785ed60 commit e5e9011
Show file tree
Hide file tree
Showing 31 changed files with 1,200 additions and 1,189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ STORAGE_CONTAINER_VAR_NAME="AzureBlobStorageContainer"
STORAGE_QUEUE_VAR_NAME="AzureBlobStorageQueue"

# Derived variables
ADMIN_ID="$(az ad user list --filter "mail eq '${ADMIN_UPN}'" --query "[].objectId" --output tsv)"
ADMIN_ID="$(az ad user list --filter "userPrincipalName eq '${ADMIN_UPN}'" --query "[].objectId" --output tsv)"
if [[ -z "${ADMIN_ID}" ]]; then
echo "Could not find user with upn ${ADMIN_UPN}"
exit 1
fi
SUB_ID="$(az account show --query "id" --output tsv)"
TENANT_ID="$(az account show --query "tenantId" --output tsv)"
DEPLOY_NAME="${PREFIX}-azure-conf-test"
Expand Down
13 changes: 11 additions & 2 deletions authentication/azure/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/adal"
Expand Down Expand Up @@ -253,7 +254,11 @@ func (c CredentialsConfig) ServicePrincipalToken() (*adal.ServicePrincipalToken,
// GetTokenCredential returns the azcore.TokenCredential object from the credentials.
func (c CredentialsConfig) GetTokenCredential() (token azcore.TokenCredential, err error) {
return azidentity.NewClientSecretCredential(c.TenantID, c.ClientID, c.ClientSecret, &azidentity.ClientSecretCredentialOptions{
AuthorityHost: azidentity.AuthorityHost(c.AADEndpoint),
ClientOptions: azcore.ClientOptions{
Cloud: cloud.Configuration{
LoginEndpoint: c.AADEndpoint,
},
},
})
}

Expand Down Expand Up @@ -334,7 +339,11 @@ func (c CertConfig) GetTokenCredential() (token azcore.TokenCredential, err erro
// Create the azcore.TokenCredential object
certs := []*x509.Certificate{cert}
opts := &azidentity.ClientCertificateCredentialOptions{
AuthorityHost: azidentity.AuthorityHost(c.AADEndpoint),
ClientOptions: azcore.ClientOptions{
Cloud: cloud.Configuration{
LoginEndpoint: c.AADEndpoint,
},
},
}
return azidentity.NewClientCertificateCredential(c.TenantID, c.ClientID, certs, key, opts)
}
Expand Down
11 changes: 7 additions & 4 deletions authentication/azure/auth_amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@ limitations under the License.

package azure

import "github.com/Azure/azure-amqp-common-go/v3/aad"
import (
amqpaad "github.com/Azure/azure-amqp-common-go/v3/aad"
)

const (
AzureServiceBusResourceName string = "servicebus"
AzureEventHubsResourceName string = "eventhubs"
)

// GetTokenProvider creates a TokenProvider for AAD retrieved from, in order:
// GetAMQPTokenProvider creates a TokenProvider for AAD for AMQP retrieved from, in order:
// 1. Client credentials
// 2. Client certificate
// 3. MSI.
func (s EnvironmentSettings) GetAADTokenProvider() (*aad.TokenProvider, error) {
func (s EnvironmentSettings) GetAMQPTokenProvider() (*amqpaad.TokenProvider, error) {
spt, err := s.GetServicePrincipalToken()
if err != nil {
return nil, err
}

return aad.NewJWTProvider(aad.JWTProviderWithAADToken(spt), aad.JWTProviderWithAzureEnvironment(s.AzureEnvironment))
return amqpaad.NewJWTProvider(amqpaad.JWTProviderWithAADToken(spt), amqpaad.JWTProviderWithAzureEnvironment(s.AzureEnvironment))
}
4 changes: 2 additions & 2 deletions bindings/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ func (a *AzureEventHubs) Init(metadata bindings.Metadata) error {
a.hub = hub
} else {
// Connect via AAD.
settings, sErr := azauth.NewEnvironmentSettings("eventhubs", metadata.Properties)
settings, sErr := azauth.NewEnvironmentSettings(azauth.AzureEventHubsResourceName, metadata.Properties)
if sErr != nil {
return sErr
}
a.eventHubSettings = settings
tokenProvider, tokenErr := a.eventHubSettings.GetAADTokenProvider()
tokenProvider, tokenErr := a.eventHubSettings.GetAMQPTokenProvider()
if tokenErr != nil {
return fmt.Errorf("%s %w", hubConnectionInitErrorMsg, err)
}
Expand Down
Loading

0 comments on commit e5e9011

Please sign in to comment.