Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disables the stream limiter until wal has recovered #3114

Merged
merged 2 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,103 @@ func TestIngesterWAL(t *testing.T) {

}

func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Enabled: true,
Dir: walDir,
Recover: true,
CheckpointDuration: time.Second,
}
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}

start := time.Now()
steps := 10
end := start.Add(time.Second * time.Duration(steps))

for i := 0; i < steps; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
}

ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)

ensureIngesterData(ctx, t, start, end, i)

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// Limit all streams except those written during WAL recovery.
limitCfg := defaultLimitsTestConfig()
limitCfg.MaxLocalStreamsPerUser = -1
limits, err = validation.NewOverrides(limitCfg, nil)
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))

// ensure we've recovered data from wal segments
ensureIngesterData(ctx, t, start, end, i)

req = logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="new"}`,
Entries: []logproto.Entry{
{
Timestamp: start,
Line: "hi",
},
},
},
},
}

ctx = user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
// Ensure regular pushes error due to stream limits.
require.Error(t, err)

}

func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) {
fs, err := ioutil.ReadDir(walDir)
require.Nil(t, err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid

func (i *Ingester) starting(ctx context.Context) error {
if i.cfg.WAL.Recover {
// Disable the in process stream limit checks while replaying the WAL
i.limiter.Disable()
defer i.limiter.Enable()

recoverer := newIngesterRecoverer(i)
defer recoverer.Close()

Expand Down
24 changes: 24 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"fmt"
"math"
"sync"

"github.com/grafana/loki/pkg/util/validation"
)
Expand All @@ -23,6 +24,21 @@ type Limiter struct {
limits *validation.Overrides
ring RingCount
replicationFactor int

mtx sync.RWMutex
disabled bool
}

func (l *Limiter) Disable() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = true
}

func (l *Limiter) Enable() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = false
}

// NewLimiter makes a new limiter
Expand All @@ -37,6 +53,14 @@ func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor
// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
// Until the limiter actually starts, all accesses are successful.
// This is used to disable limits while recovering from the WAL.
l.mtx.RLock()
defer l.mtx.RUnlock()
if l.disabled {
return nil
}

// Start by setting the local limit either from override or default
localLimit := l.limits.MaxLocalStreamsPerUser(userID)

Expand Down