From 7a319156c10fbd41d79534fea38e86c9f8403672 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 28 Sep 2016 07:19:16 -0700 Subject: [PATCH] wal: set writer offset for PageWriter --- wal/encoder.go | 4 ++-- wal/record_test.go | 2 +- wal/wal.go | 17 ++++++++++++----- wal/wal_test.go | 4 ++-- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/wal/encoder.go b/wal/encoder.go index edbd1785ab8f..6b26bf972692 100644 --- a/wal/encoder.go +++ b/wal/encoder.go @@ -39,9 +39,9 @@ type encoder struct { uint64buf []byte } -func newEncoder(w io.Writer, prevCrc uint32) *encoder { +func newEncoder(w io.Writer, offset int, prevCrc uint32) *encoder { return &encoder{ - bw: ioutil.NewPageWriter(w, walPageBytes), + bw: ioutil.NewPageWriter(w, offset, walPageBytes), crc: crc.New(prevCrc, crcTable), // 1MB buffer buf: make([]byte, 1024*1024), diff --git a/wal/record_test.go b/wal/record_test.go index ddbc37d869e5..2a6904a81fce 100644 --- a/wal/record_test.go +++ b/wal/record_test.go @@ -69,7 +69,7 @@ func TestWriteRecord(t *testing.T) { typ := int64(0xABCD) d := []byte("Hello world!") buf := new(bytes.Buffer) - e := newEncoder(buf, 0) + e := newEncoder(buf, 0, 0) e.encode(&walpb.Record{Type: typ, Data: d}) e.flush() decoder := newDecoder(ioutil.NopCloser(buf)) diff --git a/wal/wal.go b/wal/wal.go index 0bd85c166123..92e70adef323 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -112,7 +112,9 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { if err != nil { return nil, err } - if _, err = f.Seek(0, os.SEEK_END); err != nil { + var offset int64 + offset, err = f.Seek(0, os.SEEK_END) + if err != nil { return nil, err } if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil { @@ -122,7 +124,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { w := &WAL{ dir: dirpath, metadata: metadata, - encoder: newEncoder(f, 0), + encoder: newEncoder(f, int(offset), 0), } w.locks = append(w.locks, f) if err = w.saveCrc(0); err != nil { @@ -342,8 +344,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. w.metadata = metadata if w.tail() != nil { + var offset int64 + offset, err = w.tail().Seek(0, os.SEEK_CUR) + if err != nil { + return + } // create encoder (chain crc with the decoder), enable appending - w.encoder = newEncoder(w.tail(), w.decoder.lastCRC()) + w.encoder = newEncoder(w.tail(), int(offset), w.decoder.lastCRC()) } w.decoder = nil @@ -377,7 +384,7 @@ func (w *WAL) cut() error { // update writer and save the previous crc w.locks = append(w.locks, newTail) prevCrc := w.encoder.crc.Sum32() - w.encoder = newEncoder(w.tail(), prevCrc) + w.encoder = newEncoder(w.tail(), int(off), prevCrc) if err = w.saveCrc(prevCrc); err != nil { return err } @@ -416,7 +423,7 @@ func (w *WAL) cut() error { w.locks[len(w.locks)-1] = newTail prevCrc = w.encoder.crc.Sum32() - w.encoder = newEncoder(w.tail(), prevCrc) + w.encoder = newEncoder(w.tail(), int(off), prevCrc) plog.Infof("segmented wal file %v is created", fpath) return nil diff --git a/wal/wal_test.go b/wal/wal_test.go index 6fc6f782ea59..a762ee255c60 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -61,7 +61,7 @@ func TestNew(t *testing.T) { } var wb bytes.Buffer - e := newEncoder(&wb, 0) + e := newEncoder(&wb, 0, 0) err = e.encode(&walpb.Record{Type: crcType, Crc: 0}) if err != nil { t.Fatalf("err = %v, want nil", err) @@ -528,7 +528,7 @@ func TestSaveEmpty(t *testing.T) { var buf bytes.Buffer var est raftpb.HardState w := WAL{ - encoder: newEncoder(&buf, 0), + encoder: newEncoder(&buf, 0, 0), } if err := w.saveState(&est); err != nil { t.Errorf("err = %v, want nil", err)