Skip to content

Commit

Permalink
Placeholder for WAL processor (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Nov 29, 2024
1 parent 9ae39ab commit 60cb7b6
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (db *DB) Run(ctx context.Context) error {
pointerHashReaders = append(pointerHashReaders, nextReader)
prevHashReader = nextReader
}
commitSyncReader := pipeline.NewReader(prevHashReader)
applyWALReader := pipeline.NewReader(prevHashReader)
commitSyncReader := pipeline.NewReader(applyWALReader)

spawn("supervisor", parallel.Exit, func(ctx context.Context) error {
var lastSyncCh chan<- struct{}
Expand Down Expand Up @@ -256,6 +257,9 @@ func (db *DB) Run(ctx context.Context) error {
return db.updatePointerHashes(ctx, reader, uint64(i))
})
}
spawn("applyWAL", parallel.Fail, func(ctx context.Context) error {
return db.applyWALChanges(ctx, applyWALReader)
})
spawn("commitSync", parallel.Fail, func(ctx context.Context) error {
return db.syncOnCommit(ctx, commitSyncReader)
})
Expand Down Expand Up @@ -923,6 +927,17 @@ func (db *DB) updatePointerHashes(
}
}

func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader) error {
for processedCount := uint64(0); ; processedCount++ {
req, err := pipeReader.Read(ctx)
if err != nil {
return err
}

pipeReader.Acknowledge(processedCount+1, req)
}
}

func (db *DB) syncOnCommit(ctx context.Context, pipeReader *pipeline.Reader) error {
for processedCount := uint64(0); ; processedCount++ {
req, err := pipeReader.Read(ctx)
Expand Down

0 comments on commit 60cb7b6

Please sign in to comment.