diff --git a/storage/dynamic_delay.go b/storage/dynamic_delay.go index 5d4c42fb82bf..5944f515d39c 100644 --- a/storage/dynamic_delay.go +++ b/storage/dynamic_delay.go @@ -36,6 +36,23 @@ type dynamicDelay struct { mu *sync.RWMutex } +// validateDynamicDelayParams ensures, +// targetPercentile is a valid fraction (between 0 and 1). +// increaseRate is a positive number. +// minDelay is less than maxDelay. +func validateDynamicDelayParams(targetPercentile, increaseRate float64, minDelay, maxDelay time.Duration) error { + if targetPercentile < 0 || targetPercentile > 1 { + return fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile) + } + if increaseRate <= 0 { + return fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate) + } + if minDelay >= maxDelay { + return fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) + } + return nil +} + // NewDynamicDelay returns a dynamicDelay. // // targetPercentile is the desired percentile to be computed. For example, a @@ -49,16 +66,7 @@ type dynamicDelay struct { // // decrease can never lower the delay past minDelay, increase can never raise // the delay past maxDelay. -func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*dynamicDelay, error) { - if targetPercentile < 0 || targetPercentile > 1 { - return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile) - } - if increaseRate <= 0 { - return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate) - } - if minDelay >= maxDelay { - return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) - } +func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay { if initialDelay < minDelay { initialDelay = minDelay } @@ -84,7 +92,7 @@ func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDela maxDelay: maxDelay, value: initialDelay, mu: &sync.RWMutex{}, - }, nil + } } func (d *dynamicDelay) unsafeIncrease() { @@ -141,7 +149,7 @@ func (d *dynamicDelay) getValue() time.Duration { return d.value } -// PrintDelay prints the state of delay, helpful in debugging. +// printDelay prints the state of delay, helpful in debugging. func (d *dynamicDelay) printDelay() { d.mu.RLock() defer d.mu.RUnlock() @@ -152,3 +160,78 @@ func (d *dynamicDelay) printDelay() { fmt.Println("MaxDelay: ", d.maxDelay) fmt.Println("Value: ", d.value) } + +// bucketDelayManager wraps dynamicDelay to provide bucket-specific delays. +type bucketDelayManager struct { + targetPercentile float64 + increaseRate float64 + initialDelay time.Duration + minDelay time.Duration + maxDelay time.Duration + + // delays maps bucket names to their dynamic delay instance. + delays map[string]*dynamicDelay + + // mu guards delays. + mu *sync.RWMutex +} + +// newBucketDelayManager returns a new bucketDelayManager instance. +func newBucketDelayManager(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelayManager, error) { + err := validateDynamicDelayParams(targetPercentile, increaseRate, minDelay, maxDelay) + if err != nil { + return nil, err + } + + return &bucketDelayManager{ + targetPercentile: targetPercentile, + increaseRate: increaseRate, + initialDelay: initialDelay, + minDelay: minDelay, + maxDelay: maxDelay, + delays: make(map[string]*dynamicDelay), + mu: &sync.RWMutex{}, + }, nil +} + +// getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay +// exists for the bucket, a new one is created with the configured parameters. +func (b *bucketDelayManager) getDelay(bucketName string) *dynamicDelay { + b.mu.RLock() + delay, ok := b.delays[bucketName] + b.mu.RUnlock() + + if !ok { + b.mu.Lock() + defer b.mu.Unlock() + + // Check again, as someone might create b/w the execution of mu.RUnlock() and mu.Lock(). + delay, ok = b.delays[bucketName] + if !ok { + // Create a new dynamicDelay for the bucket if it doesn't exist + delay = newDynamicDelay(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay) + b.delays[bucketName] = delay + } + } + return delay +} + +// increase notes that the operation took longer than the delay for the given bucket. +func (b *bucketDelayManager) increase(bucketName string) { + b.getDelay(bucketName).increase() +} + +// decrease notes that the operation completed before the delay for the given bucket. +func (b *bucketDelayManager) decrease(bucketName string) { + b.getDelay(bucketName).decrease() +} + +// update updates the delay value for the bucket depending on the specified latency. +func (b *bucketDelayManager) update(bucketName string, latency time.Duration) { + b.getDelay(bucketName).update(latency) +} + +// getValue returns the desired delay to wait before retrying the operation for the given bucket. +func (b *bucketDelayManager) getValue(bucketName string) time.Duration { + return b.getDelay(bucketName).getValue() +} diff --git a/storage/dynamic_delay_test.go b/storage/dynamic_delay_test.go index 8247d7527496..57318580c454 100644 --- a/storage/dynamic_delay_test.go +++ b/storage/dynamic_delay_test.go @@ -13,8 +13,10 @@ package storage import ( + "fmt" "math" "math/rand" + "sync" "testing" "time" @@ -44,10 +46,7 @@ func applySamplesWithUpdate(numSamples int, expectedValue float64, rnd *rand.Ran } func TestNewDelay(t *testing.T) { - d, err := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Fatal(err) - } + d := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) want := &dynamicDelay{ increaseFactor: 1.047294, @@ -84,10 +83,7 @@ func TestNewDelay(t *testing.T) { func TestConvergence99(t *testing.T) { // d should converge to the 99-percentile value. - d, err := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Fatal(err) - } + d := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) rnd := rand.New(rand.NewSource(1)) @@ -122,10 +118,7 @@ func TestConvergence99(t *testing.T) { func TestConvergence90(t *testing.T) { // d should converge to the 90-percentile value. - d, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Fatal(err) - } + d := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) rnd := rand.New(rand.NewSource(1)) @@ -145,16 +138,18 @@ func TestConvergence90(t *testing.T) { } func TestOverflow(t *testing.T) { - d, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Fatal(err) - } + d := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) n := 10000 + // Should converge to maxDelay. for i := 0; i < n; i++ { d.increase() } - t.Log(d.getValue()) + if got, want := d.getValue(), 1*time.Hour; got != want { + t.Fatalf("unexpected d.Value: got %v, want %v", got, want) + } + + // Should converge to minDelay. for i := 0; i < 100*n; i++ { d.decrease() } @@ -163,14 +158,228 @@ func TestOverflow(t *testing.T) { } } -func TestInvalidArgument(t *testing.T) { - _, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) +func TestValidateDynamicDelayParams(t *testing.T) { + testCases := []struct { + name string + targetPercentile float64 + increaseRate float64 + minDelay time.Duration + maxDelay time.Duration + expectErr bool + }{ + // Valid parameters + {"valid", 0.5, 0.1, 1 * time.Second, 10 * time.Second, false}, + + // Invalid targetPercentile + {"invalid targetPercentile (< 0)", -0.1, 0.1, 1 * time.Second, 10 * time.Second, true}, + {"invalid targetPercentile (> 1)", 1.1, 0.1, 1 * time.Second, 10 * time.Second, true}, + + // Invalid increaseRate + {"invalid increaseRate (<= 0)", 0.5, 0, 1 * time.Second, 10 * time.Second, true}, + + // Invalid delay combination + {"invalid delay combination (minDelay >= maxDelay)", 0.5, 0.1, 10 * time.Second, 1 * time.Second, true}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validateDynamicDelayParams(tc.targetPercentile, tc.increaseRate, tc.minDelay, tc.maxDelay) + if tc.expectErr && err == nil { + t.Errorf("Expected an error, but got none") + } + if !tc.expectErr && err != nil { + t.Errorf("Unexpected error: %v", err) + } + }) + } +} + +func applySamplesBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelayManager, bucketName string) int { + var samplesOverThreshold int + for i := 0; i < numSamples; i++ { + randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) + if randomDelay > b.getValue(bucketName) { + samplesOverThreshold++ + b.increase(bucketName) + } else { + b.decrease(bucketName) + } + } + return samplesOverThreshold +} + +func applySamplesWithUpdateBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelayManager, bucketName string) { + for i := 0; i < numSamples; i++ { + randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) + b.update(bucketName, randomDelay) + } +} + +func TestBucketDelayManager(t *testing.T) { + b, err := newBucketDelayManager(0.99, 1.5, 100*time.Millisecond, 100*time.Millisecond, 10*time.Second) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + + t.Logf("testing") + + // Test increase and getValue + b.increase("bucket1") + delay1 := b.getValue("bucket1") + if delay1 <= 100*time.Millisecond { + t.Errorf("Expected delay for bucket1 to be > 100ms after increase, got %v", delay1) + } + + // Test decrease and getValue + b.decrease("bucket1") + delay2 := b.getValue("bucket1") + if delay2 >= delay1 { + t.Errorf("Expected delay for bucket1 to be < %v after decrease, got %v", delay1, delay2) + } + + // Test update with latency > current delay + b.update("bucket2", 200*time.Millisecond) + delay3 := b.getValue("bucket2") + if delay3 <= 100*time.Millisecond { + t.Errorf("Expected delay for bucket2 to be > 100ms after update with higher latency, got %v", delay3) + } + + // Test update with latency < current delay + b.update("bucket2", 50*time.Millisecond) + delay4 := b.getValue("bucket2") + if delay4 >= delay3 { + t.Errorf("Expected delay for bucket2 to be < %v after update with lower latency, got %v", delay3, delay4) + } +} + +func TestBucketDelayManagerConcurrentAccess(t *testing.T) { + b, err := newBucketDelayManager(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + + // Test concurrent access + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bucketName := fmt.Sprintf("bucket%d", i%3) // 3 buckets + b.increase(bucketName) + b.decrease(bucketName) + b.update(bucketName, time.Duration(i)*time.Millisecond) + }(i) + } + wg.Wait() + + // Check if the map size is as expected + b.mu.Lock() // Lock to access the map safely + defer b.mu.Unlock() + if len(b.delays) != 3 { + t.Errorf("Expected %d buckets in the map, but got %d", 3, len(b.delays)) + } +} + +func TestBucketDelayManagerInvalidArgument(t *testing.T) { + // Test with invalid targetPercentile + _, err := newBucketDelayManager(1.1, 15, 1*time.Millisecond, 1*time.Hour, 2*time.Hour) if err == nil { - t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay") + t.Fatal("unexpected, should throw error as targetPercentile is greater than 1") + } + + // Test with invalid increaseRate + _, err = newBucketDelayManager(0.9, -1, 1*time.Millisecond, 1*time.Hour, 2*time.Hour) + if err == nil { + t.Fatal("unexpected, should throw error as increaseRate can't be negative") } - _, err = newDynamicDelay(1-0.1, 0, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) + // Test with invalid minDelay and maxDelay combination + _, err = newBucketDelayManager(0.9, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) if err == nil { - t.Fatal("unexpected, should throw error as increaseRate can't be zero") + t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay") + } +} + +func TestBucketDelayManagerOverflow(t *testing.T) { + b, err := newBucketDelayManager(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + + bucketName := "testBucket" + n := 10000 + + // Should converge to maxDelay. + for i := 0; i < n; i++ { + b.increase(bucketName) + } + + if got, want := b.getValue(bucketName), 1*time.Hour; got != want { + t.Fatalf("unexpected delay value: got %v, want %v", got, want) + } + + // Should converge to minDelay. + for i := 0; i < 100*n; i++ { + b.decrease(bucketName) + } + if got, want := b.getValue(bucketName), 1*time.Millisecond; got != want { + t.Fatalf("unexpected delay value: got %v, want %v", got, want) + } +} + +func TestBucketDelayManagerConvergence90(t *testing.T) { + b, err := newBucketDelayManager(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + bucket1 := "bucket1" + bucket2 := "bucket2" + + rnd := rand.New(rand.NewSource(1)) + + // Warm up both buckets + applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket1) + applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket2) + + // Check convergence for bucket1 + { + samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket1) + if samplesOverThreshold < (1000 * 0.05) { + t.Errorf("bucket1: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.2) { + t.Errorf("bucket1: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) + } + } + + // Check convergence for bucket2 + { + samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket2) + if samplesOverThreshold < (1000 * 0.05) { + t.Errorf("bucket2: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.2) { + t.Errorf("bucket2: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) + } + } +} + +func TestBucketDelayManagerMapSize(t *testing.T) { + b, err := newBucketDelayManager(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + // Add delays for multiple buckets + numBuckets := 10 + for i := 0; i < numBuckets; i++ { + bucketName := fmt.Sprintf("bucket%d", i) + b.increase(bucketName) + } + + // Check if the map size is as expected + b.mu.Lock() // Lock to access the map safely + defer b.mu.Unlock() + if len(b.delays) != numBuckets { + t.Errorf("Expected %d buckets in the map, but got %d", numBuckets, len(b.delays)) } } diff --git a/storage/experimental/experimental.go b/storage/experimental/experimental.go index 4e908712ba13..c83d1fa0935a 100644 --- a/storage/experimental/experimental.go +++ b/storage/experimental/experimental.go @@ -33,18 +33,13 @@ import ( // Cloud Storage. If the timeout elapses with no response from the server, the request // is automatically retried. // The timeout is initially set to ReadStallTimeoutConfig.Min. The client tracks -// latency across all read requests from the client, and can adjust the timeout higher -// to the target percentile when latency from the server is high. +// latency across all read requests from the client for each bucket accessed, and can +// adjust the timeout higher to the target percentile when latency for request to that +// bucket is high. // Currently, this is supported only for downloads ([storage.NewReader] and // [storage.NewRangeReader] calls) and only for the XML API. Other read APIs (gRPC & JSON) // will be supported soon. func WithReadStallTimeout(rstc *ReadStallTimeoutConfig) option.ClientOption { - // TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle. - // Currently, dynamicTimeout is kept at the client and hence shared across all the - // BucketHandle, which is not the ideal state. As latency depends on location of VM - // and Bucket, and read latency of different buckets may lie in different range. - // Hence having a separate dynamicTimeout instance at BucketHandle level will - // be better. return internal.WithReadStallTimeout.(func(config *ReadStallTimeoutConfig) option.ClientOption)(rstc) } diff --git a/storage/http_client.go b/storage/http_client.go index bf4af85c5fd4..6baf90547356 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -55,7 +55,7 @@ type httpStorageClient struct { scheme string settings *settings config *storageConfig - dynamicReadReqStallTimeout *dynamicDelay + dynamicReadReqStallTimeout *bucketDelayManager } // newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON @@ -130,10 +130,10 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err) } - var dd *dynamicDelay + var bd *bucketDelayManager if config.readStallTimeoutConfig != nil { drrstConfig := config.readStallTimeoutConfig - dd, err = newDynamicDelay( + bd, err = newBucketDelayManager( drrstConfig.TargetPercentile, getDynamicReadReqIncreaseRateFromEnv(), getDynamicReadReqInitialTimeoutSecFromEnv(drrstConfig.Min), @@ -152,7 +152,7 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl scheme: u.Scheme, settings: s, config: &config, - dynamicReadReqStallTimeout: dd, + dynamicReadReqStallTimeout: bd, }, nil } @@ -892,20 +892,20 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa res, err = c.hc.Do(req.WithContext(cancelCtx)) if err == nil { reqLatency := time.Since(reqStartTime) - c.dynamicReadReqStallTimeout.update(reqLatency) + c.dynamicReadReqStallTimeout.update(params.bucket, reqLatency) } else if errors.Is(err, context.Canceled) { // context.Canceled means operation took more than current dynamicTimeout, // hence should be increased. - c.dynamicReadReqStallTimeout.increase() + c.dynamicReadReqStallTimeout.increase(params.bucket) } done <- true }() // Wait until timeout or request is successful. - timer := time.After(c.dynamicReadReqStallTimeout.getValue()) + timer := time.After(c.dynamicReadReqStallTimeout.getValue(params.bucket)) select { case <-timer: - log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue().Seconds()) + log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue(params.bucket).Seconds()) cancel() err = context.DeadlineExceeded if res != nil && res.Body != nil {