-
Notifications
You must be signed in to change notification settings - Fork 655
/
Copy pathdeadline-filter.ts
117 lines (110 loc) · 3.58 KB
/
deadline-filter.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { Call } from './call-stream';
import { ConnectivityState, Http2Channel } from './channel';
import { Status } from './constants';
import { BaseFilter, Filter, FilterFactory } from './filter';
import { Metadata } from './metadata';
const units: Array<[string, number]> = [
['m', 1],
['S', 1000],
['M', 60 * 1000],
['H', 60 * 60 * 1000],
];
function getDeadline(deadline: number) {
const now = new Date().getTime();
const timeoutMs = Math.max(deadline - now, 0);
for (const [unit, factor] of units) {
const amount = timeoutMs / factor;
if (amount < 1e8) {
return String(Math.ceil(amount)) + unit;
}
}
throw new Error('Deadline is too far in the future');
}
export class DeadlineFilter extends BaseFilter implements Filter {
private timer: NodeJS.Timer | null = null;
private deadline: number;
constructor(
private readonly channel: Http2Channel,
private readonly callStream: Call
) {
super();
const callDeadline = callStream.getDeadline();
if (callDeadline instanceof Date) {
this.deadline = callDeadline.getTime();
} else {
this.deadline = callDeadline;
}
const now: number = new Date().getTime();
let timeout = this.deadline - now;
if (timeout < 0) {
timeout = 0;
}
if (this.deadline !== Infinity) {
this.timer = setTimeout(() => {
callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,
'Deadline exceeded'
);
}, timeout);
callStream.on('status', () => clearTimeout(this.timer as NodeJS.Timer));
}
}
sendMetadata(metadata: Promise<Metadata>) {
if (this.deadline === Infinity) {
return metadata;
}
return new Promise<Metadata>((resolve, reject) => {
if (
this.channel.getConnectivityState(false) === ConnectivityState.READY
) {
resolve(metadata);
} else {
const handleStateChange = (newState: ConnectivityState) => {
if (newState === ConnectivityState.READY) {
resolve(metadata);
this.channel.removeListener(
'connectivityStateChanged',
handleStateChange
);
this.callStream.removeListener('status', handleStatus);
}
};
const handleStatus = () => {
reject(new Error('Call ended'));
this.channel.removeListener(
'connectivityStateChanged',
handleStateChange
);
};
this.channel.on('connectivityStateChanged', handleStateChange);
this.callStream.once('status', handleStatus);
}
}).then((finalMetadata: Metadata) => {
const timeoutString = getDeadline(this.deadline);
finalMetadata.set('grpc-timeout', timeoutString);
return finalMetadata;
});
}
}
export class DeadlineFilterFactory implements FilterFactory<DeadlineFilter> {
constructor(private readonly channel: Http2Channel) {}
createFilter(callStream: Call): DeadlineFilter {
return new DeadlineFilter(this.channel, callStream);
}
}