Skip to content

Commit

Permalink
Merge #59293
Browse files Browse the repository at this point in the history
59293: sql: fix emitting maximum memory usage ComponentStat r=yuzefovich a=asubiotto

While working on some frontend changes, I realized that we weren't displaying max memory usage correctly. This PR fixes that. Please see individual commits for details.

Release note: None (no user-visible change)

Co-authored-by: Alfonso Subiotto Marques <[email protected]>
  • Loading branch information
craig[bot] and asubiotto committed Jan 26, 2021
2 parents d86781c + cb7ee1c commit 97d14bf
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 159 deletions.
13 changes: 6 additions & 7 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (f *vectorizedFlow) Setup(
helper,
vectorizedRemoteComponentCreator{},
recordingStats,
f.Gateway,
f.GetWaitGroup(),
f.GetSyncFlowConsumer(),
flowCtx.Cfg.NodeDialer,
Expand Down Expand Up @@ -438,6 +439,7 @@ type vectorizedFlowCreator struct {
streamIDToInputOp map[execinfrapb.StreamID]opDAGWithMetaSources
streamIDToSpecIdx map[execinfrapb.StreamID]int
recordingStats bool
isGatewayNode bool
vectorizedStatsCollectorsQueue []colexec.VectorizedStatsCollector
waitGroup *sync.WaitGroup
syncFlowConsumer execinfra.RowReceiver
Expand Down Expand Up @@ -505,6 +507,7 @@ func newVectorizedFlowCreator(
helper flowCreatorHelper,
componentCreator remoteComponentCreator,
recordingStats bool,
isGatewayNode bool,
waitGroup *sync.WaitGroup,
syncFlowConsumer execinfra.RowReceiver,
nodeDialer *nodedialer.Dialer,
Expand Down Expand Up @@ -948,17 +951,13 @@ func (s *vectorizedFlowCreator) setupOutput(
// Start a separate recording so that GetRecording will return
// the recordings for only the child spans containing stats.
ctx, span := tracing.ChildSpanRemote(ctx, "")
if atomic.AddInt32(&s.numOutboxesDrained, 1) == atomic.LoadInt32(&s.numOutboxes) {
if atomic.AddInt32(&s.numOutboxesDrained, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode {
// At the last outbox, we can accurately retrieve stats for the
// whole flow from parent monitors. These stats are added to a
// flow-level span.
span.SetTag(execinfrapb.FlowIDTagKey, flowCtx.ID)
span.SetSpanStats(&execinfrapb.ComponentStats{
Component: execinfrapb.ComponentID{
Type: execinfrapb.ComponentID_FLOW,
FlowID: flowCtx.ID,
// TODO(radu): the node ID should be part of the ComponentID.
},
Component: execinfrapb.FlowComponentID(outputStream.OriginNodeID, flowCtx.ID),
FlowStats: execinfrapb.FlowStats{
MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())),
},
Expand Down Expand Up @@ -1369,7 +1368,7 @@ func ConvertToVecTree(
fuseOpt = flowinfra.FuseAggressively
}
creator := newVectorizedFlowCreator(
newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false,
newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, false,
nil, &execinfra.RowChannel{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
flowCtx.Cfg.VecFDSemaphore, flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn),
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestDrainOnlyInputDAG(t *testing.T) {
}
var wg sync.WaitGroup
vfc := newVectorizedFlowCreator(
&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{},
&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, false, &wg, &execinfra.RowChannel{},
nil /* nodeDialer */, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
nil /* fdSemaphore */, descs.DistSQLTypeResolver{},
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestOutboxInboundStreamIntegration(t *testing.T) {
}

streamID := execinfrapb.StreamID(1)
outbox := flowinfra.NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */)
outbox := flowinfra.NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */)
outbox.Init(rowenc.OneIntCol)

// WaitGroup for the outbox and inbound stream. If the WaitGroup is done, no
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func (ds *ServerImpl) setupFlow(
}

// Create the FlowCtx for the flow.
flowCtx := ds.NewFlowContext(ctx, req.Flow.FlowID, evalCtx, req.TraceKV, localState)
flowCtx := ds.NewFlowContext(
ctx, req.Flow.FlowID, evalCtx, req.TraceKV, localState, req.Flow.Gateway == roachpb.NodeID(ds.NodeID.SQLInstanceID()),
)

// req always contains the desired vectorize mode, regardless of whether we
// have non-nil localState.EvalContext. We don't want to update EvalContext
Expand Down Expand Up @@ -392,6 +394,7 @@ func (ds *ServerImpl) NewFlowContext(
evalCtx *tree.EvalContext,
traceKV bool,
localState LocalState,
isGatewayNode bool,
) execinfra.FlowCtx {
// TODO(radu): we should sanity check some of these fields.
flowCtx := execinfra.FlowCtx{
Expand All @@ -402,6 +405,7 @@ func (ds *ServerImpl) NewFlowContext(
NodeID: ds.ServerConfig.NodeID,
TraceKV: traceKV,
Local: localState.IsLocal,
Gateway: isGatewayNode,
}

if localState.IsLocal && localState.Collection != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type FlowCtx struct {
// Local is true if this flow is being run as part of a local-only query.
Local bool

// Gateway is true if this flow is being run on the gateway node.
Gateway bool

// TypeResolverFactory is used to construct transaction bound TypeResolvers
// to resolve type references during flow setup. It is not safe for concurrent
// use and is intended to be used only during flow setup and initialization.
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/optional"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -42,6 +43,15 @@ func StreamComponentID(flowID FlowID, streamID StreamID) ComponentID {
}
}

// FlowComponentID returns a ComponentID for the given flow.
func FlowComponentID(nodeID roachpb.NodeID, flowID FlowID) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_FLOW,
NodeID: nodeID,
}
}

// FlowIDTagKey is the key used for flow id tags in tracing spans.
const (
FlowIDTagKey = tracing.TagPrefix + "flowid"
Expand Down
Loading

0 comments on commit 97d14bf

Please sign in to comment.