Skip to content

Commit

Permalink
Merge pull request #1944 from murgatroid99/grpc-js_disable_channelz
Browse files Browse the repository at this point in the history
grpc-js: Allow users to disable channelz
  • Loading branch information
murgatroid99 authored Oct 26, 2021
2 parents 7c9ded0 + 891a918 commit c61b969
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 59 deletions.
2 changes: 2 additions & 0 deletions packages/grpc-js/src/channel-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface ChannelOptions {
'grpc.enable_http_proxy'?: number;
'grpc.http_connect_target'?: string;
'grpc.http_connect_creds'?: string;
'grpc.enable_channelz'?: number;
'grpc-node.max_session_memory'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
Expand All @@ -61,6 +62,7 @@ export const recognizedOptions = {
'grpc.max_send_message_length': true,
'grpc.max_receive_message_length': true,
'grpc.enable_http_proxy': true,
'grpc.enable_channelz': true,
'grpc-node.max_session_memory': true,
};

Expand Down
69 changes: 52 additions & 17 deletions packages/grpc-js/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ export class ChannelImplementation implements Channel {
private configSelector: ConfigSelector | null = null;

// Channelz info
private readonly channelzEnabled: boolean = true;
private originalTarget: string;
private channelzRef: ChannelRef;
private channelzTrace: ChannelzTrace;
Expand Down Expand Up @@ -213,9 +214,22 @@ export class ChannelImplementation implements Channel {
this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
this.callRefTimer.unref?.();

this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}

this.channelzTrace = new ChannelzTrace();
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
if (this.channelzEnabled) {
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'channel',
id: -1,
name: ''
};
}

if (this.options['grpc.default_authority']) {
this.defaultAuthority = this.options['grpc.default_authority'] as string;
Expand All @@ -242,7 +256,9 @@ export class ChannelImplementation implements Channel {
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
}
return subchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
Expand All @@ -262,18 +278,24 @@ export class ChannelImplementation implements Channel {
);
},
addChannelzChild: (child: ChannelRef | SubchannelRef) => {
this.childrenTracker.refChild(child);
if (this.channelzEnabled) {
this.childrenTracker.refChild(child);
}
},
removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
this.childrenTracker.unrefChild(child);
if (this.channelzEnabled) {
this.childrenTracker.unrefChild(child);
}
}
};
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
this.target,
channelControlHelper,
options,
(configSelector) => {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
}
this.configSelector = configSelector;
/* We process the queue asynchronously to ensure that the corresponding
* load balancer update has completed. */
Expand All @@ -288,7 +310,9 @@ export class ChannelImplementation implements Channel {
});
},
(status) => {
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
}
if (this.configSelectionQueue.length > 0) {
this.trace('Name resolution failed with calls queued for config selection');
}
Expand Down Expand Up @@ -553,7 +577,9 @@ export class ChannelImplementation implements Channel {
' -> ' +
ConnectivityState[newState]
);
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
}
this.connectivityState = newState;
const watchersCopy = this.connectivityStateWatchers.slice();
for (const watcherObject of watchersCopy) {
Expand Down Expand Up @@ -638,7 +664,9 @@ export class ChannelImplementation implements Channel {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.SHUTDOWN);
clearInterval(this.callRefTimer);
unregisterChannelzRef(this.channelzRef);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}

this.subchannelPool.unrefUnusedSubchannels();
}
Expand Down Expand Up @@ -690,6 +718,11 @@ export class ChannelImplementation implements Channel {
this.connectivityStateWatchers.push(watcherObject);
}

