From dd03b8ccdcb8a0f4d889543ff961ca639d8c829f Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 14 Nov 2018 18:24:44 -0500 Subject: [PATCH] distsqlrun: restructure a snippet of gnarly code Release note: None --- pkg/sql/distsqlrun/api.pb.go | 7 +++--- pkg/sql/distsqlrun/api.proto | 7 +++--- pkg/sql/distsqlrun/server.go | 41 +++++++++++++++++++++++++----------- 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/pkg/sql/distsqlrun/api.pb.go b/pkg/sql/distsqlrun/api.pb.go index fa876961def5..019eef2f3a01 100644 --- a/pkg/sql/distsqlrun/api.pb.go +++ b/pkg/sql/distsqlrun/api.pb.go @@ -143,10 +143,9 @@ func (BytesEncodeFormat) EnumDescriptor() ([]byte, []int) { return fileDescripto type SetupFlowRequest struct { // TxnCoordMeta is the TxnCoordMeta for the transaction in which the flow - // will run. If nil, the flow will not run in a transaction higher-level - // transaction (i.e. it is responsible for managing its own transactions, - // if any). Most flows expect to run in a txn, but some, like backfills, - // don't. + // will run. If nil, the flow will not run in a higher-level transaction + // (i.e. it is responsible for managing its own transactions, if any). Most + // flows expect to run in a txn, but some, like backfills, don't. TxnCoordMeta *cockroach_roachpb1.TxnCoordMeta `protobuf:"bytes,7,opt,name=txn_coord_meta,json=txnCoordMeta" json:"txn_coord_meta,omitempty"` // deprecated_txn used to play the role that TxnCoordMeta now plays. It // can be removed in v2.2. diff --git a/pkg/sql/distsqlrun/api.proto b/pkg/sql/distsqlrun/api.proto index 860f7802324a..fbb5d0332b16 100644 --- a/pkg/sql/distsqlrun/api.proto +++ b/pkg/sql/distsqlrun/api.proto @@ -29,10 +29,9 @@ message SetupFlowRequest { reserved 2; // TxnCoordMeta is the TxnCoordMeta for the transaction in which the flow - // will run. If nil, the flow will not run in a transaction higher-level - // transaction (i.e. it is responsible for managing its own transactions, - // if any). Most flows expect to run in a txn, but some, like backfills, - // don't. + // will run. If nil, the flow will not run in a higher-level transaction + // (i.e. it is responsible for managing its own transactions, if any). Most + // flows expect to run in a txn, but some, like backfills, don't. optional roachpb.TxnCoordMeta txn_coord_meta = 7; // deprecated_txn used to play the role that TxnCoordMeta now plays. It // can be removed in v2.2. diff --git a/pkg/sql/distsqlrun/server.go b/pkg/sql/distsqlrun/server.go index 820daaf1038f..b027a9a64c87 100644 --- a/pkg/sql/distsqlrun/server.go +++ b/pkg/sql/distsqlrun/server.go @@ -271,6 +271,12 @@ func FlowVerIsCompatible(flowVer, minAcceptedVersion, serverVersion DistSQLVersi return flowVer >= minAcceptedVersion && flowVer <= serverVersion } +// setupFlow creates a Flow. +// +// Args: +// localState: Specifies if the flow runs entirely on this node and, if it does, +// specifies the txn and other attributes. +// // Note: unless an error is returned, the returned context contains a span that // must be finished through Flow.Cleanup. func (ds *ServerImpl) setupFlow( @@ -326,16 +332,19 @@ func (ds *ServerImpl) setupFlow( monitor.Start(ctx, parentMonitor, mon.BoundAccount{}) acc := monitor.MakeBoundAccount() - txn := localState.Txn - if txn := req.DeprecatedTxn; txn != nil { - if req.TxnCoordMeta != nil { - return nil, nil, errors.Errorf("provided both Txn and TxnCoordMeta") + // Figure out what txn the flow needs to run in, if any. + // For local flows, the txn comes from localState.Txn. For non-local ones, we + // create a txn based on the request's TxnCoordMeta. + var txn *client.Txn + if !localState.IsLocal { + if depTxn := req.DeprecatedTxn; depTxn != nil { + if req.TxnCoordMeta != nil { + return nil, nil, errors.Errorf("provided both Txn and TxnCoordMeta") + } + meta := roachpb.MakeTxnCoordMeta(*depTxn) + req.TxnCoordMeta = &meta } - meta := roachpb.MakeTxnCoordMeta(*txn) - req.TxnCoordMeta = &meta - } - if meta := req.TxnCoordMeta; meta != nil { - if !localState.IsLocal { + if meta := req.TxnCoordMeta; meta != nil { if meta.Txn.Status != roachpb.PENDING { return nil, nil, errors.Errorf("cannot create flow in non-PENDING txn: %s", meta.Txn) @@ -344,6 +353,8 @@ func (ds *ServerImpl) setupFlow( // Txn to heartbeat the transaction. txn = client.NewTxnWithCoordMeta(ctx, ds.FlowDB, req.Flow.Gateway, client.LeafTxn, *meta) } + } else { + txn = localState.Txn } var evalCtx *tree.EvalContext @@ -475,13 +486,19 @@ func (ds *ServerImpl) SetupSyncFlow( // LocalState carries information that is required to set up a flow with wrapped // planNodes. type LocalState struct { + EvalContext *tree.EvalContext + // IsLocal is true if the flow is being run locally in the first place. IsLocal bool + + ///////////////////////////////////////////// + // Fields below are empty if IsLocal == false + ///////////////////////////////////////////// + // LocalProcs is an array of planNodeToRowSource processors. It's in order and // will be indexed into by the RowSourceIdx field in LocalPlanNodeSpec. - LocalProcs []LocalProcessor - EvalContext *tree.EvalContext - Txn *client.Txn + LocalProcs []LocalProcessor + Txn *client.Txn } // SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node.