Skip to content

Commit

Permalink
Commit WAL on sync (#259)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Dec 1, 2024
1 parent 2f9f1bd commit f7b2ce5
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,10 @@ func (db *DB) prepareTransactions(
}
}

if req.Type == pipeline.Sync {
walRecorder.Commit(req)
}

pipeReader.Acknowledge(processedCount+1, req)
}
}
Expand Down Expand Up @@ -648,14 +652,17 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read
}
}

if req.Type == pipeline.Sync {
walRecorder.Commit(req)
}

pipeReader.Acknowledge(processedCount+1, req)
}
}

func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Reader) error {
allocator := db.config.State.NewAllocator()
deallocator := db.config.State.NewDeallocator()
walRecorder := wal.NewRecorder(db.config.State, allocator)

var revisionCounter uint32

Expand Down Expand Up @@ -703,11 +710,8 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea
}
}

switch req.Type {
case pipeline.Sync:
if req.Type == pipeline.Sync {
req.SyncCh <- struct{}{}
case pipeline.Commit:
walRecorder.Commit(req)
}

pipeReader.Acknowledge(processedCount+1, req)
Expand Down Expand Up @@ -789,7 +793,7 @@ func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader)
offset := *(*uint64)(unsafe.Pointer(&walNode[wrIndex+1]))
size := *(*uint16)(unsafe.Pointer(&walNode[wrIndex+9]))
copy(unsafe.Slice((*byte)(unsafe.Add(origin, offset)), size), walNode[wrIndex+11:])
wrIndex += size + 2
wrIndex += size + 11
case wal.RecordImmediateDeallocation, wal.RecordDelayedDeallocation:
wrIndex += 25
default:
Expand Down

0 comments on commit f7b2ce5

Please sign in to comment.