Skip to content

Commit

Permalink
Enable datacatalog client to use admin's auth option (flyteorg#389)
Browse files Browse the repository at this point in the history
* Enable datacatalog client to use admin's auth option
Signed-off-by: Sean Lin <[email protected]>

* goimport
Signed-off-by: Sean Lin <[email protected]>

* Update flyteidl version
Signed-off-by: Sean Lin <[email protected]>

* update idl version
Signed-off-by: Sean Lin <[email protected]>
  • Loading branch information
mayitbeegh authored Feb 1, 2022
1 parent 5047abf commit 3950f4a
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 18 deletions.
12 changes: 7 additions & 5 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"runtime/pprof"
"time"

"google.golang.org/grpc"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flytestdlib/promutils/labeled"

Expand Down Expand Up @@ -285,21 +287,21 @@ func newControllerMetrics(scope promutils.Scope) *metrics {
}
}

func getAdminClient(ctx context.Context) (client service.AdminServiceClient, err error) {
func getAdminClient(ctx context.Context) (client service.AdminServiceClient, opt grpc.DialOption, err error) {
cfg := admin.GetConfig(ctx)
clients, err := admin.NewClientsetBuilder().WithConfig(cfg).Build(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err)
return nil, nil, fmt.Errorf("failed to initialize clientset. Error: %w", err)
}

return clients.AdminClient(), nil
return clients.AdminClient(), clients.AuthOpt(), nil
}

// NewController returns a new FlyteWorkflow controller
func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
flyteworkflowInformerFactory informers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error) {

adminClient, err := getAdminClient(ctx)
adminClient, authOpts, err := getAdminClient(ctx)
if err != nil {
logger.Errorf(ctx, "failed to initialize Admin client, err :%s", err.Error())
return nil, err
Expand Down Expand Up @@ -382,7 +384,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
}

logger.Info(ctx, "Setting up Catalog client.")
catalogClient, err := catalog.NewCatalogClient(ctx)
catalogClient, err := catalog.NewCatalogClient(ctx, authOpts)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create datacatalog client")
}
Expand Down
14 changes: 8 additions & 6 deletions flytepropeller/pkg/controller/nodes/task/catalog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flytestdlib/config"
"google.golang.org/grpc"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog/datacatalog"
)
Expand All @@ -30,23 +31,24 @@ const (
)

type Config struct {
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
}

// Gets loaded config for Discovery
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func NewCatalogClient(ctx context.Context) (catalog.Client, error) {
func NewCatalogClient(ctx context.Context, authOpt grpc.DialOption) (catalog.Client, error) {
catalogConfig := GetConfig()

switch catalogConfig.Type {
case DataCatalogType:
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration)
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, authOpt)
case NoOpDiscoveryType, "":
return NOOPCatalog{}, nil
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@ func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key,
}

// Create a new Datacatalog client for task execution caching
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration) (*CatalogClient, error) {
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, authOpt grpc.DialOption) (*CatalogClient, error) {
var opts []grpc.DialOption
if useAdminAuth && authOpt != nil {
opts = append(opts, authOpt)
}

grpcOptions := []grpcRetry.CallOption{
grpcRetry.WithBackoff(grpcRetry.BackoffLinear(100 * time.Millisecond)),
Expand Down
12 changes: 6 additions & 6 deletions flytepropeller/pkg/controller/workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) {
enqueueWorkflow := func(workflowId v1alpha1.WorkflowID) {}

eventSink := eventMocks.NewMockEventSink()
catalogClient, err := catalog.NewCatalogClient(ctx)
catalogClient, err := catalog.NewCatalogClient(ctx, nil)
assert.NoError(t, err)
recoveryClient := &recoveryMocks.RecoveryClient{}

Expand Down Expand Up @@ -318,7 +318,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) {
enqueueWorkflow := func(workflowId v1alpha1.WorkflowID) {}

eventSink := eventMocks.NewMockEventSink()
catalogClient, err := catalog.NewCatalogClient(ctx)
catalogClient, err := catalog.NewCatalogClient(ctx, nil)
assert.NoError(t, err)
recoveryClient := &recoveryMocks.RecoveryClient{}

Expand Down Expand Up @@ -382,7 +382,7 @@ func BenchmarkWorkflowExecutor(b *testing.B) {
enqueueWorkflow := func(workflowId v1alpha1.WorkflowID) {}

eventSink := eventMocks.NewMockEventSink()
catalogClient, err := catalog.NewCatalogClient(ctx)
catalogClient, err := catalog.NewCatalogClient(ctx, nil)
assert.NoError(b, err)
recoveryClient := &recoveryMocks.RecoveryClient{}
adminClient := launchplan.NewFailFastLaunchPlanExecutor()
Expand Down Expand Up @@ -492,7 +492,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) {
}
return nil
}
catalogClient, err := catalog.NewCatalogClient(ctx)
catalogClient, err := catalog.NewCatalogClient(ctx, nil)
assert.NoError(t, err)
recoveryClient := &recoveryMocks.RecoveryClient{}
adminClient := launchplan.NewFailFastLaunchPlanExecutor()
Expand Down Expand Up @@ -588,7 +588,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) {
}
return nil
}
catalogClient, err := catalog.NewCatalogClient(ctx)
catalogClient, err := catalog.NewCatalogClient(ctx, nil)
assert.NoError(t, err)
adminClient := launchplan.NewFailFastLaunchPlanExecutor()
recoveryClient := &recoveryMocks.RecoveryClient{}
Expand Down Expand Up @@ -645,7 +645,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) {
assert.NoError(t, err)

nodeEventSink := eventMocks.NewMockEventSink()
catalogClient, err := catalog.NewCatalogClient(ctx)
catalogClient, err := catalog.NewCatalogClient(ctx, nil)
assert.NoError(t, err)
recoveryClient := &recoveryMocks.RecoveryClient{}

Expand Down

0 comments on commit 3950f4a

Please sign in to comment.