Skip to content

Commit

Permalink
Adaptive sleep duration in pipeline (#242)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Nov 26, 2024
1 parent 411c171 commit 74e7907
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (db *DB) Run(ctx context.Context) error {
}
executeTxReader := pipeline.NewReader(prepareTxReaders...)
deallocateReader := pipeline.NewReader(executeTxReader)
dataHashReaders := make([]*pipeline.Reader, 0, 4)
dataHashReaders := make([]*pipeline.Reader, 0, 3)
pointerHashReaders := make([]*pipeline.Reader, 0, cap(dataHashReaders))
for range cap(dataHashReaders) {
dataHashReader := pipeline.NewReader(deallocateReader)
Expand Down
26 changes: 23 additions & 3 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ const (
// StoreCapacity is the maximum capacity of store array in store request.
StoreCapacity = 10

sleepDuration = 5 * time.Microsecond
atomicDivider = 100
sleepDuration = 5 * time.Microsecond
maxSleepDuration = 100 * time.Microsecond
atomicDivider = 10
)

// NewTransactionRequestFactory creates transaction request factory.
Expand Down Expand Up @@ -109,6 +110,7 @@ func New() (*Pipeline, *Reader) {
head: &head,
availableCounts: []*uint64{availableCount},
processedCount: lo.ToPtr[uint64](0),
sleepDuration: sleepDuration,
}
}

Expand Down Expand Up @@ -139,15 +141,19 @@ type Reader struct {

currentAvailableCount uint64
currentReadCount uint64
sleepDuration time.Duration
}

// Read reads next request from the pipeline.
func (qr *Reader) Read(ctx context.Context) (*TransactionRequest, error) {
var h *TransactionRequest
var err error

//nolint:nestif
if qr.currentAvailableCount > qr.currentReadCount {
h = *qr.head
} else {
var slept bool
// It is done once per epoch to save time on calling ctx.Err().
err = errors.WithStack(ctx.Err())
for {
Expand All @@ -161,10 +167,22 @@ func (qr *Reader) Read(ctx context.Context) (*TransactionRequest, error) {
qr.currentAvailableCount = minAvailableCount
if qr.currentAvailableCount > qr.currentReadCount {
h = *qr.head
if qr.sleepDuration > 0 {
qr.sleepDuration -= time.Microsecond
}
break
}

time.Sleep(sleepDuration)
if slept {
if qr.sleepDuration < maxSleepDuration {
qr.sleepDuration += time.Microsecond
}
time.Sleep(sleepDuration)
} else if qr.sleepDuration > 0 {
time.Sleep(qr.sleepDuration)
}

slept = true

if ctx.Err() != nil {
return nil, errors.WithStack(ctx.Err())
Expand All @@ -190,6 +208,7 @@ func NewReader(parentReaders ...*Reader) *Reader {
head: parentReaders[0].head,
availableCounts: make([]*uint64, 0, len(parentReaders)),
processedCount: lo.ToPtr[uint64](0),
sleepDuration: sleepDuration,
}
for _, pr := range parentReaders {
r.availableCounts = append(r.availableCounts, pr.processedCount)
Expand All @@ -204,5 +223,6 @@ func CloneReader(reader *Reader) *Reader {
head: reader.head,
availableCounts: reader.availableCounts,
processedCount: lo.ToPtr[uint64](0),
sleepDuration: reader.sleepDuration,
}
}

0 comments on commit 74e7907

Please sign in to comment.