Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
88414: util/tracing: trim trace recordings in a smarter way  r=andreimatei a=andreimatei

Before this patch, when the recording of a child span was being added to
the parent, if the number of spans in the child recording + the number
of spans in the parent's recording were greater than the span limit
(1000), then the child's recording was completely dropped (apart from
the structured events, which were still retained). So, for example, if
the parent had a recording of 1 span, and the child has a recording of
1000 spans, the whole 1000 spans were dropped.
This patch improves things by always combining the parent trace and the
child trace, and then trimming the result according to the following
arbitrary algorithm:
- start at the root of the trace and sort its children by size, desc
- drop the fattest children (including their descendents) until the
  remaining number of spans to drop becomes smaller than the size of the
  fattest non-dropped child
- recurse into that child, with an adjusted number of spans to drop

So, the idea is that, recursively, we drop parts of the largest child -
including dropping the whole child if needed.

Fixes #87536

Release note: None

88707: ui: reduce frequent Metrics page re-rendering r=koorosh a=koorosh

Before, many components on Metrics page relied on
`nodesSummarySelector` selector that in turn relies on `NodeStatus` that change constantly with every request (and we request it periodically every 10 seconds). `NodeStatus` include lots of unnecessary data for Metrics page (ie. node's and stores last metrics that aren't used on charts) but these changes forces react components to be re-rendered.

This patch refactors selectors that are used by metrics page in a way to provide only relevant subset of NodeStatus's info
to Graph components and reduce propagation of `props` passing from parent to child components. Instead, use selectors 
in child components if necessary.

Release note: None

Resolves #65030


Video that shows how often components were re-rendered **before** this fix - regardless of timewindow, it always
updates every 10 seconds:

https://user-images.githubusercontent.com/3106437/192295619-9b2da8bd-2fdb-4f5e-96db-688048fc54cf.mov

and here's after fix. Components re-render every 10 second for 10 minutes interval and it is not re-rendered 
for timewindows like 2 weeks or 1 month:

https://user-images.githubusercontent.com/3106437/192296089-13103781-6632-46a5-85aa-80ad8b20dd02.mov




88973: sql: pool some of the processor allocations r=yuzefovich a=yuzefovich

**sql: clarify closing contract around plan node and row source adapters**

This commit cleans up the `rowSourceToPlanNode` adapter (from the
DistSQL processor to the planNode object) in the following manner:
- it removes the incorrect call to `ConsumerClosed` of the wrapped
input. This call was redundant (because the other side of the adapter
`planNodeToRowSource` does the closure too) but was also incorrect since
it could access the row source that was put back into the sync.Pool (and
maybe even picked up by another query). See issue 88964 for more details.
- it removes the checks around non-nil "metadata forwarder". These were
needed when the local planNode and DistSQL processor engines were merged
into one about four years ago, but nowadays the adapter always gets
a non-nil forwarder.

Fixes: #88964.

Release note: None

**sql: pool some of the processor allocations**

This commit makes it so that we now pool allocations of noop,
planNodeToRowSource, and columnarizer processors. Additionally,
trailing meta callbacks for these three as well as materializers
are changed to not be anonymous functions to further reduce the
allocations.

Fixes: #88525.

Release note: None

89059: cli: fix use of canceled connection in `init` r=erikgrinaker a=tbg

The implementation of `./cockroach init` first dials up the node (in a
retry loop) and issues a `Health` RPC, to reduce the likelihood of
accidental double-init of the cluster in case of network issues.

However, this was trying to be too clever: if it managed to dial the
node and check its health (using a short-lived context), it would
then return that same connection to the caller for use in the Bootstrap
RPC. Unfortunately, that connection would sit on top of an `rpc.Context`
whose life was tied to a context[^1] that descended from one wrapped by
`RunWithTimeout`. In other words, the context would be cancelled by the
time the health check method returned.

This seems to not cause issues today, since the `rpc.Context` seems to
ignore this context cancellation. But in #88625, this suddenly became
a problem and now that I've looked at this code, might as wel fix it
regardless of whether #88625 is ever going to land.

No release note since today's code happens to work.

[^1]: https://github.com/cockroachdb/cockroach/blob/aecc58f125ac611f5793101cbd0323df569369db/pkg/cli/rpc_client.go#L46

Release note: None


89064: jobs: make the registry logging less chatty r=dt,stevendanna a=knz

The "toggling idleness" log message was the 4th most voluminous log event source in CC, logged 4x more frequently than the first next event source in volume.

