Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Presto plugin executor #69

Merged
merged 28 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/golang/protobuf v1.3.3
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/lyft/flyteidl v0.17.6
github.com/lyft/flyteidl v0.17.9
github.com/lyft/flytestdlib v0.3.2
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0 h1:NGL46+1RYcCXb3sShp0nQq
github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0/go.mod h1:/L5qH+AD540e7Cetbui1tuJeXdmNhO8jM6VkXeDdDhQ=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f h1:PGuAMDzAen0AulUfaEhNQMYmUpa41pAVo3zHI+GJsCM=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
126 changes: 126 additions & 0 deletions go/tasks/plugins/presto/client/mocks/presto_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions go/tasks/plugins/presto/client/noop_presto_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client

import (
"context"
"net/http"
"net/url"

"time"

"github.com/lyft/flyteplugins/go/tasks/plugins/presto/config"
)

const (
httpRequestTimeoutSecs = 30
)

type noopPrestoClient struct {
client *http.Client
environment *url.URL
}

func (p noopPrestoClient) ExecuteCommand(
ctx context.Context,
queryStr string,
executeArgs PrestoExecuteArgs) (PrestoExecuteResponse, error) {

return PrestoExecuteResponse{}, nil
}

func (p noopPrestoClient) KillCommand(ctx context.Context, commandID string) error {
return nil
}

func (p noopPrestoClient) GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error) {
return NewPrestoStatus(ctx, "UNKNOWN"), nil
}

func NewNoopPrestoClient(cfg *config.Config) PrestoClient {
return &noopPrestoClient{
client: &http.Client{Timeout: httpRequestTimeoutSecs * time.Second},
environment: cfg.Environment.ResolveReference(&cfg.Environment.URL),
}
}
35 changes: 35 additions & 0 deletions go/tasks/plugins/presto/client/presto_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package client
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a noop Presto client for the open-source version. There will be a separate commit for one based on Mozart

EngHabu marked this conversation as resolved.
Show resolved Hide resolved

import "context"

type PrestoStatus string

// Contains information needed to execute a Presto query
type PrestoExecuteArgs struct {
RoutingGroup string `json:"routingGroup,omitempty"`
Catalog string `json:"catalog,omitempty"`
Schema string `json:"schema,omitempty"`
Source string `json:"source,omitempty"`
User string `json:"user,omitempty"`
}

// Representation of a response after submitting a query to Presto
type PrestoExecuteResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add json tags

ID string
Status PrestoStatus
NextURI string
}

//go:generate mockery -all -case=snake

// Interface to interact with PrestoClient for Presto tasks
type PrestoClient interface {
// Submits a query to Presto
ExecuteCommand(ctx context.Context, commandStr string, executeArgs PrestoExecuteArgs) (PrestoExecuteResponse, error)

// Cancels a currently running Presto query
KillCommand(ctx context.Context, commandID string) error

// Gets the status of a Presto query
GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error)
}
40 changes: 40 additions & 0 deletions go/tasks/plugins/presto/client/presto_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package client

import (
"context"
"strings"

"github.com/lyft/flytestdlib/logger"
)

// This type is meant only to encapsulate the response coming from Presto as a type, it is
// not meant to be stored locally.
const (
PrestoStatusUnknown PrestoStatus = "UNKNOWN"
PrestoStatusWaiting PrestoStatus = "WAITING"
PrestoStatusRunning PrestoStatus = "RUNNING"
PrestoStatusFinished PrestoStatus = "FINISHED"
PrestoStatusFailed PrestoStatus = "FAILED"
PrestoStatusCancelled PrestoStatus = "CANCELLED"
)

var PrestoStatuses = map[PrestoStatus]struct{}{
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
PrestoStatusUnknown: {},
PrestoStatusWaiting: {},
PrestoStatusRunning: {},
PrestoStatusFinished: {},
PrestoStatusFailed: {},
PrestoStatusCancelled: {},
}

func NewPrestoStatus(ctx context.Context, state string) PrestoStatus {
upperCased := strings.ToUpper(state)
if strings.Contains(upperCased, "FAILED") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is failed singled out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure, I sort of copied this from the existing qubole_status. Probably not necessary to single this out.

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it was needed because Hive/Qubole has different error/failure modes and I think this maps them all to a single Failure/Error on the Flyte side. Going to leave this in for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this as a comment?

return PrestoStatusFailed
} else if _, ok := PrestoStatuses[PrestoStatus(upperCased)]; ok {
return PrestoStatus(upperCased)
} else {
logger.Warnf(ctx, "Invalid Presto Status found: %v", state)
return PrestoStatusUnknown
}
}
78 changes: 78 additions & 0 deletions go/tasks/plugins/presto/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package config

//go:generate pflags Config --default-var=defaultConfig

import (
"context"
"net/url"
"time"

"github.com/lyft/flytestdlib/config"
"github.com/lyft/flytestdlib/logger"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
)

const prestoConfigSectionKey = "presto"

func URLMustParse(s string) config.URL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower case?

Suggested change
func URLMustParse(s string) config.URL {
func urlMustParse(s string) config.URL {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it public because I call the method in one of the unit test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like URL.

r, err := url.Parse(s)
if err != nil {
logger.Panicf(context.TODO(), "Bad Presto URL Specified as default, error: %s", err)
}
if r == nil {
logger.Panicf(context.TODO(), "Nil Presto URL specified.", err)
}
return config.URL{URL: *r}
}

type RoutingGroupConfig struct {
Name string `json:"name" pflag:",The name of a given Presto routing group"`
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the routing group"`
ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the routing group"`
NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the routing group"`
}

type RateLimiter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In light of the other conversations we've had, should we rename this struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. How does CacheConfig sound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Name string `json:"name" pflag:",The name of the rate limiter"`
SyncPeriod config.Duration `json:"syncPeriod" pflag:",The duration to wait before the cache is refreshed again"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"`
MetricScope string `json:"metricScope" pflag:",The prefix in Prometheus used to track metrics related to Presto"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this... you just create it as a const in your code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the change for one of your earlier comments where we should make the cache/rate limiter configurable. Are there things that should be configurable and others that shouldnt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would say you should drop Name and MetricScope from here...

}

var (
defaultConfig = Config{
Environment: URLMustParse(""),
DefaultRoutingGroup: "adhoc",
DefaultUser: "[email protected]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should, generally, only fill in defaults with values that give you a good default behavior. This user, even if it works in Lyft, won't work outside...

RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 250}, {Name: "etl", Limit: 100}},
RateLimiter: RateLimiter{
Name: "presto",
SyncPeriod: config.Duration{Duration: 5 * time.Second},
Workers: 15,
LruCacheSize: 2000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LruCacheSize: 2000,
LruCacheSize: 10000,

MetricScope: "presto",
},
}

prestoConfigSection = pluginsConfig.MustRegisterSubSection(prestoConfigSectionKey, &defaultConfig)
)

// Presto plugin configs
type Config struct {
Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
DefaultUser string `json:"defaultUser" pflag:",Default Presto user"`
RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
RateLimiter RateLimiter `json:"rateLimiter" pflag:"Rate limiter config"`
}

// Retrieves the current config value or default.
func GetPrestoConfig() *Config {
return prestoConfigSection.GetConfig().(*Config)
}

func SetPrestoConfig(cfg *Config) error {
return prestoConfigSection.SetConfig(cfg)
}
Loading