Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementation of useAbortSignal option for grpc-web #777

Merged
merged 7 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions integration/grpc-web-no-streaming-observable/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ export const Empty = {
* but with the streaming method removed.
*/
export interface DashState {
UserSettings(request: DeepPartial<Empty>, metadata?: grpc.Metadata): Observable<DashUserSettingsState>;
UserSettings(
request: DeepPartial<Empty>,
abortSignal?: AbortSignal,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hersentino this seems fine, but it's probably a breaking API change to current codes that are passing foo.UserSettings(request, metadata)?

Can we push the abortSignal last so that it's not a breaking change?

Copy link
Contributor Author

@hersentino hersentino Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephenh Yep no problem :)

I initially put it in second argument because it was set as 2nd argument in this PR

So I had to also push abortSignal to last argument for the function generateService inside src/generate-services.ts. Not sur if this modification break something somewhere else.

metadata?: grpc.Metadata,
): Observable<DashUserSettingsState>;
}

export class DashStateClientImpl implements DashState {
Expand All @@ -332,8 +336,12 @@ export class DashStateClientImpl implements DashState {
this.UserSettings = this.UserSettings.bind(this);
}

UserSettings(request: DeepPartial<Empty>, metadata?: grpc.Metadata): Observable<DashUserSettingsState> {
return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), metadata);
UserSettings(
request: DeepPartial<Empty>,
abortSignal: AbortSignal | undefined,
metadata?: grpc.Metadata,
): Observable<DashUserSettingsState> {
return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), abortSignal, metadata);
}
}

Expand Down Expand Up @@ -373,6 +381,7 @@ interface Rpc {
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
request: any,
abortSignal: AbortSignal | undefined,
metadata: grpc.Metadata | undefined,
): Observable<any>;
}
Expand Down Expand Up @@ -404,14 +413,15 @@ export class GrpcWebImpl {
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
abortSignal: AbortSignal | undefined,
metadata: grpc.Metadata | undefined,
): Observable<any> {
const request = { ..._request, ...methodDesc.requestType };
const maybeCombinedMetadata = metadata && this.options.metadata
? new BrowserHeaders({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Observable((observer) => {
grpc.unary(methodDesc, {
const client = grpc.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -427,6 +437,14 @@ export class GrpcWebImpl {
}
},
});

const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) {
abortSignal.addEventListener("abort", abortHandler);
}
}).pipe(take(1));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
outputClientImpl=grpc-web,returnObservable=true
outputClientImpl=grpc-web,returnObservable=true,useAbortSignal=true
6 changes: 5 additions & 1 deletion integration/grpc-web/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,11 @@ export class GrpcWebImpl {
}
},
});
observer.add(() => client.close());
observer.add(() => {
if (!observer.closed) {
return client.close();
}
});
});
upStream();
}).pipe(share());
Expand Down
64 changes: 61 additions & 3 deletions src/generate-grpc-web.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export function generateGrpcClientImpl(
/** Creates the RPC methods that client code actually calls. */
function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, methodDesc: MethodDescriptorProto) {
assertInstanceOf(methodDesc, FormattedMethodDescriptor);
const { options } = ctx;
const { useAbortSignal } = options;
const requestMessage = rawRequestType(ctx, methodDesc);
const inputType = requestType(ctx, methodDesc, true);
const returns = responsePromiseOrObservable(ctx, methodDesc);
Expand All @@ -56,6 +58,7 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me
return code`
${methodDesc.formattedName}(
request: ${inputType},
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata?: grpc.Metadata,
): ${returns} {
throw new Error('ts-proto does not yet support client streaming!');
Expand All @@ -67,11 +70,13 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me
return code`
${methodDesc.formattedName}(
request: ${inputType},
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata?: grpc.Metadata,
): ${returns} {
return this.rpc.${method}(
${methodDescName(serviceDesc, methodDesc)},
${requestMessage}.fromPartial(request),
${useAbortSignal ? "abortSignal," : ""}
metadata,
);
}
Expand Down Expand Up @@ -165,6 +170,8 @@ export function addGrpcWebMisc(ctx: Context, hasStreamingMethods: boolean): Code
/** Makes an `Rpc` interface to decouple from the low-level grpc-web `grpc.invoke and grpc.unary`/etc. methods. */
function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStreamingMethods: boolean): Code {
const chunks: Code[] = [];
const { options } = ctx;
const { useAbortSignal } = options;

chunks.push(code`interface Rpc {`);

Expand All @@ -173,6 +180,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined,
): ${wrapper}<any>;
`);
Expand All @@ -182,6 +190,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre
invoke<T extends UnaryMethodDefinitionish>(
methodDesc: T,
request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined,
): ${observableType(ctx)}<any>;
`);
Expand Down Expand Up @@ -230,10 +239,24 @@ function generateGrpcWebImpl(ctx: Context, returnObservable: boolean, hasStreami
}

function createPromiseUnaryMethod(ctx: Context): Code {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
client.close();
reject(new Error("Aborted"));
}

if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";

return code`
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined
): Promise<any> {
const request = { ..._request, ...methodDesc.requestType };
Expand All @@ -242,7 +265,7 @@ function createPromiseUnaryMethod(ctx: Context): Code {
? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Promise((resolve, reject) => {
${grpc}.unary(methodDesc, {
${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -257,16 +280,30 @@ function createPromiseUnaryMethod(ctx: Context): Code {
}
},
});

${maybeAbortSignal}
});
}
`;
}

function createObservableUnaryMethod(ctx: Context): Code {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";
return code`
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined
): ${observableType(ctx)}<any> {
const request = { ..._request, ...methodDesc.requestType };
Expand All @@ -275,7 +312,7 @@ function createObservableUnaryMethod(ctx: Context): Code {
? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Observable(observer => {
${grpc}.unary(methodDesc, {
${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -291,16 +328,33 @@ function createObservableUnaryMethod(ctx: Context): Code {
}
},
});


${maybeAbortSignal}

}).pipe(${take}(1));
}
`;
}

function createInvokeMethod(ctx: Context) {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";

return code`
invoke<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
${useAbortSignal ? "abortSignal: AbortSignal | undefined," : ""}
metadata: grpc.Metadata | undefined
): ${observableType(ctx)}<any> {
const upStreamCodes = this.options.upStreamRetryCodes || [];
Expand Down Expand Up @@ -332,7 +386,11 @@ function createInvokeMethod(ctx: Context) {
}
},
});
observer.add(() => client.close());
observer.add(() => {
if (!observer.closed) return client.close()
});

${maybeAbortSignal}
});
upStream();
}).pipe(${share}());
Expand Down