Skip to content

Commit

Permalink
Revert back PR #21785. Process ops by batches in remoteMessageProcess…
Browse files Browse the repository at this point in the history
…or and pendingStateManager (#22509)

Reverting back #21785
given discoveries described here
[AB#15212](https://dev.azure.com/fluidframework/235294da-091d-4c29-84fc-cdfc3d90890b/_workitems/edit/15212)
  • Loading branch information
dannimad authored Sep 16, 2024
1 parent 41e63a1 commit 1571e0c
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 103 deletions.
77 changes: 42 additions & 35 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ export const makeLegacySendBatchFn =
type MessageWithContext = {
local: boolean;
savedOp?: boolean;
localOpMetadata?: unknown;
batchStartCsn: number;
} & (
| {
message: InboundSequencedContainerRuntimeMessage;
Expand Down Expand Up @@ -2634,38 +2634,34 @@ export class ContainerRuntime
const messageCopy = { ...messageArg };
// We expect runtime messages to have JSON contents - deserialize it in place.
ensureContentsDeserialized(messageCopy, modernRuntimeMessage, logLegacyCase);
if (modernRuntimeMessage) {
const processResult = this.remoteMessageProcessor.process(messageCopy, logLegacyCase);
if (processResult === undefined) {
// This means the incoming message is an incomplete part of a message or batch
// and we need to process more messages before the rest of the system can understand it.
return;
}
const batchStartCsn = processResult.batchStartCsn;
const batch = processResult.messages;
const messages: {
message: InboundSequencedContainerRuntimeMessage;
localOpMetadata: unknown;
}[] = local
? this.pendingStateManager.processPendingLocalBatch(batch, batchStartCsn)
: batch.map((message) => ({ message, localOpMetadata: undefined }));
messages.forEach(({ message, localOpMetadata }) => {
const msg: MessageWithContext = {
message,
local,
modernRuntimeMessage,
savedOp,
localOpMetadata,
};
this.ensureNoDataModelChanges(() => this.processCore(msg));
});
} else {
const msg: MessageWithContext = {
message: messageCopy as InboundSequencedContainerRuntimeMessageOrSystemMessage,
local,
modernRuntimeMessage,
savedOp,
};
const processResult = this.remoteMessageProcessor.process(messageCopy);
if (processResult === undefined) {
// This means the incoming message is an incomplete part of a message or batch
// and we need to process more messages before the rest of the system can understand it.
return;
}
for (const message of processResult.messages) {
const msg: MessageWithContext = modernRuntimeMessage
? {
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
// There is nothing really ensuring that anytime original message.type is Operation that
// the result messages will be so. In the end modern bool being true only directs to
// throw error if ultimately unrecognized without compat details saying otherwise.
message: message as InboundSequencedContainerRuntimeMessage,
local,
modernRuntimeMessage,
batchStartCsn: processResult.batchStartCsn,
}
: // Unrecognized message will be ignored.
{
message,
local,
modernRuntimeMessage,
batchStartCsn: processResult.batchStartCsn,
};
msg.savedOp = savedOp;

// ensure that we observe any re-entrancy, and if needed, rebase ops
this.ensureNoDataModelChanges(() => this.processCore(msg));
}
}
Expand All @@ -2676,7 +2672,7 @@ export class ContainerRuntime
* Direct the message to the correct subsystem for processing, and implement other side effects
*/
private processCore(messageWithContext: MessageWithContext) {
const { message, local, localOpMetadata } = messageWithContext;
const { message, local } = messageWithContext;

// Intercept to reduce minimum sequence number to the delta manager's minimum sequence number.
// Sequence numbers are not guaranteed to follow any sort of order. Re-entrancy is one of those situations
Expand All @@ -2696,12 +2692,23 @@ export class ContainerRuntime
this._processedClientSequenceNumber = message.clientSequenceNumber;

try {
// RemoteMessageProcessor would have already reconstituted Chunked Ops into the original op type
// See commit that added this assert for more details.
// These calls should be made for all but chunked ops:
// 1) this.pendingStateManager.processPendingLocalMessage() below
// 2) this.resetReconnectCount() below
assert(
message.type !== ContainerMessageType.ChunkedOp,
0x93b /* we should never get here with chunked ops */,
);

let localOpMetadata: unknown;
if (local && messageWithContext.modernRuntimeMessage) {
localOpMetadata = this.pendingStateManager.processPendingLocalMessage(
messageWithContext.message,
messageWithContext.batchStartCsn,
);
}

// If there are no more pending messages after processing a local message,
// the document is no longer dirty.
if (!this.hasPendingMessages()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ContainerMessageType,
type InboundContainerRuntimeMessage,
type InboundSequencedContainerRuntimeMessage,
type InboundSequencedContainerRuntimeMessageOrSystemMessage,
type InboundSequencedRecentlyAddedContainerRuntimeMessage,
} from "../messageTypes.js";
import { asBatchMetadata } from "../metadata.js";
Expand All @@ -35,7 +36,6 @@ export class RemoteMessageProcessor {
* @remarks For chunked batches, this is the CSN of the "representative" chunk (the final chunk)
*/
private batchStartCsn: number | undefined;
private readonly processorBatch: InboundSequencedContainerRuntimeMessage[] = [];

constructor(
private readonly opSplitter: OpSplitter,
Expand All @@ -52,7 +52,7 @@ export class RemoteMessageProcessor {
}

/**
* Ungroups and Unchunks the runtime ops of a batch received over the wire
* Ungroups and Unchunks the runtime ops encapsulated by the single remoteMessage received over the wire
* @param remoteMessageCopy - A shallow copy of a message from another client, possibly virtualized
* (grouped, compressed, and/or chunked).
* Being a shallow copy, it's considered mutable, meaning no other Container or other parallel procedure
Expand All @@ -67,15 +67,13 @@ export class RemoteMessageProcessor {
* 3. If grouped, ungroup the message
* For more details, see https://github.com/microsoft/FluidFramework/blob/main/packages/runtime/container-runtime/src/opLifecycle/README.md#inbound
*
* @returns all the unchunked, decompressed, ungrouped, unpacked InboundSequencedContainerRuntimeMessage from a single batch
* or undefined if the batch is not yet complete.
* @returns the unchunked, decompressed, ungrouped, unpacked SequencedContainerRuntimeMessages encapsulated in the remote message.
* For ops that weren't virtualized (e.g. System ops that the ContainerRuntime will ultimately ignore),
* a singleton array [remoteMessageCopy] is returned
*/
public process(
remoteMessageCopy: ISequencedDocumentMessage,
logLegacyCase: (codePath: string) => void,
):
public process(remoteMessageCopy: ISequencedDocumentMessage):
| {
messages: InboundSequencedContainerRuntimeMessage[];
messages: InboundSequencedContainerRuntimeMessageOrSystemMessage[];
batchStartCsn: number;
}
| undefined {
Expand Down Expand Up @@ -109,10 +107,6 @@ export class RemoteMessageProcessor {
this.batchStartCsn === undefined,
0x9d3 /* Grouped batch interrupting another batch */,
);
assert(
this.processorBatch.length === 0,
0x9d4 /* Processor batch should be empty on grouped batch */,
);
return {
messages: this.opGroupingManager.ungroupOp(message).map(unpack),
batchStartCsn: message.clientSequenceNumber,
Expand All @@ -122,21 +116,9 @@ export class RemoteMessageProcessor {
const batchStartCsn = this.getAndUpdateBatchStartCsn(message);

// Do a final unpack of runtime messages in case the message was not grouped, compressed, or chunked
unpackRuntimeMessage(message, logLegacyCase);
this.processorBatch.push(message as InboundSequencedContainerRuntimeMessage);

// this.batchStartCsn is undefined only if we have processed all messages in the batch.
// If it's still defined, we're still in the middle of a batch, so we return nothing, letting
// containerRuntime know that we're waiting for more messages to complete the batch.
if (this.batchStartCsn !== undefined) {
// batch not yet complete
return undefined;
}

const messages = [...this.processorBatch];
this.processorBatch.length = 0;
unpackRuntimeMessage(message);
return {
messages,
messages: [message as InboundSequencedContainerRuntimeMessageOrSystemMessage],
batchStartCsn,
};
}
Expand Down
23 changes: 2 additions & 21 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
import Deque from "double-ended-queue";
import { v4 as uuid } from "uuid";

import { type InboundSequencedContainerRuntimeMessage } from "./messageTypes.js";
import { InboundSequencedContainerRuntimeMessage } from "./messageTypes.js";
import { asBatchMetadata, IBatchMetadata } from "./metadata.js";
import { BatchId, BatchMessage, generateBatchId } from "./opLifecycle/index.js";
import { pkgVersion } from "./packageVersion.js";
Expand Down Expand Up @@ -293,33 +293,14 @@ export class PendingStateManager implements IDisposable {
}
}

/**
* Processes the incoming batch from the server. It verifies that messages are received in the right order and
* that the batch information is correct.
* @param batch - The batch that is being processed.
* @param batchStartCsn - The clientSequenceNumber of the start of this message's batch
*/
public processPendingLocalBatch(
batch: InboundSequencedContainerRuntimeMessage[],
batchStartCsn: number,
): {
message: InboundSequencedContainerRuntimeMessage;
localOpMetadata: unknown;
}[] {
return batch.map((message) => ({
message,
localOpMetadata: this.processPendingLocalMessage(message, batchStartCsn),
}));
}

/**
* Processes a local message once its ack'd by the server. It verifies that there was no data corruption and that
* the batch information was preserved for batch messages.
* @param message - The message that got ack'd and needs to be processed.
* @param batchStartCsn - The clientSequenceNumber of the start of this message's batch (assigned during submit)
* (not to be confused with message.clientSequenceNumber - the overwritten value in case of grouped batching)
*/
private processPendingLocalMessage(
public processPendingLocalMessage(
message: InboundSequencedContainerRuntimeMessage,
batchStartCsn: number,
): unknown {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,11 +743,8 @@ describe("Runtime", () => {
processMessage: (_message: ISequencedDocumentMessage, _local: boolean) => {
return { localAck: false, localOpMetadata: undefined };
},
processPendingLocalBatch: (_messages: ISequencedDocumentMessage[]) => {
return _messages.map((message) => ({
message,
localOpMetadata: undefined,
}));
processPendingLocalMessage: (_message: ISequencedDocumentMessage) => {
return undefined;
},
get pendingMessagesCount() {
return pendingMessages;
Expand Down Expand Up @@ -924,7 +921,6 @@ describe("Runtime", () => {
contents: {
address: "address",
},
clientSequenceNumber: 0,
} as any as ISequencedDocumentMessage,
true /* local */,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ describe("RemoteMessageProcessor", () => {
const actual: ISequencedDocumentMessage[] = [];
let seqNum = 1;
let actualBatchStartCsn: number | undefined;
let emptyProcessResultCount = 0;
for (const message of outboundMessages) {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const inboundMessage = {
Expand All @@ -174,10 +175,11 @@ describe("RemoteMessageProcessor", () => {
} as ISequencedDocumentMessage;

ensureContentsDeserialized(inboundMessage, true, () => {});
const processResult = messageProcessor.process(inboundMessage, () => {});
const processResult = messageProcessor.process(inboundMessage);

// It'll be undefined for the first n-1 chunks if chunking is enabled
if (processResult === undefined) {
++emptyProcessResultCount;
continue;
}

Expand All @@ -192,7 +194,11 @@ describe("RemoteMessageProcessor", () => {
);
}
}

assert.equal(
emptyProcessResultCount,
leadingChunkCount,
"expected empty result to be 1-1 with leading chunks",
);
const expected = option.grouping
? [
getProcessedMessage("a", startSeqNum, 1, true),
Expand Down Expand Up @@ -228,7 +234,7 @@ describe("RemoteMessageProcessor", () => {
};
const documentMessage = message as ISequencedDocumentMessage;
ensureContentsDeserialized(documentMessage, true, () => {});
const processResult = messageProcessor.process(documentMessage, () => {})?.messages ?? [];
const processResult = messageProcessor.process(documentMessage)?.messages ?? [];

assert.strictEqual(processResult.length, 1, "only expected a single processed message");
const result = processResult[0];
Expand All @@ -246,7 +252,7 @@ describe("RemoteMessageProcessor", () => {
metadata: { meta: "data" },
};
const documentMessage = message as ISequencedDocumentMessage;
const processResult = messageProcessor.process(documentMessage, () => {})?.messages ?? [];
const processResult = messageProcessor.process(documentMessage)?.messages ?? [];

assert.strictEqual(processResult.length, 1, "only expected a single processed message");
const result = processResult[0];
Expand Down Expand Up @@ -283,10 +289,7 @@ describe("RemoteMessageProcessor", () => {
},
};
const messageProcessor = getMessageProcessor();
const result = messageProcessor.process(
groupedBatch as ISequencedDocumentMessage,
() => {},
);
const result = messageProcessor.process(groupedBatch as ISequencedDocumentMessage);

const expected = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ describe("Pending State Manager", () => {
};

const process = (messages: Partial<ISequencedDocumentMessage>[], batchStartCsn: number) =>
pendingStateManager.processPendingLocalBatch(
messages as InboundSequencedContainerRuntimeMessage[],
batchStartCsn,
);
messages.forEach((message) => {
pendingStateManager.processPendingLocalMessage(
message as InboundSequencedContainerRuntimeMessage,
batchStartCsn,
);
});

it("proper batch is processed correctly", () => {
const messages: Partial<ISequencedDocumentMessage>[] = [
Expand Down Expand Up @@ -420,8 +422,8 @@ describe("Pending State Manager", () => {
],
1,
);
pendingStateManager.processPendingLocalBatch(
[futureRuntimeMessage as ISequencedDocumentMessage & UnknownContainerRuntimeMessage],
pendingStateManager.processPendingLocalMessage(
futureRuntimeMessage as ISequencedDocumentMessage & UnknownContainerRuntimeMessage,
1 /* batchStartCsn */,
);
});
Expand Down

0 comments on commit 1571e0c

Please sign in to comment.