Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: use the descs.Collection to access types during distributed flows #51865

Merged
merged 1 commit into from
Aug 3, 2020

Conversation

rohany
Copy link
Contributor

@rohany rohany commented Jul 23, 2020

This commit enables distributed queries to access user defined type
metadata during flow setup via the lease manager, so that accesses to
this metadata is cached and doesn't have to go through k/v on every
access.

This is achieved by giving the FlowContext a descs.Collection is
used to access the descriptors through the lease manager.

Release note: None

@rohany rohany added the do-not-merge bors won't merge a PR with this label. label Jul 23, 2020
@rohany rohany requested review from jordanlewis, ajwerner, thoszhang, yuzefovich, a team and adityamaru and removed request for a team July 23, 2020 21:15
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@rohany
Copy link
Contributor Author

rohany commented Jul 23, 2020

Letting CI run to see the damage.

I'd like to get thoughts on the interface that I've set up here, and some ideas on how to test that we do indeed release leases when done.

I've left a few TODO's in here that are open questions, so looking for suggestions. Some of those are some inconsistencies where the FlowContext doesn't have a txn set, but the EvalContext does. @yuzefovich I'm hoping you can shed some light on when/why that is the case.

@rohany
Copy link
Contributor Author

rohany commented Jul 24, 2020

Hmm, it seems like we create FlowContexts in alot of test usages that don't have this new field set. Not sure if I should go in and update all of those or just make these methods handle when the TypeResolverFactory is nil.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I should go in and update all of those or just make these methods handle when the TypeResolverFactory is nil.

I think the latter option is easier and better because we're likely to add more test cases that would create more FlowCtx objects without this field set.

Reviewed 34 of 38 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @ajwerner, @jordanlewis, @lucy-zhang, @rohany, and @yuzefovich)


pkg/sql/distsql_running.go, line 172 at r1 (raw file):

				scheduledOnRemoteNode := scheduledOnNodeID != thisNodeID

				// TODO (rohany): Why does each flow need its own FlowCtx if we aren't

I don't think a separate FlowCtx is needed, the code was written this way (with creating a new object for each flow) out of convenience. I think we only need to update NodeID field. What happens if you share the same object among all flows?


pkg/sql/distsql_running.go, line 185 at r1 (raw file):

				}

				// TODO (rohany): This is unfortunate that this call to setup vectorize makes

Which context are you talking about as "the one that is made later"? The one that is created for the proper execution and not for SupportsVectorized check?


pkg/sql/distsql_running.go, line 203 at r1 (raw file):

					flowCtx.TypeResolverFactory = &descs.DistSQLTypeResolverFactory{
						Descriptors:  collection,
						NeedsCleanup: true,

nit: rather than setting NeedsCleanup = true you could insert defer func() ... here, right?


pkg/sql/colflow/vectorized_flow.go, line 717 at r1 (raw file):

	// their types from InputSyncSpec, so this is a convenient place to do the
	// hydration so that all operators get the valid types.
	// TODO (rohany): This is nil in some explain contexts?

I'm guessing newFlowCtxForExplainPurposes needs to be updated.


pkg/sql/distsql/server.go, line 353 at r1 (raw file):

	}

	if localState.IsLocal && localState.Collection != nil {

I think it'd be nice to create a constructor method for new FlowCtx object that would contain this if block so that we could use the newly-added method here and in SupportsVectorized check.


pkg/sql/execinfra/processorsbase.go, line 804 at r1 (raw file):

	pb.inputsToDrain = opts.InputsToDrain

	// TODO (rohany): This txn is nil sometimes, which really confuses me. But

The problem here is that Flow.SetTxn (which sets txn on the flow context) is called after Flow.Setup (which creates all the processors). FlowCtx and "main" EvalCtx have different lifetimes - the former is created during Flow.Setup whereas the eval context of the connExecutor is reused and reset in connExecutor.resetEvalCtx and all other eval contexts are derived from it.

Overall, this ctx business is unfortunate and seems to be overdue for cleaning up, and I'm not sure what you should do in this PR, I guess the current way is ok though.


pkg/sql/rowexec/project_set.go, line 108 at r1 (raw file):

	ctx = ps.StartInternal(ctx, projectSetProcName)

	// TODO (rohany): Why is this initialization done in Start and not in newProjectSetProcessor?

I don't know, but I think the initialization can be moved into the constructor.


pkg/sql/rowflow/row_based_flow.go, line 275 at r1 (raw file):

			// this is a convenient place to do the hydration. Processors that scan
			// over tables will have their hydration performed in ProcessorBase.Init.
			// TODO (rohany): Why can't I use f.Txn here??? It's nil in some contexts

Replied above.


pkg/sql/sem/tree/type_name.go, line 136 at r1 (raw file):

	case *IDTypeReference:
		if resolver == nil {
			debug.PrintStack()

Just pointing this out.

@rohany rohany force-pushed the types-in-dist-flow branch from 92f78aa to 882c9b2 Compare July 28, 2020 16:15
Copy link
Contributor Author

@rohany rohany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @ajwerner, @jordanlewis, @lucy-zhang, and @yuzefovich)


pkg/sql/distsql_running.go, line 172 at r1 (raw file):

Previously, yuzefovich wrote…

I don't think a separate FlowCtx is needed, the code was written this way (with creating a new object for each flow) out of convenience. I think we only need to update NodeID field. What happens if you share the same object among all flows?

I don't think the nodeID changes across node iterations either. Nothing goes wrong if we share it across all the flows. In fact, it's better if it is shared across all of those flow setup calls, because then we can share the cached data among each call to set up.


pkg/sql/distsql_running.go, line 185 at r1 (raw file):

Previously, yuzefovich wrote…

Which context are you talking about as "the one that is made later"? The one that is created for the proper execution and not for SupportsVectorized check?

Yeah exactly. I would prefer to only create one flow context.


pkg/sql/distsql_running.go, line 203 at r1 (raw file):

Previously, yuzefovich wrote…

nit: rather than setting NeedsCleanup = true you could insert defer func() ... here, right?

Done


pkg/sql/colflow/vectorized_flow.go, line 717 at r1 (raw file):

Previously, yuzefovich wrote…

I'm guessing newFlowCtxForExplainPurposes needs to be updated.

Done.


pkg/sql/distsql/server.go, line 353 at r1 (raw file):

Previously, yuzefovich wrote…

I think it'd be nice to create a constructor method for new FlowCtx object that would contain this if block so that we could use the newly-added method here and in SupportsVectorized check.

Done.


pkg/sql/execinfra/processorsbase.go, line 804 at r1 (raw file):

Previously, yuzefovich wrote…

The problem here is that Flow.SetTxn (which sets txn on the flow context) is called after Flow.Setup (which creates all the processors). FlowCtx and "main" EvalCtx have different lifetimes - the former is created during Flow.Setup whereas the eval context of the connExecutor is reused and reset in connExecutor.resetEvalCtx and all other eval contexts are derived from it.

Overall, this ctx business is unfortunate and seems to be overdue for cleaning up, and I'm not sure what you should do in this PR, I guess the current way is ok though.

I see.


pkg/sql/rowexec/project_set.go, line 108 at r1 (raw file):

Previously, yuzefovich wrote…

I don't know, but I think the initialization can be moved into the constructor.

Done.


pkg/sql/rowflow/row_based_flow.go, line 275 at r1 (raw file):

Previously, yuzefovich wrote…

Replied above.

Done.


pkg/sql/sem/tree/type_name.go, line 136 at r1 (raw file):

Previously, yuzefovich wrote…

Just pointing this out.

Done.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 19 of 24 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @ajwerner, @jordanlewis, @lucy-zhang, @rohany, and @yuzefovich)


pkg/sql/distsql_running.go, line 172 at r1 (raw file):

Previously, rohany (Rohan Yadav) wrote…

