diff --git a/record/log_writer.go b/record/log_writer.go index f31aca5214..fe9a0d907d 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -538,7 +538,7 @@ func (w *LogWriter) flushBlock(b *block) error { if _, err := w.w.Write(b.buf[b.flushed:]); err != nil { return err } - b.written = 0 + atomic.StoreInt32(&b.written, 0) b.flushed = 0 w.free.Lock() w.free.blocks = append(w.free.blocks, b) @@ -657,25 +657,25 @@ func (w *LogWriter) SyncRecord(p []byte, wg *sync.WaitGroup, err *error) (int64, f.ready.Signal() } - offset := w.blockNum*blockSize + int64(w.block.written) // Note that we don't return w.err here as a concurrent call to Close would // race with our read. That's ok because the only error we could be seeing is // one to syncing for which the caller can receive notification of by passing // in a non-nil err argument. - return offset, nil + return w.Size(), nil } // Size returns the current size of the file. // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) Size() int64 { - return w.blockNum*blockSize + int64(w.block.written) + written := atomic.LoadInt32(&w.block.written) + return w.blockNum*blockSize + int64(written) } func (w *LogWriter) emitEOFTrailer() { // Write a recyclable chunk header with a different log number. Readers // will treat the header as EOF when the log number does not match. b := w.block - i := b.written + i := atomic.LoadInt32(&w.block.written) binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size b.buf[i+6] = recyclableFullChunkType @@ -685,7 +685,7 @@ func (w *LogWriter) emitEOFTrailer() { func (w *LogWriter) emitFragment(n int, p []byte) []byte { b := w.block - i := b.written + i := atomic.LoadInt32(&w.block.written) first := n == 0 last := blockSize-i-recyclableHeaderSize >= int32(len(p)) @@ -711,10 +711,11 @@ func (w *LogWriter) emitFragment(n int, p []byte) []byte { binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r)) atomic.StoreInt32(&b.written, j) - if blockSize-b.written < recyclableHeaderSize { + // NB: j = b.written + if blockSize-j < recyclableHeaderSize { // There is no room for another fragment in the block, so fill the // remaining bytes with zeros and queue the block for flushing. - for i := b.written; i < blockSize; i++ { + for i := j; i < blockSize; i++ { b.buf[i] = 0 } w.queueBlock()