From 4525b4a850a7245c082b2f74f85165923becefdb Mon Sep 17 00:00:00 2001 From: lu4nm3 Date: Thu, 2 Apr 2020 19:09:24 -0700 Subject: [PATCH 1/8] Add rate limiter configs --- go/tasks/plugins/presto/config/config.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/go/tasks/plugins/presto/config/config.go b/go/tasks/plugins/presto/config/config.go index 21f2ff917..7c5f4304d 100644 --- a/go/tasks/plugins/presto/config/config.go +++ b/go/tasks/plugins/presto/config/config.go @@ -40,6 +40,11 @@ type RefreshCacheConfig struct { LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"` } +type RateLimiterConfig struct { + Rate int64 `json:"rate" pflag:",Allowed rate of calls per second."` + Burst int `json:"burst" pflag:",Allowed burst rate of calls."` +} + var ( defaultConfig = Config{ Environment: URLMustParse(""), @@ -52,6 +57,10 @@ var ( Workers: 15, LruCacheSize: 10000, }, + RateLimiterConfig: RateLimiterConfig{ + Rate: 15, + Burst: 20, + }, } prestoConfigSection = pluginsConfig.MustRegisterSubSection(prestoConfigSectionKey, &defaultConfig) @@ -64,6 +73,7 @@ type Config struct { DefaultUser string `json:"defaultUser" pflag:",Default Presto user"` RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Rate limiter config"` + RateLimiterConfig RateLimiterConfig `json:"rateLimiterConfig" pflag:"Rate limiter config"` } // Retrieves the current config value or default. From 19260a0f1d88d7144bbbea6e3d7dd7f2b56290e9 Mon Sep 17 00:00:00 2001 From: lu4nm3 Date: Thu, 2 Apr 2020 19:49:16 -0700 Subject: [PATCH 2/8] add extern storage location --- go/tasks/plugins/presto/execution_state.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/go/tasks/plugins/presto/execution_state.go b/go/tasks/plugins/presto/execution_state.go index e9bcb4e00..09cc69343 100644 --- a/go/tasks/plugins/presto/execution_state.go +++ b/go/tasks/plugins/presto/execution_state.go @@ -314,7 +314,14 @@ func GetNextQuery( case 1: // TODO - externalLocation := getExternalLocation("s3://lyft-modelbuilder/{}/", 2) + //externalLocation := getExternalLocation("s3://lyft-modelbuilder/{}/", 2) + externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "") + //println(dataStore.String()) + if err != nil { + return Query{}, err + } + + //externalLocation := tCtx.OutputWriter().GetRawOutputPrefix().String() statement := fmt.Sprintf(` CREATE TABLE hive.flyte_temporary_tables."%s" (LIKE hive.flyte_temporary_tables."%s") WITH (format = 'PARQUET', external_location = '%s')`, @@ -323,7 +330,7 @@ WITH (format = 'PARQUET', external_location = '%s')`, externalLocation, ) currentState.CurrentPrestoQuery.Statement = statement - currentState.CurrentPrestoQuery.ExternalLocation = externalLocation + currentState.CurrentPrestoQuery.ExternalLocation = externalLocation.String() return currentState.CurrentPrestoQuery, nil case 2: From 89c6e4c34517bb369acc8dba1750635623217c7f Mon Sep 17 00:00:00 2001 From: lu4nm3 Date: Thu, 2 Apr 2020 19:52:14 -0700 Subject: [PATCH 3/8] lint --- go/tasks/plugins/presto/execution_state.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/go/tasks/plugins/presto/execution_state.go b/go/tasks/plugins/presto/execution_state.go index 09cc69343..973899ed4 100644 --- a/go/tasks/plugins/presto/execution_state.go +++ b/go/tasks/plugins/presto/execution_state.go @@ -2,7 +2,6 @@ package presto import ( "context" - "strings" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" @@ -357,15 +356,6 @@ FROM hive.flyte_temporary_tables."%s"` } } -func getExternalLocation(shardFormatter string, shardLength int) string { - shardCount := strings.Count(shardFormatter, "{}") - for i := 0; i < shardCount; i++ { - shardFormatter = strings.Replace(shardFormatter, "{}", rand.String(shardLength), 1) - } - - return shardFormatter + rand.String(32) + "/" -} - func getUser(ctx context.Context, defaultUser string) string { principalContextUser := ctx.Value("principal") if principalContextUser != nil { From ca416390bef30de98092834e00d92e56a67b8b8c Mon Sep 17 00:00:00 2001 From: lu4nm3 Date: Fri, 3 Apr 2020 10:00:36 -0700 Subject: [PATCH 4/8] undo --- .../presto/client/prestostatus_enumer.go | 54 +++++++++++++++++++ go/tasks/plugins/presto/execution_state.go | 19 ++++--- 2 files changed, 66 insertions(+), 7 deletions(-) create mode 100644 go/tasks/plugins/presto/client/prestostatus_enumer.go diff --git a/go/tasks/plugins/presto/client/prestostatus_enumer.go b/go/tasks/plugins/presto/client/prestostatus_enumer.go new file mode 100644 index 000000000..c5d206c76 --- /dev/null +++ b/go/tasks/plugins/presto/client/prestostatus_enumer.go @@ -0,0 +1,54 @@ +// Code generated by "enumer --type=PrestoStatus"; DO NOT EDIT. + +// +package client + +import ( + "fmt" +) + +const _PrestoStatusName = "PrestoStatusUnknownPrestoStatusWaitingPrestoStatusRunningPrestoStatusFinishedPrestoStatusFailedPrestoStatusCancelled" + +var _PrestoStatusIndex = [...]uint8{0, 19, 38, 57, 77, 95, 116} + +func (i PrestoStatus) String() string { + if i < 0 || i >= PrestoStatus(len(_PrestoStatusIndex)-1) { + return fmt.Sprintf("PrestoStatus(%d)", i) + } + return _PrestoStatusName[_PrestoStatusIndex[i]:_PrestoStatusIndex[i+1]] +} + +var _PrestoStatusValues = []PrestoStatus{0, 1, 2, 3, 4, 5} + +var _PrestoStatusNameToValueMap = map[string]PrestoStatus{ + _PrestoStatusName[0:19]: 0, + _PrestoStatusName[19:38]: 1, + _PrestoStatusName[38:57]: 2, + _PrestoStatusName[57:77]: 3, + _PrestoStatusName[77:95]: 4, + _PrestoStatusName[95:116]: 5, +} + +// PrestoStatusString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func PrestoStatusString(s string) (PrestoStatus, error) { + if val, ok := _PrestoStatusNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to PrestoStatus values", s) +} + +// PrestoStatusValues returns all values of the enum +func PrestoStatusValues() []PrestoStatus { + return _PrestoStatusValues +} + +// IsAPrestoStatus returns "true" if the value is listed in the enum definition. "false" otherwise +func (i PrestoStatus) IsAPrestoStatus() bool { + for _, v := range _PrestoStatusValues { + if i == v { + return true + } + } + return false +} diff --git a/go/tasks/plugins/presto/execution_state.go b/go/tasks/plugins/presto/execution_state.go index 973899ed4..d355f98a7 100644 --- a/go/tasks/plugins/presto/execution_state.go +++ b/go/tasks/plugins/presto/execution_state.go @@ -2,6 +2,7 @@ package presto import ( "context" + "strings" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" @@ -313,12 +314,7 @@ func GetNextQuery( case 1: // TODO - //externalLocation := getExternalLocation("s3://lyft-modelbuilder/{}/", 2) - externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "") - //println(dataStore.String()) - if err != nil { - return Query{}, err - } + externalLocation := getExternalLocation("s3://lyft-modelbuilder/{}/", 2) //externalLocation := tCtx.OutputWriter().GetRawOutputPrefix().String() statement := fmt.Sprintf(` @@ -329,7 +325,7 @@ WITH (format = 'PARQUET', external_location = '%s')`, externalLocation, ) currentState.CurrentPrestoQuery.Statement = statement - currentState.CurrentPrestoQuery.ExternalLocation = externalLocation.String() + currentState.CurrentPrestoQuery.ExternalLocation = externalLocation return currentState.CurrentPrestoQuery, nil case 2: @@ -356,6 +352,15 @@ FROM hive.flyte_temporary_tables."%s"` } } +func getExternalLocation(shardFormatter string, shardLength int) string { + shardCount := strings.Count(shardFormatter, "{}") + for i := 0; i < shardCount; i++ { + shardFormatter = strings.Replace(shardFormatter, "{}", rand.String(shardLength), 1) + } + + return shardFormatter + rand.String(32) + "/" +} + func getUser(ctx context.Context, defaultUser string) string { principalContextUser := ctx.Value("principal") if principalContextUser != nil { From c73a25e3b2817500a8ab133d941b9cdab9bc606a Mon Sep 17 00:00:00 2001 From: lu4nm3 Date: Fri, 3 Apr 2020 10:07:02 -0700 Subject: [PATCH 5/8] undo2 --- go/tasks/plugins/presto/execution_state.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/go/tasks/plugins/presto/execution_state.go b/go/tasks/plugins/presto/execution_state.go index d355f98a7..e9bcb4e00 100644 --- a/go/tasks/plugins/presto/execution_state.go +++ b/go/tasks/plugins/presto/execution_state.go @@ -315,8 +315,6 @@ func GetNextQuery( case 1: // TODO externalLocation := getExternalLocation("s3://lyft-modelbuilder/{}/", 2) - - //externalLocation := tCtx.OutputWriter().GetRawOutputPrefix().String() statement := fmt.Sprintf(` CREATE TABLE hive.flyte_temporary_tables."%s" (LIKE hive.flyte_temporary_tables."%s") WITH (format = 'PARQUET', external_location = '%s')`, From 3d36ebd0e3900dc2e67a99fb5d2451f4dcbceaa9 Mon Sep 17 00:00:00 2001 From: lu4nm3 Date: Fri, 3 Apr 2020 10:16:04 -0700 Subject: [PATCH 6/8] update field description --- go/tasks/plugins/presto/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/presto/config/config.go b/go/tasks/plugins/presto/config/config.go index 7c5f4304d..050f4d705 100644 --- a/go/tasks/plugins/presto/config/config.go +++ b/go/tasks/plugins/presto/config/config.go @@ -72,7 +72,7 @@ type Config struct { DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"` DefaultUser string `json:"defaultUser" pflag:",Default Presto user"` RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` - RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Rate limiter config"` + RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Refresh cache config"` RateLimiterConfig RateLimiterConfig `json:"rateLimiterConfig" pflag:"Rate limiter config"` } From e003060f1600bd22d352ede5a46eac13097b392b Mon Sep 17 00:00:00 2001 From: Luis Medina <3936213+lu4nm3@users.noreply.github.com> Date: Fri, 3 Apr 2020 10:23:02 -0700 Subject: [PATCH 7/8] Update go/tasks/plugins/presto/config/config.go Co-Authored-By: Yee Hing Tong --- go/tasks/plugins/presto/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/presto/config/config.go b/go/tasks/plugins/presto/config/config.go index 050f4d705..e3ac8c7e1 100644 --- a/go/tasks/plugins/presto/config/config.go +++ b/go/tasks/plugins/presto/config/config.go @@ -42,7 +42,7 @@ type RefreshCacheConfig struct { type RateLimiterConfig struct { Rate int64 `json:"rate" pflag:",Allowed rate of calls per second."` - Burst int `json:"burst" pflag:",Allowed burst rate of calls."` + Burst int `json:"burst" pflag:",Allowed burst rate of calls per second."` } var ( From b36245800d801db761b363010f283c748757185b Mon Sep 17 00:00:00 2001 From: lu4nm3 Date: Fri, 3 Apr 2020 10:29:06 -0700 Subject: [PATCH 8/8] add comments in field description for rate limiter --- go/tasks/plugins/presto/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/presto/config/config.go b/go/tasks/plugins/presto/config/config.go index e3ac8c7e1..bb075e818 100644 --- a/go/tasks/plugins/presto/config/config.go +++ b/go/tasks/plugins/presto/config/config.go @@ -73,7 +73,7 @@ type Config struct { DefaultUser string `json:"defaultUser" pflag:",Default Presto user"` RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Refresh cache config"` - RateLimiterConfig RateLimiterConfig `json:"rateLimiterConfig" pflag:"Rate limiter config"` + RateLimiterConfig RateLimiterConfig `json:"rateLimiterConfig" pflag:"Rate limiter config for ALL requests going to Presto"` } // Retrieves the current config value or default.