Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Presto plugin executor #69

Merged
merged 28 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/golang/protobuf v1.3.3
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/lyft/flyteidl v0.17.6
github.com/lyft/flyteidl v0.17.9
github.com/lyft/flytestdlib v0.3.2
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0 h1:NGL46+1RYcCXb3sShp0nQq
github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0/go.mod h1:/L5qH+AD540e7Cetbui1tuJeXdmNhO8jM6VkXeDdDhQ=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f h1:PGuAMDzAen0AulUfaEhNQMYmUpa41pAVo3zHI+GJsCM=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
66 changes: 66 additions & 0 deletions go/tasks/plugins/presto/client/presto_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package client
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a noop Presto client for the open-source version. There will be a separate commit for one based on Mozart

EngHabu marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"net/http"
"net/url"

"github.com/lyft/flyteplugins/go/tasks/plugins/svc"

"time"

"github.com/lyft/flyteplugins/go/tasks/plugins/presto/config"
)

const (
httpRequestTimeoutSecs = 30
//AcceptHeaderKey = "Accept"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely missed this, will do!

//ContentTypeHeaderKey = "Content-Type"
//ContentTypeJSON = "application/json"
//ContentTypeTextPlain = "text/plain"
//PrestoCatalogHeader = "X-Presto-Catalog"
//PrestoRoutingGroupHeader = "X-Presto-Routing-Group"
//PrestoSchemaHeader = "X-Presto-Schema"
//PrestoSourceHeader = "X-Presto-Source"
//PrestoUserHeader = "X-Presto-User"
)

type prestoClient struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type prestoClient struct {
type NoopPrestoClient struct {

client *http.Client
environment *url.URL
}

type PrestoExecuteArgs struct {
RoutingGroup string `json:"routing_group,omitempty"`
Catalog string `json:"catalog,omitempty"`
Schema string `json:"schema,omitempty"`
Source string `json:"source,omitempty"`
}
type PrestoExecuteResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add json tags

ID string
Status svc.CommandStatus
NextURI string
}

func (p *prestoClient) ExecuteCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to use pointer receivers in this client...

