Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Refactor rate limiter for read/write requests #76

Merged
merged 4 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions go/tasks/plugins/presto/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ type RefreshCacheConfig struct {
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"`
}

// To execute a single Presto query from a user's point of view, we actually need to send 5 different
// requests to Presto. Together these requests (i.e. queries) take care of retrieving the data, saving
// it to an external table, and performing cleanup.
//
// The Presto plugin currently uses a single allocation token for each set of 5 requests which
// correspond to a single user query. These means that in total, Flyte is able to work on
// 'PrestoConfig.RoutingGroups[routing_group_name].Limit' user queries at a time as configured in the
// configurations for the Presto plugin. This means means that at most, Flyte will be working on this
// number of user Presto queries at a time for each of the configured Presto routing groups.
//
// In addition, these 2 rate limiters control the rate at which requests are sent to Presto for
// read and write requests coming from this client. This includes requests to execute queries (write),
// requests to get the status of a query through the auto-refresh cache (read), and requests to cancel
// queries (write). Together with allocation tokens, these rate limiters will ensure that the rate of
// requests and the number of concurrent requests going to Presto don't overload the cluster.
//
// There is also another important aspect to consider in terms of how the resource manager (where
// allocation tokens get created from) interplays with the rate limiters. From the write side of things
// (e.g. executing a query), if the write rate limiter is low then it will block executing queries until
// the rate falls below the limit. Even if the rate is below the limit, if queries take a long time to
// execute, then you will be blocked at the resource manager level which only allows a certain number
// of concurrent queries to execute at any given time. Similarly, in more extreme cases, if both the
// resource manager and the write limiter are configured to support a large number of queries but the
// auto refresh cache size is small, then the cache will fill up and items will gets evicted due to the
// cache's LRU nature before the Flyte propeller workers get a chance to update the status of these
// items.
type RateLimiterConfig struct {
Rate int64 `json:"rate" pflag:",Allowed rate of calls per second."`
Burst int `json:"burst" pflag:",Allowed burst rate of calls per second."`
Expand All @@ -50,16 +76,20 @@ var (
Environment: URLMustParse(""),
DefaultRoutingGroup: "adhoc",
DefaultUser: "flyte-default-user",
RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 250}, {Name: "etl", Limit: 100}},
RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 100}, {Name: "etl", Limit: 25}},
RefreshCacheConfig: RefreshCacheConfig{
Name: "presto",
SyncPeriod: config.Duration{Duration: 5 * time.Second},
Workers: 15,
LruCacheSize: 10000,
},
RateLimiterConfig: RateLimiterConfig{
Rate: 15,
Burst: 20,
ReadRateLimiterConfig: RateLimiterConfig{
Rate: 10,
Burst: 10,
},
WriteRateLimiterConfig: RateLimiterConfig{
Rate: 5,
Burst: 10,
},
}

Expand All @@ -68,12 +98,13 @@ var (

// Presto plugin configs
type Config struct {
Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
DefaultUser string `json:"defaultUser" pflag:",Default Presto user"`
RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Refresh cache config"`
RateLimiterConfig RateLimiterConfig `json:"rateLimiterConfig" pflag:"Rate limiter config for ALL requests going to Presto"`
Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
DefaultUser string `json:"defaultUser" pflag:",Default Presto user"`
RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Refresh cache config"`
ReadRateLimiterConfig RateLimiterConfig `json:"readRateLimiterConfig" pflag:"Rate limiter config for read requests going to Presto"`
WriteRateLimiterConfig RateLimiterConfig `json:"writeRateLimiterConfig" pflag:"Rate limiter config for write requests going to Presto"`
}

// Retrieves the current config value or default.
Expand Down
19 changes: 6 additions & 13 deletions go/tasks/plugins/presto/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package presto

import (
"context"
"strings"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils"

Expand Down Expand Up @@ -313,8 +312,11 @@ func GetNextQuery(
return prestoQuery, nil

case 1:
// TODO
externalLocation := getExternalLocation("s3://lyft-modelbuilder/{}/", 2)
externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "")
if err != nil {
return Query{}, err
}

statement := fmt.Sprintf(`
CREATE TABLE hive.flyte_temporary_tables."%s" (LIKE hive.flyte_temporary_tables."%s")
WITH (format = 'PARQUET', external_location = '%s')`,
Expand All @@ -323,7 +325,7 @@ WITH (format = 'PARQUET', external_location = '%s')`,
externalLocation,
)
currentState.CurrentPrestoQuery.Statement = statement
currentState.CurrentPrestoQuery.ExternalLocation = externalLocation
currentState.CurrentPrestoQuery.ExternalLocation = externalLocation.String()
return currentState.CurrentPrestoQuery, nil

case 2:
Expand All @@ -350,15 +352,6 @@ FROM hive.flyte_temporary_tables."%s"`
}
}

func getExternalLocation(shardFormatter string, shardLength int) string {
shardCount := strings.Count(shardFormatter, "{}")
for i := 0; i < shardCount; i++ {
shardFormatter = strings.Replace(shardFormatter, "{}", rand.String(shardLength), 1)
}

return shardFormatter + rand.String(32) + "/"
}

func getUser(ctx context.Context, defaultUser string) string {
principalContextUser := ctx.Value("principal")
if principalContextUser != nil {
Expand Down