From 7a283574f917334feb7447bec3edede2672f927f Mon Sep 17 00:00:00 2001 From: Wojciech Malota-Wojcik Date: Fri, 29 Nov 2024 07:31:39 +0100 Subject: [PATCH] Placeholder for WAL processor --- db.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/db.go b/db.go index 52ce5f6..13212c7 100644 --- a/db.go +++ b/db.go @@ -197,7 +197,8 @@ func (db *DB) Run(ctx context.Context) error { pointerHashReaders = append(pointerHashReaders, nextReader) prevHashReader = nextReader } - commitSyncReader := pipeline.NewReader(prevHashReader) + applyWALReader := pipeline.NewReader(prevHashReader) + commitSyncReader := pipeline.NewReader(applyWALReader) spawn("supervisor", parallel.Exit, func(ctx context.Context) error { var lastSyncCh chan<- struct{} @@ -256,6 +257,9 @@ func (db *DB) Run(ctx context.Context) error { return db.updatePointerHashes(ctx, reader, uint64(i)) }) } + spawn("applyWAL", parallel.Fail, func(ctx context.Context) error { + return db.applyWALChanges(ctx, applyWALReader) + }) spawn("commitSync", parallel.Fail, func(ctx context.Context) error { return db.syncOnCommit(ctx, commitSyncReader) }) @@ -923,6 +927,17 @@ func (db *DB) updatePointerHashes( } } +func (db *DB) applyWALChanges(ctx context.Context, pipeReader *pipeline.Reader) error { + for processedCount := uint64(0); ; processedCount++ { + req, err := pipeReader.Read(ctx) + if err != nil { + return err + } + + pipeReader.Acknowledge(processedCount+1, req) + } +} + func (db *DB) syncOnCommit(ctx context.Context, pipeReader *pipeline.Reader) error { for processedCount := uint64(0); ; processedCount++ { req, err := pipeReader.Read(ctx)