Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use uring for persistent store #263

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ func (db *DB) Run(ctx context.Context) error {
pointerHashReaders = append(pointerHashReaders, nextReader)
prevHashReader = nextReader
}
walListReader := pipeline.NewReader(prevHashReader)
storeWALReader := pipeline.NewReader(walListReader)
storeWALReader := pipeline.NewReader(prevHashReader)

spawn("supervisor", parallel.Exit, func(ctx context.Context) error {
var lastSyncCh chan<- struct{}
Expand Down Expand Up @@ -264,9 +263,6 @@ func (db *DB) Run(ctx context.Context) error {
return db.updatePointerHashes(ctx, reader, uint64(i))
})
}
spawn("walList", parallel.Fail, func(ctx context.Context) error {
return db.buildWALList(ctx, db.config.Store, walListReader)
})
spawn("storeWAL", parallel.Fail, func(ctx context.Context) error {
return db.storeWAL(ctx, db.config.Store, storeWALReader)
})
Expand Down Expand Up @@ -1052,7 +1048,7 @@ func (db *DB) updatePointerHashes(
}
}

func (db *DB) buildWALList(ctx context.Context, store persistent.Store, pipeReader *pipeline.Reader) error {
func (db *DB) storeWAL(ctx context.Context, store persistent.Store, pipeReader *pipeline.Reader) error {
allocator := db.config.State.NewAllocator()

for processedCount := uint64(0); ; processedCount++ {
Expand All @@ -1062,6 +1058,9 @@ func (db *DB) buildWALList(ctx context.Context, store persistent.Store, pipeRead
}

for wr := req.WALRequest; wr != nil; wr = wr.Next {
if err := store.Write(wr.NodeAddress, db.config.State.Bytes(wr.NodeAddress)); err != nil {
return err
}
newTail, err := wallist.StoreAddress(&db.singularityNode.WALListTail, wr.NodeAddress, db.config.State,
allocator)
if err != nil {
Expand All @@ -1075,23 +1074,6 @@ func (db *DB) buildWALList(ctx context.Context, store persistent.Store, pipeRead
}
}

pipeReader.Acknowledge(processedCount+1, req)
}
}

func (db *DB) storeWAL(ctx context.Context, store persistent.Store, pipeReader *pipeline.Reader) error {
for processedCount := uint64(0); ; processedCount++ {
req, err := pipeReader.Read(ctx)
if err != nil {
return err
}

for wr := req.WALRequest; wr != nil; wr = wr.Next {
if err := store.Write(wr.NodeAddress, db.config.State.Bytes(wr.NodeAddress)); err != nil {
return err
}
}

if req.Type == pipeline.Commit {
err := store.Sync()
req.CommitCh <- err
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.23

require (
github.com/cespare/xxhash v1.1.0
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5
github.com/mmcloughlin/avo v0.6.0
github.com/outofforest/logger v0.5.5
github.com/outofforest/mass v0.2.1
Expand All @@ -20,6 +21,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/libp2p/go-sockaddr v0.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 h1:5zELAgnSz0gqmr4Q5DWCoOzNHoeBAxVUXB7LS1eG+sw=
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand All @@ -14,6 +16,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/libp2p/go-sockaddr v0.1.1 h1:yD80l2ZOdGksnOyHrhxDdTDFrf7Oy+v3FMVArIRgZxQ=
github.com/libp2p/go-sockaddr v0.1.1/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k=
github.com/mmcloughlin/avo v0.6.0 h1:QH6FU8SKoTLaVs80GA8TJuLNkUYl4VokHKlPhVDg4YY=
github.com/mmcloughlin/avo v0.6.0/go.mod h1:8CoAGaCSYXtCPR+8y18Y9aB/kxb8JSS6FRI7mSkvD+8=
github.com/outofforest/logger v0.3.3/go.mod h1:+M5sO17Va9V33t28Qs9VqRQ8bFV501Uhq2PtQY+R3Ms=
Expand Down Expand Up @@ -80,6 +84,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
42 changes: 36 additions & 6 deletions persistent/file.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,68 @@
package persistent

import (
"io"
"os"

"github.com/godzie44/go-uring/uring"
"github.com/pkg/errors"

"github.com/outofforest/quantum/types"
)

const ringCapacity = 50

// NewFileStore creates new file-based store.
func NewFileStore(file *os.File) (*FileStore, func(), error) {
ring, err := uring.New(ringCapacity)
if err != nil {
return nil, nil, errors.WithStack(err)
}
return &FileStore{
ring: ring,
fd: file.Fd(),
file: file,
}, func() {
_ = file.Close()
_ = ring.Close()
}, nil
}

// FileStore defines persistent file-based store.
type FileStore struct {
file *os.File
ring *uring.Ring
fd uintptr
file *os.File
counter uint32
}

// Write writes data to the store.
func (s *FileStore) Write(address types.NodeAddress, data []byte) error {
if _, err := s.file.Seek(int64(address*types.NodeLength), io.SeekStart); err != nil {
if err := s.ring.QueueSQE(uring.Write(s.fd, data, uint64(address)*types.NodeLength), 0, 0); err != nil {
return errors.WithStack(err)
}
_, err := s.file.Write(data)
return errors.WithStack(err)
s.counter++
if s.counter < ringCapacity {
return nil
}

cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter)
s.counter = 0
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(cqe.Error())
}

// Sync syncs pending writes.
func (s *FileStore) Sync() error {
if s.counter > 0 {
cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter)
s.counter = 0
if err != nil {
return errors.WithStack(err)
}
if err := cqe.Error(); err != nil {
return errors.WithStack(err)
}
}
return errors.WithStack(s.file.Sync())
}
Loading