Skip to content

Commit

Permalink
Problem: memiavl wal writing is not batched (#1145)
Browse files Browse the repository at this point in the history
* Problem: memiavl wal writing is not batched

* Update memiavl/db.go

Co-authored-by: mmsqe <[email protected]>
Signed-off-by: yihuang <[email protected]>

* Update memiavl/db.go

Signed-off-by: yihuang <[email protected]>

---------

Signed-off-by: yihuang <[email protected]>
Co-authored-by: mmsqe <[email protected]>
  • Loading branch information
yihuang and mmsqe authored Aug 28, 2023
1 parent 54fea4d commit 57adff9
Showing 1 changed file with 36 additions and 6 deletions.
42 changes: 36 additions & 6 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,28 @@ func (db *DB) initAsyncCommit() {
go func() {
defer close(walQuit)

for entry := range walChan {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
batch := wal.Batch{}
for {
entries := channelBatchRecv(walChan)
if len(entries) == 0 {
// channel is closed
break
}
if err := db.wal.Write(entry.index, bz); err != nil {

for _, entry := range entries {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
}
batch.Write(entry.index, bz)
}

if err := db.wal.WriteBatch(&batch); err != nil {
walQuit <- err
return
}
batch.Clear()
}
}()

Expand Down Expand Up @@ -953,3 +965,21 @@ func GetLatestVersion(dir string) (int64, error) {
}
return walVersion(lastIndex, uint32(metadata.InitialVersion)), nil
}

func channelBatchRecv[T any](ch <-chan *T) []*T {
// block if channel is empty
item := <-ch
if item == nil {
// channel is closed
return nil
}

remaining := len(ch)
result := make([]*T, 0, remaining+1)
result = append(result, item)
for i := 0; i < remaining; i++ {
result = append(result, <-ch)
}

return result
}

0 comments on commit 57adff9

Please sign in to comment.