Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Refactor rate limiter for read/write requests (#76)
Browse files Browse the repository at this point in the history
* Refactor rate limiter for read/write requests

* update config comment

* update config comment and external path

* remove unused function
  • Loading branch information
lu4nm3 authored Apr 6, 2020
1 parent 7998312 commit 0f69ed6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 23 deletions.
51 changes: 41 additions & 10 deletions flyteplugins/go/tasks/plugins/presto/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ type RefreshCacheConfig struct {
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"`
}

// To execute a single Presto query from a user's point of view, we actually need to send 5 different
// requests to Presto. Together these requests (i.e. queries) take care of retrieving the data, saving
// it to an external table, and performing cleanup.
//
// The Presto plugin currently uses a single allocation token for each set of 5 requests which
// correspond to a single user query. These means that in total, Flyte is able to work on
// 'PrestoConfig.RoutingGroups[routing_group_name].Limit' user queries at a time as configured in the
// configurations for the Presto plugin. This means means that at most, Flyte will be working on this
// number of user Presto queries at a time for each of the configured Presto routing groups.
//
// In addition, these 2 rate limiters control the rate at which requests are sent to Presto for
// read and write requests coming from this client. This includes requests to execute queries (write),
// requests to get the status of a query through the auto-refresh cache (read), and requests to cancel
// queries (write). Together with allocation tokens, these rate limiters will ensure that the rate of
// requests and the number of concurrent requests going to Presto don't overload the cluster.
//
// There is also another important aspect to consider in terms of how the resource manager (where
// allocation tokens get created from) interplays with the rate limiters. From the write side of things
// (e.g. executing a query), if the write rate limiter is low then it will block executing queries until
// the rate falls below the limit. Even if the rate is below the limit, if queries take a long time to
// execute, then you will be blocked at the resource manager level which only allows a certain number
// of concurrent queries to execute at any given time. Similarly, in more extreme cases, if both the
// resource manager and the write limiter are configured to support a large number of queries but the
// auto refresh cache size is small, then the cache will fill up and items will gets evicted due to the
// cache's LRU nature before the Flyte propeller workers get a chance to update the status of these
// items.
type RateLimiterConfig struct {
Rate int64 `json:"rate" pflag:",Allowed rate of calls per second."`
Burst int `json:"burst" pflag:",Allowed burst rate of calls per second."`
Expand All @@ -50,16 +76,20 @@ var (
Environment: URLMustParse(""),
DefaultRoutingGroup: "adhoc",
DefaultUser: "flyte-default-user",
RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 250}, {Name: "etl", Limit: 100}},
RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 100}, {Name: "etl", Limit: 25}},
RefreshCacheConfig: RefreshCacheConfig{
Name: "presto",
SyncPeriod: config.Duration{Duration: 5 * time.Second},
Workers: 15,
LruCacheSize: 10000,
},
RateLimiterConfig: RateLimiterConfig{
Rate: 15,
Burst: 20,
ReadRateLimiterConfig: RateLimiterConfig{
Rate: 10,
Burst: 10,
},
WriteRateLimiterConfig: RateLimiterConfig{
Rate: 5,
Burst: 10,
},
}

Expand All @@ -68,12 +98,13 @@ var (

// Presto plugin configs
type Config struct {
Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
DefaultUser string `json:"defaultUser" pflag:",Default Presto user"`
RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Refresh cache config"`
RateLimiterConfig RateLimiterConfig `json:"rateLimiterConfig" pflag:"Rate limiter config for ALL requests going to Presto"`
Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
DefaultUser string `json:"defaultUser" pflag:",Default Presto user"`
RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Refresh cache config"`
ReadRateLimiterConfig RateLimiterConfig `json:"readRateLimiterConfig" pflag:"Rate limiter config for read requests going to Presto"`
WriteRateLimiterConfig RateLimiterConfig `json:"writeRateLimiterConfig" pflag:"Rate limiter config for write requests going to Presto"`
}

// Retrieves the current config value or default.
Expand Down
19 changes: 6 additions & 13 deletions flyteplugins/go/tasks/plugins/presto/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package presto

import (
"context"
"strings"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils"

Expand Down Expand Up @@ -313,8 +312,11 @@ func GetNextQuery(
return prestoQuery, nil

case 1:
// TODO
externalLocation := getExternalLocation("s3://lyft-modelbuilder/{}/", 2)
externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "")
if err != nil {
return Query{}, err
}

statement := fmt.Sprintf(`
CREATE TABLE hive.flyte_temporary_tables."%s" (LIKE hive.flyte_temporary_tables."%s")
WITH (format = 'PARQUET', external_location = '%s')`,
Expand All @@ -323,7 +325,7 @@ WITH (format = 'PARQUET', external_location = '%s')`,
externalLocation,
)
currentState.CurrentPrestoQuery.Statement = statement
currentState.CurrentPrestoQuery.ExternalLocation = externalLocation
currentState.CurrentPrestoQuery.ExternalLocation = externalLocation.String()
return currentState.CurrentPrestoQuery, nil

case 2:
Expand All @@ -350,15 +352,6 @@ FROM hive.flyte_temporary_tables."%s"`
}
}

func getExternalLocation(shardFormatter string, shardLength int) string {
shardCount := strings.Count(shardFormatter, "{}")
for i := 0; i < shardCount; i++ {
shardFormatter = strings.Replace(shardFormatter, "{}", rand.String(shardLength), 1)
}

return shardFormatter + rand.String(32) + "/"
}

func getUser(ctx context.Context, defaultUser string) string {
principalContextUser := ctx.Value("principal")
if principalContextUser != nil {
Expand Down

0 comments on commit 0f69ed6

Please sign in to comment.