From 165d83cddad7dcff105a0c0444fde4c9da2afed4 Mon Sep 17 00:00:00 2001 From: Wojciech Malota-Wojcik Date: Tue, 3 Dec 2024 14:50:37 +0100 Subject: [PATCH] Handle EINTR in uring --- benchmark_test.go | 15 +++++------ db.go | 1 + go.mod | 6 +++-- go.sum | 8 +++--- persistent/dummy.go | 3 +++ persistent/file.go | 62 ++++++++++++++++++++++++++++++-------------- persistent/memory.go | 3 +++ persistent/types.go | 1 + 8 files changed, 66 insertions(+), 33 deletions(-) diff --git a/benchmark_test.go b/benchmark_test.go index d4c729f..dae7167 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -31,7 +31,7 @@ import ( func BenchmarkBalanceTransfer(b *testing.B) { const ( spaceID = 0x00 - numOfAddresses = 100_000_000 + numOfAddresses = 5_000_000 txsPerCommit = 20_000 balance = 100_000 ) @@ -58,7 +58,7 @@ func BenchmarkBalanceTransfer(b *testing.B) { func() { _, _ = rand.Read(accountBytes) - var size uint64 = 99 * 1024 * 1024 * 1024 + var size uint64 = 20 * 1024 * 1024 * 1024 state, stateDeallocFunc, err := alloc.NewState( size, 100, @@ -70,14 +70,13 @@ func BenchmarkBalanceTransfer(b *testing.B) { defer stateDeallocFunc() //nolint:ineffassign,wastedassign,staticcheck - store, storeCloseFunc, err := fileStore( + store, err := fileStore( // "/tmp/d0/wojciech/db.quantum", "db0.quantum", size) if err != nil { panic(err) } - defer storeCloseFunc() store = persistent.NewDummyStore() @@ -184,7 +183,7 @@ func BenchmarkBalanceTransfer(b *testing.B) { } } -func fileStore(path string, size uint64) (persistent.Store, func(), error) { +func fileStore(path string, size uint64) (persistent.Store, error) { expectedNumOfNodes := size / types.NodeLength expectedCapacity := expectedNumOfNodes * types.NodeLength seekTo := int64(expectedCapacity - types.NodeLength) @@ -195,14 +194,14 @@ func fileStore(path string, size uint64) (persistent.Store, func(), error) { file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o600) if err != nil { - return nil, nil, err + return nil, err } if _, err := file.Seek(seekTo, io.SeekEnd); err != nil { - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } if _, err := file.Write(data); err != nil { - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } return persistent.NewFileStore(file) diff --git a/db.go b/db.go index 554bc64..c815465 100644 --- a/db.go +++ b/db.go @@ -167,6 +167,7 @@ func (db *DB) Run(ctx context.Context) error { spawn("state", parallel.Fail, db.config.State.Run) spawn("pipeline", parallel.Exit, func(ctx context.Context) error { defer db.config.State.Close() + defer db.config.Store.Close() return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { supervisorReader := db.queueReader diff --git a/go.mod b/go.mod index 2006293..7485275 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/outofforest/quantum -go 1.23 +go 1.23.3 + +replace github.com/godzie44/go-uring => github.com/gohryt/go-uring v0.0.0-20240825103418-6afda088f948 require ( github.com/cespare/xxhash v1.1.0 @@ -21,7 +23,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect - github.com/libp2p/go-sockaddr v0.1.1 // indirect + github.com/libp2p/go-sockaddr v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 5ea5766..76b2caf 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 h1:5zELAgnSz0gqmr4Q5DWCoOzNHoeBAxVUXB7LS1eG+sw= -github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM= +github.com/gohryt/go-uring v0.0.0-20240825103418-6afda088f948 h1:xDgy/WxYHRqhVRxfUlH+iKzFDe5qiLz4/ZTs/58uoXg= +github.com/gohryt/go-uring v0.0.0-20240825103418-6afda088f948/go.mod h1:ct8+i1XkPS1LQSX8i2XzBgwPSd5e///gqX7LSfrDSkM= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -16,8 +16,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/libp2p/go-sockaddr v0.1.1 h1:yD80l2ZOdGksnOyHrhxDdTDFrf7Oy+v3FMVArIRgZxQ= -github.com/libp2p/go-sockaddr v0.1.1/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= +github.com/libp2p/go-sockaddr v0.2.0 h1:Alhhj6lGxVAon9O32tOO89T601EugSx6YiGjy5BVjWk= +github.com/libp2p/go-sockaddr v0.2.0/go.mod h1:5NxulaB17yJ07IpzRIleys4un0PJ7WLWgMDLBBWrGw8= github.com/mmcloughlin/avo v0.6.0 h1:QH6FU8SKoTLaVs80GA8TJuLNkUYl4VokHKlPhVDg4YY= github.com/mmcloughlin/avo v0.6.0/go.mod h1:8CoAGaCSYXtCPR+8y18Y9aB/kxb8JSS6FRI7mSkvD+8= github.com/outofforest/logger v0.3.3/go.mod h1:+M5sO17Va9V33t28Qs9VqRQ8bFV501Uhq2PtQY+R3Ms= diff --git a/persistent/dummy.go b/persistent/dummy.go index 2434fe1..e0729fe 100644 --- a/persistent/dummy.go +++ b/persistent/dummy.go @@ -22,3 +22,6 @@ func (s *DummyStore) Write(_ types.NodeAddress, _ []byte) error { func (s *DummyStore) Sync() error { return nil } + +// Close closes the store. +func (s *DummyStore) Close() {} diff --git a/persistent/file.go b/persistent/file.go index 28959d3..841e026 100644 --- a/persistent/file.go +++ b/persistent/file.go @@ -2,6 +2,7 @@ package persistent import ( "os" + "syscall" "github.com/godzie44/go-uring/uring" "github.com/pkg/errors" @@ -12,18 +13,16 @@ import ( const ringCapacity = 50 // NewFileStore creates new file-based store. -func NewFileStore(file *os.File) (*FileStore, func(), error) { +func NewFileStore(file *os.File) (*FileStore, error) { ring, err := uring.New(ringCapacity) if err != nil { - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } return &FileStore{ - ring: ring, - fd: file.Fd(), - file: file, - }, func() { - _ = ring.Close() - }, nil + ring: ring, + fd: file.Fd(), + file: file, + }, nil } // FileStore defines persistent file-based store. @@ -44,25 +43,50 @@ func (s *FileStore) Write(address types.NodeAddress, data []byte) error { return nil } - cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter) - s.counter = 0 - if err != nil { - return errors.WithStack(err) - } - return errors.WithStack(cqe.Error()) + return s.submit() } // Sync syncs pending writes. func (s *FileStore) Sync() error { if s.counter > 0 { - cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter) - s.counter = 0 - if err != nil { + if err := s.submit(); err != nil { return errors.WithStack(err) } - if err := cqe.Error(); err != nil { + } + + // FIXME (wojciech): Fsync is implemented by io uring but not implemented in the library. + // Fork that library or write our own, possibly a better one? + // return errors.WithStack(s.file.Sync()) + + return nil +} + +// Close closes the store. +func (s *FileStore) Close() { + _ = s.ring.Close() + _ = s.file.Close() +} + +func (s *FileStore) submit() error { + // Due to asynchronous preemption + // https://go.dev/doc/go1.14#runtime + // https://unskilled.blog/posts/preemption-in-go-an-introduction/ + // Go runtime sends SIGURG to the thread to stop executing currently running goroutine. + // Signal causes the currently long-running syscall to exit with + // "interrupted system call" error. In that case operation must be repeated. + // In case of urings, the affected part is the syscall awaiting events. + // Looks like that when entire thread is dedicated to one goroutine (using runtime.LockOSThread), + // that mechanism is turned off on that thread. But it is a rare case so we decided to not incorporate this for now. + + for { + cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter) + switch { + case err == nil: + s.counter = 0 + return errors.WithStack(cqe.Error()) + case errors.Is(err, syscall.EINTR): + default: return errors.WithStack(err) } } - return errors.WithStack(s.file.Sync()) } diff --git a/persistent/memory.go b/persistent/memory.go index 197febf..bcfdc93 100644 --- a/persistent/memory.go +++ b/persistent/memory.go @@ -41,3 +41,6 @@ func (s *MemoryStore) Write(address types.NodeAddress, data []byte) error { func (s *MemoryStore) Sync() error { return nil } + +// Close closes the store. +func (s *MemoryStore) Close() {} diff --git a/persistent/types.go b/persistent/types.go index 8ff58ac..fdb807a 100644 --- a/persistent/types.go +++ b/persistent/types.go @@ -6,4 +6,5 @@ import "github.com/outofforest/quantum/types" type Store interface { Write(address types.NodeAddress, data []byte) error Sync() error + Close() }