/**
* Get the channelz reference object for this channel. The returned value is
* garbage if channelz is disabled for this channel.
* @returns
*/
getChannelzRef() {
return this.channelzRef;
}
Expand Down Expand Up @@ -735,14 +768,16 @@ export class ChannelImplementation implements Channel {
this.credentials._getCallCredentials(),
callNumber
);
this.callTracker.addCallStarted();
stream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
stream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
}
return stream;
}
}
41 changes: 32 additions & 9 deletions packages/grpc-js/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export class Server {
private options: ChannelOptions;

// Channelz Info
private readonly channelzEnabled: boolean = true;
private channelzRef: ServerRef;
private channelzTrace = new ChannelzTrace();
private callTracker = new ChannelzCallTracker();
Expand All @@ -157,9 +158,20 @@ export class Server {

constructor(options?: ChannelOptions) {
this.options = options ?? {};
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Server created');
this.trace('Server constructed');
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
if (this.channelzEnabled) {
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Server created');
this.trace('Server constructed');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'server',
id: -1
};
}
}

private getChannelzInfo(): ServerInfo {
Expand Down Expand Up @@ -638,7 +650,9 @@ export class Server {
if (this.started === true) {
throw new Error('server is already started');
}
this.channelzTrace.addTrace('CT_INFO', 'Starting');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Starting');
}
this.started = true;
}

Expand Down Expand Up @@ -686,6 +700,11 @@ export class Server {
throw new Error('Not yet implemented');
}

/**
* Get the channelz reference object for this server. The returned value is
* garbage if channelz is disabled for this server.
* @returns
*/
getChannelzRef() {
return this.channelzRef;
}
Expand Down Expand Up @@ -841,12 +860,16 @@ export class Server {

this.sessions.set(session, channelzSessionInfo);
const clientAddress = session.socket.remoteAddress;
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
}
session.on('close', () => {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
}
this.sessions.delete(session);
});
});
Expand Down
94 changes: 61 additions & 33 deletions packages/grpc-js/src/subchannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ export class Subchannel {
private subchannelAddressString: string;

// Channelz info
private readonly channelzEnabled: boolean = true;
private channelzRef: SubchannelRef;
private channelzTrace: ChannelzTrace;
private callTracker = new ChannelzCallTracker();
Expand Down Expand Up @@ -226,9 +227,21 @@ export class Subchannel {
}, backoffOptions);
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);

this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
this.channelzTrace = new ChannelzTrace();
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
if (this.channelzEnabled) {
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'subchannel',
id: -1,
name: ''
};
}
this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
}

Expand Down Expand Up @@ -286,6 +299,9 @@ export class Subchannel {
}

private resetChannelzSocketInfo() {
if (!this.channelzEnabled) {
return;
}
if (this.channelzSocketRef) {
unregisterChannelzRef(this.channelzSocketRef);
this.childrenTracker.unrefChild(this.channelzSocketRef);
Expand Down Expand Up @@ -335,7 +351,9 @@ export class Subchannel {
}

private sendPing() {
this.keepalivesSent += 1;
if (this.channelzEnabled) {
this.keepalivesSent += 1;
}
logging.trace(
LogVerbosity.DEBUG,
'keepalive',
Expand Down Expand Up @@ -462,8 +480,10 @@ export class Subchannel {
connectionOptions
);
this.session = session;
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!);
this.childrenTracker.refChild(this.channelzSocketRef);
if (this.channelzEnabled) {
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!);
this.childrenTracker.refChild(this.channelzSocketRef);
}
session.unref();
/* For all of these events, check if the session at the time of the event
* is the same one currently attached to this subchannel, to ensure that
Expand Down Expand Up @@ -615,7 +635,9 @@ export class Subchannel {
' -> ' +
ConnectivityState[newState]
);
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
}
const previousState = this.connectivityState;
this.connectivityState = newState;
switch (newState) {
Expand Down Expand Up @@ -678,12 +700,16 @@ export class Subchannel {
/* If no calls, channels, or subchannel pools have any more references to
* this subchannel, we can be sure it will never be used again. */
if (this.callRefcount === 0 && this.refcount === 0) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
}
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
unregisterChannelzRef(this.channelzRef);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
}
}

Expand Down Expand Up @@ -805,34 +831,36 @@ export class Subchannel {
' with headers\n' +
headersString
);
this.callTracker.addCallStarted();
callStream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
const streamSession = this.session;
this.streamTracker.addCallStarted();
callStream.addStreamEndWatcher(success => {
if (streamSession === this.session) {
if (success) {
this.streamTracker.addCallSucceeded();
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
callStream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.streamTracker.addCallFailed();
this.callTracker.addCallFailed();
}
}
});
callStream.attachHttp2Stream(http2Stream, this, extraFilters, {
addMessageSent: () => {
this.messagesSent += 1;
this.lastMessageSentTimestamp = new Date();
},
addMessageReceived: () => {
this.messagesReceived += 1;
}
});
});
this.streamTracker.addCallStarted();
callStream.addStreamEndWatcher(success => {
if (streamSession === this.session) {
if (success) {
this.streamTracker.addCallSucceeded();
} else {
this.streamTracker.addCallFailed();
}
}
});
callStream.attachHttp2Stream(http2Stream, this, extraFilters, {
addMessageSent: () => {
this.messagesSent += 1;
this.lastMessageSentTimestamp = new Date();
},
addMessageReceived: () => {
this.messagesReceived += 1;
}
});
}
}

/**
Expand Down

0 comments on commit c61b969

Please sign in to comment.