Skip to content

Commit

Permalink
[loki] Azure auth using Service Principal (grafana#7179)
Browse files Browse the repository at this point in the history
This PR adds the ability to use Service Principal credentials to
authenticate through Azure OAuth. It might be needed when we want to
access Azure Blob storage from another tenant.
Previously we could use only StorageAccount key that can not be used
cross-tenant.
  • Loading branch information
vlad-diachenko authored and lxwzy committed Nov 7, 2022
1 parent 35965c8 commit 4335448
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Loki

##### Enhancements
* [7179](https://github.com/grafana/loki/pull/7179) **vlad-diachenko**: Add ability to use Azure Service Principals credentials to authenticate to Azure Blob Storage.
* [7101](https://github.com/grafana/loki/pull/7101) **liguozhong**: Promtail: Add support for max stream limit.
* [7063](https://github.com/grafana/loki/pull/7063) **kavirajk**: Add additional `push` mode to Loki canary that can directly push logs to given Loki URL.
* [7069](https://github.com/grafana/loki/pull/7069) **periklis**: Add support for custom internal server listener for readiness probes.
Expand Down
76 changes: 55 additions & 21 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
Expand Down Expand Up @@ -72,21 +73,25 @@ var (

// BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage.
type BlobStorageConfig struct {
Environment string `yaml:"environment"`
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
ContainerName string `yaml:"container_name"`
Endpoint string `yaml:"endpoint_suffix"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
UserAssignedID string `yaml:"user_assigned_id"`
ChunkDelimiter string `yaml:"chunk_delimiter"`
DownloadBufferSize int `yaml:"download_buffer_size"`
UploadBufferSize int `yaml:"upload_buffer_size"`
UploadBufferCount int `yaml:"upload_buffer_count"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MaxRetries int `yaml:"max_retries"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
Environment string `yaml:"environment"`
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
ContainerName string `yaml:"container_name"`
Endpoint string `yaml:"endpoint_suffix"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
UserAssignedID string `yaml:"user_assigned_id"`
UseServicePrincipal bool `yaml:"use_service_principal"`
ClientID string `yaml:"client_id"`
ClientSecret flagext.Secret `yaml:"client_secret"`
TenantID string `yaml:"tenant_id"`
ChunkDelimiter string `yaml:"chunk_delimiter"`
DownloadBufferSize int `yaml:"download_buffer_size"`
UploadBufferSize int `yaml:"upload_buffer_size"`
UploadBufferCount int `yaml:"upload_buffer_count"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MaxRetries int `yaml:"max_retries"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -111,6 +116,10 @@ func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS
f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.")
f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.")
f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.")
f.BoolVar(&c.UseServicePrincipal, prefix+"azure.use-service-principal", false, "Use Service Principal to authenticate through Azure OAuth.")
f.StringVar(&c.TenantID, prefix+"azure.tenant-id", "", "Azure Tenant ID is used to authenticate through Azure OAuth.")
f.StringVar(&c.ClientID, prefix+"azure.client-id", "", "Azure Service Principal ID(GUID).")
f.Var(&c.ClientSecret, prefix+"azure.client-id", "Azure Service Principal secret key.")
}

type BlobStorageMetrics struct {
Expand Down Expand Up @@ -159,6 +168,8 @@ type BlobStorage struct {

pipeline pipeline.Pipeline
hedgingPipeline pipeline.Pipeline
tc azblob.TokenCredential
lock sync.Mutex
}

// NewBlobStorage creates a new instance of the BlobStorage struct.
Expand Down Expand Up @@ -305,7 +316,7 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
})
}

if !b.cfg.UseManagedIdentity && b.cfg.UserAssignedID == "" {
if !b.cfg.UseManagedIdentity && !b.cfg.UseServicePrincipal && b.cfg.UserAssignedID == "" {
credential, err := azblob.NewSharedKeyCredential(b.cfg.StorageAccountName, b.cfg.StorageAccountKey.String())
if err != nil {
return nil, err
Expand All @@ -319,10 +330,17 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
return nil, err
}

return azblob.NewPipeline(*tokenCredential, opts), nil
return azblob.NewPipeline(tokenCredential, opts), nil
}

func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
func (b *BlobStorage) getOAuthToken() (azblob.TokenCredential, error) {
b.lock.Lock()
defer b.lock.Unlock()

// this method is called a few times when we create each Pipeline, so we need to re-use TokenCredentials.
if b.tc != nil {
return b.tc, nil
}
spt, err := b.getServicePrincipalToken()
if err != nil {
return nil, err
Expand All @@ -334,7 +352,7 @@ func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
return nil, err
}

tc := azblob.NewTokenCredential(spt.Token().AccessToken, func(tc azblob.TokenCredential) time.Duration {
b.tc = azblob.NewTokenCredential(spt.Token().AccessToken, func(tc azblob.TokenCredential) time.Duration {
err := spt.Refresh()
if err != nil {
// something went wrong, prevent the refresher from being triggered again
Expand All @@ -347,8 +365,7 @@ func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
// get the next token slightly before the current one expires
return time.Until(spt.Token().Expires()) - 10*time.Second
})

return &tc, nil
return b.tc, nil
}

func (b *BlobStorage) getServicePrincipalToken() (*adal.ServicePrincipalToken, error) {
Expand All @@ -361,6 +378,12 @@ func (b *BlobStorage) getServicePrincipalToken() (*adal.ServicePrincipalToken, e

resource := fmt.Sprintf("https://%s.%s", b.cfg.StorageAccountName, endpoint)

if b.cfg.UseServicePrincipal {
config := auth.NewClientCredentialsConfig(b.cfg.ClientID, b.cfg.ClientSecret.String(), b.cfg.TenantID)
config.Resource = resource
return config.ServicePrincipalToken()
}

msiConfig := auth.MSIConfig{
Resource: resource,
}
Expand Down Expand Up @@ -431,6 +454,17 @@ func (c *BlobStorageConfig) Validate() error {
if !util.StringsContain(supportedEnvironments, c.Environment) {
return fmt.Errorf("unsupported Azure blob storage environment: %s, please select one of: %s ", c.Environment, strings.Join(supportedEnvironments, ", "))
}
if c.UseServicePrincipal {
if strings.TrimSpace(c.TenantID) == "" {
return fmt.Errorf("tenant_id is required if authentication using Service Principal is enabled")
}
if strings.TrimSpace(c.ClientID) == "" {
return fmt.Errorf("client_id is required if authentication using Service Principal is enabled")
}
if strings.TrimSpace(c.ClientSecret.String()) == "" {
return fmt.Errorf("client_secret is required if authentication using Service Principal is enabled")
}
}
return nil
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/storage/chunk/client/azure/blob_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

Expand Down Expand Up @@ -143,3 +144,51 @@ func Test_EndpointSuffixWithBlob(t *testing.T) {
require.NoError(t, err)
require.Equal(t, *expect, bloburl.URL())
}

func Test_ConfigValidation(t *testing.T) {
t.Run("expected validation error if environment is not supported", func(t *testing.T) {
cfg := &BlobStorageConfig{
Environment: "",
}

require.EqualError(t, cfg.Validate(), "unsupported Azure blob storage environment: , please select one of: AzureGlobal, AzureChinaCloud, AzureGermanCloud, AzureUSGovernment ")
})
t.Run("expected validation error if tenant_id is empty and UseServicePrincipal is enabled", func(t *testing.T) {
cfg := createServicePrincipalStorageConfig("", "", "")

require.EqualError(t, cfg.Validate(), "tenant_id is required if authentication using Service Principal is enabled")
})
t.Run("expected validation error if client_id is empty and UseServicePrincipal is enabled", func(t *testing.T) {
cfg := createServicePrincipalStorageConfig("fake_tenant", "", "")

require.EqualError(t, cfg.Validate(), "client_id is required if authentication using Service Principal is enabled")
})
t.Run("expected validation error if client_secret is empty and UseServicePrincipal is enabled", func(t *testing.T) {
cfg := createServicePrincipalStorageConfig("fake_tenant", "fake_client", "")

require.EqualError(t, cfg.Validate(), "client_secret is required if authentication using Service Principal is enabled")
})
t.Run("expected no errors if UseServicePrincipal is enabled and required fields are set", func(t *testing.T) {
cfg := createServicePrincipalStorageConfig("fake_tenant", "fake_client", "fake_secret")

require.NoError(t, cfg.Validate())
})
t.Run("expected no errors if UseServicePrincipal is disabled and fields are empty", func(t *testing.T) {
cfg := &BlobStorageConfig{
Environment: azureGlobal,
UseServicePrincipal: false,
}

require.NoError(t, cfg.Validate())
})
}

func createServicePrincipalStorageConfig(tenantID string, clientID string, clientSecret string) *BlobStorageConfig {
return &BlobStorageConfig{
Environment: azureGlobal,
UseServicePrincipal: true,
TenantID: tenantID,
ClientID: clientID,
ClientSecret: flagext.SecretWithValue(clientSecret),
}
}

0 comments on commit 4335448

Please sign in to comment.