From 3f3218db661db9dce4cd58afc02c112b71b018f8 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 15 Dec 2022 14:24:39 -0500 Subject: [PATCH] util/ring: generic-ify ring.Buffer Epic: none Release note: None --- pkg/sql/conn_io.go | 4 +- pkg/sql/distsql_running.go | 6 +- pkg/sql/distsql_running_test.go | 2 +- pkg/sql/pgwire/conn.go | 61 +++---------------- pkg/sql/rowcontainer/row_container.go | 6 +- pkg/sql/sem/builtins/window_frame_builtins.go | 10 +-- pkg/sql/sem/eval/window_funcs_util.go | 8 +-- pkg/util/ring/ring_buffer.go | 54 ++++++++-------- pkg/util/ring/ring_buffer_test.go | 11 ++-- pkg/util/schedulerlatency/sampler.go | 6 +- pkg/util/tracing/crdbspan.go | 38 +++++------- pkg/util/tracing/span.go | 4 +- pkg/util/tracing/span_test.go | 2 +- pkg/util/tracing/tracer.go | 6 +- pkg/util/tracing/tracer_snapshots.go | 10 +-- 15 files changed, 90 insertions(+), 138 deletions(-) diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index f0583982cb3c..f66196a6e021 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -97,7 +97,7 @@ type StmtBuf struct { cond *sync.Cond // data contains the elements of the buffer. - data ring.Buffer // []Command + data ring.Buffer[Command] // startPos indicates the index of the first command currently in data // relative to the start of the connection. @@ -459,7 +459,7 @@ func (buf *StmtBuf) CurCmd() (Command, CmdPos, error) { } len := buf.mu.data.Len() if cmdIdx < len { - return buf.mu.data.Get(cmdIdx).(Command), curPos, nil + return buf.mu.data.Get(cmdIdx), curPos, nil } if cmdIdx != len { return nil, 0, errors.AssertionFailedf( diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 82d3d7775073..1892a8313943 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -282,7 +282,7 @@ type cancelFlowsCoordinator struct { mu struct { syncutil.Mutex // deadFlowsByNode is a ring of pointers to deadFlowsOnNode objects. - deadFlowsByNode ring.Buffer + deadFlowsByNode ring.Buffer[*deadFlowsOnNode] } // workerWait should be used by canceling workers to block until there are // some dead flows to cancel. @@ -301,7 +301,7 @@ func (c *cancelFlowsCoordinator) getFlowsToCancel() ( if c.mu.deadFlowsByNode.Len() == 0 { return nil, base.SQLInstanceID(0) } - deadFlows := c.mu.deadFlowsByNode.GetFirst().(*deadFlowsOnNode) + deadFlows := c.mu.deadFlowsByNode.GetFirst() c.mu.deadFlowsByNode.RemoveFirst() req := &execinfrapb.CancelDeadFlowsRequest{ FlowIDs: deadFlows.ids, @@ -322,7 +322,7 @@ func (c *cancelFlowsCoordinator) addFlowsToCancel( // sufficiently fast. found := false for j := 0; j < c.mu.deadFlowsByNode.Len(); j++ { - deadFlows := c.mu.deadFlowsByNode.Get(j).(*deadFlowsOnNode) + deadFlows := c.mu.deadFlowsByNode.Get(j) if sqlInstanceID == deadFlows.sqlInstanceID { deadFlows.ids = append(deadFlows.ids, f.FlowID) found = true diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 29b512baa514..9b4606f807b9 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -483,7 +483,7 @@ func TestCancelFlowsCoordinator(t *testing.T) { require.GreaterOrEqual(t, numNodes-1, c.mu.deadFlowsByNode.Len()) seen := make(map[base.SQLInstanceID]struct{}) for i := 0; i < c.mu.deadFlowsByNode.Len(); i++ { - deadFlows := c.mu.deadFlowsByNode.Get(i).(*deadFlowsOnNode) + deadFlows := c.mu.deadFlowsByNode.Get(i) require.NotEqual(t, gatewaySQLInstanceID, deadFlows.sqlInstanceID) _, ok := seen[deadFlows.sqlInstanceID] require.False(t, ok) diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 016131aea48b..3a316f321c70 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1247,7 +1247,7 @@ type flushInfo struct { // cmdStarts maintains the state about where the results for the respective // positions begin. We utilize the invariant that positions are // monotonically increasing sequences. - cmdStarts cmdIdxBuffer + cmdStarts ring.Buffer[cmdIdx] } type cmdIdx struct { @@ -1255,61 +1255,14 @@ type cmdIdx struct { idx int } -var cmdIdxPool = sync.Pool{ - New: func() interface{} { - return &cmdIdx{} - }, -} - -func (c *cmdIdx) release() { - *c = cmdIdx{} - cmdIdxPool.Put(c) -} - -type cmdIdxBuffer struct { - // We intentionally do not just embed ring.Buffer in order to restrict the - // methods that can be called on cmdIdxBuffer. - buffer ring.Buffer -} - -func (b *cmdIdxBuffer) empty() bool { - return b.buffer.Len() == 0 -} - -func (b *cmdIdxBuffer) addLast(pos sql.CmdPos, idx int) { - cmdIdx := cmdIdxPool.Get().(*cmdIdx) - cmdIdx.pos = pos - cmdIdx.idx = idx - b.buffer.AddLast(cmdIdx) -} - -// removeLast removes the last cmdIdx from the buffer and will panic if the -// buffer is empty. -func (b *cmdIdxBuffer) removeLast() { - b.getLast().release() - b.buffer.RemoveLast() -} - -// getLast returns the last cmdIdx in the buffer and will panic if the buffer is -// empty. -func (b *cmdIdxBuffer) getLast() *cmdIdx { - return b.buffer.GetLast().(*cmdIdx) -} - -func (b *cmdIdxBuffer) clear() { - for !b.empty() { - b.removeLast() - } -} - // registerCmd updates cmdStarts buffer when the first result for a new command // is received. func (fi *flushInfo) registerCmd(pos sql.CmdPos) { - if !fi.cmdStarts.empty() && fi.cmdStarts.getLast().pos >= pos { + if fi.cmdStarts.Len() > 0 && fi.cmdStarts.GetLast().pos >= pos { // Not a new command, nothing to do. return } - fi.cmdStarts.addLast(pos, fi.buf.Len()) + fi.cmdStarts.AddLast(cmdIdx{pos: pos, idx: fi.buf.Len()}) } func cookTag( @@ -1682,7 +1635,7 @@ func (c *conn) Flush(pos sql.CmdPos) error { c.writerState.fi.lastFlushed = pos // Make sure that the entire cmdStarts buffer is drained. - c.writerState.fi.cmdStarts.clear() + c.writerState.fi.cmdStarts.Discard() _ /* n */, err := c.writerState.buf.WriteTo(c.conn) if err != nil { @@ -1756,13 +1709,13 @@ func (cl *clientConnLock) RTrim(ctx context.Context, pos sql.CmdPos) { truncateIdx := cl.buf.Len() // Update cmdStarts buffer: delete commands that were trimmed from the back // of the cmdStarts buffer. - for !cl.cmdStarts.empty() { - cmdStart := cl.cmdStarts.getLast() + for cl.cmdStarts.Len() > 0 { + cmdStart := cl.cmdStarts.GetLast() if cmdStart.pos < pos { break } truncateIdx = cmdStart.idx - cl.cmdStarts.removeLast() + cl.cmdStarts.RemoveLast() } cl.buf.Truncate(truncateIdx) } diff --git a/pkg/sql/rowcontainer/row_container.go b/pkg/sql/rowcontainer/row_container.go index 2d16b21f5f07..8cb62adb84ad 100644 --- a/pkg/sql/rowcontainer/row_container.go +++ b/pkg/sql/rowcontainer/row_container.go @@ -650,7 +650,7 @@ type DiskBackedIndexedRowContainer struct { firstCachedRowPos int nextPosToCache int // indexedRowsCache is the cache of up to maxCacheSize contiguous rows. - indexedRowsCache ring.Buffer + indexedRowsCache ring.Buffer[eval.IndexedRow] // maxCacheSize indicates the maximum number of rows to be cached. It is // initialized to maxIndexedRowsCacheSize and dynamically adjusted if OOM // error is encountered. @@ -783,7 +783,7 @@ func (f *DiskBackedIndexedRowContainer) GetRow( if pos >= f.firstCachedRowPos && pos < f.nextPosToCache { requestedRowCachePos := pos - f.firstCachedRowPos f.hitCount++ - return f.indexedRowsCache.Get(requestedRowCachePos).(eval.IndexedRow), nil + return f.indexedRowsCache.Get(requestedRowCachePos), nil } f.missCount++ if f.diskRowIter == nil { @@ -860,7 +860,7 @@ func (f *DiskBackedIndexedRowContainer) GetRow( return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", idx) } if f.idxRowIter == pos { - return f.indexedRowsCache.GetLast().(eval.IndexedRow), nil + return f.indexedRowsCache.GetLast(), nil } } f.idxRowIter++ diff --git a/pkg/sql/sem/builtins/window_frame_builtins.go b/pkg/sql/sem/builtins/window_frame_builtins.go index ef10306130e6..37c4f085dbff 100644 --- a/pkg/sql/sem/builtins/window_frame_builtins.go +++ b/pkg/sql/sem/builtins/window_frame_builtins.go @@ -37,7 +37,7 @@ type indexedValue struct { // It assumes that the frame bounds will never go back, i.e. non-decreasing // sequences of frame start and frame end indices. type slidingWindow struct { - values ring.Buffer + values ring.Buffer[*indexedValue] evalCtx *eval.Context cmp func(*eval.Context, tree.Datum, tree.Datum) int } @@ -58,7 +58,7 @@ func makeSlidingWindow( // largest idx). func (sw *slidingWindow) add(iv *indexedValue) { for i := sw.values.Len() - 1; i >= 0; i-- { - if sw.cmp(sw.evalCtx, sw.values.Get(i).(*indexedValue).value, iv.value) > 0 { + if sw.cmp(sw.evalCtx, sw.values.Get(i).value, iv.value) > 0 { break } sw.values.RemoveLast() @@ -70,7 +70,7 @@ func (sw *slidingWindow) add(iv *indexedValue) { // indices smaller than given 'idx'. This operation corresponds to shifting the // start of the frame up to 'idx'. func (sw *slidingWindow) removeAllBefore(idx int) { - for sw.values.Len() > 0 && sw.values.Get(0).(*indexedValue).idx < idx { + for sw.values.Len() > 0 && sw.values.Get(0).idx < idx { sw.values.RemoveFirst() } } @@ -78,7 +78,7 @@ func (sw *slidingWindow) removeAllBefore(idx int) { func (sw *slidingWindow) string() string { var builder strings.Builder for i := 0; i < sw.values.Len(); i++ { - builder.WriteString(fmt.Sprintf("(%v, %v)\t", sw.values.Get(i).(*indexedValue).value, sw.values.Get(i).(*indexedValue).idx)) + builder.WriteString(fmt.Sprintf("(%v, %v)\t", sw.values.Get(i).value, sw.values.Get(i).idx)) } return builder.String() } @@ -175,7 +175,7 @@ func (w *slidingWindowFunc) Compute( // The datum with "highest priority" within the frame is at the very front // of the deque. - return w.sw.values.GetFirst().(*indexedValue).value, nil + return w.sw.values.GetFirst().value, nil } func max(a, b int) int { diff --git a/pkg/sql/sem/eval/window_funcs_util.go b/pkg/sql/sem/eval/window_funcs_util.go index c15250777325..4474ff6ed13e 100644 --- a/pkg/sql/sem/eval/window_funcs_util.go +++ b/pkg/sql/sem/eval/window_funcs_util.go @@ -38,7 +38,7 @@ type peerGroup struct { // offsets if we have OFFSET_FOLLOWING type of bound (both F and O are // upper-bounded by total number of peer groups). type PeerGroupsIndicesHelper struct { - groups ring.Buffer // queue of peer groups + groups ring.Buffer[*peerGroup] peerGrouper PeerGroupChecker headPeerGroupNum int // number of the peer group at the head of the queue allPeerGroupsSkipped bool // in GROUP mode, indicates whether all peer groups were skipped during Init @@ -161,7 +161,7 @@ func (p *PeerGroupsIndicesHelper) Update(wfr *WindowFrameRun) error { // nextPeerGroupStartIdx is the index of the first row that we haven't // computed peer group for. - lastPeerGroup := p.groups.GetLast().(*peerGroup) + lastPeerGroup := p.groups.GetLast() nextPeerGroupStartIdx := lastPeerGroup.firstPeerIdx + lastPeerGroup.rowCount if (wfr.Frame == nil || wfr.Frame.Mode == treewindow.ROWS || wfr.Frame.Mode == treewindow.RANGE) || @@ -211,7 +211,7 @@ func (p *PeerGroupsIndicesHelper) GetFirstPeerIdx(peerGroupNum int) int { if posInBuffer < 0 || p.groups.Len() < posInBuffer { panic("peerGroupNum out of bounds") } - return p.groups.Get(posInBuffer).(*peerGroup).firstPeerIdx + return p.groups.Get(posInBuffer).firstPeerIdx } // GetRowCount returns the number of rows within peer group of number @@ -221,7 +221,7 @@ func (p *PeerGroupsIndicesHelper) GetRowCount(peerGroupNum int) int { if posInBuffer < 0 || p.groups.Len() < posInBuffer { panic("peerGroupNum out of bounds") } - return p.groups.Get(posInBuffer).(*peerGroup).rowCount + return p.groups.Get(posInBuffer).rowCount } // GetLastPeerGroupNum returns the number of the last peer group in the queue. diff --git a/pkg/util/ring/ring_buffer.go b/pkg/util/ring/ring_buffer.go index e026e6674218..78b14b2b5987 100644 --- a/pkg/util/ring/ring_buffer.go +++ b/pkg/util/ring/ring_buffer.go @@ -17,8 +17,8 @@ package ring // // Note: it is backed by a slice (unlike container/ring which is backed by a // linked list). -type Buffer struct { - buffer []interface{} +type Buffer[T any] struct { + buffer []T head int // the index of the front of the buffer tail int // the index of the first position after the end of the buffer @@ -32,12 +32,12 @@ type Buffer struct { // scratch, if not nil, represents pre-allocated space that the Buffer takes // ownership of. The whole backing array of the provided slice is taken over, // included elements and available capacity. -func MakeBuffer(scratch []interface{}) Buffer { - return Buffer{buffer: scratch} +func MakeBuffer[T any](scratch []T) Buffer[T] { + return Buffer[T]{buffer: scratch} } // Len returns the number of elements in the Buffer. -func (r *Buffer) Len() int { +func (r *Buffer[T]) Len() int { if !r.nonEmpty { return 0 } @@ -51,12 +51,12 @@ func (r *Buffer) Len() int { } // Cap returns the capacity of the Buffer. -func (r *Buffer) Cap() int { +func (r *Buffer[T]) Cap() int { return cap(r.buffer) } // Get returns an element at position pos in the Buffer (zero-based). -func (r *Buffer) Get(pos int) interface{} { +func (r *Buffer[T]) Get(pos int) T { if !r.nonEmpty || pos < 0 || pos >= r.Len() { panic("index out of bounds") } @@ -64,7 +64,7 @@ func (r *Buffer) Get(pos int) interface{} { } // GetFirst returns an element at the front of the Buffer. -func (r *Buffer) GetFirst() interface{} { +func (r *Buffer[T]) GetFirst() T { if !r.nonEmpty { panic("getting first from empty ring buffer") } @@ -72,14 +72,14 @@ func (r *Buffer) GetFirst() interface{} { } // GetLast returns an element at the front of the Buffer. -func (r *Buffer) GetLast() interface{} { +func (r *Buffer[T]) GetLast() T { if !r.nonEmpty { panic("getting last from empty ring buffer") } return r.buffer[(cap(r.buffer)+r.tail-1)%cap(r.buffer)] } -func (r *Buffer) resize(n int) { +func (r *Buffer[T]) resize(n int) { if n < r.Len() { panic("resizing to fewer elements than current length") } @@ -89,7 +89,7 @@ func (r *Buffer) resize(n int) { return } - newBuffer := make([]interface{}, n) + newBuffer := make([]T, n) r.copyTo(newBuffer) r.tail = r.Len() % cap(newBuffer) r.head = 0 @@ -98,7 +98,7 @@ func (r *Buffer) resize(n int) { // copyTo copies elements from r to dst. If len(dst) < r.Len(), only the first // len(dst) elements are copied. -func (r *Buffer) copyTo(dst []interface{}) { +func (r *Buffer[T]) copyTo(dst []T) { if !r.nonEmpty { return } @@ -115,7 +115,7 @@ func (r *Buffer) copyTo(dst []interface{}) { } } -func (r *Buffer) maybeGrow() { +func (r *Buffer[T]) maybeGrow() { if r.Len() != cap(r.buffer) { return } @@ -128,7 +128,7 @@ func (r *Buffer) maybeGrow() { // AddFirst add element to the front of the Buffer and doubles it's underlying // slice if necessary. -func (r *Buffer) AddFirst(element interface{}) { +func (r *Buffer[T]) AddFirst(element T) { r.maybeGrow() r.head = (cap(r.buffer) + r.head - 1) % cap(r.buffer) r.buffer[r.head] = element @@ -137,7 +137,7 @@ func (r *Buffer) AddFirst(element interface{}) { // AddLast adds element to the end of the Buffer and doubles it's underlying // slice if necessary. -func (r *Buffer) AddLast(element interface{}) { +func (r *Buffer[T]) AddLast(element T) { r.maybeGrow() r.buffer[r.tail] = element r.tail = (r.tail + 1) % cap(r.buffer) @@ -145,11 +145,12 @@ func (r *Buffer) AddLast(element interface{}) { } // RemoveFirst removes a single element from the front of the Buffer. -func (r *Buffer) RemoveFirst() { +func (r *Buffer[T]) RemoveFirst() { if r.Len() == 0 { panic("removing first from empty ring buffer") } - r.buffer[r.head] = nil + var zero T + r.buffer[r.head] = zero r.head = (r.head + 1) % cap(r.buffer) if r.head == r.tail { r.nonEmpty = false @@ -157,12 +158,13 @@ func (r *Buffer) RemoveFirst() { } // RemoveLast removes a single element from the end of the Buffer. -func (r *Buffer) RemoveLast() { +func (r *Buffer[T]) RemoveLast() { if r.Len() == 0 { panic("removing last from empty ring buffer") } lastPos := (cap(r.buffer) + r.tail - 1) % cap(r.buffer) - r.buffer[lastPos] = nil + var zero T + r.buffer[lastPos] = zero r.tail = lastPos if r.tail == r.head { r.nonEmpty = false @@ -173,7 +175,7 @@ func (r *Buffer) RemoveLast() { // to reserve a size less than the r.Len(). // // If the Buffer already has a capacity of n or larger, this is a no-op. -func (r *Buffer) Reserve(n int) { +func (r *Buffer[T]) Reserve(n int) { if n < r.Len() { panic("reserving fewer elements than current length") } @@ -191,7 +193,7 @@ func (r *Buffer) Reserve(n int) { // Note that, if n != r.Len(), Resize always allocates new storage, even when n // is less than the current capacity. This can be useful to make the storage for // a buffer that used to be large available for GC, but it can also be wasteful. -func (r *Buffer) Resize(n int) { +func (r *Buffer[T]) Resize(n int) { if n < r.Len() { panic("resizing to fewer elements than current length") } @@ -207,7 +209,7 @@ func (r *Buffer) Resize(n int) { // available to GC. // // See also Discard. -func (r *Buffer) Reset() { +func (r *Buffer[T]) Reset() { r.head = 0 r.tail = 0 r.nonEmpty = false @@ -219,13 +221,13 @@ func (r *Buffer) Reset() { // but *r will be reassigned. // // See also Reset and Resize. -func (r *Buffer) Discard() { - *r = Buffer{} +func (r *Buffer[T]) Discard() { + *r = Buffer[T]{} } // all a slice with returns all the elements in the buffer. -func (r *Buffer) all() []interface{} { - buf := make([]interface{}, r.Len()) +func (r *Buffer[T]) all() []T { + buf := make([]T, r.Len()) r.copyTo(buf) return buf } diff --git a/pkg/util/ring/ring_buffer_test.go b/pkg/util/ring/ring_buffer_test.go index 555bbc8d57d7..8868e89d7444 100644 --- a/pkg/util/ring/ring_buffer_test.go +++ b/pkg/util/ring/ring_buffer_test.go @@ -21,7 +21,7 @@ import ( func TestRingBuffer(t *testing.T) { defer leaktest.AfterTest(t)() const operationCount = 100 - var buffer Buffer + var buffer Buffer[int] naiveBuffer := make([]interface{}, 0, operationCount) for i := 0; i < operationCount; i++ { switch rand.Intn(5) { @@ -53,14 +53,17 @@ func TestRingBuffer(t *testing.T) { default: t.Fatal("unexpected") } - - require.Equal(t, naiveBuffer, buffer.all()) + contents := make([]interface{}, 0, buffer.Len()) + for _, v := range buffer.all() { + contents = append(contents, v) + } + require.Equal(t, naiveBuffer, contents) } } func TestRingBufferCapacity(t *testing.T) { defer leaktest.AfterTest(t)() - var b Buffer + var b Buffer[string] require.Panics(t, func() { b.Reserve(-1) }) require.Equal(t, 0, b.Len()) diff --git a/pkg/util/schedulerlatency/sampler.go b/pkg/util/schedulerlatency/sampler.go index 9bbe426ac53e..d45e52f2bc46 100644 --- a/pkg/util/schedulerlatency/sampler.go +++ b/pkg/util/schedulerlatency/sampler.go @@ -159,14 +159,14 @@ func StartSampler( type sampler struct { mu struct { syncutil.Mutex - ringBuffer ring.Buffer // contains *metrics.Float64Histogram + ringBuffer ring.Buffer[*metrics.Float64Histogram] lastIntervalHistogram *metrics.Float64Histogram } } func newSampler(period, duration time.Duration) *sampler { s := &sampler{} - s.mu.ringBuffer = ring.MakeBuffer(nil) + s.mu.ringBuffer = ring.MakeBuffer(([]*metrics.Float64Histogram)(nil)) s.setPeriodAndDuration(period, duration) return s } @@ -209,7 +209,7 @@ func (s *sampler) recordLocked( sample *metrics.Float64Histogram, ) (oldest *metrics.Float64Histogram, ok bool) { if s.mu.ringBuffer.Len() == s.mu.ringBuffer.Cap() { // no more room, clear out the oldest - oldest = s.mu.ringBuffer.GetLast().(*metrics.Float64Histogram) + oldest = s.mu.ringBuffer.GetLast() s.mu.ringBuffer.RemoveLast() } s.mu.ringBuffer.AddFirst(sample) diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index 84bcc3368554..20c27903d1b8 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -146,7 +146,7 @@ type recordingState struct { // but its 'swap' method requires the mutex. recordingType atomicRecordingType - logs sizeLimitedBuffer // of *tracingpb.LogRecords + logs sizeLimitedBuffer[*tracingpb.LogRecord] // structured accumulates StructuredRecord's. // // Note that structured events that originally belonged to child spans but @@ -154,7 +154,7 @@ type recordingState struct { // this span was not recording verbosely, or children that were dropped from a // verbose recording because of the span limit) are not part of this buffer; // they're in finishedChildren.Root.StructuredRecords. - structured sizeLimitedBuffer + structured sizeLimitedBuffer[*tracingpb.StructuredRecord] // notifyParentOnStructuredEvent is true if the span's parent has asked to be // notified of every StructuredEvent recording on this span. @@ -200,8 +200,8 @@ type recordingState struct { // scratch, if not nil, represents pre-allocated space that the Buffer takes // ownership of. The whole backing array of the provided slice is taken over, // included elements and available capacity. -func makeSizeLimitedBuffer(limit int64, scratch []interface{}) sizeLimitedBuffer { - return sizeLimitedBuffer{ +func makeSizeLimitedBuffer[T any](limit int64, scratch []T) sizeLimitedBuffer[T] { + return sizeLimitedBuffer[T]{ bytesLimit: limit, Buffer: ring.MakeBuffer(scratch), } @@ -209,8 +209,8 @@ func makeSizeLimitedBuffer(limit int64, scratch []interface{}) sizeLimitedBuffer // sizeLimitedBuffer is a wrapper on top of ring.Buffer that keeps track of the // memory size of its elements. -type sizeLimitedBuffer struct { - ring.Buffer +type sizeLimitedBuffer[T any] struct { + ring.Buffer[T] bytesSize int64 bytesLimit int64 } @@ -539,8 +539,8 @@ func (t *Trace) PartialClone() Trace { // Note that Discard does not modify the backing storage (i.e. it does not nil // out the elements). So, if anyone still has a reference to the storage, then // the elements cannot be GCed. -func (buf *sizeLimitedBuffer) Discard() { - *buf = sizeLimitedBuffer{} +func (buf *sizeLimitedBuffer[T]) Discard() { + *buf = sizeLimitedBuffer[T]{} } // finish marks the span as finished. Further operations on the span are not @@ -994,12 +994,10 @@ func (s *crdbSpan) record(msg redact.RedactableString) { return } - logRecord := &tracingpb.LogRecord{ + recordInternal(s, &tracingpb.LogRecord{ Time: s.tracer.now(), Message: msg, - } - - s.recordInternal(logRecord, &s.mu.recording.logs) + }, &s.mu.recording.logs) } // recordStructured includes a structured event in s' recording. @@ -1019,7 +1017,7 @@ func (s *crdbSpan) recordStructured(item Structured) { Time: s.tracer.now(), Payload: p, } - s.recordInternal(sr, &s.mu.recording.structured) + recordInternal(s, sr, &s.mu.recording.structured) // If there are any listener's registered with this span, notify them of the // Structured event being recorded. @@ -1037,13 +1035,9 @@ type memorySizable interface { MemorySize() int } -func (s *crdbSpan) recordInternal(payload memorySizable, buffer *sizeLimitedBuffer) { +func recordInternal[PL memorySizable](s *crdbSpan, payload PL, buffer *sizeLimitedBuffer[PL]) { s.mu.Lock() defer s.mu.Unlock() - s.recordInternalLocked(payload, buffer) -} - -func (s *crdbSpan) recordInternalLocked(payload memorySizable, buffer *sizeLimitedBuffer) { size := int64(payload.MemorySize()) if size > buffer.bytesLimit { // The incoming payload alone blows past the memory limit. Let's just @@ -1057,7 +1051,7 @@ func (s *crdbSpan) recordInternalLocked(payload memorySizable, buffer *sizeLimit s.mu.recording.droppedLogs = true } for buffer.bytesSize > buffer.bytesLimit { - first := buffer.GetFirst().(memorySizable) + first := buffer.GetFirst() buffer.RemoveFirst() buffer.bytesSize -= int64(first.MemorySize()) } @@ -1125,7 +1119,7 @@ func (s *crdbSpan) appendStructuredEventsLocked( ) []tracingpb.StructuredRecord { numEvents := s.mu.recording.structured.Len() for i := 0; i < numEvents; i++ { - event := s.mu.recording.structured.Get(i).(*tracingpb.StructuredRecord) + event := s.mu.recording.structured.Get(i) buffer = append(buffer, *event) } return buffer @@ -1194,7 +1188,7 @@ func (s *crdbSpan) getRecordingNoChildrenLocked( if numEvents := s.mu.recording.structured.Len(); numEvents != 0 { rs.StructuredRecords = make([]tracingpb.StructuredRecord, 0, numEvents) for i := 0; i < numEvents; i++ { - event := s.mu.recording.structured.Get(i).(*tracingpb.StructuredRecord) + event := s.mu.recording.structured.Get(i) rs.AddStructuredRecord(*event) } } @@ -1230,7 +1224,7 @@ func (s *crdbSpan) getRecordingNoChildrenLocked( if numLogs := s.mu.recording.logs.Len(); numLogs != 0 { rs.Logs = make([]tracingpb.LogRecord, numLogs) for i := 0; i < numLogs; i++ { - lr := s.mu.recording.logs.Get(i).(*tracingpb.LogRecord) + lr := s.mu.recording.logs.Get(i) rs.Logs[i] = *lr } } diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index cba11f3c46c3..e0387b9850ae 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -698,8 +698,8 @@ func (sp *Span) reset( openChildren: h.childrenAlloc[:0], goroutineID: goroutineID, recording: recordingState{ - logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */), - structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]), + logs: makeSizeLimitedBuffer[*tracingpb.LogRecord](maxLogBytesPerSpan, nil /* scratch */), + structured: makeSizeLimitedBuffer[*tracingpb.StructuredRecord](maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]), childrenMetadata: h.childrenMetadataAlloc, finishedChildren: MakeTrace(tracingpb.RecordedSpan{}), }, diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index 2865b714450f..25cf7f4b7d28 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -56,7 +56,7 @@ func TestRecordingString(t *testing.T) { root.Record("root 1") { // Hackily fix the timing on the first log message, so that we can check it later. - r := root.i.crdb.mu.recording.logs.GetFirst().(*tracingpb.LogRecord) + r := root.i.crdb.mu.recording.logs.GetFirst() r.Time = root.i.crdb.startTime.Add(time.Millisecond) root.i.crdb.mu.recording.logs.RemoveFirst() root.i.crdb.mu.recording.logs.AddFirst(r) diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 57a77a6de061..8b26dc977009 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -313,7 +313,7 @@ type Tracer struct { // snapshots stores the activeSpansRegistry snapshots taken during the // Tracer's lifetime. The ring buffer will contain snapshots with contiguous // IDs, from the oldest one to + maxSnapshots - 1. - snapshots ring.Buffer // snapshotWithID + snapshots ring.Buffer[snapshotWithID] } testingMu syncutil.Mutex // protects testingRecordAsyncSpans @@ -967,7 +967,7 @@ type spanAllocHelper struct { // Pre-allocated buffers for the span. tagsAlloc [3]attribute.KeyValue childrenAlloc [4]childRef - structuredEventsAlloc [3]interface{} + structuredEventsAlloc [3]*tracingpb.StructuredRecord childrenMetadataAlloc map[string]tracingpb.OperationMetadata } @@ -1039,7 +1039,7 @@ func (t *Tracer) releaseSpanToPool(sp *Span) { h := sp.helper h.tagsAlloc = [3]attribute.KeyValue{} h.childrenAlloc = [4]childRef{} - h.structuredEventsAlloc = [3]interface{}{} + h.structuredEventsAlloc = [3]*tracingpb.StructuredRecord{} for op := range h.childrenMetadataAlloc { delete(h.childrenMetadataAlloc, op) } diff --git a/pkg/util/tracing/tracer_snapshots.go b/pkg/util/tracing/tracer_snapshots.go index 1a1ad45da139..62f05a146a0d 100644 --- a/pkg/util/tracing/tracer_snapshots.go +++ b/pkg/util/tracing/tracer_snapshots.go @@ -68,7 +68,7 @@ func (t *Tracer) SaveSnapshot() SnapshotInfo { if snapshots.Len() == 0 { id = 1 } else { - id = snapshots.GetLast().(snapshotWithID).ID + 1 + id = snapshots.GetLast().ID + 1 } snapshots.AddLast(snapshotWithID{ ID: id, @@ -101,16 +101,16 @@ func (t *Tracer) GetSnapshot(id SnapshotID) (SpansSnapshot, error) { if snapshots.Len() == 0 { return SpansSnapshot{}, errSnapshotDoesntExist } - minID := snapshots.GetFirst().(snapshotWithID).ID + minID := snapshots.GetFirst().ID if id < minID { return SpansSnapshot{}, errSnapshotTooOld } - maxID := snapshots.GetLast().(snapshotWithID).ID + maxID := snapshots.GetLast().ID if id > maxID { return SpansSnapshot{}, errSnapshotDoesntExist } - return snapshots.Get(int(id - minID)).(snapshotWithID).SpansSnapshot, nil + return snapshots.Get(int(id - minID)).SpansSnapshot, nil } // SnapshotInfo represents minimal info about a stored snapshot, as returned by @@ -128,7 +128,7 @@ func (t *Tracer) GetSnapshots() []SnapshotInfo { res := make([]SnapshotInfo, snapshots.Len()) for i := 0; i < snapshots.Len(); i++ { - s := snapshots.Get(i).(snapshotWithID) + s := snapshots.Get(i) res[i] = SnapshotInfo{ ID: s.ID, CapturedAt: s.CapturedAt,