diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index bae869c59..9a25cb7bc 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -43,7 +43,7 @@ import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation'; import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor'; import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor'; import { validateSerializable } from '../util/validator'; -import { Json } from '@yorkie-js-sdk/src/document/document'; +import { Json, BroadcastOptions } from '@yorkie-js-sdk/src/document/document'; /** * `SyncMode` defines synchronization modes for the PushPullChanges API. @@ -161,6 +161,15 @@ const DefaultClientOptions = { reconnectStreamDelay: 1000, }; +/** + * `DefaultBroadcastOptions` is the default options for broadcast. + */ +const DefaultBroadcastOptions = { + maxRetries: Infinity, + initialRetryInterval: 1000, + maxBackoff: 20000, +}; + /** * `Client` is a normal client that can communicate with the server. * It has documents and sends changes of the documents in local @@ -307,12 +316,13 @@ export class Client { doc.update((_, p) => p.set(options.initialPresence || {})); const unsubscribeBroacastEvent = doc.subscribe( 'local-broadcast', - (event) => { + async (event) => { const { topic, payload } = event.value; - const errorFn = event.error; + const errorFn = event.options?.error; + const options = event.options; try { - this.broadcast(doc.getKey(), topic, payload); + await this.broadcast(doc.getKey(), topic, payload, options); } catch (error: unknown) { if (error instanceof Error) { errorFn?.(error); @@ -609,6 +619,7 @@ export class Client { docKey: DocumentKey, topic: string, payload: Json, + options?: BroadcastOptions, ): Promise { if (!this.isActive()) { throw new YorkieError( @@ -631,28 +642,63 @@ export class Client { ); } - return this.enqueueTask(async () => { - return this.rpcClient - .broadcast( - { - clientId: this.id!, - documentId: attachment.docID, - topic, - payload: new TextEncoder().encode(JSON.stringify(payload)), - }, - { headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } }, - ) - .then(() => { - logger.info( - `[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`, - ); - }) - .catch((err) => { - logger.error(`[BC] c:"${this.getKey()}" err :`, err); - this.handleConnectError(err); - throw err; - }); - }); + const maxRetries = + options?.maxRetries ?? DefaultBroadcastOptions.maxRetries; + const maxBackoff = DefaultBroadcastOptions.maxBackoff; + + let retryCount = 0; + + const exponentialBackoff = (retryCount: number) => { + const retryInterval = Math.min( + DefaultBroadcastOptions.initialRetryInterval * 2 ** retryCount, + maxBackoff, + ); + return retryInterval; + }; + + const doLoop = async (): Promise => { + return this.enqueueTask(async () => { + return this.rpcClient + .broadcast( + { + clientId: this.id!, + documentId: attachment.docID, + topic, + payload: new TextEncoder().encode(JSON.stringify(payload)), + }, + { headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } }, + ) + .then(() => { + logger.info( + `[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`, + ); + }) + .catch((err) => { + logger.error(`[BC] c:"${this.getKey()}" err:`, err); + if (this.handleConnectError(err)) { + if (retryCount < maxRetries) { + retryCount++; + setTimeout(() => doLoop(), exponentialBackoff(retryCount - 1)); + logger.info( + `[BC] c:"${this.getKey()}" retry attempt ${retryCount}/${maxRetries}`, + ); + } else { + logger.error( + `[BC] c:"${this.getKey()}" exceeded maximum retry attempts`, + ); + + // Stop retrying after maxRetries + throw err; + } + } else { + // Stop retrying if the error is not retryable + throw err; + } + }); + }); + }; + + return doLoop(); } /** diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 311f91d3c..b0452c83e 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -79,6 +79,23 @@ import { History, HistoryOperation } from '@yorkie-js-sdk/src/document/history'; import { setupDevtools } from '@yorkie-js-sdk/src/devtools'; import * as Devtools from '@yorkie-js-sdk/src/devtools/types'; +/** + * `BroadcastOptions` are the options to create a new document. + * + * @public + */ +export interface BroadcastOptions { + /** + * `error` is called when an error occurs. + */ + error?: ErrorFn; + + /** + * `maxRetries` is the maximum number of retries. + */ + maxRetries?: number; +} + /** * `DocumentOptions` are the options to create a new document. * @@ -386,13 +403,13 @@ export interface PresenceChangedEvent

export interface BroadcastEvent extends BaseDocEvent { type: DocEventType.Broadcast; value: { clientID: ActorID; topic: string; payload: Json }; - error?: ErrorFn; + options?: BroadcastOptions; } export interface LocalBroadcastEvent extends BaseDocEvent { type: DocEventType.LocalBroadcast; value: { topic: string; payload: any }; - error?: ErrorFn; + options?: BroadcastOptions; } type DocEventCallbackMap

= { @@ -450,14 +467,14 @@ export type DocumentKey = string; type OperationInfoOfElement = TElement extends Text ? TextOperationInfo : TElement extends Counter - ? CounterOperationInfo - : TElement extends Tree - ? TreeOperationInfo - : TElement extends BaseArray - ? ArrayOperationInfo - : TElement extends BaseObject - ? ObjectOperationInfo - : OperationInfo; + ? CounterOperationInfo + : TElement extends Tree + ? TreeOperationInfo + : TElement extends BaseArray + ? ArrayOperationInfo + : TElement extends BaseObject + ? ObjectOperationInfo + : OperationInfo; /** * `OperationInfoOfInternal` represents the type of the operation info of the @@ -478,24 +495,24 @@ type OperationInfoOfInternal< > = TDepth extends 0 ? TElement : TKeyOrPath extends `${infer TFirst}.${infer TRest}` - ? TFirst extends keyof TElement - ? TElement[TFirst] extends BaseArray - ? OperationInfoOfInternal< - TElement[TFirst], - number, - DecreasedDepthOf - > - : OperationInfoOfInternal< - TElement[TFirst], - TRest, - DecreasedDepthOf - > - : OperationInfo - : TKeyOrPath extends keyof TElement - ? TElement[TKeyOrPath] extends BaseArray - ? ArrayOperationInfo - : OperationInfoOfElement - : OperationInfo; + ? TFirst extends keyof TElement + ? TElement[TFirst] extends BaseArray + ? OperationInfoOfInternal< + TElement[TFirst], + number, + DecreasedDepthOf + > + : OperationInfoOfInternal< + TElement[TFirst], + TRest, + DecreasedDepthOf + > + : OperationInfo + : TKeyOrPath extends keyof TElement + ? TElement[TKeyOrPath] extends BaseArray + ? ArrayOperationInfo + : OperationInfoOfElement + : OperationInfo; /** * `DecreasedDepthOf` represents the type of the decreased depth of the given depth. @@ -503,24 +520,24 @@ type OperationInfoOfInternal< type DecreasedDepthOf = Depth extends 10 ? 9 : Depth extends 9 - ? 8 - : Depth extends 8 - ? 7 - : Depth extends 7 - ? 6 - : Depth extends 6 - ? 5 - : Depth extends 5 - ? 4 - : Depth extends 4 - ? 3 - : Depth extends 3 - ? 2 - : Depth extends 2 - ? 1 - : Depth extends 1 - ? 0 - : -1; + ? 8 + : Depth extends 8 + ? 7 + : Depth extends 7 + ? 6 + : Depth extends 6 + ? 5 + : Depth extends 5 + ? 4 + : Depth extends 4 + ? 3 + : Depth extends 3 + ? 2 + : Depth extends 2 + ? 1 + : Depth extends 1 + ? 0 + : -1; /** * `PathOfInternal` represents the type of the path of the given element. @@ -532,29 +549,29 @@ type PathOfInternal< > = Depth extends 0 ? Prefix : TElement extends Record - ? { - [TKey in keyof TElement]: TElement[TKey] extends LeafElement - ? `${Prefix}${TKey & string}` - : TElement[TKey] extends BaseArray - ? - | `${Prefix}${TKey & string}` - | `${Prefix}${TKey & string}.${number}` - | PathOfInternal< - TArrayElement, - `${Prefix}${TKey & string}.${number}.`, - DecreasedDepthOf - > - : - | `${Prefix}${TKey & string}` - | PathOfInternal< - TElement[TKey], - `${Prefix}${TKey & string}.`, - DecreasedDepthOf - >; - }[keyof TElement] - : Prefix extends `${infer TRest}.` - ? TRest - : Prefix; + ? { + [TKey in keyof TElement]: TElement[TKey] extends LeafElement + ? `${Prefix}${TKey & string}` + : TElement[TKey] extends BaseArray + ? + | `${Prefix}${TKey & string}` + | `${Prefix}${TKey & string}.${number}` + | PathOfInternal< + TArrayElement, + `${Prefix}${TKey & string}.${number}.`, + DecreasedDepthOf + > + : + | `${Prefix}${TKey & string}` + | PathOfInternal< + TElement[TKey], + `${Prefix}${TKey & string}.`, + DecreasedDepthOf + >; + }[keyof TElement] + : Prefix extends `${infer TRest}.` + ? TRest + : Prefix; /** * `OperationInfoOf` represents the type of the operation info of the given @@ -2070,11 +2087,11 @@ export class Document { /** * `broadcast` the payload to the given topic. */ - public broadcast(topic: string, payload: Json, error?: ErrorFn) { + public broadcast(topic: string, payload: Json, options?: BroadcastOptions) { const broadcastEvent: LocalBroadcastEvent = { type: DocEventType.LocalBroadcast, value: { topic, payload }, - error, + options, }; this.publish([broadcastEvent]); diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 2d485e386..cf325c647 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -33,6 +33,7 @@ import { } from '@yorkie-js-sdk/test/integration/integration_helper'; import { ConnectError, Code as ConnectCode } from '@connectrpc/connect'; import { Code, YorkieError } from '@yorkie-js-sdk/src/util/error'; +import { Json } from '@yorkie-js-sdk/src/document/document'; describe.sequential('Client', function () { afterEach(() => { @@ -902,7 +903,9 @@ describe.sequential('Client', function () { // @ts-ignore // Disable type checking for testing purposes - doc.broadcast(broadcastTopic, payload, errorHandler); + doc.broadcast(broadcastTopic, payload, { + error: errorHandler, + }); await eventCollector.waitAndVerifyNthEvent(1, broadcastErrMessage); @@ -914,7 +917,7 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector<[string, any]>(); + const eventCollector = new EventCollector<[string, Json]>(); const broadcastTopic = 'test'; const unsubscribe = d2.subscribe('broadcast', (event) => { const { topic, payload } = event.value; @@ -945,7 +948,7 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector<[string, any]>(); + const eventCollector = new EventCollector<[string, Json]>(); const broadcastTopic1 = 'test1'; const broadcastTopic2 = 'test2'; @@ -980,7 +983,7 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector<[string, any]>(); + const eventCollector = new EventCollector<[string, Json]>(); const broadcastTopic = 'test'; const unsubscribe = d2.subscribe('broadcast', (event) => { const { topic, payload } = event.value; @@ -1018,8 +1021,8 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector1 = new EventCollector<[string, any]>(); - const eventCollector2 = new EventCollector<[string, any]>(); + const eventCollector1 = new EventCollector<[string, Json]>(); + const eventCollector2 = new EventCollector<[string, Json]>(); const broadcastTopic = 'test'; const payload = { a: 1, b: '2' }; @@ -1058,4 +1061,96 @@ describe.sequential('Client', function () { SyncMode.Realtime, ); }); + + it('Should retry broadcasting on network failure with retry option and succeeds when network is restored', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, Json]>(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); + + // 01. Simulate Unknown error. + vi.stubGlobal('fetch', async () => { + throw new ConnectError('Failed to fetch', ConnectCode.Unknown); + }); + await new Promise((res) => setTimeout(res, 30)); + + const payload = { a: 1, b: '2' }; + + d1.broadcast(broadcastTopic, payload); + + // Failed to broadcast due to network failure + await new Promise((res) => setTimeout(res, 3000)); + assert.equal(eventCollector.getLength(), 0); + + // 02. Back to normal condition + vi.unstubAllGlobals(); + + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); + + it('Should not retry broadcasting on network failure when maxRetries is set to zero', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, any]>(); + const eventCollector2 = new EventCollector(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); + + // 01. Simulate Unknown error. + vi.stubGlobal('fetch', async () => { + throw new ConnectError('Failed to fetch', ConnectCode.Unknown); + }); + + await new Promise((res) => setTimeout(res, 30)); + + const payload = { a: 1, b: '2' }; + + const errorHandler = (error: Error) => { + if (error instanceof ConnectError) { + eventCollector2.add(error.code); + } + }; + + d1.broadcast(broadcastTopic, payload, { + error: errorHandler, + maxRetries: 0, + }); + + // 02. Back to normal condition + vi.unstubAllGlobals(); + + await eventCollector2.waitAndVerifyNthEvent(1, ConnectCode.Unknown); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); });