Skip to content

Commit

Permalink
Open up Buffered Sender Configurations (#12297)
Browse files Browse the repository at this point in the history
* Open up Buffered Sender Configurations

* Response to PR Comments

* Update API File

* Added Jitter value

* PR Comments II

* Add jitterValue to the correct place

* Format

* Sync Retry Logic with core-https

* Changed Name to initialBatchActionCount

* Update API
  • Loading branch information
sarangan12 authored Nov 6, 2020
1 parent 9190ef5 commit 61d3b59
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 9 deletions.
5 changes: 5 additions & 0 deletions sdk/search/search-documents/review/search-documents.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,11 @@ export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = Operatio
// @public
export interface SearchIndexingBufferedSenderOptions {
autoFlush?: boolean;
flushWindowInMs?: number;
initialBatchActionCount?: number;
maxRetries?: number;
maxRetryDelayInMs?: number;
retryDelayInMs?: number;
}

// @public
Expand Down
26 changes: 26 additions & 0 deletions sdk/search/search-documents/src/indexModels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,32 @@ export interface SearchIndexingBufferedSenderOptions {
* Indicates if autoFlush is enabled.
*/
autoFlush?: boolean;
/**
* Initial Batch Action Count.
*
* A batch request will be sent once the documents
* reach the initialBatchActionCount.
*/
initialBatchActionCount?: number;
/**
* Flush Window.
*
* A batch request will be sent after flushWindowInMs is
* reached.
*/
flushWindowInMs?: number;
/**
* Maximum number of Retries
*/
maxRetries?: number;
/**
* Delay between retries
*/
retryDelayInMs?: number;
/**
* Max Delay between retries
*/
maxRetryDelayInMs?: number;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,29 @@ import EventEmitter from "events";
import { createSpan } from "./tracing";
import { CanonicalCode } from "@opentelemetry/api";
import { SearchIndexingBufferedSender } from "./searchIndexingBufferedSender";
import { delay } from "@azure/core-http";
import { getRandomIntegerInclusive } from "./serviceUtils";

/**
* Default Batch Size
*/
export const DEFAULT_BATCH_SIZE: number = 1000;
export const DEFAULT_BATCH_SIZE: number = 512;
/**
* Default window flush interval
*/
export const DEFAULT_FLUSH_WINDOW: number = 60000;
/**
* Default number of times to retry
* Default number of times to retry.
*/
export const DEFAULT_RETRY_COUNT: number = 3;
/**
* Default retry delay.
*/
export const DEFAULT_RETRY_DELAY: number = 800;
/**
* Default Max Delay between retries.
*/
export const DEFAULT_MAX_RETRY_DELAY: number = 60000;

/**
* Class used to perform buffered operations against a search index,
Expand All @@ -49,10 +59,22 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
* Interval between flushes (in milliseconds).
*/
private flushWindowInMs: number;
/**
* Delay between retries
*/
private retryDelayInMs: number;
/**
* Maximum number of Retries
*/
private maxRetries: number;
/**
* Max Delay between retries
*/
private maxRetryDelayInMs: number;
/**
* Size of the batch.
*/
private batchSize: number;
private initialBatchActionCount: number;
/**
* Batch object used to complete the service call.
*/
Expand All @@ -75,9 +97,15 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
*/
constructor(client: SearchClient<T>, options: SearchIndexingBufferedSenderOptions = {}) {
this.client = client;
// General Configuration properties
this.autoFlush = options.autoFlush ?? false;
this.flushWindowInMs = DEFAULT_FLUSH_WINDOW;
this.batchSize = DEFAULT_BATCH_SIZE;
this.initialBatchActionCount = options.initialBatchActionCount ?? DEFAULT_BATCH_SIZE;
this.flushWindowInMs = options.flushWindowInMs ?? DEFAULT_FLUSH_WINDOW;
// Retry specific configuration properties
this.retryDelayInMs = options.retryDelayInMs ?? DEFAULT_FLUSH_WINDOW;
this.maxRetries = options.maxRetries ?? DEFAULT_RETRY_COUNT;
this.maxRetryDelayInMs = options.maxRetryDelayInMs ?? DEFAULT_MAX_RETRY_DELAY;

this.batchObject = new IndexDocumentsBatch<T>();
if (this.autoFlush) {
const interval = setInterval(() => this.flush(), this.flushWindowInMs);
Expand Down Expand Up @@ -323,7 +351,7 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
}

private isBatchReady(): boolean {
return this.batchObject.actions.length >= this.batchSize;
return this.batchObject.actions.length >= this.initialBatchActionCount;
}

private async internalFlush(force: boolean, options: OperationOptions = {}): Promise<void> {
Expand All @@ -332,7 +360,7 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
const actions: IndexDocumentsAction<T>[] = this.batchObject.actions;
this.batchObject = new IndexDocumentsBatch<T>();
while (actions.length > 0) {
const actionsToSend = actions.splice(0, this.batchSize);
const actionsToSend = actions.splice(0, this.initialBatchActionCount);
await this.submitDocuments(actionsToSend, options);
}
}
Expand All @@ -341,7 +369,7 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
private async submitDocuments(
actionsToSend: IndexDocumentsAction<T>[],
options: OperationOptions,
retryAttempt: number = 0
retryAttempt: number = 1
): Promise<void> {
try {
for (const action of actionsToSend) {
Expand All @@ -354,7 +382,16 @@ class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSende
// raise success event
this.emitter.emit("batchSucceeded", result);
} catch (e) {
if (this.isRetryAbleError(e) && retryAttempt < DEFAULT_RETRY_COUNT) {
if (this.isRetryAbleError(e) && retryAttempt <= this.maxRetries) {
// Exponentially increase the delay each time
const exponentialDelay = this.retryDelayInMs * Math.pow(2, retryAttempt);
// Don't let the delay exceed the maximum
const clampedExponentialDelay = Math.min(this.maxRetryDelayInMs, exponentialDelay);
// Allow the final value to have some "jitter" (within 50% of the delay size) so
// that retries across multiple clients don't occur simultaneously.
const delayWithJitter =
clampedExponentialDelay / 2 + getRandomIntegerInclusive(0, clampedExponentialDelay / 2);
await delay(delayWithJitter);
this.submitDocuments(actionsToSend, options, retryAttempt + 1);
} else {
this.emitter.emit("batchFailed", e);
Expand Down
11 changes: 11 additions & 0 deletions sdk/search/search-documents/src/serviceUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -574,3 +574,14 @@ export function convertDataDeletionDetectionPolicyToPublic(

return dataDeletionDetectionPolicy as SoftDeleteColumnDeletionDetectionPolicy;
}

export function getRandomIntegerInclusive(min: number, max: number): number {
// Make sure inputs are integers.
min = Math.ceil(min);
max = Math.floor(max);
// Pick a random offset from zero to the size of the range.
// Since Math.random() can never return 1, we have to make the range one larger
// in order to be inclusive of the maximum value after we take the floor.
const offset = Math.floor(Math.random() * (max - min + 1));
return offset + min;
}

0 comments on commit 61d3b59

Please sign in to comment.