Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Update flyteidl version #467

Merged
merged 10 commits into from
Sep 30, 2022
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.1.10
github.com/flyteorg/flyteidl v1.1.19
github.com/flyteorg/flyteplugins v1.0.15
github.com/flyteorg/flytestdlib v1.0.5
github.com/ghodss/yaml v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.1.10 h1:Bus/JUto0oBTjAS4EBN7EITeuZNS4naq+uFpj+ydaW4=
github.com/flyteorg/flyteidl v1.1.10/go.mod h1:f1tvw5CDjqmrzNxKpRYr6BdAhHL8f7Wp1Duxl0ZOV4g=
github.com/flyteorg/flyteidl v1.1.19 h1:1CtSbuFhFHwUbKdv66PqbcER01iacAJU+snh0eTsXc4=
github.com/flyteorg/flyteidl v1.1.19/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U=
github.com/flyteorg/flyteplugins v1.0.15 h1:LewZIw2qSyGy34OoghYeuc7N/KazeVZvD0gNYXt/ZcM=
github.com/flyteorg/flyteplugins v1.0.15/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84=
github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c=
Expand Down
12 changes: 9 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,20 @@ func newControllerMetrics(scope promutils.Scope) *metrics {
}
}

func getAdminClient(ctx context.Context) (client service.AdminServiceClient, opt grpc.DialOption, err error) {
func getAdminClient(ctx context.Context) (client service.AdminServiceClient, opt []grpc.DialOption, err error) {
cfg := admin.GetConfig(ctx)
clients, err := admin.NewClientsetBuilder().WithConfig(cfg).Build(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize clientset. Error: %w", err)
}

return clients.AdminClient(), clients.AuthOpt(), nil
credentialsFuture := admin.NewPerRPCCredentialsFuture()
opts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(admin.NewAuthInterceptor(cfg, nil, credentialsFuture)),
grpc.WithPerRPCCredentials(credentialsFuture),
}

return clients.AdminClient(), opts, nil
}

// New returns a new FlyteWorkflow controller
Expand Down Expand Up @@ -413,7 +419,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
}

logger.Info(ctx, "Setting up Catalog client.")
catalogClient, err := catalog.NewCatalogClient(ctx, authOpts)
catalogClient, err := catalog.NewCatalogClient(ctx, authOpts...)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create datacatalog client")
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/nodes/task/catalog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,19 @@ type Config struct {
DefaultServiceConfig string `json:"default-service-config" pflag:"\"\", Set the default service config for the catalog gRPC client"`
}

// Gets loaded config for Discovery
// GetConfig gets loaded config for Discovery
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func NewCatalogClient(ctx context.Context, authOpt grpc.DialOption) (catalog.Client, error) {
func NewCatalogClient(ctx context.Context, authOpt ...grpc.DialOption) (catalog.Client, error) {
catalogConfig := GetConfig()

switch catalogConfig.Type {
case DataCatalogType:
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig, authOpt)
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig,
authOpt...)
case NoOpDiscoveryType, "":
return NOOPCatalog{}, nil
}
Expand Down
15 changes: 5 additions & 10 deletions pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"
grpcMiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
Expand Down Expand Up @@ -352,11 +351,11 @@ func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key,
return nil
}

// Create a new Datacatalog client for task execution caching
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, defaultServiceConfig string, authOpt grpc.DialOption) (*CatalogClient, error) {
// NewDataCatalog creates a new Datacatalog client for task execution caching
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CatalogClient, error) {
var opts []grpc.DialOption
if useAdminAuth && authOpt != nil {
opts = append(opts, authOpt)
opts = append(opts, authOpt...)
}

grpcOptions := []grpcRetry.CallOption{
Expand Down Expand Up @@ -385,12 +384,8 @@ func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection boo

retryInterceptor := grpcRetry.UnaryClientInterceptor(grpcOptions...)

finalUnaryInterceptor := grpcMiddleware.ChainUnaryClient(
grpcPrometheus.UnaryClientInterceptor,
retryInterceptor,
)

opts = append(opts, grpc.WithUnaryInterceptor(finalUnaryInterceptor))
opts = append(opts, grpc.WithChainUnaryInterceptor(grpcPrometheus.UnaryClientInterceptor,
retryInterceptor))
clientConn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return nil, err
Expand Down