Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Presto plugin executor #69

Merged
merged 28 commits into from
Mar 24, 2020
Merged

Presto plugin executor #69

merged 28 commits into from
Mar 24, 2020

Conversation

lu4nm3
Copy link
Contributor

@lu4nm3 lu4nm3 commented Mar 10, 2020

TL;DR

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

This is the executor implementation to support a new Presto task

Tracking Issue

flyteorg/flyte#203

Follow-up issue

NA
OR
https://github.com/lyft/flyte/issues/

@@ -0,0 +1,66 @@
package client
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a noop Presto client for the open-source version. There will be a separate commit for one based on Mozart

Copy link
Contributor

@EngHabu EngHabu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dude, this is awesome! thank you for going through all of this. And please don't be discouraged by the number of comments, they are mostly nits... awesome work! and thank you for adding unit tests!


const (
httpRequestTimeoutSecs = 30
//AcceptHeaderKey = "Accept"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely missed this, will do!

return NewPrestoStatus(ctx, "UNKNOWN"), nil
}

func NewPrestoClient(cfg *config.Config) svc.ServiceClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewPrestoClient(cfg *config.Config) svc.ServiceClient {
func NewNoopPrestoClient(cfg *config.Config) svc.ServiceClient {

return NewPrestoStatus(ctx, "UNKNOWN"), nil
}

func NewPrestoClient(cfg *config.Config) svc.ServiceClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we are not consistent on this but the more idiomatic way in go to using interfaces suggests we should return the actual type here not an interface. which means your type should also be public.

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's interesting. Normally from my experience in other langs (java, scala, rust, etc), you typically tie yourself to an interface instead of a specific implementation. In our case, I feel this will help when trying to swap out the Mozart client for one based on just Presto. Otherwise, won't we need to make a copy of the calling code (ie. the executor) in the private repo? In general, I think this might minimize the number of code changes we will need to make in the long run

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, interfaces in Go are kind of the inverse in Java.
The idiomatic way is something like this:
func MyFunc(m SpecificInterface) SpecificType
So your function takes an interface of exactly the functions it needs and returns a specific type..

I do not understand what do you mean by copying calling code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So given that I made the client specific to Presto, my original comment is largely void now.

What I was trying to say is that if we made the function return a specific type:

func NewNoopPrestoClient(cfg *config.Config) PrestoClient {

Then in the calling code that expects this client, we would also need to specifically have it accept a PrestoClient:

func InitializePrestoExecutor(
	ctx context.Context,
	iCtx core.SetupContext,
	cfg *config.Config,
	resourceConfig map[string]int,
	prestoClient client.PrestoClient) (core.Plugin, error) {

Normally, I think this would be ok. But what I was trying to say (perhaps in a not so clear manner), is that if one day we wanted to abstract the client to be a more generic ServiceClient to use with not only Presto but also Hive, etc, then it's better if we rely on this interface (ServiceClient) as it would give us the flexibility on swapping out the client as we see fit. However, as I'm writing this, I realize that making this abstract is probably a larger refactoring effort that will likely change even this disregard this explanation haha. Sorry I think I'm rambling at this point

NextURI string
}

func (p *prestoClient) ExecuteCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to use pointer receivers in this client...

Suggested change
func (p *prestoClient) ExecuteCommand(
func (p prestoClient) ExecuteCommand(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I was following what the Hive client was doing here. I'm not too familiar with things in Go yet but, when would I use a reference vs a value for a method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hive code should probably be changed. by value here is fine, esp since the underlying http client is already a pointer.

//PrestoUserHeader = "X-Presto-User"
)

type prestoClient struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type prestoClient struct {
type NoopPrestoClient struct {

go/tasks/plugins/presto/execution_state.go Show resolved Hide resolved
scope: scope,
cfg: cfg,
}
autoRefreshCache, err := cache.NewAutoRefreshCache("presto", q.SyncPrestoQuery, workqueue.DefaultControllerRateLimiter(), ResyncDuration, cfg.Workers, cfg.LruCacheSize, scope)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably add rate limiter params to the config and construct a rate limiter here with those params...

@@ -0,0 +1,112 @@
package presto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename the file to helpers_test.go or add the annotation to only build it for Test? I know go is pretty good at trimming what's not used but why put strain on the compiler if we don't need to..


//go:generate mockery -all -case=snake

type ServiceClient interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, can you clarify what you mean by docs? Do you mean to add comments to this file which will serve as docs? Or are there other docs I should be updating?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I meant godocs so yeah comments on the public types, fields and funcs... ideally also a package level comment to explain what this is

@@ -0,0 +1,15 @@
package svc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not doing service plugin implementation for now, let's move this inside Presto... I think this interface needs some rework to make it generic.... @wild-endeavor what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @wild-endeavor suggested this yesterday. I'm fine moving it.


func NewPrestoStatus(ctx context.Context, state string) svc.CommandStatus {
upperCased := strings.ToUpper(state)
if strings.Contains(upperCased, "FAILED") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is failed singled out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure, I sort of copied this from the existing qubole_status. Probably not necessary to single this out.

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it was needed because Hive/Qubole has different error/failure modes and I think this maps them all to a single Failure/Error on the Flyte side. Going to leave this in for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this as a comment?

NextURI string
}

func (p *prestoClient) ExecuteCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hive code should probably be changed. by value here is fine, esp since the underlying http client is already a pointer.

DefaultRoutingGroup: "adhoc",
Workers: 15,
LruCacheSize: 2000,
AwsS3ShardFormatter: "s3://lyft-modelbuilder/{}/",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave it out, but we should put in canonical examples that work with the sandbox (docker desktop/minikube) deployment.


var (
defaultConfig = Config{
Environment: URLMustParse("https://prestoproxy-internal.lyft.net:443"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Phase ExecutionPhase

// This will store the command ID from Presto
CommandID string `json:"command_id,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

camel I'm pretty sure.

return cachedExecutionState.ExecutionState, nil
}

func MapExecutionStateToPhaseInfo(state ExecutionState) core.PhaseInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this function still work? I feel like we need to add more phase versions.

The Hive state machine is a one pass waterfall. it just goes through each state linearly. This state machine goes back and revisits the states because there's the query count. We should make each time around a new phase version.

@@ -180,6 +180,10 @@ func PhaseInfoSuccess(info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info)
}

func PhaseInfoSuccessWithVersion(version uint32, info *TaskInfo) PhaseInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we consider the version on success.. success phase is terminal, so once you hit that, there should be no expectation the system will call your plugin ever again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so the reason why I wanted to add a version to success is that this is a status that will occur up to 5 times for every Presto query due to how the mini-state machine is implemented where it executes a total of 5 Presto queries for each Presto task. After each of these 5 queries finishes successfully, its status will be successful and is handled here: https://github.com/lyft/flyteplugins/pull/69/files?file-filters%5B%5D=.go#diff-6a65d420579b33b62766e71e98eafdf7R112

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As each query finishes, even though the internal state might move to PhaseQuerySucceeded, the overall phase that's sent to Admin should still be running. Only when everything is truly done, should the plugin send a success. Does this make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So we should keep things as Running and only use Success once at the very end once everything is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wild-endeavor I went ahead and updated things so that the success phase is only returned at the very end once all 5 queries have executed

return NewPrestoStatus(ctx, "UNKNOWN"), nil
}

func NewPrestoClient(cfg *config.Config) svc.ServiceClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, interfaces in Go are kind of the inverse in Java.
The idiomatic way is something like this:
func MyFunc(m SpecificInterface) SpecificType
So your function takes an interface of exactly the functions it needs and returns a specific type..

I do not understand what do you mean by copying calling code?

DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"`
AwsS3ShardFormatter string `json:"awsS3ShardFormatter" pflag:", S3 bucket prefix where Presto results will be stored"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you can.
This is very hacky IMO. Ideally this path should be passed down to the plugin through TaskContext() baking shards.. etc. here will only lead to more problems...
We should remove these two fields from here, add a function to TaskExecutionContext... that gets you a path to write to (e.g. DefaultDataReference() storage.DataReference)
Propeller (which implements this interface) should pass this path down to the plugin. It has all the logic to build the right path based on the current configuration in the system.
It doesn't however do sharding (hence why I was suggesting you move all of that out)... it just doesn't belong to this plugin (or any plugin).

return "", "", "", "", err
}

statement, routingGroup, catalog, schema, err = presto.InterpolateInputs(ctx, *inputs, statement, routingGroup, catalog, schema)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We interpolate the statement, routing_config, catalog, schema with inputs from users. See the comments I wrote in the utils file. I tested this locally and it works

return "", "", "", "", err
}

routingGroup, catalog, schema, statement, err = presto.InterpolateInputs(ctx, *inputs, routingGroup, catalog, schema, statement)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where user inputs get interpolated for the query, routing group, catalog, and schema. See comments in the interpolator file for more details

NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the routing group"`
}

type RateLimiter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In light of the other conversations we've had, should we rename this struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. How does CacheConfig sound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

delete(inputsAsStrings, "__implicit_schema")

statement = interpolate(inputsAsStrings, statement)
routingGroup = interpolate(inputsAsStrings, routingGroup)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, @EngHabu can you chime in here? We only want to do statement right? Everything else we just leave as is?

Copy link
Contributor Author

@lu4nm3 lu4nm3 Mar 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this approach would provide a bit more flexibility to users in terms of how much they are able to customize their Presto tasks. For example, if they want their query to only include the table name:

SELECT * FROM table_name

Then they could set the catalog and schema as inputs whose values are perhaps made up of different user-supplied, non-implicit, input values and thus you wouldn't limit users to only be able to interpolate the query statement itself. Not sure if interpolating only the query will be enough for users but if it is, then I can definitely simplify this. Your call.

"Allocation request granted for Presto", scope),
AllocationNotGranted: labeled.NewCounter("presto_allocation_not_granted",
"Allocation request did not fail but not granted for Presto", scope),
ResourceWaitTime: scope.MustNewSummaryWithOptions("presto_resource_wait_time",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add "Seconds"/"_seconds" to this metric name? Not sure if we have a convention. Haytham?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. I copied this largely from the Hive version. Not sure how units were handled in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like we are using them though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's used somewhere in the executor

)

// Matches any pair of open/close mustaches that contains a variable inside (e.g. `{{ abc }}`)
var inputMustacheRegex = regexp.MustCompile(`{{\s*[^\s]+\s*}}`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not have our own matching and our own format here... please use the one that already exist here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wild-endeavor and I discussed this last week and came to the conclusion that we can't just use this out-of-the-box as inputs in the Presto task (as well as how interpolation is done on the query plus routing_group, catalog, and schema) are being handled a little differently

}

// Representation of a response after submitting a query to Presto
type PrestoExecuteResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add json tags

// This type is meant only to encapsulate the response coming from Presto as a type, it is
// not meant to be stored locally.
const (
PrestoStatusUnknown svc.CommandStatus = "UNKNOWN"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example that works:
https://play.golang.org/p/DOSciZ7joY1

I needed to implement UnmarshalJSON()...


const prestoConfigSectionKey = "presto"

func URLMustParse(s string) config.URL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower case?

Suggested change
func URLMustParse(s string) config.URL {
func urlMustParse(s string) config.URL {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it public because I call the method in one of the unit test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like URL.

SyncPeriod config.Duration `json:"syncPeriod" pflag:",The duration to wait before the cache is refreshed again"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"`
MetricScope string `json:"metricScope" pflag:",The prefix in Prometheus used to track metrics related to Presto"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this... you just create it as a const in your code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the change for one of your earlier comments where we should make the cache/rate limiter configurable. Are there things that should be configurable and others that shouldnt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would say you should drop Name and MetricScope from here...

defaultConfig = Config{
Environment: URLMustParse(""),
DefaultRoutingGroup: "adhoc",
DefaultUser: "[email protected]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should, generally, only fill in defaults with values that give you a good default behavior. This user, even if it works in Lyft, won't work outside...

Name: "presto",
SyncPeriod: config.Duration{Duration: 5 * time.Second},
Workers: 15,
LruCacheSize: 2000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LruCacheSize: 2000,
LruCacheSize: 10000,

"Allocation request granted for Presto", scope),
AllocationNotGranted: labeled.NewCounter("presto_allocation_not_granted",
"Allocation request did not fail but not granted for Presto", scope),
ResourceWaitTime: scope.MustNewSummaryWithOptions("presto_resource_wait_time",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like we are using them though...

@lu4nm3
Copy link
Contributor Author

lu4nm3 commented Mar 24, 2020

Ready for another look 🙏

wild-endeavor
wild-endeavor previously approved these changes Mar 24, 2020
@lu4nm3 lu4nm3 merged commit 3050380 into master Mar 24, 2020
eapolinario pushed a commit that referenced this pull request Sep 6, 2023
* Presto plugin executor

* mockery

* add unit tests

* more unit tests

* update to correct import

* fix lint

* linting

* last linting?

* minor changes

* expanded comment on state machine logic

* PR feedback changes

* PR feedback changes 2

* PR feedback changes 3

* PR feedback changes 4

* add user to execute args

* resource reg

* update status

* e2e chages

* update sync period

* add input interpolator

* changes

* changes 2

* prefix implicit inputs with __

* comments

* more feedback

* edit metrics
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants