Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Fix filehandling for windows #392

Merged
merged 3 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
stats := &CheckpointStats{}

var sr io.Reader
// We close everything explicitly because Windows needs files to be
// closed before being deleted. But we also have defer so that we close
// files if there is an error somewhere.
var closers []io.Closer
{
lastFn, k, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound {
Expand All @@ -126,6 +130,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
return nil, errors.Wrap(err, "open last checkpoint")
}
defer last.Close()
closers = append(closers, last)
sr = last
}

Expand All @@ -134,6 +139,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
return nil, errors.Wrap(err, "create segment reader")
}
defer segsr.Close()
closers = append(closers, segsr)

if sr != nil {
sr = io.MultiReader(sr, segsr)
Expand Down Expand Up @@ -263,6 +269,9 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory")
}
if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
Expand Down
6 changes: 6 additions & 0 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
defer chunkw.Close()
// Record written chunk sizes on level 1 compactions.
if meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{
Expand All @@ -466,6 +467,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil {
return errors.Wrap(err, "open index writer")
}
defer indexw.Close()

if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
Expand All @@ -475,6 +477,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "write merged meta")
}

// We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
// make sure they are closed if the function exits due to an error above.
if err = chunkw.Close(); err != nil {
return errors.Wrap(err, "close chunk writer")
}
Expand Down
1 change: 1 addition & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
}

block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block)

// Now check that all expected blocks are actually persisted on disk.
Expand Down
2 changes: 1 addition & 1 deletion fileutil/fileutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Rename(from, to string) error {
// It is not atomic.
func Replace(from, to string) error {
if err := os.RemoveAll(to); err != nil {
return nil
return err
}
if err := os.Rename(from, to); err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/testutil"

"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
Expand Down Expand Up @@ -74,6 +75,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
if p.Err() != nil {
t.Fatal(err)
}
testutil.Ok(t, r.Close())

// On DB opening all blocks in the base dir should be repaired.
db, err := Open("testdata/repair_index_version", nil, nil, nil)
Expand Down
20 changes: 18 additions & 2 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,13 @@ func (w *SegmentWAL) run(interval time.Duration) {

// Close syncs all data and closes the underlying resources.
func (w *SegmentWAL) Close() error {
// Make sure you can call Close() multiple times.
select {
case <-w.stopc:
return nil // Already closed.
default:
}

close(w.stopc)
<-w.donec

Expand All @@ -735,10 +742,12 @@ func (w *SegmentWAL) Close() error {
// On opening, a WAL must be fully consumed once. Afterwards
// only the current segment will still be open.
if hf := w.head(); hf != nil {
return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name())
if err := hf.Close(); err != nil {
return errors.Wrapf(err, "closing WAL head %s", hf.Name())
}
}

return w.dirFile.Close()
return errors.Wrapf(w.dirFile.Close(), "closing WAL dir %s", w.dirFile.Name())
}

const (
Expand Down Expand Up @@ -1260,6 +1269,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
if err != nil {
return errors.Wrap(err, "open new WAL")
}

// It should've already been closed as part of the previous finalization.
// Do it once again in case of prior errors.
defer func() {
Expand Down Expand Up @@ -1306,6 +1316,12 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
if err != nil {
return errors.Wrap(err, "write new entries")
}
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
if err := w.Close(); err != nil {
return errors.Wrap(err, "close old WAL")
}
if err := repl.Close(); err != nil {
return errors.Wrap(err, "close new WAL")
}
Expand Down
18 changes: 17 additions & 1 deletion wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ func (w *WAL) Repair(origErr error) error {
if s.n <= cerr.Segment {
continue
}
if w.segment.i == s.n {
// The active segment needs to be removed,
// close it first (Windows!). Can be closed safely
// as we set the current segment to repaired file
// below.
if err := w.segment.Close(); err != nil {
return errors.Wrap(err, "close active segment")
}
}
if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil {
return errors.Wrap(err, "delete segment")
}
Expand Down Expand Up @@ -312,15 +321,22 @@ func (w *WAL) Repair(origErr error) error {
return errors.Wrap(err, "open segment")
}
defer f.Close()

r := NewReader(bufio.NewReader(f))

for r.Next() {
if err := w.Log(r.Record()); err != nil {
return errors.Wrap(err, "insert record")
}
}
// We expect an error here, so nothing to handle.
// We expect an error here from r.Err(), so nothing to handle.

// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
if err := f.Close(); err != nil {
return errors.Wrap(err, "close corrupted file")
}
if err := os.Remove(tmpfn); err != nil {
return errors.Wrap(err, "delete corrupted segment")
}
Expand Down
8 changes: 7 additions & 1 deletion wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,14 @@ func TestWAL_Repair(t *testing.T) {
for r.Next() {
}
testutil.NotOk(t, r.Err())
testutil.Ok(t, sr.Close())
testutil.Ok(t, w.Repair(r.Err()))
testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) // See https://github.com/prometheus/prometheus/issues/4603

// See https://github.com/prometheus/prometheus/issues/4603
// We need to close w.segment because it needs to be deleted.
// But this is to mainly artificially test Repair() again.
testutil.Ok(t, w.segment.Close())
testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err")))

sr, err = NewSegmentsReader(dir)
testutil.Ok(t, err)
Expand Down