Skip to content

Commit

Permalink
Add WAL segment versioning; add flag (only v1 allowed).
Browse files Browse the repository at this point in the history
Implementation for prometheus/proposals#40

Signed-off-by: bwplotka <[email protected]>
  • Loading branch information
bwplotka committed Dec 11, 2024
1 parent 664177b commit 2d3e8d4
Show file tree
Hide file tree
Showing 16 changed files with 1,517 additions and 1,201 deletions.
54 changes: 38 additions & 16 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,21 @@ func init() {
// serverOnlyFlag creates server-only kingpin flag.
func serverOnlyFlag(app *kingpin.Application, name, help string) *kingpin.FlagClause {
return app.Flag(name, fmt.Sprintf("%s Use with server mode only.", help)).
PreAction(func(parseContext *kingpin.ParseContext) error {
// This will be invoked only if flag is actually provided by user.
serverOnlyFlags = append(serverOnlyFlags, "--"+name)
return nil
})
PreAction(func(parseContext *kingpin.ParseContext) error {
// This will be invoked only if flag is actually provided by user.
serverOnlyFlags = append(serverOnlyFlags, "--"+name)
return nil
})
}

// agentOnlyFlag creates agent-only kingpin flag.
func agentOnlyFlag(app *kingpin.Application, name, help string) *kingpin.FlagClause {
return app.Flag(name, fmt.Sprintf("%s Use with agent mode only.", help)).
PreAction(func(parseContext *kingpin.ParseContext) error {
// This will be invoked only if flag is actually provided by user.
agentOnlyFlags = append(agentOnlyFlags, "--"+name)
return nil
})
PreAction(func(parseContext *kingpin.ParseContext) error {
// This will be invoked only if flag is actually provided by user.
agentOnlyFlags = append(agentOnlyFlags, "--"+name)
return nil
})
}

type flagConfig struct {
Expand Down Expand Up @@ -427,6 +427,9 @@ func main() {
serverOnlyFlag(a, "storage.tsdb.wal-compression-type", "Compression algorithm for the tsdb WAL.").
Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.tsdb.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd))

serverOnlyFlag(a, "storage.tsdb.wal-version", fmt.Sprintf("Version for the new WAL segments. Supported versions: %v", wlog.ReleasedSupportedSegmentVersions())).
Default(fmt.Sprintf("%v", wlog.DefaultSegmentVersion)).Uint32Var(&cfg.tsdb.WALSegmentVersion)

serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)

Expand All @@ -443,12 +446,15 @@ func main() {
"Size at which to split WAL segment files. Example: 100MB").
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.agent.WALSegmentSize)

agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the agent WAL.").
agentOnlyFlag(a, "storage.agent.wal-compression", "Compress the new WAL segments.").
Default("true").BoolVar(&cfg.agent.WALCompression)

agentOnlyFlag(a, "storage.agent.wal-compression-type", "Compression algorithm for the agent WAL.").
Hidden().Default(string(wlog.CompressionSnappy)).EnumVar(&cfg.agent.WALCompressionType, string(wlog.CompressionSnappy), string(wlog.CompressionZstd))

agentOnlyFlag(a, "storage.agent.wal-version", fmt.Sprintf("Version for the new WAL segments. Supported versions: %v", wlog.ReleasedSupportedSegmentVersions())).
Default(fmt.Sprintf("%v", wlog.DefaultSegmentVersion)).Uint32Var(&cfg.agent.WALSegmentVersion)

agentOnlyFlag(a, "storage.agent.wal-truncate-frequency",
"The frequency at which to truncate the WAL and remove old data.").
Hidden().PlaceHolder("<duration>").SetValue(&cfg.agent.TruncateFrequency)
Expand Down Expand Up @@ -1229,6 +1235,10 @@ func main() {
g.Add(
func() error {
logger.Info("Starting TSDB ...")
if !wlog.SegmentVersion(cfg.tsdb.WALSegmentVersion).IsReleased() {
return fmt.Errorf("flag 'storage.tsdb.wal-version' was set to unsupported WAL segment version %v; supported versions %v", cfg.tsdb.WALSegmentVersion, wlog.ReleasedSupportedSegmentVersions())
}

if cfg.tsdb.WALSegmentSize != 0 {
if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
Expand Down Expand Up @@ -1285,6 +1295,10 @@ func main() {
g.Add(
func() error {
logger.Info("Starting WAL storage ...")
if !wlog.SegmentVersion(cfg.agent.WALSegmentVersion).IsReleased() {
return fmt.Errorf("flag 'storage.agent.wal-version' was set to unsupported WAL segment version %v; supported versions %v", cfg.tsdb.WALSegmentVersion, wlog.ReleasedSupportedSegmentVersions())
}

if cfg.agent.WALSegmentSize != 0 {
if cfg.agent.WALSegmentSize < 10*1024*1024 || cfg.agent.WALSegmentSize > 256*1024*1024 {
return errors.New("flag 'storage.agent.wal-segment-size' must be set between 10MB and 256MB")
Expand Down Expand Up @@ -1492,7 +1506,7 @@ func reloadConfig(filename string, enableExemplarStorage bool, logger *slog.Logg

func startsOrEndsWithQuote(s string) bool {
return strings.HasPrefix(s, "\"") || strings.HasPrefix(s, "'") ||
strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'")
strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'")
}

// compileCORSRegexString compiles given string and adds anchors.
Expand Down Expand Up @@ -1780,6 +1794,7 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
// This is required as tsdb.Option fields are unit agnostic (time).
type tsdbOptions struct {
WALSegmentSize units.Base2Bytes
WALSegmentVersion uint32
MaxBlockChunkSegmentSize units.Base2Bytes
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
Expand All @@ -1804,12 +1819,15 @@ type tsdbOptions struct {

func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
return tsdb.Options{
WALSegmentSize: int(opts.WALSegmentSize),
WALSegment: wlog.SegmentOptions{
Version: wlog.SegmentVersion(opts.WALSegmentVersion),
Compression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
Size: int(opts.WALSegmentSize),
},
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
SamplesPerChunk: opts.SamplesPerChunk,
StripeSize: opts.StripeSize,
Expand All @@ -1833,6 +1851,7 @@ type agentOptions struct {
WALSegmentSize units.Base2Bytes
WALCompression bool
WALCompressionType string
WALSegmentVersion uint32
StripeSize int
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
Expand All @@ -1845,8 +1864,11 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option
outOfOrderTimeWindow = 0
}
return agent.Options{
WALSegmentSize: int(opts.WALSegmentSize),
WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
WALSegment: wlog.SegmentOptions{
Version: wlog.SegmentVersion(opts.WALSegmentVersion),
Compression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType),
Size: int(opts.WALSegmentSize),
},
StripeSize: opts.StripeSize,
TruncateFrequency: time.Duration(opts.TruncateFrequency),
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
Expand Down
70 changes: 33 additions & 37 deletions tsdb/agent/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,8 @@ var (

// Options of the WAL storage.
type Options struct {
// Segments (wal files) max size.
// WALSegmentSize <= 0, segment size is default size.
// WALSegmentSize > 0, segment size is WALSegmentSize.
WALSegmentSize int

// WALCompression configures the compression type to use on records in the WAL.
WALCompression wlog.CompressionType
// WALSegment represents WAL segment options.
WALSegment wlog.SegmentOptions

// StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance.
StripeSize int
Expand All @@ -89,8 +84,7 @@ type Options struct {
// millisecond-precision timestamps.
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
WALCompression: wlog.CompressionNone,
WALSegment: wlog.SegmentOptions{}, // wlog.New will set the correct defaults
StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
Expand Down Expand Up @@ -266,7 +260,7 @@ func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir str
// remote_write expects WAL to be stored in a "wal" subdirectory of the main storage.
dir = filepath.Join(dir, "wal")

w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
w, err := wlog.New(l, reg, dir, opts.WALSegment)
if err != nil {
return nil, fmt.Errorf("creating WAL: %w", err)
}
Expand Down Expand Up @@ -326,14 +320,6 @@ func validateOptions(opts *Options) *Options {
if opts == nil {
opts = DefaultOptions()
}
if opts.WALSegmentSize <= 0 {
opts.WALSegmentSize = wlog.DefaultSegmentSize
}

if opts.WALCompression == "" {
opts.WALCompression = wlog.CompressionNone
}

// Revert StripeSize to DefaultStripeSize if StripeSize is either 0 or not a power of 2.
if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) {
opts.StripeSize = tsdb.DefaultStripeSize
Expand Down Expand Up @@ -389,14 +375,14 @@ func (db *DB) replayWAL() error {
}

// Find the last segment.
_, last, err := wlog.Segments(db.wal.Dir())
_, last, err := wlog.SegmentsRange(db.wal.Dir())
if err != nil {
return fmt.Errorf("finding WAL segments: %w", err)
}

// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ {
seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i))
seg, err := wlog.OpenReadSegmentByIndex(db.wal.Dir(), i)
if err != nil {
return fmt.Errorf("open WAL segment: %d: %w", i, err)
}
Expand Down Expand Up @@ -443,10 +429,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series := seriesPool.Get()[:0]
series, err = dec.Series(rec, series)
if err != nil {
i, v := r.Segment()
errCh <- &wlog.CorruptionErr{
Err: fmt.Errorf("decode series: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
Err: fmt.Errorf("decode series: %w", err),
SegmentIndex: i,
SegmentVersion: v,
Offset: r.Offset(),
}
return
}
Expand All @@ -455,10 +443,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
samples := samplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
i, v := r.Segment()
errCh <- &wlog.CorruptionErr{
Err: fmt.Errorf("decode samples: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
Err: fmt.Errorf("decode samples: %w", err),
SegmentIndex: i,
SegmentVersion: v,
Offset: r.Offset(),
}
return
}
Expand All @@ -467,10 +457,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
histograms := histogramsPool.Get()[:0]
histograms, err = dec.HistogramSamples(rec, histograms)
if err != nil {
i, v := r.Segment()
errCh <- &wlog.CorruptionErr{
Err: fmt.Errorf("decode histogram samples: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
Err: fmt.Errorf("decode histogram samples: %w", err),
SegmentIndex: i,
SegmentVersion: v,
Offset: r.Offset(),
}
return
}
Expand All @@ -479,10 +471,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
floatHistograms := floatHistogramsPool.Get()[:0]
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
if err != nil {
i, v := r.Segment()
errCh <- &wlog.CorruptionErr{
Err: fmt.Errorf("decode float histogram samples: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
Err: fmt.Errorf("decode float histogram samples: %w", err),
SegmentIndex: i,
SegmentVersion: v,
Offset: r.Offset(),
}
return
}
Expand All @@ -493,10 +487,12 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
// stripeSeries.exemplars in the next block by using setLatestExemplar.
continue
default:
i, v := r.Segment()
errCh <- &wlog.CorruptionErr{
Err: fmt.Errorf("invalid record type %v", dec.Type(rec)),
Segment: r.Segment(),
Offset: r.Offset(),
Err: fmt.Errorf("invalid record type %v", dec.Type(rec)),
SegmentIndex: i,
SegmentVersion: v,
Offset: r.Offset(),
}
}
}
Expand Down Expand Up @@ -632,7 +628,7 @@ func (db *DB) truncate(mint int64) error {
db.gc(mint)
db.logger.Info("series GC completed", "duration", time.Since(start))

first, last, err := wlog.Segments(db.wal.Dir())
first, last, err := wlog.SegmentsRange(db.wal.Dir())
if err != nil {
return fmt.Errorf("get segment range: %w", err)
}
Expand Down Expand Up @@ -711,7 +707,7 @@ func (db *DB) gc(mint int64) {
deleted := db.series.GC(mint)
db.metrics.numActiveSeries.Sub(float64(len(deleted)))

_, last, _ := wlog.Segments(db.wal.Dir())
_, last, _ := wlog.SegmentsRange(db.wal.Dir())

// We want to keep series records for any newly deleted series
// until we've passed the last recorded segment. This prevents
Expand Down
32 changes: 11 additions & 21 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var ErrNotReady = errors.New("TSDB not ready")
// millisecond precision timestamps.
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
WALSegment: wlog.SegmentOptions{}, // wlog.New sets the correct defaults.
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration,
Expand All @@ -97,11 +97,10 @@ func DefaultOptions() *Options {

// Options of the DB storage.
type Options struct {
// Segments (wal files) max size.
// WALSegmentSize = 0, segment size is default size.
// WALSegmentSize > 0, segment size is WALSegmentSize.
// WALSegmentSize < 0, wal is disabled.
WALSegmentSize int
// WALSegment represents segment options.
WALSegment wlog.SegmentOptions
// DisableWAL disables WAL.
DisableWAL bool

// MaxBlockChunkSegmentSize is the max size of block chunk segment files.
// MaxBlockChunkSegmentSize = 0, chunk segment size is default size.
Expand Down Expand Up @@ -929,14 +928,10 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
}

var wal, wbl *wlog.WL
segmentSize := wlog.DefaultSegmentSize

// Wal is enabled.
if opts.WALSegmentSize >= 0 {
// Wal is set to a custom size.
if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize
}
wal, err = wlog.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
if !opts.DisableWAL {
wal, err = wlog.New(l, r, walDir, opts.WALSegment)
if err != nil {
return nil, err
}
Expand All @@ -946,7 +941,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
return nil, err
}
if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 {
wbl, err = wlog.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
wbl, err = wlog.New(l, r, wblDir, opts.WALSegment)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1159,14 +1154,9 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
case db.head.wbl != nil:
// The existing WBL from the disk might have been replayed while OOO was disabled.
wblog = db.head.wbl
case !db.oooWasEnabled.Load() && oooTimeWindow > 0 && db.opts.WALSegmentSize >= 0:
segmentSize := wlog.DefaultSegmentSize
// Wal is set to a custom size.
if db.opts.WALSegmentSize > 0 {
segmentSize = db.opts.WALSegmentSize
}
case !db.oooWasEnabled.Load() && oooTimeWindow > 0 && !db.opts.DisableWAL:
oooWalDir := filepath.Join(db.dir, wlog.WblDirName)
wblog, err = wlog.NewSize(db.logger, db.registerer, oooWalDir, segmentSize, db.opts.WALCompression)
wblog, err = wlog.New(db.logger, db.registerer, oooWalDir, db.opts.WALSegment)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 2d3e8d4

Please sign in to comment.