Suggested change
func (p *prestoClient) ExecuteCommand(
func (p prestoClient) ExecuteCommand(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I was following what the Hive client was doing here. I'm not too familiar with things in Go yet but, when would I use a reference vs a value for a method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hive code should probably be changed. by value here is fine, esp since the underlying http client is already a pointer.

ctx context.Context,
queryStr string,
extraArgs interface{}) (interface{}, error) {

return PrestoExecuteResponse{}, nil
}

func (p *prestoClient) KillCommand(ctx context.Context, commandID string) error {
return nil
}

func (p *prestoClient) GetCommandStatus(ctx context.Context, commandID string) (svc.CommandStatus, error) {
return NewPrestoStatus(ctx, "UNKNOWN"), nil
}

func NewPrestoClient(cfg *config.Config) svc.ServiceClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewPrestoClient(cfg *config.Config) svc.ServiceClient {
func NewNoopPrestoClient(cfg *config.Config) svc.ServiceClient {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we are not consistent on this but the more idiomatic way in go to using interfaces suggests we should return the actual type here not an interface. which means your type should also be public.

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's interesting. Normally from my experience in other langs (java, scala, rust, etc), you typically tie yourself to an interface instead of a specific implementation. In our case, I feel this will help when trying to swap out the Mozart client for one based on just Presto. Otherwise, won't we need to make a copy of the calling code (ie. the executor) in the private repo? In general, I think this might minimize the number of code changes we will need to make in the long run

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, interfaces in Go are kind of the inverse in Java.
The idiomatic way is something like this:
func MyFunc(m SpecificInterface) SpecificType
So your function takes an interface of exactly the functions it needs and returns a specific type..

I do not understand what do you mean by copying calling code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So given that I made the client specific to Presto, my original comment is largely void now.

What I was trying to say is that if we made the function return a specific type:

func NewNoopPrestoClient(cfg *config.Config) PrestoClient {

Then in the calling code that expects this client, we would also need to specifically have it accept a PrestoClient:

func InitializePrestoExecutor(
	ctx context.Context,
	iCtx core.SetupContext,
	cfg *config.Config,
	resourceConfig map[string]int,
	prestoClient client.PrestoClient) (core.Plugin, error) {

Normally, I think this would be ok. But what I was trying to say (perhaps in a not so clear manner), is that if one day we wanted to abstract the client to be a more generic ServiceClient to use with not only Presto but also Hive, etc, then it's better if we rely on this interface (ServiceClient) as it would give us the flexibility on swapping out the client as we see fit. However, as I'm writing this, I realize that making this abstract is probably a larger refactoring effort that will likely change even this disregard this explanation haha. Sorry I think I'm rambling at this point

return &prestoClient{
client: &http.Client{Timeout: httpRequestTimeoutSecs * time.Second},
environment: cfg.Environment.ResolveReference(&cfg.Environment.URL),
}
}
41 changes: 41 additions & 0 deletions go/tasks/plugins/presto/client/presto_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package client

import (
"context"
"strings"

"github.com/lyft/flyteplugins/go/tasks/plugins/svc"
"github.com/lyft/flytestdlib/logger"
)

// This type is meant only to encapsulate the response coming from Presto as a type, it is
// not meant to be stored locally.
const (
PrestoStatusUnknown svc.CommandStatus = "UNKNOWN"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use enumer to generate the maps and conversion functions for you.
Example will generate this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this but ran into a problem:

enumer: can't handle non-integer constant type CommandStatus
go/tasks/plugins/presto/client/presto_status.go:10: running "enumer": exit status 1

I think this might be why the qubole_status.go doesn't use the generator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example that works:
https://play.golang.org/p/DOSciZ7joY1

I needed to implement UnmarshalJSON()...

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EngHabu Sorry, I'm still a little confused as I think this example you posted uses:

type TransitionType int

where the type is int. In my case, the status is a string and based on my attempts above, it doesn't seem to work on non-integer types. Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be a string though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this is not a big deal... if it doesn't seem obvious to change, that's fine...

PrestoStatusQueued svc.CommandStatus = "QUEUED"
PrestoStatusRunning svc.CommandStatus = "RUNNING"
PrestoStatusFinished svc.CommandStatus = "FINISHED"
PrestoStatusFailed svc.CommandStatus = "FAILED"
PrestoStatusCancelled svc.CommandStatus = "CANCELLED"
)

var PrestoStatuses = map[svc.CommandStatus]struct{}{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use sets.String{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I do this:

var PrestoStatuses = sets.String{
	PrestoStatusUnknown:   {},
	PrestoStatusQueued:    {},
	PrestoStatusRunning:   {},
	PrestoStatusFinished:  {},
	PrestoStatusFailed:    {},
	PrestoStatusCancelled: {},
}

I get the error messages about:

Cannot use 'PrestoStatusUnknown' (type svc.CommandStatus) as type string

Am I doing this correctly?

PrestoStatusUnknown: {},
PrestoStatusQueued: {},
PrestoStatusRunning: {},
PrestoStatusFinished: {},
PrestoStatusFailed: {},
PrestoStatusCancelled: {},
}

func NewPrestoStatus(ctx context.Context, state string) svc.CommandStatus {
upperCased := strings.ToUpper(state)
if strings.Contains(upperCased, "FAILED") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is failed singled out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure, I sort of copied this from the existing qubole_status. Probably not necessary to single this out.

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it was needed because Hive/Qubole has different error/failure modes and I think this maps them all to a single Failure/Error on the Flyte side. Going to leave this in for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this as a comment?

return PrestoStatusFailed
} else if _, ok := PrestoStatuses[svc.CommandStatus(upperCased)]; ok {
return svc.CommandStatus(upperCased)
} else {
logger.Warnf(ctx, "Invalid Presto Status found: %v", state)
return PrestoStatusUnknown
}
}
67 changes: 67 additions & 0 deletions go/tasks/plugins/presto/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package config

//go:generate pflags Config --default-var=defaultConfig

import (
"context"
"net/url"

"github.com/lyft/flytestdlib/config"
"github.com/lyft/flytestdlib/logger"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
)

const prestoConfigSectionKey = "presto"

func URLMustParse(s string) config.URL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower case?

Suggested change
func URLMustParse(s string) config.URL {
func urlMustParse(s string) config.URL {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it public because I call the method in one of the unit test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like URL.

r, err := url.Parse(s)
if err != nil {
logger.Panicf(context.TODO(), "Bad Presto URL Specified as default, error: %s", err)
}
if r == nil {
logger.Panicf(context.TODO(), "Nil Presto URL specified.", err)
}
return config.URL{URL: *r}
}

type RoutingGroupConfig struct {
Name string `json:"primaryLabel" pflag:",The name of a given Presto routing group"`
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the routing group"`
ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the routing group"`
NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the routing group"`
}

var (
defaultConfig = Config{
Environment: URLMustParse("https://prestoproxy-internal.lyft.net:443"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try not to put internal URLs in the public repo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EngHabu I had a larger question with regards to configs, namely, where would I create the actual config file for the Presto plugin? I couldn't find any in this repo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll create separate PRs to add configs to these repos

DefaultRoutingGroup: "adhoc",
Workers: 15,
LruCacheSize: 2000,
AwsS3ShardFormatter: "s3://lyft-modelbuilder/{}/",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this out too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave it out, but we should put in canonical examples that work with the sandbox (docker desktop/minikube) deployment.

AwsS3ShardCount: 2,
RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 250}},
}

prestoConfigSection = pluginsConfig.MustRegisterSubSection(prestoConfigSectionKey, &defaultConfig)
)

// Presto plugin configs
type Config struct {
Environment config.URL `json:"endpoint" pflag:",Endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"`
AwsS3ShardFormatter string `json:"awsS3ShardFormatter" pflag:", S3 bucket prefix where Presto results will be stored"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we limited to S3? Flyte in general is storage agnostic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we would be limited by whatever Presto supports in terms of external tables. I'm not very familiar with Flyte's generic storage model. Do you have any ideas of how I might abstract this? Currently, this logic of generating random paths is handled by flytekit but I'm not sure if the logic there is very generic as it appeared to be tied to S3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you can.
This is very hacky IMO. Ideally this path should be passed down to the plugin through TaskContext() baking shards.. etc. here will only lead to more problems...
We should remove these two fields from here, add a function to TaskExecutionContext... that gets you a path to write to (e.g. DefaultDataReference() storage.DataReference)
Propeller (which implements this interface) should pass this path down to the plugin. It has all the logic to build the right path based on the current configuration in the system.
It doesn't however do sharding (hence why I was suggesting you move all of that out)... it just doesn't belong to this plugin (or any plugin).

AwsS3ShardCount int `json:"awsS3ShardStringLength" pflag:", Number of characters for the S3 bucket shard prefix"`
RoutingGroupConfigs []RoutingGroupConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
}

// Retrieves the current config value or default.
func GetPrestoConfig() *Config {
return prestoConfigSection.GetConfig().(*Config)
}

func SetPrestoConfig(cfg *Config) error {
return prestoConfigSection.SetConfig(cfg)
}
51 changes: 51 additions & 0 deletions go/tasks/plugins/presto/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading