Skip to content

Commit

Permalink
Merge #32368
Browse files Browse the repository at this point in the history
32368: distsqlrun: restructure a snippet of gnarly code r=andreimatei a=andreimatei

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Nov 15, 2018
2 parents e8ebf3a + dd03b8c commit 60fc55f
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 20 deletions.
7 changes: 3 additions & 4 deletions pkg/sql/distsqlrun/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions pkg/sql/distsqlrun/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ message SetupFlowRequest {
reserved 2;

// TxnCoordMeta is the TxnCoordMeta for the transaction in which the flow
// will run. If nil, the flow will not run in a transaction higher-level
// transaction (i.e. it is responsible for managing its own transactions,
// if any). Most flows expect to run in a txn, but some, like backfills,
// don't.
// will run. If nil, the flow will not run in a higher-level transaction
// (i.e. it is responsible for managing its own transactions, if any). Most
// flows expect to run in a txn, but some, like backfills, don't.
optional roachpb.TxnCoordMeta txn_coord_meta = 7;
// deprecated_txn used to play the role that TxnCoordMeta now plays. It
// can be removed in v2.2.
Expand Down
41 changes: 29 additions & 12 deletions pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ func FlowVerIsCompatible(flowVer, minAcceptedVersion, serverVersion DistSQLVersi
return flowVer >= minAcceptedVersion && flowVer <= serverVersion
}

// setupFlow creates a Flow.
//
// Args:
// localState: Specifies if the flow runs entirely on this node and, if it does,
// specifies the txn and other attributes.
//
// Note: unless an error is returned, the returned context contains a span that
// must be finished through Flow.Cleanup.
func (ds *ServerImpl) setupFlow(
Expand Down Expand Up @@ -326,16 +332,19 @@ func (ds *ServerImpl) setupFlow(
monitor.Start(ctx, parentMonitor, mon.BoundAccount{})
acc := monitor.MakeBoundAccount()

txn := localState.Txn
if txn := req.DeprecatedTxn; txn != nil {
if req.TxnCoordMeta != nil {
return nil, nil, errors.Errorf("provided both Txn and TxnCoordMeta")
// Figure out what txn the flow needs to run in, if any.
// For local flows, the txn comes from localState.Txn. For non-local ones, we
// create a txn based on the request's TxnCoordMeta.
var txn *client.Txn
if !localState.IsLocal {
if depTxn := req.DeprecatedTxn; depTxn != nil {
if req.TxnCoordMeta != nil {
return nil, nil, errors.Errorf("provided both Txn and TxnCoordMeta")
}
meta := roachpb.MakeTxnCoordMeta(*depTxn)
req.TxnCoordMeta = &meta
}
meta := roachpb.MakeTxnCoordMeta(*txn)
req.TxnCoordMeta = &meta
}
if meta := req.TxnCoordMeta; meta != nil {
if !localState.IsLocal {
if meta := req.TxnCoordMeta; meta != nil {
if meta.Txn.Status != roachpb.PENDING {
return nil, nil, errors.Errorf("cannot create flow in non-PENDING txn: %s",
meta.Txn)
Expand All @@ -344,6 +353,8 @@ func (ds *ServerImpl) setupFlow(
// Txn to heartbeat the transaction.
txn = client.NewTxnWithCoordMeta(ctx, ds.FlowDB, req.Flow.Gateway, client.LeafTxn, *meta)
}
} else {
txn = localState.Txn
}

var evalCtx *tree.EvalContext
Expand Down Expand Up @@ -475,13 +486,19 @@ func (ds *ServerImpl) SetupSyncFlow(
// LocalState carries information that is required to set up a flow with wrapped
// planNodes.
type LocalState struct {
EvalContext *tree.EvalContext

// IsLocal is true if the flow is being run locally in the first place.
IsLocal bool

/////////////////////////////////////////////
// Fields below are empty if IsLocal == false
/////////////////////////////////////////////

// LocalProcs is an array of planNodeToRowSource processors. It's in order and
// will be indexed into by the RowSourceIdx field in LocalPlanNodeSpec.
LocalProcs []LocalProcessor
EvalContext *tree.EvalContext
Txn *client.Txn
LocalProcs []LocalProcessor
Txn *client.Txn
}

// SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node.
Expand Down

0 comments on commit 60fc55f

Please sign in to comment.