Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Presto plugin executor #69

Merged
merged 28 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/golang/protobuf v1.3.3
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/lyft/flyteidl v0.17.6
github.com/lyft/flyteidl v0.17.9
github.com/lyft/flytestdlib v0.3.3
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0 h1:NGL46+1RYcCXb3sShp0nQq
github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0/go.mod h1:/L5qH+AD540e7Cetbui1tuJeXdmNhO8jM6VkXeDdDhQ=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f h1:PGuAMDzAen0AulUfaEhNQMYmUpa41pAVo3zHI+GJsCM=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
126 changes: 126 additions & 0 deletions go/tasks/plugins/presto/client/mocks/presto_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions go/tasks/plugins/presto/client/noop_presto_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client

import (
"context"
"net/http"
"net/url"

"time"

"github.com/lyft/flyteplugins/go/tasks/plugins/presto/config"
)

const (
httpRequestTimeoutSecs = 30
)

type noopPrestoClient struct {
client *http.Client
environment *url.URL
}

func (p noopPrestoClient) ExecuteCommand(
ctx context.Context,
queryStr string,
executeArgs PrestoExecuteArgs) (PrestoExecuteResponse, error) {

return PrestoExecuteResponse{}, nil
}

func (p noopPrestoClient) KillCommand(ctx context.Context, commandID string) error {
return nil
}

func (p noopPrestoClient) GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error) {
return NewPrestoStatus(ctx, "UNKNOWN"), nil
}

func NewNoopPrestoClient(cfg *config.Config) PrestoClient {
return &noopPrestoClient{
client: &http.Client{Timeout: httpRequestTimeoutSecs * time.Second},
environment: cfg.Environment.ResolveReference(&cfg.Environment.URL),
}
}
35 changes: 35 additions & 0 deletions go/tasks/plugins/presto/client/presto_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package client
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a noop Presto client for the open-source version. There will be a separate commit for one based on Mozart

EngHabu marked this conversation as resolved.
Show resolved Hide resolved

import "context"

type PrestoStatus string

// Contains information needed to execute a Presto query
type PrestoExecuteArgs struct {
RoutingGroup string `json:"routingGroup,omitempty"`
Catalog string `json:"catalog,omitempty"`
Schema string `json:"schema,omitempty"`
Source string `json:"source,omitempty"`
User string `json:"user,omitempty"`
}

// Representation of a response after submitting a query to Presto
type PrestoExecuteResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add json tags

ID string `json:"id,omitempty"`
Status PrestoStatus `json:"status,omitempty"`
NextURI string `json:"nextUri,omitempty"`
}

//go:generate mockery -all -case=snake

// Interface to interact with PrestoClient for Presto tasks
type PrestoClient interface {
// Submits a query to Presto
ExecuteCommand(ctx context.Context, commandStr string, executeArgs PrestoExecuteArgs) (PrestoExecuteResponse, error)

// Cancels a currently running Presto query
KillCommand(ctx context.Context, commandID string) error

// Gets the status of a Presto query
GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error)
}
43 changes: 43 additions & 0 deletions go/tasks/plugins/presto/client/presto_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client

import (
"context"
"strings"

"github.com/lyft/flytestdlib/logger"
)

// This type is meant only to encapsulate the response coming from Presto as a type, it is
// not meant to be stored locally.
const (
PrestoStatusUnknown PrestoStatus = "UNKNOWN"
PrestoStatusWaiting PrestoStatus = "WAITING"
PrestoStatusRunning PrestoStatus = "RUNNING"
PrestoStatusFinished PrestoStatus = "FINISHED"
PrestoStatusFailed PrestoStatus = "FAILED"
PrestoStatusCancelled PrestoStatus = "CANCELLED"
)

var PrestoStatuses = map[PrestoStatus]struct{}{
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
PrestoStatusUnknown: {},
PrestoStatusWaiting: {},
PrestoStatusRunning: {},
PrestoStatusFinished: {},
PrestoStatusFailed: {},
PrestoStatusCancelled: {},
}

func NewPrestoStatus(ctx context.Context, state string) PrestoStatus {
upperCased := strings.ToUpper(state)

// Presto has different failure modes so this maps them all to a single Failure on the
// Flyte side
if strings.Contains(upperCased, "FAILED") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is failed singled out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure, I sort of copied this from the existing qubole_status. Probably not necessary to single this out.

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it was needed because Hive/Qubole has different error/failure modes and I think this maps them all to a single Failure/Error on the Flyte side. Going to leave this in for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this as a comment?

return PrestoStatusFailed
} else if _, ok := PrestoStatuses[PrestoStatus(upperCased)]; ok {
return PrestoStatus(upperCased)
} else {
logger.Warnf(ctx, "Invalid Presto Status found: %v", state)
return PrestoStatusUnknown
}
}
76 changes: 76 additions & 0 deletions go/tasks/plugins/presto/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package config

//go:generate pflags Config --default-var=defaultConfig

import (
"context"
"net/url"
"time"

"github.com/lyft/flytestdlib/config"
"github.com/lyft/flytestdlib/logger"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
)

const prestoConfigSectionKey = "presto"

func URLMustParse(s string) config.URL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower case?

Suggested change
func URLMustParse(s string) config.URL {
func urlMustParse(s string) config.URL {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it public because I call the method in one of the unit test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like URL.

r, err := url.Parse(s)
if err != nil {
logger.Panicf(context.TODO(), "Bad Presto URL Specified as default, error: %s", err)
}
if r == nil {
logger.Panicf(context.TODO(), "Nil Presto URL specified.", err)
}
return config.URL{URL: *r}
}

type RoutingGroupConfig struct {
Name string `json:"name" pflag:",The name of a given Presto routing group"`
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the routing group"`
ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the routing group"`
NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the routing group"`
}

type RefreshCacheConfig struct {
Name string `json:"name" pflag:",The name of the rate limiter"`
SyncPeriod config.Duration `json:"syncPeriod" pflag:",The duration to wait before the cache is refreshed again"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"`
}

var (
defaultConfig = Config{
Environment: URLMustParse(""),
DefaultRoutingGroup: "adhoc",
DefaultUser: "flyte-default-user",
RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 250}, {Name: "etl", Limit: 100}},
RefreshCacheConfig: RefreshCacheConfig{
Name: "presto",
SyncPeriod: config.Duration{Duration: 5 * time.Second},
Workers: 15,
LruCacheSize: 10000,
},
}

prestoConfigSection = pluginsConfig.MustRegisterSubSection(prestoConfigSectionKey, &defaultConfig)
)

// Presto plugin configs
type Config struct {
Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
DefaultUser string `json:"defaultUser" pflag:",Default Presto user"`
RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Rate limiter config"`
}

// Retrieves the current config value or default.
func GetPrestoConfig() *Config {
return prestoConfigSection.GetConfig().(*Config)
}

func SetPrestoConfig(cfg *Config) error {
return prestoConfigSection.SetConfig(cfg)
}
Loading