-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds WAL support (experimental) #2981
Conversation
…nfig validation & derives chunk encoding.
done chan struct{} | ||
} | ||
|
||
func newIngesterSeriesIter(ing *Ingester) *ingesterSeriesIter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should abstract away with a interface defined here the ingester. This way you clearly show what you need and reduce the coupling.
This has also the advantages of making testing easier which I think you don't have on the Iter()
part.
It might requires ingester code refactoring to properly expose what you need so we can keep this for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface is defined as
type SeriesIter interface {
Num() int
Iter() <-chan *SeriesWithErr
Stop()
}
here:
loki/pkg/ingester/checkpoint.go
Lines 114 to 118 in 3166183
type SeriesIter interface { | |
Num() int | |
Iter() <-chan *SeriesWithErr | |
Stop() | |
} |
Do you mean something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I was talking about the Ingester passed in as Parameter in the new here.
|
||
for _, stream := range streams { | ||
// TODO(owen-d): use a pool | ||
chunks, err := toWireChunks(stream.chunks, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You most likely have a race here:
- 1 the list of streams can change
- 2 the list of chunks too.
You should write a test that run 2 goroutines, one that push entries to ingester and one that read this iterator, then using go test -race
.
This is definitively tricky cause you want a snapshot of chunks but you're using a chanel which is driven by the reader and so in-between reads things can change.
I feel like you should either lock everything until iteration is over which seems dangerous or return an arrays of SeriesWithErr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is/will be covered by two things:
-
Streams are buffered to an intermediate channel during checkpoint start.
This helps synchronize the perSeriesDuration with a list of chunks and avoids the changing problem. The caveat is an old stream may be kept around for up to the checkpoint interval (default 5m).
loki/pkg/ingester/checkpoint.go
Lines 149 to 155 in 3166183
// Need to buffer streams internally so the read lock isn't held trying to write to a blocked channel. streams := make([]*stream, 0, len(inst.streams)) inst.streamsMtx.RUnlock() _ = inst.forAllStreams(func(stream *stream) error { streams = append(streams, stream) return nil })
loki/pkg/ingester/checkpoint.go
Lines 438 to 442 in 3166183
// Give a 10% buffer to the checkpoint duration in order to account for // new series, slow writes, etc. perSeriesDuration := (90 * c.dur) / (100 * time.Duration(n)) ticker := time.NewTicker(perSeriesDuration) -
The locking is mainly covered in a followup PR I'm preparing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please include a concurrent access tests in your following PR.
} | ||
|
||
for _, ref := range r.RefEntries { | ||
recordPool.PutEntries(ref.Entries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit or for later. This should be a leaky bucket. like
Line 49 in 6500f82
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty happy with this PR state, this is very good work @owen-d .
My only concern is the race raised in the Iter() path code. I'd like to see something that avoids it.
uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error
d4fcec5
to
51ff8d7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This broke your documentation: https://grafana.com/docs/loki/latest/installation/local/
|
Confirmed. Service came up fine with the following lines commented out of the config file:
|
This is intended as an intermediate PR before marking the WAL as GA. It exposes WAL configurations and defaults them to false. This is not expected to be included in a release yet as there are a few other things which need fixing:
Fortunately, these race conditions are (a) unlikely and (b) dataloss is nullified by the WAL. A follow up PR will introduce concurrency controls around this.
In order to reduce cognitive load during review, I'm submitting this initial PR which will be built upon by another.