I don't think the nodeID changes across node iterations either. Nothing goes wrong if we share it across all the flows. In fact, it's better if it is shared across all of those flow setup calls, because then we can share the cached data among each call to set up.

You're right, everything is the same.


pkg/sql/distsql_running.go, line 185 at r1 (raw file):

Previously, rohany (Rohan Yadav) wrote…

Yeah exactly. I would prefer to only create one flow context.

I don't think we can get away easily from having to create separate FlowCtx here, in SupportsVectorized check and in ServerImpl.setupFlow - the former is done on the gateway whereas the latter is performed on every node participating in the flow.


pkg/sql/distsql/server.go, line 413 at r2 (raw file):

func (ds *ServerImpl) NewFlowContext(
	id execinfrapb.FlowID, evalCtx *tree.EvalContext, traceKV bool, localState LocalState,
) execinfra.FlowCtx {

What do you think about returning a cleanup function that would be releasing things? I think it might be safer this way, since the caller of that method would see right away that some cleanup is needed instead of having to remember to release the descriptors.


pkg/sql/flowinfra/flow.go, line 422 at r2 (raw file):

	}

	// Release any descriptors accessed by this flow.q

nit: s/q//.


pkg/sql/rowexec/project_set.go, line 120 at r2 (raw file):

// Start is part of the RowSource interface.
func (ps *projectSetProcessor) Start(ctx context.Context) context.Context {
	ps.input.Start(ctx)

This returns ctx which we should use.

@rohany rohany force-pushed the types-in-dist-flow branch from 882c9b2 to a361bfa Compare July 28, 2020 17:52
Copy link
Contributor Author

@rohany rohany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @ajwerner, @jordanlewis, @lucy-zhang, @rohany, and @yuzefovich)


pkg/sql/distsql_running.go, line 185 at r1 (raw file):

Previously, yuzefovich wrote…

I don't think we can get away easily from having to create separate FlowCtx here, in SupportsVectorized check and in ServerImpl.setupFlow - the former is done on the gateway whereas the latter is performed on every node participating in the flow.

That's unfortunate. I think there might be a slight performance penalty here for vectorized flows that use user defined types, but we'll have to measure it to see the actual impact.


pkg/sql/distsql/server.go, line 413 at r2 (raw file):

Previously, yuzefovich wrote…

What do you think about returning a cleanup function that would be releasing things? I think it might be safer this way, since the caller of that method would see right away that some cleanup is needed instead of having to remember to release the descriptors.

I'm not sure that is necessary -- the cleanup logic is already in flow.Cleanup. I think we have to manually do the cleanup just in the setupVectorized case where we don't actually have a flow.


pkg/sql/rowexec/project_set.go, line 120 at r2 (raw file):

Previously, yuzefovich wrote…

This returns ctx which we should use.

Done.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DistSQL stuff LGTM.

Reviewed 1 of 3 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @ajwerner, @jordanlewis, @lucy-zhang, @rohany, and @yuzefovich)


pkg/sql/distsql_running.go, line 185 at r1 (raw file):

Previously, rohany (Rohan Yadav) wrote…

That's unfortunate. I think there might be a slight performance penalty here for vectorized flows that use user defined types, but we'll have to measure it to see the actual impact.

We could create a "local" flow context before SupportsVectorized check that would be used in that function as well as in SetupLocalSyncFlow, but I'm worried that the lifetime of flow context will get even worse, so I'd rather see a performance hit first to justify the refactoring.

That said, hopefully some time soon we'll be able to remove SupportsVectorized check entirely which would be my preferred solution here.


pkg/sql/distsql/server.go, line 413 at r2 (raw file):

Previously, rohany (Rohan Yadav) wrote…

I'm not sure that is necessary -- the cleanup logic is already in flow.Cleanup. I think we have to manually do the cleanup just in the setupVectorized case where we don't actually have a flow.

Yeah, I understand. I'm mostly worried about a possibility of another call to NewFlowContext (apart from the two we currently have) and forgetting to do the release. However, I doubt we'll be adding such another call, so up to you.

