Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Presto plugin executor #69

Merged
merged 28 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/golang/protobuf v1.3.3
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/lyft/flyteidl v0.17.6
github.com/lyft/flyteidl v0.17.9
github.com/lyft/flytestdlib v0.3.2
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0 h1:NGL46+1RYcCXb3sShp0nQq
github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0/go.mod h1:/L5qH+AD540e7Cetbui1tuJeXdmNhO8jM6VkXeDdDhQ=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f h1:PGuAMDzAen0AulUfaEhNQMYmUpa41pAVo3zHI+GJsCM=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func PhaseInfoSuccess(info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info)
}

func PhaseInfoSuccessWithVersion(version uint32, info *TaskInfo) PhaseInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we consider the version on success.. success phase is terminal, so once you hit that, there should be no expectation the system will call your plugin ever again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so the reason why I wanted to add a version to success is that this is a status that will occur up to 5 times for every Presto query due to how the mini-state machine is implemented where it executes a total of 5 Presto queries for each Presto task. After each of these 5 queries finishes successfully, its status will be successful and is handled here: https://github.com/lyft/flyteplugins/pull/69/files?file-filters%5B%5D=.go#diff-6a65d420579b33b62766e71e98eafdf7R112

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As each query finishes, even though the internal state might move to PhaseQuerySucceeded, the overall phase that's sent to Admin should still be running. Only when everything is truly done, should the plugin send a success. Does this make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So we should keep things as Running and only use Success once at the very end once everything is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wild-endeavor I went ahead and updated things so that the success phase is only returned at the very end once all 5 queries have executed

return phaseInfo(PhaseSuccess, version, nil, info)
}

func PhaseInfoFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason}, info)
}
Expand Down
128 changes: 128 additions & 0 deletions go/tasks/plugins/presto/client/mocks/presto_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions go/tasks/plugins/presto/client/presto_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package client
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a noop Presto client for the open-source version. There will be a separate commit for one based on Mozart

EngHabu marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"net/http"
"net/url"

"time"

"github.com/lyft/flyteplugins/go/tasks/plugins/presto/config"
)

const (
httpRequestTimeoutSecs = 30
)

type noopPrestoClient struct {
client *http.Client
environment *url.URL
}

type PrestoExecuteArgs struct {
RoutingGroup string `json:"routingGroup,omitempty"`
Catalog string `json:"catalog,omitempty"`
Schema string `json:"schema,omitempty"`
Source string `json:"source,omitempty"`
}
type PrestoExecuteResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add json tags

ID string
Status PrestoStatus
NextURI string
}

//go:generate mockery -all -case=snake

type PrestoClient interface {
ExecuteCommand(ctx context.Context, commandStr string, extraArgs interface{}) (interface{}, error)
KillCommand(ctx context.Context, commandID string) error
GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error)
}

func (p noopPrestoClient) ExecuteCommand(
ctx context.Context,
queryStr string,
extraArgs interface{}) (interface{}, error) {

return PrestoExecuteResponse{}, nil
}

func (p noopPrestoClient) KillCommand(ctx context.Context, commandID string) error {
return nil
}

func (p noopPrestoClient) GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error) {
return NewPrestoStatus(ctx, "UNKNOWN"), nil
}

func NewNoopPrestoClient(cfg *config.Config) PrestoClient {
return &noopPrestoClient{
client: &http.Client{Timeout: httpRequestTimeoutSecs * time.Second},
environment: cfg.Environment.ResolveReference(&cfg.Environment.URL),
}
}
42 changes: 42 additions & 0 deletions go/tasks/plugins/presto/client/presto_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package client

import (
"context"
"strings"

"github.com/lyft/flytestdlib/logger"
)

type PrestoStatus string

// This type is meant only to encapsulate the response coming from Presto as a type, it is
// not meant to be stored locally.
const (
PrestoStatusUnknown PrestoStatus = "UNKNOWN"
PrestoStatusQueued PrestoStatus = "QUEUED"
PrestoStatusRunning PrestoStatus = "RUNNING"
PrestoStatusFinished PrestoStatus = "FINISHED"
PrestoStatusFailed PrestoStatus = "FAILED"
PrestoStatusCancelled PrestoStatus = "CANCELLED"
)

var PrestoStatuses = map[PrestoStatus]struct{}{
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
PrestoStatusUnknown: {},
PrestoStatusQueued: {},
PrestoStatusRunning: {},
PrestoStatusFinished: {},
PrestoStatusFailed: {},
PrestoStatusCancelled: {},
}

func NewPrestoStatus(ctx context.Context, state string) PrestoStatus {
upperCased := strings.ToUpper(state)
if strings.Contains(upperCased, "FAILED") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is failed singled out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure, I sort of copied this from the existing qubole_status. Probably not necessary to single this out.

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it was needed because Hive/Qubole has different error/failure modes and I think this maps them all to a single Failure/Error on the Flyte side. Going to leave this in for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this as a comment?

return PrestoStatusFailed
} else if _, ok := PrestoStatuses[PrestoStatus(upperCased)]; ok {
return PrestoStatus(upperCased)
} else {
logger.Warnf(ctx, "Invalid Presto Status found: %v", state)
return PrestoStatusUnknown
}
}
80 changes: 80 additions & 0 deletions go/tasks/plugins/presto/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package config

//go:generate pflags Config --default-var=defaultConfig

import (
"context"
"net/url"
"time"

"github.com/lyft/flytestdlib/config"
"github.com/lyft/flytestdlib/logger"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
)

const prestoConfigSectionKey = "presto"

func URLMustParse(s string) config.URL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower case?

Suggested change
func URLMustParse(s string) config.URL {
func urlMustParse(s string) config.URL {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it public because I call the method in one of the unit test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like URL.

r, err := url.Parse(s)
if err != nil {
logger.Panicf(context.TODO(), "Bad Presto URL Specified as default, error: %s", err)
}
if r == nil {
logger.Panicf(context.TODO(), "Nil Presto URL specified.", err)
}
return config.URL{URL: *r}
}

type RoutingGroupConfig struct {
Name string `json:"name" pflag:",The name of a given Presto routing group"`
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the routing group"`
ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the routing group"`
NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the routing group"`
}

type RateLimiter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In light of the other conversations we've had, should we rename this struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. How does CacheConfig sound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Name string `json:"name" pflag:",The name of the rate limiter"`
SyncPeriod config.Duration `json:"syncPeriod" pflag:",The duration to wait before the cache is refreshed again"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"`
MetricScope string `json:"metricScope" pflag:",The prefix in Prometheus used to track metrics related to Presto"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this... you just create it as a const in your code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the change for one of your earlier comments where we should make the cache/rate limiter configurable. Are there things that should be configurable and others that shouldnt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would say you should drop Name and MetricScope from here...

}

var (
defaultConfig = Config{
Environment: URLMustParse(""),
DefaultRoutingGroup: "adhoc",
AwsS3ShardFormatter: "",
AwsS3ShardCount: 2,
RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 250}, {Name: "etl", Limit: 100}},
RateLimiter: RateLimiter{
Name: "presto",
SyncPeriod: config.Duration{Duration: 3 * time.Second},
Workers: 15,
LruCacheSize: 2000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LruCacheSize: 2000,
LruCacheSize: 10000,

MetricScope: "presto",
},
}

prestoConfigSection = pluginsConfig.MustRegisterSubSection(prestoConfigSectionKey, &defaultConfig)
)

// Presto plugin configs
type Config struct {
Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
AwsS3ShardFormatter string `json:"awsS3ShardFormatter" pflag:", S3 bucket prefix where Presto results will be stored"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we limited to S3? Flyte in general is storage agnostic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we would be limited by whatever Presto supports in terms of external tables. I'm not very familiar with Flyte's generic storage model. Do you have any ideas of how I might abstract this? Currently, this logic of generating random paths is handled by flytekit but I'm not sure if the logic there is very generic as it appeared to be tied to S3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you can.
This is very hacky IMO. Ideally this path should be passed down to the plugin through TaskContext() baking shards.. etc. here will only lead to more problems...
We should remove these two fields from here, add a function to TaskExecutionContext... that gets you a path to write to (e.g. DefaultDataReference() storage.DataReference)
Propeller (which implements this interface) should pass this path down to the plugin. It has all the logic to build the right path based on the current configuration in the system.
It doesn't however do sharding (hence why I was suggesting you move all of that out)... it just doesn't belong to this plugin (or any plugin).

AwsS3ShardCount int `json:"awsS3ShardCount" pflag:", Number of characters for the S3 bucket shard prefix"`
RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
RateLimiter RateLimiter `json:"rateLimiter" pflag:"Rate limiter config"`
}

// Retrieves the current config value or default.
func GetPrestoConfig() *Config {
return prestoConfigSection.GetConfig().(*Config)
}

func SetPrestoConfig(cfg *Config) error {
return prestoConfigSection.SetConfig(cfg)
}
Loading