Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Ensure token is refreshed on Unauthenticated" #5404

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions flytectl/cmd/core/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func generateCommandFunc(cmdEntry CommandEntry) func(cmd *cobra.Command, args []
cmdCtx := NewCommandContextNoClient(cmd.OutOrStdout())
if !cmdEntry.DisableFlyteClient {
clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).
WithTokenCache(pkce.NewTokenCacheKeyringProvider(
pkce.KeyRingServiceName,
fmt.Sprintf("%s:%s", adminCfg.Endpoint.String(), pkce.KeyRingServiceUser),
)).Build(ctx)
WithTokenCache(pkce.TokenCacheKeyringProvider{
ServiceUser: fmt.Sprintf("%s:%s", adminCfg.Endpoint.String(), pkce.KeyRingServiceUser),
ServiceName: pkce.KeyRingServiceName,
}).Build(ctx)
if err != nil {
return err
}
Expand Down
66 changes: 6 additions & 60 deletions flytectl/pkg/pkce/token_cache_keyring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,68 +3,23 @@ package pkce
import (
"encoding/json"
"fmt"
"sync"

"github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache"

"github.com/zalando/go-keyring"
"golang.org/x/oauth2"
)

const (
KeyRingServiceUser = "flytectl-user"
KeyRingServiceName = "flytectl"
)

// TokenCacheKeyringProvider wraps the logic to save and retrieve tokens from the OS's keyring implementation.
type TokenCacheKeyringProvider struct {
ServiceName string
ServiceUser string
mu *sync.Mutex
cond *sync.Cond
}

func (t *TokenCacheKeyringProvider) PurgeIfEquals(existing *oauth2.Token) (bool, error) {
if existingBytes, err := json.Marshal(existing); err != nil {
return false, fmt.Errorf("unable to marshal token to save in cache due to %w", err)
} else if tokenJSON, err := keyring.Get(t.ServiceName, t.ServiceUser); err != nil {
if err.Error() == "secret not found in keyring" {
return false, fmt.Errorf("unable to read token from cache. Error: %w", cache.ErrNotFound)
}

return false, fmt.Errorf("unable to read token from cache. Error: %w", err)
} else if tokenJSON != string(existingBytes) {
return false, nil
}

_ = keyring.Delete(t.ServiceName, t.ServiceUser)
return true, nil
}

func (t *TokenCacheKeyringProvider) Lock() {
t.mu.Lock()
}

func (t *TokenCacheKeyringProvider) Unlock() {
t.mu.Unlock()
}

// TryLock the cache.
func (t *TokenCacheKeyringProvider) TryLock() bool {
return t.mu.TryLock()
}

// CondWait waits for the condition to be true.
func (t *TokenCacheKeyringProvider) CondWait() {
t.cond.Wait()
}

// CondBroadcast signals the condition.
func (t *TokenCacheKeyringProvider) CondBroadcast() {
t.cond.Broadcast()
}
const (
KeyRingServiceUser = "flytectl-user"
KeyRingServiceName = "flytectl"
)

func (t *TokenCacheKeyringProvider) SaveToken(token *oauth2.Token) error {
func (t TokenCacheKeyringProvider) SaveToken(token *oauth2.Token) error {
var tokenBytes []byte
if token.AccessToken == "" {
return fmt.Errorf("cannot save empty token with expiration %v", token.Expiry)
Expand All @@ -83,7 +38,7 @@ func (t *TokenCacheKeyringProvider) SaveToken(token *oauth2.Token) error {
return nil
}

func (t *TokenCacheKeyringProvider) GetToken() (*oauth2.Token, error) {
func (t TokenCacheKeyringProvider) GetToken() (*oauth2.Token, error) {
// get saved token
tokenJSON, err := keyring.Get(t.ServiceName, t.ServiceUser)
if len(tokenJSON) == 0 {
Expand All @@ -101,12 +56,3 @@ func (t *TokenCacheKeyringProvider) GetToken() (*oauth2.Token, error) {

return &token, nil
}

func NewTokenCacheKeyringProvider(serviceName, serviceUser string) *TokenCacheKeyringProvider {
return &TokenCacheKeyringProvider{
mu: &sync.Mutex{},
cond: sync.NewCond(&sync.Mutex{}),
ServiceName: serviceName,
ServiceUser: serviceUser,
}
}
52 changes: 7 additions & 45 deletions flyteidl/clients/go/admin/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

// MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server.
// Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values.
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache,
perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {
authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyCredentialsFuture)
if err != nil {
return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err)
Expand All @@ -43,17 +42,11 @@

tokenSource, err := tokenSourceProvider.GetTokenSource(ctx)
if err != nil {
return fmt.Errorf("failed to get token source. Error: %w", err)
}

_, err = tokenSource.Token()
if err != nil {
return fmt.Errorf("failed to issue token. Error: %w", err)
return err

Check warning on line 45 in flyteidl/clients/go/admin/auth_interceptor.go

View check run for this annotation

Codecov / codecov/patch

flyteidl/clients/go/admin/auth_interceptor.go#L45

Added line #L45 was not covered by tests
}

wrappedTokenSource := NewCustomHeaderTokenSource(tokenSource, cfg.UseInsecureConnection, authorizationMetadataKey)
perRPCCredentials.Store(wrappedTokenSource)

return nil
}

Expand Down Expand Up @@ -141,50 +134,19 @@
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = setHTTPClientContext(ctx, cfg, proxyCredentialsFuture)

// If there is already a token in the cache (e.g. key-ring), we should use it immediately...
t, _ := tokenCache.GetToken()
if t != nil {
err := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if err != nil {
return fmt.Errorf("failed to materialize credentials. Error: %v", err)
}
}

err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err)

if st, ok := status.FromError(err); ok {
// If the error we receive from executing the request expects
if shouldAttemptToAuthenticate(st.Code()) {
err = func() error {
if !tokenCache.TryLock() {
tokenCache.CondWait()
return nil
}

defer tokenCache.Unlock()
_, err := tokenCache.PurgeIfEquals(t)
if err != nil && !errors.Is(err, cache.ErrNotFound) {
logger.Errorf(ctx, "Failed to purge cache. Error [%v]", err)
return fmt.Errorf("failed to purge cache. Error: %w", err)
}

logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if newErr != nil {
errString := fmt.Sprintf("authentication error! Original Error: %v, Auth Error: %v", err, newErr)
logger.Errorf(ctx, errString)
return fmt.Errorf(errString)
}

tokenCache.CondBroadcast()
return nil
}()

if err != nil {
return err
logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if newErr != nil {
return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr)

Check warning on line 147 in flyteidl/clients/go/admin/auth_interceptor.go

View check run for this annotation

Codecov / codecov/patch

flyteidl/clients/go/admin/auth_interceptor.go#L147

Added line #L147 was not covered by tests
}

return invoker(ctx, method, req, reply, cc, opts...)
}
}
Expand Down
Loading
Loading