Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Update WAL watcher from Prometheus repo, pass metrics structs around for
Browse files Browse the repository at this point in the history
the live reader, expose WriteTo interface.

Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan committed Jul 18, 2019
1 parent 56eb528 commit f44209e
Show file tree
Hide file tree
Showing 2 changed files with 420 additions and 391 deletions.
77 changes: 35 additions & 42 deletions wal/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,45 +39,34 @@ const (
consumer = "consumer"
)

type watcherMetrics struct {
recordsRead *prometheus.CounterVec
recordDecodeFails *prometheus.CounterVec
samplesSentPreTailing *prometheus.CounterVec
currentSegment *prometheus.GaugeVec
}

var (
lrMetrics = NewLiveReaderMetrics(prometheus.DefaultRegisterer)
)

// fromTime returns a new millisecond timestamp from a time.
// This function is copied from prometheus/prometheus/pkg/timestamp to avoid adding vendor to TSDB repo.

// FromTime returns a new millisecond timestamp from a time.
func FromTime(t time.Time) int64 {
func fromTime(t time.Time) int64 {
return t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond)
}

// func init() {
// prometheus.MustRegister(watcherRecordsRead)
// prometheus.MustRegister(watcherRecordDecodeFails)
// prometheus.MustRegister(watcherSamplesSentPreTailing)
// prometheus.MustRegister(watcherCurrentSegment)
// }

type writeTo interface {
type WriteTo interface {
Append([]record.RefSample) bool
StoreSeries([]record.RefSeries, int)
SeriesReset(int)
}

type watcherMetrics struct {
recordsRead *prometheus.CounterVec
recordDecodeFails *prometheus.CounterVec
samplesSentPreTailing *prometheus.CounterVec
currentSegment *prometheus.GaugeVec
}

// Watcher watches the TSDB WAL for a given WriteTo.
type Watcher struct {
name string
writer writeTo
writer WriteTo
logger log.Logger
walDir string
lastCheckpoint string
metrics *watcherMetrics
readerMetrics *liveReaderMetrics

startTime int64

Expand Down Expand Up @@ -144,22 +133,24 @@ func NewWatcherMetrics(reg prometheus.Registerer) *watcherMetrics {
}

// NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(logger log.Logger, metrics *watcherMetrics, name string, writer writeTo, walDir string) *Watcher {
func NewWatcher(reg prometheus.Registerer, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher {
if logger == nil {
logger = log.NewNopLogger()
}

return &Watcher{
logger: logger,
metrics: metrics,
writer: writer,
walDir: path.Join(walDir, "wal"),
name: name,
quit: make(chan struct{}),
done: make(chan struct{}),
w := Watcher{
logger: logger,
writer: writer,
metrics: NewWatcherMetrics(reg),
readerMetrics: NewLiveReaderMetrics(reg),
walDir: path.Join(walDir, "wal"),
name: name,
quit: make(chan struct{}),
done: make(chan struct{}),

maxSegment: -1,
}
return &w
}

func (w *Watcher) setMetrics() {
Expand All @@ -175,7 +166,7 @@ func (w *Watcher) setMetrics() {
// Start the Watcher.
func (w *Watcher) Start() {
w.setMetrics()
level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name)
level.Info(w.logger).Log("msg", "starting WAL watcher", "consumer", w.name)

go w.loop()
}
Expand All @@ -200,7 +191,7 @@ func (w *Watcher) loop() {

// We may encourter failures processing the WAL; we should wait and retry.
for !isClosed(w.quit) {
w.startTime = FromTime(time.Now())
w.startTime = fromTime(time.Now())
if err := w.run(); err != nil {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
}
Expand Down Expand Up @@ -263,7 +254,7 @@ func (w *Watcher) run() error {
func (w *Watcher) findSegmentForIndex(index int) (int, error) {
refs, err := w.segments(w.walDir)
if err != nil {
return -1, nil
return -1, err
}

for _, r := range refs {
Expand All @@ -278,7 +269,7 @@ func (w *Watcher) findSegmentForIndex(index int) (int, error) {
func (w *Watcher) firstAndLast() (int, int, error) {
refs, err := w.segments(w.walDir)
if err != nil {
return -1, -1, nil
return -1, -1, err
}

if len(refs) == 0 {
Expand Down Expand Up @@ -323,7 +314,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
}
defer segment.Close()

reader := NewLiveReader(w.logger, lrMetrics, segment)
reader := NewLiveReader(w.logger, w.readerMetrics, segment)

readTicker := time.NewTicker(readPeriod)
defer readTicker.Stop()
Expand Down Expand Up @@ -448,11 +439,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
dec record.Decoder
series []record.RefSeries
samples []record.RefSample
send []record.RefSample
)

for r.Next() && !isClosed(w.quit) {
rec := r.Record()
w.recordsReadMetric.WithLabelValues(Type(dec.Type(rec))).Inc()
w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc()

switch dec.Type(rec) {
case record.Series:
Expand All @@ -474,7 +466,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.recordDecodeFailsMetric.Inc()
return err
}
var send []record.RefSample
for _, s := range samples {
if s.T > w.startTime {
send = append(send, s)
Expand All @@ -483,6 +474,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
if len(send) > 0 {
// Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
w.writer.Append(send)
send = send[:0]
}

case record.Tombstones:
Expand All @@ -498,7 +490,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return r.Err()
}

func Type(rt record.Type) string {
func recordType(rt record.Type) string {
switch rt {
case record.Invalid:
return "invalid"
Expand Down Expand Up @@ -538,7 +530,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string) error {
}
defer sr.Close()

r := NewLiveReader(w.logger, lrMetrics, sr)
r := NewLiveReader(w.logger, w.readerMetrics, sr)
if err := w.readSegment(r, index, false); err != io.EOF && err != nil {
return errors.Wrap(err, "readSegment")
}
Expand All @@ -554,7 +546,8 @@ func (w *Watcher) readCheckpoint(checkpointDir string) error {

func checkpointNum(dir string) (int, error) {
// Checkpoint dir names are in the format checkpoint.000001
chunks := strings.Split(dir, ".")
// dir may contain a hidden directory, so only check the base directory
chunks := strings.Split(path.Base(dir), ".")
if len(chunks) != 2 {
return 0, errors.Errorf("invalid checkpoint dir string: %s", dir)
}
Expand Down
Loading

0 comments on commit f44209e

Please sign in to comment.