@rohany rohany removed the do-not-merge bors won't merge a PR with this label. label Jul 28, 2020
@rohany
Copy link
Contributor Author

rohany commented Jul 30, 2020

pinging @ajwerner and @lucy-zhang for a review

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the plumbing here super interesting. I've got a WIP lifting the abstraction around the descriptors and the biggest stumbling block for me right now is that we serialize the descriptors directly into the spec. That means that I end up needing to still wrap (i.e. turn a descpb.TableDescriptor into an *ImmutableTableDescriptor) them when constructing the nodes in rowexec or colexec which is a bummer. Ideally eventually we'll not only pull the types out of the descs collection but pull the hydrated, strongly typed descriptor and make this all just better.

I think I'm good on this PR but want to take another pass later today. Curious if you have thoughts on what I'm saying above.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @ajwerner, @jordanlewis, @lucy-zhang, @rohany, and @yuzefovich)

Copy link
Contributor

@thoszhang thoszhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 15 of 38 files at r1, 22 of 24 files at r2, 3 of 3 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @jordanlewis, and @rohany)


pkg/sql/logictest/testdata/logic_test/enums, line 1101 at r3 (raw file):

ROLLBACK

statement ok

Did this not work before? Either way, I think a comment to clarify what this is testing might be useful.

@rohany
Copy link
Contributor Author

rohany commented Aug 3, 2020

Did this not work before? Either way, I think a comment to clarify what this is testing might be useful.

It did work before, but it's now an important case to ensure that we don't try and lease types once we've modified them in the same txn. I'll update it with a comment.

I find the plumbing here super interesting. I've got a WIP lifting the abstraction around the descriptors and the biggest stumbling block for me right now is that we serialize the descriptors directly into the spec. That means that I end up needing to still wrap (i.e. turn a descpb.TableDescriptor into an *ImmutableTableDescriptor) them when constructing the nodes in rowexec or colexec which is a bummer. Ideally eventually we'll not only pull the types out of the descs collection but pull the hydrated, strongly typed descriptor and make this all just better.

I agree that this could be done similarly to how I'm getting types out from the collection. It would be interesting to measure and see if just sending over a table ID in these flows and then using the collection to get the object is faster or not. It might be, given that these table descriptors are kinda beefy. Another option is to collect all the wrapped descriptors at the top of the flow so that they only have to be created once, and then accessed whenever needed.

This commit enables distributed queries to access user defined type
metadata during flow setup via the lease manager, so that accesses to
this metadata is cached and doesn't have to go through k/v on every
access.

This is achieved by giving the `FlowContext` a `descs.Collection` is
used to access the descriptors through the lease manager.

Release note: None
@rohany rohany force-pushed the types-in-dist-flow branch from a361bfa to bf9ffe9 Compare August 3, 2020 04:44
@rohany
Copy link
Contributor Author

rohany commented Aug 3, 2020

@ajwerner are you still looking, or can I merge this?

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: - I'm not sure I get all of the complexities in flow setup but I guess I will soon enough...

I agree that this could be done similarly to how I'm getting types out from the collection. It would be interesting to measure and see if just sending over a table ID in these flows and then using the collection to get the object is faster or not. It might be, given that these table descriptors are kinda beefy. Another option is to collect all the wrapped descriptors at the top of the flow so that they only have to be created once, and then accessed whenever needed.

Interesting, would love to talk more about this. You'll see in #52238 I end up wrapping descriptors out of specs a lot... Merge this and I'll rebase it

Reviewed 12 of 12 files at r4.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @adityamaru, @jordanlewis, and @rohany)

@rohany
Copy link
Contributor Author

rohany commented Aug 3, 2020

bors r+

1 similar comment
@rohany
Copy link
Contributor Author

rohany commented Aug 3, 2020

bors r+

@craig
Copy link
Contributor

craig bot commented Aug 3, 2020

Already running a review

@craig
Copy link
Contributor

craig bot commented Aug 3, 2020

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants