Skip to content

Commit

Permalink
Handle EINTR in uring (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Dec 3, 2024
1 parent 9013a4e commit 3415dba
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 33 deletions.
15 changes: 7 additions & 8 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func BenchmarkBalanceTransfer(b *testing.B) {
const (
spaceID = 0x00
numOfAddresses = 100_000_000
numOfAddresses = 5_000_000
txsPerCommit = 20_000
balance = 100_000
)
Expand All @@ -58,7 +58,7 @@ func BenchmarkBalanceTransfer(b *testing.B) {
func() {
_, _ = rand.Read(accountBytes)

var size uint64 = 99 * 1024 * 1024 * 1024
var size uint64 = 20 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := alloc.NewState(
size,
100,
Expand All @@ -70,14 +70,13 @@ func BenchmarkBalanceTransfer(b *testing.B) {
defer stateDeallocFunc()

//nolint:ineffassign,wastedassign,staticcheck
store, storeCloseFunc, err := fileStore(
store, err := fileStore(
// "/tmp/d0/wojciech/db.quantum",
"db0.quantum",
size)
if err != nil {
panic(err)
}
defer storeCloseFunc()

store = persistent.NewDummyStore()

Expand Down Expand Up @@ -184,7 +183,7 @@ func BenchmarkBalanceTransfer(b *testing.B) {
}
}

func fileStore(path string, size uint64) (persistent.Store, func(), error) {
func fileStore(path string, size uint64) (persistent.Store, error) {
expectedNumOfNodes := size / types.NodeLength
expectedCapacity := expectedNumOfNodes * types.NodeLength
seekTo := int64(expectedCapacity - types.NodeLength)
Expand All @@ -195,14 +194,14 @@ func fileStore(path string, size uint64) (persistent.Store, func(), error) {

file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o600)
if err != nil {
return nil, nil, err
return nil, err
}

if _, err := file.Seek(seekTo, io.SeekEnd); err != nil {
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
if _, err := file.Write(data); err != nil {
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}

return persistent.NewFileStore(file)
Expand Down
1 change: 1 addition & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (db *DB) Run(ctx context.Context) error {
spawn("state", parallel.Fail, db.config.State.Run)
spawn("pipeline", parallel.Exit, func(ctx context.Context) error {
defer db.config.State.Close()
defer db.config.Store.Close()

return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
supervisorReader := db.queueReader
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/outofforest/quantum

go 1.23
go 1.23.3

replace github.com/godzie44/go-uring => github.com/gohryt/go-uring v0.0.0-20240825103418-6afda088f948

require (
github.com/cespare/xxhash v1.1.0
Expand All @@ -21,7 +23,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/libp2p/go-sockaddr v0.1.1 // indirect
github.com/libp2p/go-sockaddr v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 h1:5zELAgnSz0gqmr4Q5DWCoOzNHoeBAxVUXB7LS1eG+sw=
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM=
github.com/gohryt/go-uring v0.0.0-20240825103418-6afda088f948 h1:xDgy/WxYHRqhVRxfUlH+iKzFDe5qiLz4/ZTs/58uoXg=
github.com/gohryt/go-uring v0.0.0-20240825103418-6afda088f948/go.mod h1:ct8+i1XkPS1LQSX8i2XzBgwPSd5e///gqX7LSfrDSkM=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand All @@ -16,8 +16,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/libp2p/go-sockaddr v0.1.1 h1:yD80l2ZOdGksnOyHrhxDdTDFrf7Oy+v3FMVArIRgZxQ=
github.com/libp2p/go-sockaddr v0.1.1/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k=
github.com/libp2p/go-sockaddr v0.2.0 h1:Alhhj6lGxVAon9O32tOO89T601EugSx6YiGjy5BVjWk=
github.com/libp2p/go-sockaddr v0.2.0/go.mod h1:5NxulaB17yJ07IpzRIleys4un0PJ7WLWgMDLBBWrGw8=
github.com/mmcloughlin/avo v0.6.0 h1:QH6FU8SKoTLaVs80GA8TJuLNkUYl4VokHKlPhVDg4YY=
github.com/mmcloughlin/avo v0.6.0/go.mod h1:8CoAGaCSYXtCPR+8y18Y9aB/kxb8JSS6FRI7mSkvD+8=
github.com/outofforest/logger v0.3.3/go.mod h1:+M5sO17Va9V33t28Qs9VqRQ8bFV501Uhq2PtQY+R3Ms=
Expand Down
3 changes: 3 additions & 0 deletions persistent/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ func (s *DummyStore) Write(_ types.NodeAddress, _ []byte) error {
func (s *DummyStore) Sync() error {
return nil
}

// Close closes the store.
func (s *DummyStore) Close() {}
62 changes: 43 additions & 19 deletions persistent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package persistent

import (
"os"
"syscall"

"github.com/godzie44/go-uring/uring"
"github.com/pkg/errors"
Expand All @@ -12,18 +13,16 @@ import (
const ringCapacity = 50

// NewFileStore creates new file-based store.
func NewFileStore(file *os.File) (*FileStore, func(), error) {
func NewFileStore(file *os.File) (*FileStore, error) {
ring, err := uring.New(ringCapacity)
if err != nil {
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
return &FileStore{
ring: ring,
fd: file.Fd(),
file: file,
}, func() {
_ = ring.Close()
}, nil
ring: ring,
fd: file.Fd(),
file: file,
}, nil
}

// FileStore defines persistent file-based store.
Expand All @@ -44,25 +43,50 @@ func (s *FileStore) Write(address types.NodeAddress, data []byte) error {
return nil
}

cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter)
s.counter = 0
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(cqe.Error())
return s.submit()
}

// Sync syncs pending writes.
func (s *FileStore) Sync() error {
if s.counter > 0 {
cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter)
s.counter = 0
if err != nil {
if err := s.submit(); err != nil {
return errors.WithStack(err)
}
if err := cqe.Error(); err != nil {
}

// FIXME (wojciech): Fsync is implemented by io uring but not implemented in the library.
// Fork that library or write our own, possibly a better one?
// return errors.WithStack(s.file.Sync())

return nil
}

// Close closes the store.
func (s *FileStore) Close() {
_ = s.ring.Close()
_ = s.file.Close()
}

func (s *FileStore) submit() error {
// Due to asynchronous preemption
// https://go.dev/doc/go1.14#runtime
// https://unskilled.blog/posts/preemption-in-go-an-introduction/
// Go runtime sends SIGURG to the thread to stop executing currently running goroutine.
// Signal causes the currently long-running syscall to exit with
// "interrupted system call" error. In that case operation must be repeated.
// In case of urings, the affected part is the syscall awaiting events.
// Looks like that when entire thread is dedicated to one goroutine (using runtime.LockOSThread),
// that mechanism is turned off on that thread. But it is a rare case so we decided to not incorporate this for now.

for {
cqe, err := s.ring.SubmitAndWaitCQEvents(s.counter)
switch {
case err == nil:
s.counter = 0
return errors.WithStack(cqe.Error())
case errors.Is(err, syscall.EINTR):
default:
return errors.WithStack(err)
}
}
return errors.WithStack(s.file.Sync())
}
3 changes: 3 additions & 0 deletions persistent/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ func (s *MemoryStore) Write(address types.NodeAddress, data []byte) error {
func (s *MemoryStore) Sync() error {
return nil
}

// Close closes the store.
func (s *MemoryStore) Close() {}
1 change: 1 addition & 0 deletions persistent/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ import "github.com/outofforest/quantum/types"
type Store interface {
Write(address types.NodeAddress, data []byte) error
Sync() error
Close()
}

0 comments on commit 3415dba

Please sign in to comment.