diff --git a/db.go b/db.go index fdc62e8..554bc64 100644 --- a/db.go +++ b/db.go @@ -196,8 +196,7 @@ func (db *DB) Run(ctx context.Context) error { pointerHashReaders = append(pointerHashReaders, nextReader) prevHashReader = nextReader } - walListReader := pipeline.NewReader(prevHashReader) - storeWALReader := pipeline.NewReader(walListReader) + storeWALReader := pipeline.NewReader(prevHashReader) spawn("supervisor", parallel.Exit, func(ctx context.Context) error { var lastSyncCh chan<- struct{} @@ -264,9 +263,6 @@ func (db *DB) Run(ctx context.Context) error { return db.updatePointerHashes(ctx, reader, uint64(i)) }) } - spawn("walList", parallel.Fail, func(ctx context.Context) error { - return db.buildWALList(ctx, db.config.Store, walListReader) - }) spawn("storeWAL", parallel.Fail, func(ctx context.Context) error { return db.storeWAL(ctx, db.config.Store, storeWALReader) }) @@ -1052,7 +1048,7 @@ func (db *DB) updatePointerHashes( } } -func (db *DB) buildWALList(ctx context.Context, store persistent.Store, pipeReader *pipeline.Reader) error { +func (db *DB) storeWAL(ctx context.Context, store persistent.Store, pipeReader *pipeline.Reader) error { allocator := db.config.State.NewAllocator() for processedCount := uint64(0); ; processedCount++ { @@ -1062,6 +1058,9 @@ func (db *DB) buildWALList(ctx context.Context, store persistent.Store, pipeRead } for wr := req.WALRequest; wr != nil; wr = wr.Next { + if err := store.Write(wr.NodeAddress, db.config.State.Bytes(wr.NodeAddress)); err != nil { + return err + } newTail, err := wallist.StoreAddress(&db.singularityNode.WALListTail, wr.NodeAddress, db.config.State, allocator) if err != nil { @@ -1075,23 +1074,6 @@ func (db *DB) buildWALList(ctx context.Context, store persistent.Store, pipeRead } } - pipeReader.Acknowledge(processedCount+1, req) - } -} - -func (db *DB) storeWAL(ctx context.Context, store persistent.Store, pipeReader *pipeline.Reader) error { - for processedCount := uint64(0); ; processedCount++ { - req, err := pipeReader.Read(ctx) - if err != nil { - return err - } - - for wr := req.WALRequest; wr != nil; wr = wr.Next { - if err := store.Write(wr.NodeAddress, db.config.State.Bytes(wr.NodeAddress)); err != nil { - return err - } - } - if req.Type == pipeline.Commit { err := store.Sync() req.CommitCh <- err diff --git a/go.mod b/go.mod index 3164e0c..2006293 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.23 require ( github.com/cespare/xxhash v1.1.0 + github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 github.com/mmcloughlin/avo v0.6.0 github.com/outofforest/logger v0.5.5 github.com/outofforest/mass v0.2.1 @@ -20,6 +21,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/libp2p/go-sockaddr v0.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 89b7c03..5ea5766 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 h1:5zELAgnSz0gqmr4Q5DWCoOzNHoeBAxVUXB7LS1eG+sw= +github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -14,6 +16,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/libp2p/go-sockaddr v0.1.1 h1:yD80l2ZOdGksnOyHrhxDdTDFrf7Oy+v3FMVArIRgZxQ= +github.com/libp2p/go-sockaddr v0.1.1/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= github.com/mmcloughlin/avo v0.6.0 h1:QH6FU8SKoTLaVs80GA8TJuLNkUYl4VokHKlPhVDg4YY= github.com/mmcloughlin/avo v0.6.0/go.mod h1:8CoAGaCSYXtCPR+8y18Y9aB/kxb8JSS6FRI7mSkvD+8= github.com/outofforest/logger v0.3.3/go.mod h1:+M5sO17Va9V33t28Qs9VqRQ8bFV501Uhq2PtQY+R3Ms= @@ -80,6 +84,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/persistent/file.go b/persistent/file.go index d7175c1..28959d3 100644 --- a/persistent/file.go +++ b/persistent/file.go @@ -1,38 +1,68 @@ package persistent import ( - "io" "os" + "github.com/godzie44/go-uring/uring" "github.com/pkg/errors" "github.com/outofforest/quantum/types" ) +const ringCapacity = 50 + // NewFileStore creates new file-based store. func NewFileStore(file *os.File) (*FileStore, func(), error) { + ring, err := uring.New(ringCapacity) + if err != nil { + return nil, nil, errors.WithStack(err) + } return &FileStore{ + ring: ring, + fd: file.Fd(), file: file, }, func() { - _ = file.Close() + _ = ring.Close() }, nil } // FileStore defines persistent file-based store. type FileStore struct { - file *os.File + ring *uring.Ring + fd uintptr + file *os.File + counter uint32 } // Write writes data to the store. func (s *FileStore) Write(address types.NodeAddress, data []byte) error { - if _, err := s.file.Seek(int64(address*types.NodeLength), io.SeekStart); err != nil { + if err := s.ring.QueueSQE(uring.Write(s.fd, data, uint64(address)*types.NodeLength), 0, 0); err != nil { return errors.WithStack(err) } - _, err := s.file.Write(data) - return errors.WithStack(err) + s.counter++ + if s.counter < ringCapacity { + return nil + } + + cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter) + s.counter = 0 + if err != nil { + return errors.WithStack(err) + } + return errors.WithStack(cqe.Error()) } // Sync syncs pending writes. func (s *FileStore) Sync() error { + if s.counter > 0 { + cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter) + s.counter = 0 + if err != nil { + return errors.WithStack(err) + } + if err := cqe.Error(); err != nil { + return errors.WithStack(err) + } + } return errors.WithStack(s.file.Sync()) }