-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
colexec, colflow: add network latency stat to streams #55705
Conversation
20455e1
to
a81be2d
Compare
#54769 was closed in favour of this PR, as most of the feedback no longer applied.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool! I only have a few nits, but I'll defer to Alfonso for approval.
IO time
was renamed todeserialization time
for inboxes. I got this value by subtracting the network latency from the total time. Since the network latency is just an approximate stat, so I'm not sure if it's okay/a good idea to base another stat on this value. Thoughts?
I think the math is not quite right - the total time includes the latency for every received batch, but we're subtracting it only once. Looks like we should use something like this deserialization time = total time - # batches x latency
. I like the idea of having this stat even if it's not perfectly accurate.
Reviewed 17 of 17 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @cathymw)
pkg/sql/colexec/stats.go, line 37 at r1 (raw file):
// VectorizedStatsCollector collects VectorizedStats on Operators. // // If two Operators are connected (i.e. one is an input to another), the
nit: just noticed that this comment is no longer true (not related to this PR), so feel free to remove this paragraph.
pkg/sql/colexec/stats.go, line 41 at r1 (raw file):
// StopWatch. // TODO(cathymw): refactor this class into a base and specialized stats // collectors
nit: missing period.
pkg/sql/colexec/execpb/stats.go, line 104 at r1 (raw file):
fmt.Sprintf("%s: %v", networkLatencyQueryPlanSuffix, time.Duration(vs.NetworkLatency).Round(time.Microsecond))) } else { stats = append(stats, fmt.Sprintf("%s: %d", tuplesOutputQueryPlanSuffix, vs.NumTuples),
super nit: I think it'd be nice to have this Sprintf
on a new line for consistency with above.
pkg/sql/colexec/execpb/stats.proto, line 42 at r1 (raw file):
// if network latency should be shown or not. bool on_stream = 10; // network_latency is the latency time between outbox and inbox. It is set
nit: it might be worth calling out what is the unit of time here (I think it is nanoseconds).
4c84287
to
a695871
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick review! :) Fixed the nits and deserialization time calculation.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/sql/colexec/stats.go, line 37 at r1 (raw file):
Previously, yuzefovich wrote…
nit: just noticed that this comment is no longer true (not related to this PR), so feel free to remove this paragraph.
Done.
pkg/sql/colexec/stats.go, line 41 at r1 (raw file):
Previously, yuzefovich wrote…
nit: missing period.
Done.
pkg/sql/colexec/execpb/stats.go, line 104 at r1 (raw file):
Previously, yuzefovich wrote…
super nit: I think it'd be nice to have this
Sprintf
on a new line for consistency with above.
Done.
pkg/sql/colexec/execpb/stats.proto, line 42 at r1 (raw file):
Previously, yuzefovich wrote…
nit: it might be worth calling out what is the unit of time here (I think it is nanoseconds).
Done.
a695871
to
aed4a8e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
IO time was renamed to deserialization time for inboxes. I got this value by subtracting the network latency from the total time. Since the network latency is just an approximate stat, so I'm not sure if it's okay/a good idea to base another stat on this value. Thoughts?
Didn't we already calculate deserialization time using the timer you added or is that something we only did in the last version? I think that's my preferred way to calculate it.
Reviewed 4 of 4 files at r2.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @cathymw and @yuzefovich)
pkg/sql/colexec/execpb/stats.go, line 43 at r2 (raw file):
if vs.IO { if vs.OnStream { timeSuffix = deserializationTimeTagSuffix
I would leave this if block alone and set the timeSuffix
in the below check of vs.OnStream
so you only have to check it once.
pkg/sql/colexec/execpb/stats.go, line 101 at r2 (raw file):
if vs.OnStream { stats = append(stats, fmt.Sprintf("%s: %v", timeSuffix, (vs.Time-time.Duration(vs.NetworkLatency*vs.NumBatches)).Round(time.Microsecond)),
I think this is a bit leaky because the stats object shouldn't know how to calculate deserialization time. Maybe there's a way to have the inbox return deserialization time through vs.Time
? Also, doesn't having this specific case mean that we don't output the number of tuples output? Rather than having vs.OnStream
be a specific case. How about:
stats = []string{batches output, tuples output, time /* in inbox, this will be deserialization time */}
if vs.OnStream {
stats = append(stats, network latency)
}
pkg/sql/colexec/execpb/stats.proto, line 41 at r2 (raw file):
// on_stream indicates whether the stats are shown on a stream. This decides // if network latency should be shown or not. bool on_stream = 10;
If you're going to do a refactor for this, I would get it in before this PR (or an earlier commit as we discussed). It would be unfortunate to add a new field to a protobuf that we'd then have to deprecate.
pkg/sql/colflow/vectorized_flow.go, line 591 at r2 (raw file):
var latency int64 ss, err := flowCtx.Cfg.NodesStatusServer.OptionalNodesStatusServer(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
What happens in case this is not a system tenant? latency
will be 0, does that mean that no stats will be displayed? It would be great to just remove it from the plan if we can't get it.
pkg/sql/colflow/vectorized_flow.go, line 593 at r2 (raw file):
ss, err := flowCtx.Cfg.NodesStatusServer.OptionalNodesStatusServer(errorutil.FeatureNotAvailableToNonSystemTenantsIssue) if err == nil { response, _ := ss.Nodes(ctx, &serverpb.NodesRequest{})
This performs the RPC every time we set up a remote output stream (many times per query). As we discussed, I think we need to cache this map in a level above thevectorizedFlowCreator
. There might be a better place but we would:
- Create some sort of helper
latencyGetter struct { cache latencyMap; server StatusServer }
that the distsql server or planner creates with a pointer to the status server. - This helper would have some way to get latency:
func (lg latencyGetter) getLatency(from, to NodeID) {
if time.Since(lastUpdate) > someTime {
// The cache seems old, update it.
lg.cache = convertToMap(lg.server.Nodes())
lastUpdate = time.Now()
}
return lg.cache[from][to]
}
And we'd use the latencyGetter here (maybe there's a better name). This is a lot cheaper, since we don't expect latencies to vary much.
pkg/sql/execinfrapb/data.proto, line 246 at r2 (raw file):
// and an Inbox. message NetworkStats { optional int64 latency = 1 [(gogoproto.nullable) = false];
I think this message deserves a more extensive comment about how this latency is calculated and why we're using the latency on the origin node and then sending it over rather than using the latency on the inbox. In fact, can't we get the same information from the inbox (we have access to the whole cluster matrix, right?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @cathymw, and @yuzefovich)
pkg/sql/colexec/execpb/stats.go, line 101 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I think this is a bit leaky because the stats object shouldn't know how to calculate deserialization time. Maybe there's a way to have the inbox return deserialization time through
vs.Time
? Also, doesn't having this specific case mean that we don't output the number of tuples output? Rather than havingvs.OnStream
be a specific case. How about:stats = []string{batches output, tuples output, time /* in inbox, this will be deserialization time */} if vs.OnStream { stats = append(stats, network latency) }
tuples output
is removed from the stream because it is duplicated with rowsRead
, see #54769 (review)
aed4a8e
to
264d418
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a second commit that refactors the VectorizedStatsCollector and VectorizedStats types. Also, added a LatencyGetter struct to hold a map of all the node latencies.
Didn't we already calculate deserialization time using the timer you added or is that something we only did in the last version? I think that's my preferred way to calculate it.
Having the inbox collect its own deserialization time was proposed in the last version. While I was testing the second commit, I found that deserialization time = total time - # batches x latency
sometimes displays negative time values for the deserialization time. Since this PR is already getting quite large, I left the time displayed as ioTime
and added a TODO to have the inbox collect its own the deserialization time.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/sql/colexec/execpb/stats.go, line 43 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I would leave this if block alone and set the
timeSuffix
in the below check ofvs.OnStream
so you only have to check it once.
Done. vs.OnStream was removed completely in the refactor (2nd commit).
pkg/sql/colexec/execpb/stats.go, line 101 at r2 (raw file):
I think this is a bit leaky because the stats object shouldn't know how to calculate deserialization time. Maybe there's a way to have the inbox return deserialization time through vs.Time?
I've removed this calculation for now, since it was showing negative time values for some queries. For now, I added a TODO to have inbox collect its own deserialization time, since it's a little out of scope for this PR.
Also, doesn't having this specific case mean that we don't output the number of tuples output?
+1 on what Yahor commented. Also, with the refactor in the second commit, I added a separate implementation of the SpanStats
interface called VectorizedInboxStats
that displays the stats we want for Inbox.
pkg/sql/colexec/execpb/stats.proto, line 41 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
If you're going to do a refactor for this, I would get it in before this PR (or an earlier commit as we discussed). It would be unfortunate to add a new field to a protobuf that we'd then have to deprecate.
Done.
pkg/sql/colflow/vectorized_flow.go, line 591 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
What happens in case this is not a system tenant?
latency
will be 0, does that mean that no stats will be displayed? It would be great to just remove it from the plan if we can't get it.
Done.
pkg/sql/colflow/vectorized_flow.go, line 593 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
This performs the RPC every time we set up a remote output stream (many times per query). As we discussed, I think we need to cache this map in a level above the
vectorizedFlowCreator
. There might be a better place but we would:
- Create some sort of helper
latencyGetter struct { cache latencyMap; server StatusServer }
that the distsql server or planner creates with a pointer to the status server.- This helper would have some way to get latency:
func (lg latencyGetter) getLatency(from, to NodeID) { if time.Since(lastUpdate) > someTime { // The cache seems old, update it. lg.cache = convertToMap(lg.server.Nodes()) lastUpdate = time.Now() } return lg.cache[from][to] }
And we'd use the latencyGetter here (maybe there's a better name). This is a lot cheaper, since we don't expect latencies to vary much.
Done.
pkg/sql/execinfrapb/data.proto, line 246 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I think this message deserves a more extensive comment about how this latency is calculated and why we're using the latency on the origin node and then sending it over rather than using the latency on the inbox. In fact, can't we get the same information from the inbox (we have access to the whole cluster matrix, right?)
I updated the comment to be more extensive, but I'm not sure I understand the second part of your comment. The latency is retrieved at the outbox and sent to the inbox because retrieving the latency requires both the sending node ID and receiving node ID, but the inbox doesn't have access to which node ID data is coming from
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I was testing the second commit, I found that deserialization time = total time - # batches x latency sometimes displays negative time values for the deserialization time. Since this PR is already getting quite large, I left the time displayed as ioTime and added a TODO to have the inbox collect its own the deserialization time.
SGTM
Looking good, although I think you should reorder the commits (using git rebase -i HEAD~2
and swapping the order of the commits) since the stats commit adds some code that is made unnecessary by the refactor commit .
Reviewed 6 of 17 files at r1, 7 of 7 files at r3, 13 of 13 files at r4.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @cathymw, and @yuzefovich)
pkg/server/server_sql.go, line 67 at r4 (raw file):
"github.com/marusama/semaphore" "google.golang.org/grpc" "math"
I think these imports should stay where they were. Have you enabled crlfmt
in goland? That should take care of it for you automatically on save.
pkg/server/serverpb/status.go, line 81 at r4 (raw file):
// the Inbox. func (lg *LatencyGetter) GetLatency( outboxNodeID roachpb.NodeID,
Instead of outbox/inbox
, you should make this more general, with for example originNodeID
and targetNodeID
.
pkg/server/serverpb/status.go, line 83 at r4 (raw file):
outboxNodeID roachpb.NodeID, inboxNodeID roachpb.NodeID, ctx context.Context,
nit: context
is usually the first argument.
pkg/server/serverpb/status.go, line 85 at r4 (raw file):
ctx context.Context, ) int64 { if time.Since(lg.lastUpdatedTime) > time.Second {
extract this time.Second
into a constant and refer to it in the comment so it doesn't become stale if it is changed. Also, maybe we can make this 5*time.Second, not sure we get anything from pinging these nodes every second.
pkg/server/serverpb/status.go, line 86 at r4 (raw file):
) int64 { if time.Since(lg.lastUpdatedTime) > time.Second { // update latencies in latencyMap
nit: capitalize and period.
pkg/server/serverpb/status.go, line 88 at r4 (raw file):
// update latencies in latencyMap ss, err := lg.NodesStatusServer.OptionalNodesStatusServer(errorutil.FeatureNotAvailableToNonSystemTenantsIssue) if err == nil {
Can you avoid excessive indentation by doing if err != nil { do something? }
so that your err == nil
is implicit and the block doesn't need to be indented. In general, it's nice to indent only the smallest blocks. For example, you can change your code to be:
if time.Since(lastUpdated) <= time.Second {
return lg.latencyMap
}
// Implicit else, update with result from Nodes
lg.latencyMap.update()
if err != nil {
return something
}
return updated lg.latencyMap
pkg/server/serverpb/status.go, line 106 at r4 (raw file):
} } fmt.Println(lg.latencyMap)
leftover
pkg/sql/distsql_running.go, line 47 at r4 (raw file):
"github.com/cockroachdb/errors" opentracing "github.com/opentracing/opentracing-go" "math"
same thing re imports
pkg/sql/colexec/stats.go, line 122 at r4 (raw file):
idTagKey string, inputWatch *timeutil.StopWatch, memMonitors []*mon.BytesMonitor,
All the nil monitors seem to indicate that it'd be nice to also have a memory/disk focused stat collector as well as one for IO operators (i.e. wherever we need to write if vsc.IO
) but we can leave that for another PR.
pkg/sql/colexec/stats.go, line 201 at r4 (raw file):
} func createSpan(
Could you make this a private method on the vectorized stats collector base since this needs a couple of inputs that are already fields of the struct?
pkg/sql/colflow/vectorized_flow.go, line 593 at r2 (raw file):
Previously, cathymw wrote…
Done.
It seems like the new LatencyGetter
is part of the refactor commit when it should be part of the stats commit.
Can you also add a comment about what happens if we can't get the node ID?
pkg/sql/execinfrapb/data.proto, line 246 at r2 (raw file):
but the inbox doesn't have access to which node ID data is coming from
Got it, thanks. I had forgotten about this. BTW, I think the new comment is in the wrong commit. To undo this you can use git restore
to undo the changes and stage them and then commit in the other commit. If it's too much work to separate the changes into their respective commits, I think I'm fine with squashing the two commits into one even though it's not ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @cathymw, and @yuzefovich)
pkg/sql/colexec/stats.go, line 65 at r4 (raw file):
} // NetworkVectorizedStatsCollector collects InbocVectorizedStats on Inbox.
nit: s/InbocVectorizedStats/VectorizedInboxStats/
.
pkg/sql/colexec/stats.go, line 76 at r4 (raw file):
// initVectorizedStatsCollectorBase initializes the common fields // of VectorizedStatsCollectorBase for all VectorizedStatsCollectors
nit: missing period.d
pkg/sql/colexec/stats.go, line 194 at r4 (raw file):
// finalizeStats records the stats for the VectorizedStatsCollectorBase and // network latency. It also adjusts the time recorded to be the Inbox
How does the "adjustment" work? I don't think I see it. I guess it is a TODO item?
pkg/sql/colflow/vectorized_flow.go, line 376 at r4 (raw file):
// colexec.NetworkVectorizedStatsCollector that wraps op. func (s *vectorizedFlowCreator) wrapWithNetworkVectorizedStatsCollector( op colexecbase.Operator,
nit: I think op
and networkReader
currently is always the same colrpc.Inbox
, right? We could probably remove one of the arguments then.
264d418
to
6aca241
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Squashed commits because reordering the commits resulted in a bunch of merge conflicts and separating the changes was quite complicated.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/server/server_sql.go, line 67 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I think these imports should stay where they were. Have you enabled
crlfmt
in goland? That should take care of it for you automatically on save.
Done.
pkg/server/serverpb/status.go, line 81 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Instead of
outbox/inbox
, you should make this more general, with for exampleoriginNodeID
andtargetNodeID
.
Done.
pkg/server/serverpb/status.go, line 83 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
nit:
context
is usually the first argument.
Done.
pkg/server/serverpb/status.go, line 85 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
extract this
time.Second
into a constant and refer to it in the comment so it doesn't become stale if it is changed. Also, maybe we can make this 5*time.Second, not sure we get anything from pinging these nodes every second.
Done.
pkg/server/serverpb/status.go, line 86 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
nit: capitalize and period.
Done.
pkg/server/serverpb/status.go, line 88 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Can you avoid excessive indentation by doing
if err != nil { do something? }
so that yourerr == nil
is implicit and the block doesn't need to be indented. In general, it's nice to indent only the smallest blocks. For example, you can change your code to be:if time.Since(lastUpdated) <= time.Second { return lg.latencyMap } // Implicit else, update with result from Nodes lg.latencyMap.update() if err != nil { return something } return updated lg.latencyMap
Done.
pkg/server/serverpb/status.go, line 106 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
leftover
Done.
pkg/sql/distsql_running.go, line 47 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
same thing re imports
Done.
pkg/sql/colexec/stats.go, line 65 at r4 (raw file):
Previously, yuzefovich wrote…
nit:
s/InbocVectorizedStats/VectorizedInboxStats/
.
Done.
pkg/sql/colexec/stats.go, line 76 at r4 (raw file):
Previously, yuzefovich wrote…
nit: missing period.d
Done.
pkg/sql/colexec/stats.go, line 122 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
All the nil monitors seem to indicate that it'd be nice to also have a memory/disk focused stat collector as well as one for IO operators (i.e. wherever we need to write
if vsc.IO
) but we can leave that for another PR.
Done. Left a TODO.
pkg/sql/colexec/stats.go, line 194 at r4 (raw file):
Previously, yuzefovich wrote…
How does the "adjustment" work? I don't think I see it. I guess it is a TODO item?
Done. Oops, I forgot to take out this comment when I removed deserialization time. Left a TODO in execpb/stats.go
for inbox to collect its own deserialization time.
pkg/sql/colexec/stats.go, line 201 at r4 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Could you make this a private method on the vectorized stats collector base since this needs a couple of inputs that are already fields of the struct?
Done.
pkg/sql/colflow/vectorized_flow.go, line 593 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
It seems like the new
LatencyGetter
is part of the refactor commit when it should be part of the stats commit.Can you also add a comment about what happens if we can't get the node ID?
Done.
pkg/sql/colflow/vectorized_flow.go, line 376 at r4 (raw file):
Previously, yuzefovich wrote…
nit: I think
op
andnetworkReader
currently is always the samecolrpc.Inbox
, right? We could probably remove one of the arguments then.
Done.
pkg/sql/execinfrapb/data.proto, line 246 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
but the inbox doesn't have access to which node ID data is coming from
Got it, thanks. I had forgotten about this. BTW, I think the new comment is in the wrong commit. To undo this you can use
git restore
to undo the changes and stage them and then commit in the other commit. If it's too much work to separate the changes into their respective commits, I think I'm fine with squashing the two commits into one even though it's not ideal.
Done. Commits were squashed because it would have been quite complicated to separate the changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 7 files at r5.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @cathymw, and @yuzefovich)
pkg/server/serverpb/status.go, line 84 at r5 (raw file):
ctx context.Context, originNodeID roachpb.NodeID, targetNodeID roachpb.NodeID, ) int64 { if time.Since(lg.lastUpdatedTime) > updateThreshold {
As mentioned in the other comment, I would flip this conditional to avoid indenting the block inside the if:
if time.Since(lg.lastUpdatedTime) < updateThreshold {
return lg.latencyMap[originNodeID][targetNodeID]
}
<other code>
pkg/sql/execinfrapb/data.proto, line 246 at r2 (raw file):
Previously, cathymw wrote…
Done. Commits were squashed because it would have been quite complicated to separate the changes
I've been thinking about this and I wonder: maybe we should just change the StreamEndpointSpec
protobuf that the inbox has access to to include the node ID it's receiving data from? We could avoid modifying the outbox to send this special message. I believe all it would take is changing PopulateEndpoints
in pkg/sql/physicalplan/physical_plan.go
to populate the new protobuf field with p1.Node
, which is the node the outbox will be on.
306dc47
to
bccd736
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/server/serverpb/status.go, line 84 at r5 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
As mentioned in the other comment, I would flip this conditional to avoid indenting the block inside the if:
if time.Since(lg.lastUpdatedTime) < updateThreshold { return lg.latencyMap[originNodeID][targetNodeID] } <other code>
Done. Oops, fixed it!
pkg/sql/execinfrapb/data.proto, line 246 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I've been thinking about this and I wonder: maybe we should just change the
StreamEndpointSpec
protobuf that the inbox has access to to include the node ID it's receiving data from? We could avoid modifying the outbox to send this special message. I believe all it would take is changingPopulateEndpoints
inpkg/sql/physicalplan/physical_plan.go
to populate the new protobuf field withp1.Node
, which is the node the outbox will be on.
Added some WIP changes as a second commit that I will squash/delete later. I tried to have the inbox access both the origin node id and target node id, like you suggested, and passing in the latency when an inbox is created. But,
when I tested it in roachprod, the node ids seem to not get set properly (i.e. accessing the ids through StreamEndpointSpec
always gives 0 as both values). I double-checked that in the PopulateEndpoints
function they were getting set to non-zero values. Not really sure right now what/where the missing link is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @cathymw, and @yuzefovich)
pkg/sql/execinfrapb/data.proto, line 246 at r2 (raw file):
But, when I tested in it roachprod
What tests are you running? And where are you accessing the StreamEndpointSpec
? When setting up the inputs?
Hmm, taking a quick look at the code it looks like this:
endpoint := execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(sIdx)}
if p1.Node == p2.Node {
endpoint.Type = execinfrapb.StreamEndpointSpec_LOCAL
} else {
endpoint.Type = execinfrapb.StreamEndpointSpec_REMOTE
}
p2.Spec.Input[s.DestInput].Streams = append(p2.Spec.Input[s.DestInput].Streams, endpoint)
if endpoint.Type == execinfrapb.StreamEndpointSpec_REMOTE {
if !p.remotePlan {
p.remotePlan = true
}
endpoint.TargetNodeID = p2.Node
}
So what we're doing is appending to p2
input spec before setting the TargetNodeID
. I think what's most likely happening is that since the endpoint spec is a value, appending to the streams causes a copy when the node IDs are unset, which results in a node ID of zero even if they're set later on (because you're setting it on the original copy). I would just populate the node IDs before appending and see if that makes a difference.
4972f77
to
78c6a2b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reviews so far! Ready for another look! Following Alfonso's suggestion, I updated the commit so that the inbox now has access to the latency directly, rather than having the outbox access the latency and sending that information to the inbox.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/sql/execinfrapb/data.proto, line 246 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
But, when I tested in it roachprod
What tests are you running? And where are you accessing the
StreamEndpointSpec
? When setting up the inputs?Hmm, taking a quick look at the code it looks like this:
endpoint := execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(sIdx)} if p1.Node == p2.Node { endpoint.Type = execinfrapb.StreamEndpointSpec_LOCAL } else { endpoint.Type = execinfrapb.StreamEndpointSpec_REMOTE } p2.Spec.Input[s.DestInput].Streams = append(p2.Spec.Input[s.DestInput].Streams, endpoint) if endpoint.Type == execinfrapb.StreamEndpointSpec_REMOTE { if !p.remotePlan { p.remotePlan = true } endpoint.TargetNodeID = p2.Node }
So what we're doing is appending to
p2
input spec before setting theTargetNodeID
. I think what's most likely happening is that since the endpoint spec is a value, appending to the streams causes a copy when the node IDs are unset, which results in a node ID of zero even if they're set later on (because you're setting it on the original copy). I would just populate the node IDs before appending and see if that makes a difference.
OH, you are so right. Putting the append line after the node IDs are set fixed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, I think this is nearly read to merge.
Reviewed 21 of 21 files at r6.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @cathymw, and @yuzefovich)
pkg/sql/colexec/stats.go, line 41 at r6 (raw file):
type VectorizedStatsCollector interface { OutputStats(ctx context.Context, flowID string, deterministicStats bool) getElapsedTime() time.Duration
I think it would be cleaner to put this into a separate interface.
pkg/sql/colflow/vectorized_flow.go, line 447 at r6 (raw file):
toClose []colexecbase.Closer, ) (*colrpc.Outbox, error) newInbox(ctx context.Context, allocator *colmem.Allocator, typs []*types.T, streamID execinfrapb.StreamID, latency int64) (*colrpc.Inbox, error)
It feels wrong to add latency
to the constructor, given all the callsites that need to be changed to pass in a 0 latency because they're not interested. How about adding a method to the inbox that does something like:
func (i *Inbox) enableLatencyReport(latency int64) {
i.latency = latency
}
Or something (maybe you can find a better name). This will avoid having to add a parameter to the constructor that is only used in a small number of cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @cathymw, and @yuzefovich)
pkg/sql/colflow/vectorized_flow.go, line 447 at r6 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
It feels wrong to add
latency
to the constructor, given all the callsites that need to be changed to pass in a 0 latency because they're not interested. How about adding a method to the inbox that does something like:func (i *Inbox) enableLatencyReport(latency int64) { i.latency = latency }
Or something (maybe you can find a better name). This will avoid having to add a parameter to the constructor that is only used in a small number of cases.
I was actually thinking this morning that maybe it's a little overkill to set the latency in the inbox at all. Currently, the latency is set in the inbox, and then the inbox is passed as the networkReader
into the NetworkVectorizedStatsCollector
just so the stats collector can later get the latency from the inbox. I was thinking that since the vectorized flow has access to the latency, it can just directly set the latency stat when it creates the vectorized stats collector. What do you think?
Sounds like a great idea. |
f404a77
to
feaa1ca
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @yuzefovich)
pkg/sql/colexec/stats.go, line 41 at r6 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
I think it would be cleaner to put this into a separate interface.
Done.
pkg/sql/colflow/vectorized_flow.go, line 447 at r6 (raw file):
Previously, cathymw wrote…
I was actually thinking this morning that maybe it's a little overkill to set the latency in the inbox at all. Currently, the latency is set in the inbox, and then the inbox is passed as the
networkReader
into theNetworkVectorizedStatsCollector
just so the stats collector can later get the latency from the inbox. I was thinking that since the vectorized flow has access to the latency, it can just directly set the latency stat when it creates the vectorized stats collector. What do you think?
Done.
d60b4c1
to
e6fddb9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 13 of 13 files at r7.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto, @cathymw, and @yuzefovich)
pkg/sql/colexec/stats.go, line 39 at r7 (raw file):
// ChildStatsCollector gives access to the stopwatches of a // VectorizedStatsCollector's childStatsCollectors. type ChildStatsCollector interface {
nit: can this be a private interface?
This commit adds the information about network latency from an outbox to inbox on EXPLAIN ANALYZE diagrams. This commit also adds the LatencyGetter helper to store node latencies in a map. Rather than updating the latency map multiple times per query, the latency map is updated if it hasn't been updated in over one second. This commit refactors the VectorizedStatsCollector type into a base struct and a specialized stats collector, NetworkVectorizedStatsCollector, for collecting network latency on streams. Since the networkReader is only used for the case where stats are collected for streams, this refactor avoids the need to pass in nils for other instance where stats collectors are used. This commit also adds a separate implementation of the SpanStats interface called VectorizedInboxStats. This change allows for us to choose which stats to display on EXPLAIN ANALYZE diagrams for streams. Release note (sql change): EXPLAIN ANALYZE diagrams now contain "network latency" information on streams.
e6fddb9
to
4ea339a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto and @yuzefovich)
pkg/sql/colexec/stats.go, line 39 at r7 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
nit: can this be a private interface?
I think this has to be public so that in can be used in vectorized_flow.go
when the vectorized stats collectors are created.
TFTRs! bors r=asubiotto |
Build succeeded: |
When the node latency map (introduced in cockroachdb#55705) is updated, it makes a NodesRequest and the returned NodesResponse is used to obtain the node latencies. This commit adds an error check for this response so that a nil pointer dereference doesn't occur. Release note: None.
56213: sql: ban alter column type on view-depended cols r=yuzefovich a=jordanlewis Previously, it was legal to edit the column type of columns that were depended on by views. This is not a good idea for various reasons, and is not Postgres compatible to boot. Now, it is illegal to alter the column type of columns that are depended on views. Closes #56166. Release note (sql change): prevent column type modification of columns that are depended on by views. 56220: flowinfra: fix and extend flow setup benchmark r=yuzefovich a=yuzefovich Previously, the flow setup benchmark would fail because we reused the same internal planner object multiple times, and this is not supported (it would fail due to memory budget error). This is now fixed by creating a new internal planner on every iteration. Additionally, this commit extends the benchmark to try both the vectorized and row-based flow setups. Release note: None 56265: colflow, serverpb: add check for NodesResponse error r=RaduBerinde a=cathymw When the node latency map (introduced in #55705) is updated, it makes a NodesRequest and the returned NodesResponse is used to obtain the node latencies. This commit adds an error check for this response so that a nil pointer dereference doesn't occur. ran `make stressrace TESTS=TestUncertaintyErrorIsReturned PKG=./pkg/sql/rowexec TESTTIMEOUT=5m STRESSFLAGS='-timeout 5m' 2>&1` for a couple minutes with no failures. Fixes: #56255 Release note: None. Co-authored-by: Jordan Lewis <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Cathy <[email protected]>
This commit adds the information about network latency from an outbox to inbox
on EXPLAIN ANALYZE diagrams.
The network latencies are obtained from the
node.Activity
map from theNodesStatusServer
.This commit also adds the LatencyGetter helper to store node latencies
in a map. Rather than updating the latency map multiple times per query,
the latency map is updated if it hasn't been updated in over one second.
This commit refactors the VectorizedStatsCollector type into a base struct
and a specialized stats collector, NetworkVectorizedStatsCollector, for
collecting network latency on streams. Since the networkReader is only
used for the case where stats are collected for streams, this refactor
avoids the need to pass in nils for other instance where stats collectors
are used.
This commit also adds a separate implementation of the SpanStats interface
called VectorizedInboxStats. This change allows for us to choose which stats
to display on EXPLAIN ANALYZE diagrams for streams.
Closes: #54555
Closes: #43859
The following screenshot shows that EXPLAIN ANALYZE now shows network latency on streams:
Release note (sql change): EXPLAIN ANALYZE diagrams now contain "network
latency" information on streams.