This commit makes it less verbose.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Andrii Vorobiov <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
6 people committed Sep 30, 2022
6 parents 3a9f5b8 + b03a24d + e333650 + 0ff5952 + 90aadd6 + 0b8409a commit aaca5ce
Show file tree
Hide file tree
Showing 47 changed files with 1,277 additions and 715 deletions.
84 changes: 36 additions & 48 deletions pkg/cli/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)

var initCmd = &cobra.Command{
Expand All @@ -44,7 +43,10 @@ func runInit(cmd *cobra.Command, args []string) error {
defer cancel()

// Wait for the node to be ready for initialization.
conn, finish, err := waitForClientReadinessAndGetClientGRPCConn(ctx)
if err := dialAndCheckHealth(ctx); err != nil {
return err
}
conn, _, finish, err := getClientGRPCConn(ctx, serverCfg)
if err != nil {
return err
}
Expand All @@ -60,61 +62,47 @@ func runInit(cmd *cobra.Command, args []string) error {
return nil
}

// waitForClientReadinessAndGetClientGRPCConn waits for the node to
// be ready for initialization. This check ensures that the `init`
// command is less likely to fail because it was issued too
// early. In general, retrying the `init` command is dangerous [0],
// so we make a best effort at minimizing chances for users to
// arrive in an uncomfortable situation.
// dialAndCheckHealth waits for the node to be ready for initialization. This
// check ensures that the `init` command is less likely to fail because it was
// issued too early. In general, retrying the `init` command is dangerous [0],
// so we make a best effort at minimizing chances for users to arrive in an
// uncomfortable situation.
//
// [0]: https://github.com/cockroachdb/cockroach/pull/19753#issuecomment-341561452
func waitForClientReadinessAndGetClientGRPCConn(
ctx context.Context,
) (conn *grpc.ClientConn, finish func(), err error) {
defer func() {
// If we're returning with an error, tear down the gRPC connection
// that's been established, if any.
if finish != nil && err != nil {
finish()
func dialAndCheckHealth(ctx context.Context) error {
retryOpts := retry.Options{InitialBackoff: time.Second, MaxBackoff: time.Second}

tryConnect := func(ctx context.Context) error {
// (Attempt to) establish the gRPC connection. If that fails,
// it may be that the server hasn't started to listen yet, in
// which case we'll retry.
conn, _, finish, err := getClientGRPCConn(ctx, serverCfg)
if err != nil {
return err
}
}()
defer finish()

retryOpts := retry.Options{InitialBackoff: time.Second, MaxBackoff: time.Second}
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
if err = contextutil.RunWithTimeout(ctx, "init-open-conn", 5*time.Second,
func(ctx context.Context) error {
// (Attempt to) establish the gRPC connection. If that fails,
// it may be that the server hasn't started to listen yet, in
// which case we'll retry.
conn, _, finish, err = getClientGRPCConn(ctx, serverCfg)
if err != nil {
return err
}
// Access the /health endpoint. Until/unless this succeeds, the
// node is not yet fully initialized and ready to accept
// Bootstrap requests.
ac := serverpb.NewAdminClient(conn)
_, err = ac.Health(ctx, &serverpb.HealthRequest{})
return err
}

// Access the /health endpoint. Until/unless this succeeds, the
// node is not yet fully initialized and ready to accept
// Bootstrap requests.
ac := serverpb.NewAdminClient(conn)
_, err := ac.Health(ctx, &serverpb.HealthRequest{})
return err
}); err != nil {
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
if err := contextutil.RunWithTimeout(
ctx, "init-open-conn", 5*time.Second, tryConnect,
); err != nil {
err = errors.Wrapf(err, "node not ready to perform cluster initialization")
fmt.Fprintln(stderr, "warning:", err, "(retrying)")

// We're going to retry; first cancel the connection that's
// been established, if any.
if finish != nil {
finish()
finish = nil
}
// Then retry.
continue
}

// No error - connection was established and health endpoint is
// ready.
return conn, finish, err
// We managed to connect and run a sanity check. Note however that `conn` in
// `tryConnect` was established using a context that is now canceled, so we
// let the caller do its own dial.
return nil
}
err = errors.New("maximum number of retries exceeded")
return
return errors.New("maximum number of retries exceeded")
}
9 changes: 6 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,15 +960,18 @@ func (r *Registry) cleanupOldJobsPage(

log.VEventf(ctx, 2, "read potentially expired jobs: %d", numRows)
if len(toDelete.Array) > 0 {
log.Infof(ctx, "attempting to clean up %d expired job records", len(toDelete.Array))
log.VEventf(ctx, 2, "attempting to clean up %d expired job records", len(toDelete.Array))
const stmt = `DELETE FROM system.jobs WHERE id = ANY($1)`
var nDeleted int
if nDeleted, err = r.ex.Exec(
ctx, "gc-jobs", nil /* txn */, stmt, toDelete,
); err != nil {
log.Warningf(ctx, "error cleaning up %d jobs: %v", len(toDelete.Array), err)
return false, 0, errors.Wrap(err, "deleting old jobs")
}
log.Infof(ctx, "cleaned up %d expired job records", nDeleted)
if nDeleted > 0 {
log.Infof(ctx, "cleaned up %d expired job records", nDeleted)
}
}
// If we got as many rows as we asked for, there might be more.
morePages := numRows == pageSize
Expand Down Expand Up @@ -1419,7 +1422,7 @@ func (r *Registry) MarkIdle(job *Job, isIdle bool) {
jobType := payload.Type()
jm := r.metrics.JobMetrics[jobType]
if aj.isIdle != isIdle {
log.Infof(r.serverCtx, "%s job %d: toggling idleness to %+v", jobType, job.ID(), isIdle)
log.VEventf(r.serverCtx, 2, "%s job %d: toggling idleness to %+v", jobType, job.ID(), isIdle)
if isIdle {
r.metrics.RunningNonIdleJobs.Dec(1)
jm.CurrentlyIdle.Inc(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3906,7 +3906,8 @@ func (s *adminServer) GetTrace(
}
traceStillExists = true
if recording == nil {
recording = sp.GetFullRecording(tracingpb.RecordingVerbose)
trace := sp.GetFullRecording(tracingpb.RecordingVerbose)
recording = trace.Flatten()
}
return iterutil.StopIteration()
}); err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/server/node_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
func redactRecording(rec tracingpb.Recording) {
for i := range rec {
sp := &rec[i]
sp.Tags = nil // TODO(benbardin): Remove for 23.1.
sp.TagGroups = nil
for j := range sp.Logs {
record := &sp.Logs[j]
Expand Down
2 changes: 0 additions & 2 deletions pkg/server/node_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func TestRedactRecordingForTenant(t *testing.T) {

rec := mkRec()
redactRecording(rec)
require.Zero(t, rec[0].Tags)
require.Zero(t, rec[0].TagGroups)
require.Len(t, rec[0].Logs, 1)
msg := rec[0].Logs[0].Msg().StripMarkers()
Expand All @@ -79,7 +78,6 @@ func TestNewSpanFields(t *testing.T) {
SpanID tracingpb.SpanID
ParentSpanID tracingpb.SpanID
Operation string
Tags map[string]string
TagGroups []tracingpb.TagGroup
StartTime time.Time
Duration time.Duration
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execagg",
"//pkg/sql/execinfra/execopnode",
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/faketreeeval",
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,9 @@ func (r opResult) createAndWrapRowSource(
)
}
r.ColumnTypes = rs.OutputTypes()
if releasable, ok := rs.(execreleasable.Releasable); ok {
r.Releasables = append(r.Releasables, releasable)
}
return rs, nil
},
materializerSafeToRelease,
Expand All @@ -593,6 +596,7 @@ func (r opResult) createAndWrapRowSource(
r.MetadataSources = append(r.MetadataSources, r.Root.(colexecop.MetadataSource))
r.ToClose = append(r.ToClose, r.Root.(colexecop.Closer))
r.Releasables = append(r.Releasables, releasables...)
r.Releasables = append(r.Releasables, c)
return nil
}

Expand Down
56 changes: 38 additions & 18 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexec

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -78,8 +80,9 @@ type Columnarizer struct {
removedFromFlow bool
}

var _ colexecop.Operator = &Columnarizer{}
var _ colexecop.DrainableClosableOperator = &Columnarizer{}
var _ colexecop.VectorizedStatsCollector = &Columnarizer{}
var _ execreleasable.Releasable = &Columnarizer{}

// NewBufferingColumnarizer returns a new Columnarizer that will be buffering up
// rows before emitting them as output batches.
Expand Down Expand Up @@ -121,6 +124,12 @@ func NewStreamingColumnarizer(
return newColumnarizer(batchAllocator, metadataAllocator, flowCtx, processorID, input, columnarizerStreamingMode)
}

var columnarizerPool = sync.Pool{
New: func() interface{} {
return &Columnarizer{}
},
}

// newColumnarizer returns a new Columnarizer.
func newColumnarizer(
batchAllocator *colmem.Allocator,
Expand All @@ -135,10 +144,12 @@ func newColumnarizer(
default:
colexecerror.InternalError(errors.AssertionFailedf("unexpected columnarizerMode %d", mode))
}
c := &Columnarizer{
metadataAllocator: metadataAllocator,
input: input,
mode: mode,
c := columnarizerPool.Get().(*Columnarizer)
*c = Columnarizer{
ProcessorBaseNoHelper: c.ProcessorBaseNoHelper,
metadataAllocator: metadataAllocator,
input: input,
mode: mode,
}
c.ProcessorBaseNoHelper.Init(
nil, /* self */
Expand All @@ -153,18 +164,12 @@ func newColumnarizer(
processorID,
nil, /* output */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{input},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
// Close will call InternalClose(). Note that we don't return
// any trailing metadata here because the columnarizers
// propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
return nil
}},
// We append input to inputs to drain below in order to reuse the same
// underlying slice from the pooled columnarizer.
TrailingMetaCallback: c.trailingMetaCallback,
},
)
c.AddInputToDrain(input)
c.typs = c.input.OutputTypes()
c.helper.Init(batchAllocator, execinfra.GetWorkMemLimit(flowCtx), c.typs)
return c
Expand Down Expand Up @@ -256,8 +261,6 @@ func (c *Columnarizer) Next() coldata.Batch {
return c.batch
}

var _ colexecop.DrainableClosableOperator = &Columnarizer{}

// DrainMeta is part of the colexecop.MetadataSource interface.
func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
if c.removedFromFlow {
Expand Down Expand Up @@ -301,6 +304,23 @@ func (c *Columnarizer) Close(context.Context) error {
return nil
}

func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata {
// Close will call InternalClose(). Note that we don't return any trailing
// metadata here because the columnarizers propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
return nil
}

// Release releases this Columnarizer back to the pool.
func (c *Columnarizer) Release() {
c.ProcessorBaseNoHelper.Reset()
*c = Columnarizer{ProcessorBaseNoHelper: c.ProcessorBaseNoHelper}
columnarizerPool.Put(c)
}

// ChildCount is part of the execopnode.OpNode interface.
func (c *Columnarizer) ChildCount(verbose bool) int {
if _, ok := c.input.(execopnode.OpNode); ok {
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,7 @@ func newMaterializerInternal(
execinfra.ProcStateOpts{
// We append drainHelper to inputs to drain below in order to reuse
// the same underlying slice from the pooled materializer.
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
// Note that we delegate draining all of the metadata sources
// to drainHelper which is added as an input to drain below.
m.close()
return nil
},
TrailingMetaCallback: m.trailingMetaCallback,
},
)
m.AddInputToDrain(&m.drainHelper)
Expand Down Expand Up @@ -334,6 +329,13 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata
return nil, m.DrainHelper()
}

func (m *Materializer) trailingMetaCallback() []execinfrapb.ProducerMetadata {
// Note that we delegate draining all of the metadata sources to drainHelper
// which is added as an input to drain.
m.close()
return nil
}

func (m *Materializer) close() {
if m.Closed {
return
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,8 @@ CREATE TABLE crdb_internal.node_inflight_trace_spans (
"only users with the admin role are allowed to read crdb_internal.node_inflight_trace_spans")
}
return p.ExecCfg().AmbientCtx.Tracer.VisitSpans(func(span tracing.RegistrySpan) error {
for _, rec := range span.GetFullRecording(tracingpb.RecordingVerbose) {
trace := span.GetFullRecording(tracingpb.RecordingVerbose)
for _, rec := range trace.Flatten() {
traceID := rec.TraceID
parentSpanID := rec.ParentSpanID
spanID := rec.SpanID
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3339,17 +3339,15 @@ func (dsp *DistSQLPlanner) wrapPlan(
// expecting is in fact RowsAffected. RowsAffected statements return a single
// row with the number of rows affected by the statement, and are the only
// types of statement where it's valid to invoke a plan's fast path.
wrapper, err := makePlanNodeToRowSource(n,
wrapper := newPlanNodeToRowSource(
n,
runParams{
extendedEvalCtx: &evalCtx,
p: planCtx.planner,
},
useFastPath,
firstNotWrapped,
)
if err != nil {
return nil, err
}
wrapper.firstNotWrapped = firstNotWrapped

localProcIdx := p.AddLocalProcessor(wrapper)
var input []execinfrapb.InputSyncSpec
Expand Down
Loading

0 comments on commit aaca5ce

Please sign in to comment.