Skip to content

Commit

Permalink
Create limiting store
Browse files Browse the repository at this point in the history
  • Loading branch information
bstrausser committed Mar 5, 2024
1 parent 80627b2 commit 4e9f68e
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 76 deletions.
41 changes: 5 additions & 36 deletions copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,15 @@ package desync

import (
"context"
"time"

"golang.org/x/sync/errgroup"
)

type TimeThrottle struct {
lastExecutionTime time.Time
minimumTimeBetweenEachExecution time.Duration
}

func (timeThrottle *TimeThrottle) reset() {
timeThrottle.lastExecutionTime = time.Now()
}

func (timeThrottle *TimeThrottle) calculateThrottle() (bool, time.Duration) {
r := -(time.Since(timeThrottle.lastExecutionTime) - timeThrottle.minimumTimeBetweenEachExecution)
return r > 0, r
}

func (timeThrottle *TimeThrottle) waitIfRequired() {

wait, duration := timeThrottle.calculateThrottle()
if wait {
time.Sleep(duration)
}
}

func buildThrottle(waitPeriodMillis int) TimeThrottle {

d := time.Millisecond * time.Duration(waitPeriodMillis)
return TimeThrottle{time.Now().Add(-d), time.Duration(d)}
}

// Copy reads a list of chunks from the provided src store, and copies the ones
// not already present in the dst store. The goal is to load chunks from remote
// store to populate a cache. If progress is provided, it'll be called when a
// chunk has been processed. Used to draw a progress bar, can be nil.
func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar, shouldThrottle bool, waitPeriodMillis int) error {
func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar) error {

in := make(chan ChunkID)
g, ctx := errgroup.WithContext(ctx)
Expand All @@ -53,8 +24,8 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int,
// Start the workers
for i := 0; i < n; i++ {
g.Go(func() error {
waitPeriodMillis := 200
throttle := buildThrottle(waitPeriodMillis)



for id := range in {
pb.Increment()
Expand All @@ -65,12 +36,10 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int,
if hasChunk {
continue
}
if shouldThrottle {
throttle.waitIfRequired()
}


chunk, err := src.GetChunk(id)
throttle.reset()

if err != nil {
return err
}
Expand Down
41 changes: 1 addition & 40 deletions copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package desync
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -29,47 +28,9 @@ func TestCopy(t *testing.T) {
chunks := make([]ChunkID, 1)
chunks[0] = first_chunk_id

Copy(context.Background(), chunks, src_store, dst_store, 1, NewProgressBar(""),false,100)
Copy(context.Background(), chunks, src_store, dst_store, 1, NewProgressBar(""))
require.NoError(t, err)
has_the_chunk, _ := dst_store.HasChunk(first_chunk_id)

require.True(t, has_the_chunk)
}

func TestTimeThrottle(t *testing.T) {

// If the wait time is zero, we never wait

wait := time.Duration(time.Millisecond * 0)
throttle := TimeThrottle{time.Now(), wait}
w, _ := throttle.calculateThrottle()
require.False(t, w)

past := time.Now().Add(-time.Hour * 1)
throttle = TimeThrottle{past, wait}
w, _ = throttle.calculateThrottle()
require.False(t, w)

wait = time.Duration(time.Hour * 1)
throttle = TimeThrottle{time.Now(), wait}
w, d := throttle.calculateThrottle()
require.True(t, w)
require.True(t, d > time.Duration(time.Minute*59))

// Assuming out last exection was in the past, we don't wait
past = time.Now().Add(-time.Hour * 1)
wait = time.Duration(time.Second * 60)
throttle = TimeThrottle{past, wait}
w, _ = throttle.calculateThrottle()
require.False(t, w)

wait = time.Duration(time.Second * 60)
throttle = TimeThrottle{time.Now(), wait}
present := throttle.lastExecutionTime
// Without the sleep this can fail. At least on windows
// https://github.com/folbricht/desync/actions/runs/8131384060/job/22220648517?pr=258
time.Sleep(time.Duration(time.Millisecond*100))
throttle.reset()
future := throttle.lastExecutionTime
require.True(t, present.Before(future))
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
63 changes: 63 additions & 0 deletions ratelimitstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package desync

import (
"context"
"fmt"
"time"

"golang.org/x/time/rate"
)

type ThrottleOptions struct {
eventRate float64
burstRate int
timeout time.Duration
}

type RateLimitedLocalStore struct {

wrappedStore WriteStore

limiter *rate.Limiter

options ThrottleOptions

}

func NewRateLimitedLocalStore(s WriteStore, options ThrottleOptions) *RateLimitedLocalStore {

limiter := rate.NewLimiter(rate.Limit(options.eventRate), options.burstRate)
return &RateLimitedLocalStore{wrappedStore: s,limiter: limiter }
}

func (s RateLimitedLocalStore) GetChunk(id ChunkID) (*Chunk, error) {

return s.wrappedStore.GetChunk(id)
}

func (s RateLimitedLocalStore) HasChunk(id ChunkID) (bool, error) {


return s.wrappedStore.HasChunk(id)
}


func (s RateLimitedLocalStore) StoreChunk(chunk *Chunk) error {
ctx, cancel := context.WithTimeout(context.Background(), s.options.timeout)
defer cancel()
// This isn't ideal because what I'm really interested is in size over the wire.
_, err := chunk.Data()
if err != nil {
return err
}

//size := len(b)
err = s.limiter.WaitN(ctx,1)
if err != nil {

fmt.Println("Rate limit context error:", err)
}

return s.wrappedStore.StoreChunk(chunk)

}
58 changes: 58 additions & 0 deletions ratelimitstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package desync

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

func TestLimiter(t *testing.T){

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000)
defer cancel()

limiter := rate.NewLimiter(rate.Limit(50), 50)
err := limiter.WaitN(ctx,1)
require.Nil(t, err)

limiter = rate.NewLimiter(rate.Limit(2), 1)
_ = limiter.WaitN(ctx,1)
require.Nil(t, err)
err = limiter.WaitN(ctx,1)
require.Nil(t, err)

limiter = rate.NewLimiter(rate.Limit(1), 1)
_ = limiter.WaitN(ctx,1)
require.Nil(t, err)
err = limiter.WaitN(ctx,1)
require.NotNil(t, err)




}

func TestCopyWithNoLimit(t *testing.T) {

src_store_dir := t.TempDir()

src_store, err := NewLocalStore(src_store_dir, StoreOptions{})
require.NoError(t, err)
throttleOptions := ThrottleOptions{100,100,time.Millisecond*10000}
throttledStore := NewRateLimitedLocalStore(src_store, throttleOptions)

chunk_data := []byte("some data")
chunk := NewChunk(chunk_data)
chunk_id := chunk.ID()

err = throttledStore.StoreChunk(chunk)
require.NotNil(t,err)
hasChunk, err := throttledStore.HasChunk(chunk_id)
require.NotNil(t,err)
require.True(t,hasChunk)


}

0 comments on commit 4e9f68e

Please sign in to comment.