Skip to content

Commit

Permalink
feat: Calculate the age of a WAL segment (#13637)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Jul 24, 2024
1 parent 2d92fff commit 4abb5a4
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 62 deletions.
93 changes: 55 additions & 38 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,30 +103,42 @@ type Manager struct {
metrics *ManagerMetrics
available *list.List
pending *list.List
// firstAppend is the time of the first append to the segment at the
// front of the available list. It is used to know when the segment has
// exceeded the maximum age and should be moved to the pending list.
// It is reset each time this happens.
firstAppend time.Time
closed bool
mu sync.Mutex
closed bool
mu sync.Mutex
// Used in tests.
clock quartz.Clock
}

// item is similar to PendingSegment, but it is an internal struct used in the
// available and pending lists. It contains a single-use result that is returned
// to callers appending to the WAL and a re-usable segment that is reset after
// each flush.
type item struct {
// segment is similar to PendingSegment, however it is an internal struct used
// in the available and pending lists. It contains a single-use result that is
// returned to callers appending to the WAL and a re-usable segment that is reset
// after each flush.
type segment struct {
r *AppendResult
w *SegmentWriter

// firstAppend is the time of the first append to the segment. It is used to
// know when the segment has exceeded the maximum age and should be moved to
// the pending list.
firstAppend time.Time

// moved is the time the segment was moved to the pending list. It is used
// to calculate the age of the segment. A segment is moved when it has
// exceeded the maximum age or the maximum size.
moved time.Time
}

// PendingSegment contains a result and the segment to be flushed.
type PendingSegment struct {
Result *AppendResult
Writer *SegmentWriter
Result *AppendResult
Writer *SegmentWriter
FirstAppend time.Time
Moved time.Time
}

// Age returns the age of the segment.
func (p *PendingSegment) Age() time.Duration {
return p.Moved.Sub(p.FirstAppend)
}

func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
Expand All @@ -145,7 +157,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
if err != nil {
return nil, err
}
m.available.PushBack(&item{
m.available.PushBack(&segment{
r: &AppendResult{done: make(chan struct{})},
w: w,
})
Expand All @@ -164,29 +176,29 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
if el == nil {
return nil, ErrFull
}
it := el.Value.(*item)
if m.firstAppend.IsZero() {
s := el.Value.(*segment)
if s.firstAppend.IsZero() {
// This is the first append to the segment. This time will be used in
// know when the segment has exceeded its maximum age and should be
// moved to the pending list.
m.firstAppend = m.clock.Now()
s.firstAppend = m.clock.Now()
}
it.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries)
// If the segment exceeded the maximum age or the maximum size, move it to
s.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries)
// If the segment exceeded the maximum age or the maximum size, move s to
// the closed list to be flushed.
if m.clock.Since(m.firstAppend) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize {
m.move(el, it)
if m.clock.Since(s.firstAppend) >= m.cfg.MaxAge || s.w.InputSize() >= m.cfg.MaxSegmentSize {
m.move(el, s)
}
return it.r, nil
return s.r, nil
}

func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()
if el := m.available.Front(); el != nil {
it := el.Value.(*item)
if it.w.InputSize() > 0 {
m.move(el, it)
s := el.Value.(*segment)
if s.w.InputSize() > 0 {
m.move(el, s)
}
}
m.closed = true
Expand All @@ -206,45 +218,50 @@ func (m *Manager) NextPending() (*PendingSegment, error) {
return nil, nil
}
el := m.pending.Front()
it := el.Value.(*item)
s := el.Value.(*segment)
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingSegment{Result: it.r, Writer: it.w}, nil
return &PendingSegment{
Result: s.r,
Writer: s.w,
FirstAppend: s.firstAppend,
Moved: s.moved,
}, nil
}

// Put resets the segment and puts it back in the available list to accept
// writes. A PendingSegment should not be put back until it has been flushed.
func (m *Manager) Put(it *PendingSegment) {
it.Writer.Reset()
func (m *Manager) Put(s *PendingSegment) {
s.Writer.Reset()
m.mu.Lock()
defer m.mu.Unlock()
m.metrics.NumFlushing.Dec()
m.metrics.NumAvailable.Inc()
m.available.PushBack(&item{
m.available.PushBack(&segment{
r: &AppendResult{done: make(chan struct{})},
w: it.Writer,
w: s.Writer,
})
}

// move the element from the available list to the pending list and sets the
// relevant metrics.
func (m *Manager) move(el *list.Element, it *item) {
m.pending.PushBack(it)
func (m *Manager) move(el *list.Element, s *segment) {
s.moved = m.clock.Now()
m.pending.PushBack(s)
m.metrics.NumPending.Inc()
m.available.Remove(el)
m.metrics.NumAvailable.Dec()
m.firstAppend = time.Time{}
}

// moveFrontIfExpired moves the element from the front of the available list to
// the pending list if the segment has exceeded its maximum age and sets the
// relevant metrics.
func (m *Manager) moveFrontIfExpired() bool {
if el := m.available.Front(); el != nil {
it := el.Value.(*item)
if !m.firstAppend.IsZero() && m.clock.Since(m.firstAppend) >= m.cfg.MaxAge {
m.move(el, it)
s := el.Value.(*segment)
if !s.firstAppend.IsZero() && m.clock.Since(s.firstAppend) >= m.cfg.MaxAge {
m.move(el, s)
return true
}
}
Expand Down
112 changes: 88 additions & 24 deletions pkg/storage/wal/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ func TestManager_NextPending(t *testing.T) {

// There should be no segments waiting to be flushed as no data has been
// written.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
require.Nil(t, s)

// Append 1KB of data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
Expand All @@ -271,14 +271,78 @@ func TestManager_NextPending(t *testing.T) {
require.NoError(t, err)

// There should be a segment waiting to be flushed.
it, err = m.NextPending()
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)

// There should be no more segments waiting to be flushed.
it, err = m.NextPending()
s, err = m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
require.Nil(t, s)
}

func TestManager_NextPendingAge(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// Create a mock clock.
clock := quartz.NewMock(t)
m.clock = clock

// Append 1B of data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// Wait 100ms. The segment that was just appended to should have reached
// the maximum age.
clock.Advance(100 * time.Millisecond)
s, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, 100*time.Millisecond, s.Age())
m.Put(s)

// Append 1KB of data using two separate append requests, 1ms apart.
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}}
res, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// Wait 1ms and then append the rest of the data.
clock.Advance(time.Millisecond)
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}}
res, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// The segment that was just appended to should have reached the maximum
// size.
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, time.Millisecond, s.Age())
}

func TestManager_NextPendingMaxAgeExceeded(t *testing.T) {
Expand Down Expand Up @@ -307,18 +371,18 @@ func TestManager_NextPendingMaxAgeExceeded(t *testing.T) {

// The segment that was just appended to has neither reached the maximum
// age nor maximum size to be flushed.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
require.Nil(t, s)
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// Wait 100ms. The segment that was just appended to should have reached
// the maximum age.
clock.Advance(100 * time.Millisecond)
it, err = m.NextPending()
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)
require.Equal(t, 0, m.available.Len())
require.Equal(t, 0, m.pending.Len())
}
Expand All @@ -345,24 +409,24 @@ func TestManager_NextPendingWALClosed(t *testing.T) {

// There should be no segments waiting to be flushed as neither the maximum
// age nor maximum size has been exceeded.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
require.Nil(t, s)

// Close the WAL.
m.Close()

// There should be one segment waiting to be flushed.
it, err = m.NextPending()
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)

// There are no more segments waiting to be flushed, and since the WAL is
// closed, successive calls should return ErrClosed.
for i := 0; i < 10; i++ {
it, err = m.NextPending()
s, err = m.NextPending()
require.ErrorIs(t, err, ErrClosed)
require.Nil(t, it)
require.Nil(t, s)
}
}

Expand Down Expand Up @@ -395,22 +459,22 @@ func TestManager_Put(t *testing.T) {
require.Equal(t, 1, m.pending.Len())

// Getting the pending segment should remove it from the list.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)
require.Equal(t, 0, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// The segment should contain 1KB of data.
require.Equal(t, int64(1024), it.Writer.InputSize())
require.Equal(t, int64(1024), s.Writer.InputSize())

// Putting it back should add it to the available list.
m.Put(it)
m.Put(s)
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// The segment should be reset.
require.Equal(t, int64(0), it.Writer.InputSize())
require.Equal(t, int64(0), s.Writer.InputSize())
}

func TestManager_Metrics(t *testing.T) {
Expand Down Expand Up @@ -465,9 +529,9 @@ wal_segments_pending 1
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))

// Get the segment from the pending list.
it, err := m.NextPending()
s, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.NotNil(t, s)
expected = `
# HELP wal_segments_available The number of WAL segments accepting writes.
# TYPE wal_segments_available gauge
Expand All @@ -482,7 +546,7 @@ wal_segments_pending 0
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))

// Reset the segment and put it back in the available list.
m.Put(it)
m.Put(s)
expected = `
# HELP wal_segments_available The number of WAL segments accepting writes.
# TYPE wal_segments_available gauge
Expand Down

0 comments on commit 4abb5a4

Please sign in to comment.