Skip to content

Commit

Permalink
refactor(NODE-5696): add async-iterator based socket helpers (#3896)
Browse files Browse the repository at this point in the history
Co-authored-by: Neal Beeken <[email protected]>
  • Loading branch information
baileympearson and nbbeeken authored Nov 8, 2023
1 parent 89cb092 commit b602162
Show file tree
Hide file tree
Showing 13 changed files with 1,292 additions and 95 deletions.
10 changes: 5 additions & 5 deletions src/cmap/command_monitoring_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
LEGACY_HELLO_COMMAND_CAMEL_CASE
} from '../constants';
import { calculateDurationInMs, deepCopy } from '../utils';
import { Msg, type Query, type WriteProtocolMessageType } from './commands';
import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands';
import type { Connection } from './connection';

/**
Expand Down Expand Up @@ -181,8 +181,8 @@ const HELLO_COMMANDS = new Set(['hello', LEGACY_HELLO_COMMAND, LEGACY_HELLO_COMM

// helper methods
const extractCommandName = (commandDoc: Document) => Object.keys(commandDoc)[0];
const namespace = (command: Query) => command.ns;
const collectionName = (command: Query) => command.ns.split('.')[1];
const namespace = (command: OpQueryRequest) => command.ns;
const collectionName = (command: OpQueryRequest) => command.ns.split('.')[1];
const maybeRedact = (commandName: string, commandDoc: Document, result: Error | Document) =>
SENSITIVE_COMMANDS.has(commandName) ||
(HELLO_COMMANDS.has(commandName) && commandDoc.speculativeAuthenticate)
Expand Down Expand Up @@ -220,7 +220,7 @@ const OP_QUERY_KEYS = [

/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
function extractCommand(command: WriteProtocolMessageType): Document {
if (command instanceof Msg) {
if (command instanceof OpMsgRequest) {
return deepCopy(command.command);
}

Expand Down Expand Up @@ -283,7 +283,7 @@ function extractReply(command: WriteProtocolMessageType, reply?: Document) {
return reply;
}

if (command instanceof Msg) {
if (command instanceof OpMsgRequest) {
return deepCopy(reply.result ? reply.result : reply);
}

Expand Down
73 changes: 65 additions & 8 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import { MongoInvalidArgumentError, MongoRuntimeError } from '../error';
import { ReadPreference } from '../read_preference';
import type { ClientSession } from '../sessions';
import type { CommandOptions } from './connection';
import { OP_MSG, OP_QUERY } from './wire_protocol/constants';
import {
compress,
Compressor,
type CompressorName,
uncompressibleCommands
} from './wire_protocol/compression';
import { OP_COMPRESSED, OP_MSG, OP_QUERY } from './wire_protocol/constants';

// Incrementing request id
let _requestId = 0;
Expand All @@ -25,7 +31,7 @@ const SHARD_CONFIG_STALE = 4;
const AWAIT_CAPABLE = 8;

/** @internal */
export type WriteProtocolMessageType = Query | Msg;
export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest;

/** @internal */
export interface OpQueryOptions extends CommandOptions {
Expand All @@ -52,7 +58,7 @@ export interface OpQueryOptions extends CommandOptions {
* QUERY
**************************************************************/
/** @internal */
export class Query {
export class OpQueryRequest {
ns: string;
numberToSkip: number;
numberToReturn: number;
Expand Down Expand Up @@ -96,7 +102,7 @@ export class Query {
this.numberToSkip = options.numberToSkip || 0;
this.numberToReturn = options.numberToReturn || 0;
this.returnFieldSelector = options.returnFieldSelector || undefined;
this.requestId = Query.getRequestId();
this.requestId = options.requestId ?? OpQueryRequest.getRequestId();

// special case for pre-3.2 find commands, delete ASAP
this.pre32Limit = options.pre32Limit;
Expand Down Expand Up @@ -285,7 +291,7 @@ export interface OpResponseOptions extends BSONSerializeOptions {
}

/** @internal */
export class Response {
export class OpQueryResponse {
parsed: boolean;
raw: Buffer;
data: Buffer;
Expand Down Expand Up @@ -472,7 +478,7 @@ export interface OpMsgOptions {
}

/** @internal */
export class Msg {
export class OpMsgRequest {
requestId: number;
serializeFunctions: boolean;
ignoreUndefined: boolean;
Expand Down Expand Up @@ -502,7 +508,7 @@ export class Msg {
this.options = options ?? {};

// Additional options
this.requestId = options.requestId ? options.requestId : Msg.getRequestId();
this.requestId = options.requestId ? options.requestId : OpMsgRequest.getRequestId();

// Serialization option
this.serializeFunctions =
Expand Down Expand Up @@ -580,7 +586,7 @@ export class Msg {
}

/** @internal */
export class BinMsg {
export class OpMsgResponse {
parsed: boolean;
raw: Buffer;
data: Buffer;
Expand Down Expand Up @@ -709,3 +715,54 @@ export class BinMsg {
return { utf8: { writeErrors: false } };
}
}

const MESSAGE_HEADER_SIZE = 16;
const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID

/**
* @internal
*
* An OP_COMPRESSED request wraps either an OP_QUERY or OP_MSG message.
*/
export class OpCompressedRequest {
constructor(
private command: WriteProtocolMessageType,
private options: { zlibCompressionLevel: number; agreedCompressor: CompressorName }
) {}

// Return whether a command contains an uncompressible command term
// Will return true if command contains no uncompressible command terms
static canCompress(command: WriteProtocolMessageType) {
const commandDoc = command instanceof OpMsgRequest ? command.command : command.query;
const commandName = Object.keys(commandDoc)[0];
return !uncompressibleCommands.has(commandName);
}

async toBin(): Promise<Buffer[]> {
const concatenatedOriginalCommandBuffer = Buffer.concat(this.command.toBin());
// otherwise, compress the message
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);

// Extract information needed for OP_COMPRESSED from the uncompressed message
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);

// Compress the message body
const compressedMessage = await compress(this.options, messageToBeCompressed);
// Create the msgHeader of OP_COMPRESSED
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
msgHeader.writeInt32LE(
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
0
); // messageLength
msgHeader.writeInt32LE(this.command.requestId, 4); // requestID
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode

// Create the compression details of OP_COMPRESSED
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
compressionDetails.writeUInt8(Compressor[this.options.agreedCompressor], 8); // compressorID
return [msgHeader, compressionDetails, compressedMessage];
}
}
Loading

0 comments on commit b602162

Please sign in to comment.