From 0f69ed67f30edc02a86b40e113867b5fe66d2e0c Mon Sep 17 00:00:00 2001 From: Luis Medina <3936213+lu4nm3@users.noreply.github.com> Date: Mon, 6 Apr 2020 14:34:31 -0700 Subject: [PATCH] Refactor rate limiter for read/write requests (#76) * Refactor rate limiter for read/write requests * update config comment * update config comment and external path * remove unused function --- .../go/tasks/plugins/presto/config/config.go | 51 +++++++++++++++---- .../tasks/plugins/presto/execution_state.go | 19 +++---- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/presto/config/config.go b/flyteplugins/go/tasks/plugins/presto/config/config.go index bb075e818..47fa6a840 100644 --- a/flyteplugins/go/tasks/plugins/presto/config/config.go +++ b/flyteplugins/go/tasks/plugins/presto/config/config.go @@ -40,6 +40,32 @@ type RefreshCacheConfig struct { LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"` } +// To execute a single Presto query from a user's point of view, we actually need to send 5 different +// requests to Presto. Together these requests (i.e. queries) take care of retrieving the data, saving +// it to an external table, and performing cleanup. +// +// The Presto plugin currently uses a single allocation token for each set of 5 requests which +// correspond to a single user query. These means that in total, Flyte is able to work on +// 'PrestoConfig.RoutingGroups[routing_group_name].Limit' user queries at a time as configured in the +// configurations for the Presto plugin. This means means that at most, Flyte will be working on this +// number of user Presto queries at a time for each of the configured Presto routing groups. +// +// In addition, these 2 rate limiters control the rate at which requests are sent to Presto for +// read and write requests coming from this client. This includes requests to execute queries (write), +// requests to get the status of a query through the auto-refresh cache (read), and requests to cancel +// queries (write). Together with allocation tokens, these rate limiters will ensure that the rate of +// requests and the number of concurrent requests going to Presto don't overload the cluster. +// +// There is also another important aspect to consider in terms of how the resource manager (where +// allocation tokens get created from) interplays with the rate limiters. From the write side of things +// (e.g. executing a query), if the write rate limiter is low then it will block executing queries until +// the rate falls below the limit. Even if the rate is below the limit, if queries take a long time to +// execute, then you will be blocked at the resource manager level which only allows a certain number +// of concurrent queries to execute at any given time. Similarly, in more extreme cases, if both the +// resource manager and the write limiter are configured to support a large number of queries but the +// auto refresh cache size is small, then the cache will fill up and items will gets evicted due to the +// cache's LRU nature before the Flyte propeller workers get a chance to update the status of these +// items. type RateLimiterConfig struct { Rate int64 `json:"rate" pflag:",Allowed rate of calls per second."` Burst int `json:"burst" pflag:",Allowed burst rate of calls per second."` @@ -50,16 +76,20 @@ var ( Environment: URLMustParse(""), DefaultRoutingGroup: "adhoc", DefaultUser: "flyte-default-user", - RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 250}, {Name: "etl", Limit: 100}}, + RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 100}, {Name: "etl", Limit: 25}}, RefreshCacheConfig: RefreshCacheConfig{ Name: "presto", SyncPeriod: config.Duration{Duration: 5 * time.Second}, Workers: 15, LruCacheSize: 10000, }, - RateLimiterConfig: RateLimiterConfig{ - Rate: 15, - Burst: 20, + ReadRateLimiterConfig: RateLimiterConfig{ + Rate: 10, + Burst: 10, + }, + WriteRateLimiterConfig: RateLimiterConfig{ + Rate: 5, + Burst: 10, }, } @@ -68,12 +98,13 @@ var ( // Presto plugin configs type Config struct { - Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"` - DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"` - DefaultUser string `json:"defaultUser" pflag:",Default Presto user"` - RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` - RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Refresh cache config"` - RateLimiterConfig RateLimiterConfig `json:"rateLimiterConfig" pflag:"Rate limiter config for ALL requests going to Presto"` + Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"` + DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"` + DefaultUser string `json:"defaultUser" pflag:",Default Presto user"` + RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` + RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Refresh cache config"` + ReadRateLimiterConfig RateLimiterConfig `json:"readRateLimiterConfig" pflag:"Rate limiter config for read requests going to Presto"` + WriteRateLimiterConfig RateLimiterConfig `json:"writeRateLimiterConfig" pflag:"Rate limiter config for write requests going to Presto"` } // Retrieves the current config value or default. diff --git a/flyteplugins/go/tasks/plugins/presto/execution_state.go b/flyteplugins/go/tasks/plugins/presto/execution_state.go index e9bcb4e00..e349956da 100644 --- a/flyteplugins/go/tasks/plugins/presto/execution_state.go +++ b/flyteplugins/go/tasks/plugins/presto/execution_state.go @@ -2,7 +2,6 @@ package presto import ( "context" - "strings" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" @@ -313,8 +312,11 @@ func GetNextQuery( return prestoQuery, nil case 1: - // TODO - externalLocation := getExternalLocation("s3://lyft-modelbuilder/{}/", 2) + externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "") + if err != nil { + return Query{}, err + } + statement := fmt.Sprintf(` CREATE TABLE hive.flyte_temporary_tables."%s" (LIKE hive.flyte_temporary_tables."%s") WITH (format = 'PARQUET', external_location = '%s')`, @@ -323,7 +325,7 @@ WITH (format = 'PARQUET', external_location = '%s')`, externalLocation, ) currentState.CurrentPrestoQuery.Statement = statement - currentState.CurrentPrestoQuery.ExternalLocation = externalLocation + currentState.CurrentPrestoQuery.ExternalLocation = externalLocation.String() return currentState.CurrentPrestoQuery, nil case 2: @@ -350,15 +352,6 @@ FROM hive.flyte_temporary_tables."%s"` } } -func getExternalLocation(shardFormatter string, shardLength int) string { - shardCount := strings.Count(shardFormatter, "{}") - for i := 0; i < shardCount; i++ { - shardFormatter = strings.Replace(shardFormatter, "{}", rand.String(shardLength), 1) - } - - return shardFormatter + rand.String(32) + "/" -} - func getUser(ctx context.Context, defaultUser string) string { principalContextUser := ctx.Value("principal") if principalContextUser != nil {