From b9fb6e31eeb27a15c522995b0c0ea0f5536c0fe0 Mon Sep 17 00:00:00 2001 From: Honnix Date: Fri, 3 Jun 2022 00:29:51 +0200 Subject: [PATCH] Upgrade flyteidl, preload lb policy and add default service config for catalog client (#447) --- go.mod | 4 ++-- go.sum | 4 ++-- pkg/controller/controller.go | 1 + pkg/controller/nodes/task/catalog/config.go | 8 +++++++- pkg/controller/nodes/task/catalog/config_flags.go | 1 + .../nodes/task/catalog/config_flags_test.go | 14 ++++++++++++++ .../nodes/task/catalog/datacatalog/datacatalog.go | 6 +++++- propeller-config.yaml | 2 ++ 8 files changed, 34 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index e1b2f2023f..3b5dd5b117 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 - github.com/flyteorg/flyteidl v1.1.0 + github.com/flyteorg/flyteidl v1.1.5 github.com/flyteorg/flyteplugins v1.0.0 github.com/flyteorg/flytestdlib v1.0.0 github.com/ghodss/yaml v1.0.0 @@ -15,6 +15,7 @@ require ( github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.2.0 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/imdario/mergo v0.3.11 github.com/magiconair/properties v1.8.4 github.com/mitchellh/mapstructure v1.4.1 @@ -86,7 +87,6 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect github.com/googleapis/gnostic v0.5.1 // indirect - github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/go.sum b/go.sum index 26dab996cc..d39bcb7aff 100644 --- a/go.sum +++ b/go.sum @@ -243,8 +243,8 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v1.0.0/go.mod h1:JW0z1ZaHS9zWvDAwSMIyGhsf+V4zrzBBgh5IuqzMFCM= -github.com/flyteorg/flyteidl v1.1.0 h1:f8tdMXOuorS/d+4Ut2QarfDbdCOriK0S+EnlQzrwz9E= -github.com/flyteorg/flyteidl v1.1.0/go.mod h1:JW0z1ZaHS9zWvDAwSMIyGhsf+V4zrzBBgh5IuqzMFCM= +github.com/flyteorg/flyteidl v1.1.5 h1:awptNJfw2yESkdNOm1Pe9KPILAzVImkiViUFP1K7UPk= +github.com/flyteorg/flyteidl v1.1.5/go.mod h1:f1tvw5CDjqmrzNxKpRYr6BdAhHL8f7Wp1Duxl0ZOV4g= github.com/flyteorg/flyteplugins v1.0.0 h1:77hUJjiIxBmQ9rd3+cXjSGnzOVAFrSzCd59aIaYFB/8= github.com/flyteorg/flyteplugins v1.0.0/go.mod h1:4Cpn+9RfanIieTTh2XsuL6zPYXtsR5UDe8YaEmXONT4= github.com/flyteorg/flytestdlib v1.0.0 h1:gb99ignMsVcNTUmWzArtcIDdkRjyzQQVBkWNOQakiFg= diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 09c5337188..f29d8865b4 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -46,6 +46,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" + _ "google.golang.org/grpc/balancer/roundrobin" //nolint apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" diff --git a/pkg/controller/nodes/task/catalog/config.go b/pkg/controller/nodes/task/catalog/config.go index f4dfb31fff..b3ddd72be8 100644 --- a/pkg/controller/nodes/task/catalog/config.go +++ b/pkg/controller/nodes/task/catalog/config.go @@ -36,6 +36,12 @@ type Config struct { Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"` MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"` UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"` + + // Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md + // eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]} + // find the full schema here https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto#L625 + // Note that required packages may need to be preloaded to support certain service config. For example "google.golang.org/grpc/balancer/roundrobin" should be preloaded to have round-robin policy supported. + DefaultServiceConfig string `json:"default-service-config" pflag:"\"\", Set the default service config for the catalog gRPC client"` } // Gets loaded config for Discovery @@ -48,7 +54,7 @@ func NewCatalogClient(ctx context.Context, authOpt grpc.DialOption) (catalog.Cli switch catalogConfig.Type { case DataCatalogType: - return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, authOpt) + return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig, authOpt) case NoOpDiscoveryType, "": return NOOPCatalog{}, nil } diff --git a/pkg/controller/nodes/task/catalog/config_flags.go b/pkg/controller/nodes/task/catalog/config_flags.go index 004af8fe30..4bd40b20b4 100755 --- a/pkg/controller/nodes/task/catalog/config_flags.go +++ b/pkg/controller/nodes/task/catalog/config_flags.go @@ -55,5 +55,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "insecure"), defaultConfig.Insecure, " Use insecure grpc connection") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "max-cache-age"), defaultConfig.MaxCacheAge.String(), " Cache entries past this age will incur cache miss. 0 means cache never expires") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "use-admin-auth"), defaultConfig.UseAdminAuth, " Use the same gRPC credentials option as the flyteadmin client") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-service-config"), defaultConfig.DefaultServiceConfig, " Set the default service config for the catalog gRPC client") return cmdFlags } diff --git a/pkg/controller/nodes/task/catalog/config_flags_test.go b/pkg/controller/nodes/task/catalog/config_flags_test.go index 21bf7d253b..3b18a9282d 100755 --- a/pkg/controller/nodes/task/catalog/config_flags_test.go +++ b/pkg/controller/nodes/task/catalog/config_flags_test.go @@ -169,4 +169,18 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_default-service-config", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("default-service-config", testValue) + if vString, err := cmdFlags.GetString("default-service-config"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.DefaultServiceConfig) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index 9b971b07b2..86504388cc 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -353,7 +353,7 @@ func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, } // Create a new Datacatalog client for task execution caching -func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, authOpt grpc.DialOption) (*CatalogClient, error) { +func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, defaultServiceConfig string, authOpt grpc.DialOption) (*CatalogClient, error) { var opts []grpc.DialOption if useAdminAuth && authOpt != nil { opts = append(opts, authOpt) @@ -379,6 +379,10 @@ func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection boo opts = append(opts, grpc.WithTransportCredentials(creds)) } + if defaultServiceConfig != "" { + opts = append(opts, grpc.WithDefaultServiceConfig(defaultServiceConfig)) + } + retryInterceptor := grpcRetry.UnaryClientInterceptor(grpcOptions...) finalUnaryInterceptor := grpcMiddleware.ChainUnaryClient( diff --git a/propeller-config.yaml b/propeller-config.yaml index d524e0c03f..84ce877d66 100644 --- a/propeller-config.yaml +++ b/propeller-config.yaml @@ -99,10 +99,12 @@ event: admin: endpoint: localhost:30081 insecure: true + defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}' catalog-cache: type: noop endpoint: datacatalog:8089 insecure: true + default-service-config: '{"loadBalancingConfig": [{"round_robin":{}}]}' logger: level: 5 show-source: true