diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 1cc0a7786444..09b54fd094ef 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -129,7 +129,7 @@ import { metadataBlobName, wrapSummaryInChannelsTree, } from "./summaryFormat"; -import { SummaryCollection, SummaryCollectionOpActions } from "./summaryCollection"; +import { SummaryCollection } from "./summaryCollection"; import { getLocalStorageFeatureGate } from "./localStorageFeatureGates"; export enum ContainerMessageType { @@ -899,32 +899,25 @@ export class ContainerRuntime extends TypedEventEmitter this.emit("codeDetailsProposed", proposal.value, proposal); } }); - const defaultAction = (op: ISequencedDocumentMessage,sc: SummaryCollection)=> { - if(sc.opsSinceLastAck > (this.runtimeOptions.summaryOptions.maxOpsSinceLastSummary ?? 3000)) { + + this.summaryCollection = new SummaryCollection(this.deltaManager, this.logger); + const maxOpsSinceLastSummary = this.runtimeOptions.summaryOptions.maxOpsSinceLastSummary ?? 3000; + const defaultAction = () => { + if (this.summaryCollection.opsSinceLastAck > maxOpsSinceLastSummary) { this.logger.sendErrorEvent({eventName: "SummaryStatus:Behind"}); // unregister default to no log on every op after falling behind // and register summary ack handler to re-register this handler // after successful summary - opActions.default = undefined; - opActions.summaryAck = summaryAckAction; + this.summaryCollection.once(MessageType.SummaryAck, () => { + this.logger.sendTelemetryEvent({eventName: "SummaryStatus:CaughtUp"}); + // we've caught up, so re-register the default action to monitor for + // falling behind, and unregister ourself + this.summaryCollection.on("default", defaultAction); + }); + this.summaryCollection.off("default", defaultAction); } }; - const summaryAckAction = (op: ISequencedDocumentMessage,sc: SummaryCollection)=> { - this.logger.sendTelemetryEvent({eventName: "SummaryStatus:CaughtUp"}); - // we've caught up, so re-register the default action to monitor for - // falling behind, and unregister ourself - opActions.default = defaultAction; - opActions.summaryAck = undefined; - }; - const opActions: SummaryCollectionOpActions = { - default: defaultAction, - }; - - this.summaryCollection = new SummaryCollection( - this.deltaManager, - this.logger, - opActions, - ); + this.summaryCollection.on("default", defaultAction); // We always create the summarizer in the case that we are asked to generate summaries. But this may // want to be on demand instead. @@ -941,8 +934,10 @@ export class ContainerRuntime extends TypedEventEmitter // Create the SummaryManager and mark the initial state this.summaryManager = new SummaryManager( context, + this.summaryCollection, this.runtimeOptions.summaryOptions.generateSummaries !== false, this.logger, + maxOpsSinceLastSummary, this.runtimeOptions.summaryOptions.initialSummarizerDelayMs); if (this.connected) { diff --git a/packages/runtime/container-runtime/src/orderedClientElection.ts b/packages/runtime/container-runtime/src/orderedClientElection.ts new file mode 100644 index 000000000000..64455cabd2d9 --- /dev/null +++ b/packages/runtime/container-runtime/src/orderedClientElection.ts @@ -0,0 +1,263 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { IEvent } from "@fluidframework/common-definitions"; +import { assert, TypedEventEmitter } from "@fluidframework/common-utils"; +import { IQuorum, ISequencedClient } from "@fluidframework/protocol-definitions"; + +export const summarizerClientType = "summarizer"; + +/** Minimum information for a client tracked for election consideration. */ +export interface ITrackedClient { + readonly clientId: string; + readonly sequenceNumber: number; +} + +/** Additional information required for internal tracking of ineligible clients. */ +interface IIneligibleClient extends ITrackedClient { + readonly eligible: false; + readonly isSummarizer: boolean; +} + +/** Additional information required to keep track of the client within the doubly-linked list. */ +interface IEligibleClient extends ITrackedClient { + readonly eligible: true; + olderClient: IEligibleClient; + youngerClient: IEligibleClient | undefined; +} + +type TrackedClient = IIneligibleClient | IEligibleClient; + +interface IOrderedClientElectionEvents extends IEvent { + /** Event fires when the currently elected client changes. */ + (event: "electedChange", listener: (client: ITrackedClient | undefined) => void); + /** Event fires when the number of summarizers changes. */ + (event: "summarizerChange", listener: (summarizerCount: number) => void); +} + +/** + * Tracks clients in the Quorum. It maintains their order using their join op + * sequence numbers. The purpose is to deterministically maintain a currently + * elected client, excluding non-interactive clients, in a distributed fashion. + * This can be true as long as incrementElectedClient and resetElectedClient calls + * are called at the same time for all clients. + * Internally, the collection of eligible (interactive, non-summarizer) clients is + * maintained in a doubly-linked list, with pointers to both the first and last nodes. + * The first node is a placeholder to simplify logic. + */ +export class OrderedClientElection extends TypedEventEmitter { + /** Collection of ALL clients currently in the quorum, with client ids as keys. */ + private readonly clientMap = new Map(); + /** Placeholder head node of linked list, for simplified null checking. */ + private readonly rootClient: IEligibleClient = { + sequenceNumber: -1, + clientId: "", + eligible: true, + get olderClient(): IEligibleClient { + throw Error("Root client in OrderedClientElection should not have olderClient getter called."); + }, + set olderClient(_: IEligibleClient) { + throw Error("Root client in OrderedClientElection should not have olderClient setter called."); + }, + youngerClient: undefined, + }; + /** Pointer to end of linked list, for optimized client adds. */ + private youngestClient: IEligibleClient = this.rootClient; + /** Count of clients eligible for election. */ + private eligibleCount = 0; + /** Count of summarizer clients. */ + private summarizerCount = 0; + /** Currently elected client (within linked list). */ + private electedClient: IEligibleClient | undefined; + + constructor(quorum: Pick) { + super(); + const members = quorum.getMembers(); + for (const [clientId, client] of members) { + this.addClient(clientId, client); + } + + quorum.on("addMember", this.addClient); + quorum.on("removeMember", this.removeClient); + + this.resetElectedClient(); + } + + private insertEligibleClient(clientId: string, sequenceNumber: number): IEligibleClient { + // Normal case is adding the latest client, which will bypass loop. + // Find where it belongs otherwise (this shouldn't happen, assert?). + assert(sequenceNumber > -1, "Negative client sequence number not allowed"); + let currClient = this.youngestClient; + while (currClient.sequenceNumber > sequenceNumber) { + assert(currClient.olderClient !== undefined, "Previous client should always be defined"); + // what to do if currClient === this.currentClient + currClient = currClient.olderClient; + } + + // Now currClient is the node right before where the new client node should be. + const newClient: IEligibleClient = { + clientId, + sequenceNumber, + eligible: true, + olderClient: currClient, + youngerClient: currClient.youngerClient, + }; + + // Update prev node to point to this new node. + newClient.olderClient.youngerClient = newClient; + + if (newClient.youngerClient === undefined) { + // Update linked list end pointer to youngest client. + this.youngestClient = newClient; + } else { + // Update next node to point back to this new node. + newClient.youngerClient.olderClient = newClient; + } + + this.eligibleCount++; + return newClient; + } + + private deleteEligibleClient(removeClient: Readonly) { + // Update prev node to point to next node. + removeClient.olderClient.youngerClient = removeClient.youngerClient; + + if (removeClient.youngerClient === undefined) { + // Update linked list end pointer to youngest client. + this.youngestClient = removeClient.olderClient; + } else { + // Update next node to point back to previous node. + removeClient.youngerClient.olderClient = removeClient.olderClient; + } + + this.eligibleCount--; + } + + private readonly addClient = (clientId: string, client: ISequencedClient) => { + const isSummarizer = client.client.details?.type === summarizerClientType; + const eligible = !isSummarizer && (client.client.details?.capabilities.interactive ?? true); + const newClient: TrackedClient = eligible ? this.insertEligibleClient(clientId, client.sequenceNumber) : { + clientId, + sequenceNumber: client.sequenceNumber, + eligible, + isSummarizer, + }; + this.clientMap.set(clientId, newClient); + + // Emit change events if necessary + if (newClient.eligible) { + if (this.electedClient === undefined && newClient.youngerClient === undefined) { + this.electedClient = newClient; + this.emit("electedChange", this.getElectedClient()); + } + } else { + if (newClient.isSummarizer) { + this.summarizerCount++; + this.emit("summarizerChange", this.summarizerCount); + } + } + }; + + private readonly removeClient = (clientId: string) => { + const removeClient = this.clientMap.get(clientId); + if (removeClient !== undefined) { + this.clientMap.delete(clientId); + if (!removeClient.eligible) { + if (removeClient.isSummarizer) { + this.summarizerCount--; + this.emit("summarizerChange", this.summarizerCount); + } + return; + } + + this.deleteEligibleClient(removeClient); + if (removeClient === this.electedClient) { + this.electedClient = this.electedClient.youngerClient; + this.emit("electedChange", this.getElectedClient()); + } + } + }; + + /** Returns the currently elected client. */ + public getElectedClient(): ITrackedClient | undefined { + return this.electedClient; + } + + /** Resets the currently elected client back to its original value: the oldest eligible client. */ + public resetElectedClient(): void { + const prevId = this.electedClient?.clientId; + this.electedClient = this.rootClient.youngerClient; + if (prevId !== this.electedClient?.clientId) { + this.emit("electedChange", this.getElectedClient()); + } + } + + /** Increments the currently elected client to the next oldest eligible client. */ + public incrementElectedClient(): void { + const prevId = this.electedClient?.clientId; + this.electedClient = this.electedClient?.youngerClient; + if (prevId !== this.electedClient?.clientId) { + this.emit("electedChange", this.getElectedClient()); + } + } + + /** Returns the count of eligible clients tracked. Eligible clients must be interactive, non-summarizers. */ + public getEligibleCount(): number { + return this.eligibleCount; + } + + /** Returns the count of summarizer clients tracked. */ + public getSummarizerCount(): number { + return this.summarizerCount; + } + + /** Returns the total count of clients tracked. */ + public getTotalCount(): number { + return this.clientMap.size; + } + + /** Returns an array of all eligible client ids being tracked in order from oldest to newest. */ + public getOrderedEligibleClientIds(): string[] { + const result: string[] = []; + let currClient = this.rootClient; + while (currClient.youngerClient !== undefined) { + result.push(currClient.youngerClient.clientId); + currClient = currClient.youngerClient; + } + return result; + } +} + +/** + * Used to give increasing delay times for throttling a single functionality. + * Delay is based on previous attempts within specified time window, ignoring actual delay time. + */ + export class Throttler { + private startTimes: number[] = []; + constructor( + private readonly delayWindowMs: number, + private readonly maxDelayMs: number, + private readonly delayFunction: (n: number) => number, + ) { } + + public get attempts() { + return this.startTimes.length; + } + + public getDelay() { + const now = Date.now(); + this.startTimes = this.startTimes.filter((t) => now - t < this.delayWindowMs); + const delayMs = Math.min(this.delayFunction(this.startTimes.length), this.maxDelayMs); + this.startTimes.push(now); + this.startTimes = this.startTimes.map((t) => t + delayMs); // account for delay time + if (delayMs === this.maxDelayMs) { + // we hit max delay so adding more won't affect anything + // shift off oldest time to stop this array from growing forever + this.startTimes.shift(); + } + + return delayMs; + } +} diff --git a/packages/runtime/container-runtime/src/summaryCollection.ts b/packages/runtime/container-runtime/src/summaryCollection.ts index 859668c7cea3..8f09068e563e 100644 --- a/packages/runtime/container-runtime/src/summaryCollection.ts +++ b/packages/runtime/container-runtime/src/summaryCollection.ts @@ -3,8 +3,8 @@ * Licensed under the MIT License. */ -import { IDisposable, ITelemetryLogger } from "@fluidframework/common-definitions"; -import { Deferred, assert } from "@fluidframework/common-utils"; +import { IDisposable, IEvent, ITelemetryLogger } from "@fluidframework/common-definitions"; +import { Deferred, assert, TypedEventEmitter } from "@fluidframework/common-utils"; import { IDeltaManager } from "@fluidframework/container-definitions"; import { IDocumentMessage, @@ -192,17 +192,18 @@ class ClientSummaryWatcher implements IClientSummaryWatcher { } } -export type SummaryCollectionOpActions = - Partial void - >>; +export type OpActionEventName = MessageType.Summarize | MessageType.SummaryAck | MessageType.SummaryNack | "default"; +export type OpActionEventListener = (op: ISequencedDocumentMessage) => void; +export interface ISummaryCollectionOpEvents extends IEvent { + (event: OpActionEventName, listener: OpActionEventListener); +} + /** * Data structure that looks at the op stream to track summaries as they * are broadcast, acked and nacked. * It provides functionality for watching specific summaries. */ -export class SummaryCollection { +export class SummaryCollection extends TypedEventEmitter { // key: clientId private readonly summaryWatchers = new Map(); // key: summarySeqNum @@ -216,6 +217,10 @@ export class SummaryCollection { public get latestAck(): IAckedSummary | undefined { return this.lastAck; } + public emit(event: OpActionEventName, ...args: Parameters): boolean { + return super.emit(event, ...args); + } + public get opsSinceLastAck() { return this.deltaManager.lastSequenceNumber - (this.lastAck?.summaryAck.sequenceNumber ?? this.deltaManager.initialSequenceNumber); @@ -224,8 +229,8 @@ export class SummaryCollection { public constructor( private readonly deltaManager: IDeltaManager, private readonly logger: ITelemetryLogger, - private readonly opActions: SummaryCollectionOpActions, ) { + super(); this.deltaManager.on( "op", (op) => this.handleOp(op)); @@ -311,7 +316,7 @@ export class SummaryCollection { ) { this.pendingAckTimerTimeoutCallback?.(); } - this.opActions.default?.(op, this); + this.emit("default", op); return; } @@ -339,7 +344,7 @@ export class SummaryCollection { } this.pendingSummaries.set(op.sequenceNumber, summary); this.lastSummaryTimestamp = op.timestamp; - this.opActions.summarize?.(op, this); + this.emit(MessageType.Summarize, op); } private handleSummaryAck(op: ISummaryAckMessage) { @@ -377,7 +382,7 @@ export class SummaryCollection { }; this.refreshWaitNextAck.resolve(); this.refreshWaitNextAck = new Deferred(); - this.opActions.summaryAck?.(op, this); + this.emit(MessageType.SummaryAck, op); } } @@ -387,7 +392,7 @@ export class SummaryCollection { if (summary) { summary.ackNack(op); this.pendingSummaries.delete(seq); - this.opActions.summaryNack?.(op, this); + this.emit(MessageType.SummaryNack, op); } } } diff --git a/packages/runtime/container-runtime/src/summaryManager.ts b/packages/runtime/container-runtime/src/summaryManager.ts index c4faa972f8ac..46e952de693b 100644 --- a/packages/runtime/container-runtime/src/summaryManager.ts +++ b/packages/runtime/container-runtime/src/summaryManager.ts @@ -6,16 +6,11 @@ import { EventEmitter } from "events"; import { IDisposable, - IEvent, ITelemetryLogger, } from "@fluidframework/common-definitions"; import { - Heap, - IComparer, - IHeapNode, IPromiseTimerResult, PromiseTimer, - TypedEventEmitter, } from "@fluidframework/common-utils"; import { ChildLogger, PerformanceEvent } from "@fluidframework/telemetry-utils"; import { IFluidObject, IRequest } from "@fluidframework/core-interfaces"; @@ -23,81 +18,19 @@ import { IContainerContext, LoaderHeader, } from "@fluidframework/container-definitions"; -import { IQuorum, ISequencedClient } from "@fluidframework/protocol-definitions"; +import { ISequencedClient, MessageType } from "@fluidframework/protocol-definitions"; import { DriverHeader } from "@fluidframework/driver-definitions"; import { ISummarizer, createSummarizingWarning, ISummarizingWarning } from "./summarizer"; +import { SummaryCollection } from "./summaryCollection"; +import { ITrackedClient, OrderedClientElection, summarizerClientType, Throttler } from "./orderedClientElection"; -export const summarizerClientType = "summarizer"; - -interface ITrackedClient { - clientId: string; - sequenceNumber: number; - isSummarizer: boolean; -} - -class ClientComparer implements IComparer { - public readonly min: ITrackedClient = { - clientId: "", - sequenceNumber: -1, - isSummarizer: false, - }; - - public compare(a: ITrackedClient, b: ITrackedClient): number { - return a.sequenceNumber - b.sequenceNumber; - } -} - -interface IQuorumHeapEvents extends IEvent { - (event: "heapChange", listener: () => void); -} - -class QuorumHeap extends TypedEventEmitter { - private readonly heap = new Heap((new ClientComparer())); - private readonly heapMembers = new Map>(); - private summarizerCount = 0; - - constructor(quorum: IQuorum) { - super(); - const members = quorum.getMembers(); - for (const [clientId, client] of members) { - this.addClient(clientId, client); - } - - quorum.on("addMember", this.addClient); - quorum.on("removeMember", this.removeClient); - } - - private readonly addClient = (clientId: string, client: ISequencedClient) => { - // Have to undefined-check client.details for backwards compatibility - const isSummarizer = client.client.details?.type === summarizerClientType; - const heapNode = this.heap.add({ clientId, sequenceNumber: client.sequenceNumber, isSummarizer }); - this.heapMembers.set(clientId, heapNode); - if (isSummarizer) { - this.summarizerCount++; - } - this.emit("heapChange"); - }; - - private readonly removeClient = (clientId: string) => { - const member = this.heapMembers.get(clientId); - if (member) { - this.heap.remove(member); - this.heapMembers.delete(clientId); - if (member.value.isSummarizer) { - this.summarizerCount--; - } - this.emit("heapChange"); - } - }; - - public getFirstClientId(): string | undefined { - return this.heap.count() > 0 ? this.heap.peek().value.clientId : undefined; - } +const defaultInitialDelayMs = 5000; +const opsToBypassInitialDelay = 4000; - public getSummarizerCount(): number { - return this.summarizerCount; - } -} +const defaultThrottleDelayWindowMs = 60 * 1000; +const defaultThrottleMaxDelayMs = 30 * 1000; +// default throttling function increases exponentially (0ms, 20ms, 60ms, 140ms, etc) +const defaultThrottleDelayFunction = (n: number) => 20 * (Math.pow(2, n) - 1); enum SummaryManagerState { Off = 0, @@ -107,64 +40,20 @@ enum SummaryManagerState { Disabled = -1, } -const defaultInitialDelayMs = 5000; -const opsToBypassInitialDelay = 4000; - -// Please note that all reasons in this list are not errors, +// Please note that all reasons in this list are not errors, // and thus they are not raised today to parent container as error. // If this needs to be changed in future, we should re-evaluate what and how we raise to summarizer type StopReason = "parentNotConnected" | "parentShouldNotSummarize" | "disposed"; -type ShouldSummarizeState = { - shouldSummarize: true; - shouldStart: boolean; -} | { - shouldSummarize: false; - stopReason: StopReason; -}; - -const defaultThrottleDelayWindowMs = 60 * 1000; -const defaultThrottleMaxDelayMs = 30 * 1000; -// default throttling function increases exponentially (0ms, 20ms, 60ms, 140ms, etc) -const defaultThrottleDelayFunction = (n: number) => 20 * (Math.pow(2, n) - 1); - -/** - * Used to give increasing delay times for throttling a single functionality. - * Delay is based on previous attempts within specified time window, ignoring actual delay time. - */ -class Throttler { - private startTimes: number[] = []; - constructor( - private readonly delayWindowMs, - private readonly maxDelayMs, - private readonly delayFunction, - ) { } - - public get attempts() { - return this.startTimes.length; - } - - public getDelay() { - const now = Date.now(); - this.startTimes = this.startTimes.filter((t) => now - t < this.delayWindowMs); - const delayMs = Math.min(this.delayFunction(this.startTimes.length), this.maxDelayMs); - this.startTimes.push(now); - this.startTimes = this.startTimes.map((t) => t + delayMs); // account for delay time - if (delayMs === this.maxDelayMs) { - // we hit max delay so adding more won't affect anything - // shift off oldest time to stop this array from growing forever - this.startTimes.shift(); - } - - return delayMs; - } -} +type ShouldSummarizeState = + | { shouldSummarize: true; shouldStart: boolean; } + | { shouldSummarize: false; stopReason: StopReason; }; export class SummaryManager extends EventEmitter implements IDisposable { private readonly logger: ITelemetryLogger; - private readonly quorumHeap: QuorumHeap; + private readonly orderedClients: OrderedClientElection; private readonly initialDelayP: Promise; private readonly initialDelayTimer?: PromiseTimer; - private summarizerClientId?: string; + private electedClientId?: string; private clientId?: string; private latestClientId?: string; private connected = false; @@ -179,17 +68,24 @@ export class SummaryManager extends EventEmitter implements IDisposable { private opsUntilFirstConnect = -1; public get summarizer() { - return this.summarizerClientId; + return this.electedClientId; } public get disposed() { return this._disposed; } + /** Used to calculate number of ops since last summary ack for the current elected client */ + private lastSummaryAckSeqForClient = 0; + private hasSummarizersInQuorum: boolean; + private hasLoggedTelemetry = false; + constructor( private readonly context: IContainerContext, + private readonly summaryCollection: SummaryCollection, private readonly summariesEnabled: boolean, parentLogger: ITelemetryLogger, + private readonly maxOpsSinceLastSummary: number, initialDelayMs: number = defaultInitialDelayMs, ) { super(); @@ -204,14 +100,57 @@ export class SummaryManager extends EventEmitter implements IDisposable { this.setClientId(context.clientId); } - context.quorum.on("addMember", (clientId: string, details: ISequencedClient) => { + // Track ops until first (write) connect + const opsUntilFirstConnectHandler = (clientId: string, details: ISequencedClient) => { if (this.opsUntilFirstConnect === -1 && clientId === this.clientId) { + context.quorum.off("addMember", opsUntilFirstConnectHandler); this.opsUntilFirstConnect = details.sequenceNumber - this.context.deltaManager.initialSequenceNumber; } + }; + context.quorum.on("addMember", opsUntilFirstConnectHandler); + + this.summaryCollection.on("default", (op) => { + const opsSinceLastAckForClient = op.sequenceNumber - this.lastSummaryAckSeqForClient; + if ( + opsSinceLastAckForClient > this.maxOpsSinceLastSummary + && !this.hasLoggedTelemetry + && this.electedClientId !== undefined + ) { + // Limit telemetry to only next client? + this.logger.sendErrorEvent({ + eventName: "ElectedClientNotSummarizing", + thisClientId: this.clientId, + electedClientId: this.electedClientId, + sequenceNumber: op.sequenceNumber, + lastSummaryAckSeqForClient: this.lastSummaryAckSeqForClient, + }); + + // In future we will change the elected client. + // this.orderedClients.incrementCurrentClient(); + } + }); + this.summaryCollection.on(MessageType.SummaryAck, (op) => { + this.hasLoggedTelemetry = false; + this.lastSummaryAckSeqForClient = op.sequenceNumber; }); - this.quorumHeap = new QuorumHeap(context.quorum); - this.quorumHeap.on("heapChange", () => { this.refreshSummarizer(); }); + this.orderedClients = new OrderedClientElection(context.quorum); + this.orderedClients.on("summarizerChange", (summarizerCount) => { + const prev = this.hasSummarizersInQuorum; + this.hasSummarizersInQuorum = summarizerCount > 0; + if (prev !== this.hasSummarizersInQuorum) { + this.refreshSummarizer(); + } + }); + this.orderedClients.on("electedChange", (client: ITrackedClient | undefined) => { + this.hasLoggedTelemetry = false; + if (client !== undefined) { + // set to join seq + this.lastSummaryAckSeqForClient = client.sequenceNumber; + } + this.refreshSummarizer(); + }); + this.hasSummarizersInQuorum = this.orderedClients.getSummarizerCount() > 0; this.initialDelayTimer = new PromiseTimer(initialDelayMs, () => { }); this.initialDelayP = this.initialDelayTimer?.start() ?? Promise.resolve(); @@ -259,9 +198,11 @@ export class SummaryManager extends EventEmitter implements IDisposable { return { shouldSummarize: false, stopReason: "parentShouldNotSummarize" }; } else if (this.disposed) { return { shouldSummarize: false, stopReason: "disposed" }; - } else if (this.quorumHeap.getSummarizerCount() > 0) { + } else if (this.orderedClients.getSummarizerCount() > 0) { // Need to wait for any other existing summarizer clients to close, // because they can live longer than their parent container. + // TODO: We will need to remove this check when we allow elected summarizer + // to change, because they could get stuck in quorum. return { shouldSummarize: true, shouldStart: false }; } else { return { shouldSummarize: true, shouldStart: true }; @@ -270,9 +211,9 @@ export class SummaryManager extends EventEmitter implements IDisposable { private refreshSummarizer() { // Compute summarizer - const newSummarizerClientId = this.quorumHeap.getFirstClientId(); - if (newSummarizerClientId !== this.summarizerClientId) { - this.summarizerClientId = newSummarizerClientId; + const newSummarizerClientId = this.orderedClients.getElectedClient()?.clientId; + if (newSummarizerClientId !== this.electedClientId) { + this.electedClientId = newSummarizerClientId; this.emit("summarizer", newSummarizerClientId); } diff --git a/packages/runtime/container-runtime/src/test/orderedClientElection.spec.ts b/packages/runtime/container-runtime/src/test/orderedClientElection.spec.ts new file mode 100644 index 000000000000..4b5a224dfdb6 --- /dev/null +++ b/packages/runtime/container-runtime/src/test/orderedClientElection.spec.ts @@ -0,0 +1,388 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; +import { EventEmitter } from "events"; +import { IQuorum, ISequencedClient } from "@fluidframework/protocol-definitions"; +import { OrderedClientElection, summarizerClientType } from "../orderedClientElection"; + +describe("Ordered Client Election", () => { + const quorumMembers = new Map(); + const emitter = new EventEmitter(); + const mockQuorum: Pick = { + getMembers: () => quorumMembers, + on(event: string, handler: (...args: any[]) => void) { + emitter.on(event, handler); + return this as IQuorum; + }, + }; + let electedChangeEventCount = 0; + let summarizerChangeEventCount = 0; + + function addClient(clientId: string, sequenceNumber: number, isSummarizer = false) { + const details: ISequencedClient["client"]["details"] = { + capabilities: { interactive: !isSummarizer }, + type: isSummarizer ? summarizerClientType : "", + }; + const c: Partial = { details }; + const client: ISequencedClient = { client: c as ISequencedClient["client"], sequenceNumber }; + quorumMembers.set(clientId, client); + emitter.emit("addMember", clientId, client); + } + function removeClient(clientId: string) { + quorumMembers.delete(clientId); + emitter.emit("removeMember", clientId); + } + function createOrderedClients( + initialClients: [id: string, seq: number, sum: boolean][] = [], + ): OrderedClientElection { + for (const [id, seq, sum] of initialClients) { + addClient(id, seq, sum); + } + const orderedClients = new OrderedClientElection(mockQuorum); + orderedClients.on("electedChange", () => electedChangeEventCount++); + orderedClients.on("summarizerChange", () => summarizerChangeEventCount++); + return orderedClients; + } + function assertState( + orderedClients: OrderedClientElection, + eligibleCount: number, + summarizerCount: number, + electedClientId: string | undefined, + message = "", + ) { + const prefix = message ? `${message} - ` : ""; + // Assume no non-interactive, non-summarizer clients for these tests only. + const totalCount = eligibleCount + summarizerCount; + assert.strictEqual( + orderedClients.getEligibleCount(), eligibleCount, `${prefix}Invalid eligible count`); + assert.strictEqual( + orderedClients.getSummarizerCount(), summarizerCount, `${prefix}Invalid summarizer count`); + assert.strictEqual( + orderedClients.getTotalCount(), totalCount, `${prefix}Invalid total count`); + assert.strictEqual( + orderedClients.getElectedClient()?.clientId, electedClientId, `${prefix}Invalid elected client id`); + } + function assertEvents(expectedElectedChangeCount: number, expectedSummarizerChangeCount: number) { + assert.strictEqual( + electedChangeEventCount, expectedElectedChangeCount, "Unexpected electedChange event count"); + assert.strictEqual( + summarizerChangeEventCount, expectedSummarizerChangeCount, "Unexpected summarizerChange event count"); + } + function assertOrderedClientIds(orderedClients: OrderedClientElection, ...expectedIds: string[]) { + const orderedIds = orderedClients.getOrderedEligibleClientIds(); + assert.strictEqual(orderedIds.length, expectedIds.length, "Unexpected number of ordered client ids"); + for (let i = 0; i < orderedIds.length; i++) { + assert.strictEqual(orderedIds[i], expectedIds[i], `Unexpected ordered client id at index ${i}`); + } + } + + afterEach(() => { + quorumMembers.clear(); + electedChangeEventCount = 0; + summarizerChangeEventCount = 0; + emitter.removeAllListeners(); + }); + + describe("Initialize", () => { + it("Should initialize with empty quorum", () => { + const oc = createOrderedClients(); + assertState(oc, 0, 0, undefined); + assertOrderedClientIds(oc); + }); + + it("Should initialize with correct client counts and current client", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + assertState(oc, 3, 1, "a"); + assertOrderedClientIds(oc, "a", "b", "c"); + }); + }); + + describe("Add Client", () => { + it("Should add summarizer client without impacting nonSummarizer clients", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + addClient("n", 100, true); + assertState(oc, 3, 2, "a"); + assertEvents(0, 1); + assertOrderedClientIds(oc, "a", "b", "c"); + }); + + it("Should add summarizer client to empty quorum without impacting nonSummarizer clients", () => { + const oc = createOrderedClients(); + addClient("n", 100, true); + assertState(oc, 0, 1, undefined); + assertEvents(0, 1); + assertOrderedClientIds(oc); + }); + + it("Should add nonSummarizer client to empty quorum", () => { + const oc = createOrderedClients(); + addClient("n", 100); + assertState(oc, 1, 0, "n"); + assertEvents(1, 0); + assertOrderedClientIds(oc, "n"); + }); + + it("Should add nonSummarizer client to end", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + addClient("n", 100); + assertState(oc, 4, 1, "a"); + assertEvents(0, 0); + assertOrderedClientIds(oc, "a", "b", "c", "n"); + }); + + it("Should add nonSummarizer client to middle", () => { + // Questionable test, since this shouldn't really happen. + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + addClient("n", 3); + assertState(oc, 4, 1, "a"); + assertEvents(0, 0); + assertOrderedClientIds(oc, "a", "b", "n", "c"); + }); + + it("Should add nonSummarizer client to front", () => { + // Questionable test, since this shouldn't really happen. + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + addClient("n", 0); + assertState(oc, 4, 1, "a"); + assertEvents(0, 0); + assertOrderedClientIds(oc, "n", "a", "b", "c"); + }); + }); + + describe("Remove Client", () => { + it("Should do nothing when removing a client from empty quorum", () => { + const oc = createOrderedClients(); + removeClient("x"); + assertState(oc, 0, 0, undefined); + assertEvents(0, 0); + assertOrderedClientIds(oc); + }); + + it("Should do nothing when removing a client that doesn't exist", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + removeClient("x"); + assertState(oc, 3, 1, "a"); + assertEvents(0, 0); + assertOrderedClientIds(oc, "a", "b", "c"); + }); + + it("Should remove summarizer client", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + removeClient("s"); + assertState(oc, 3, 0, "a"); + assertEvents(0, 1); + assertOrderedClientIds(oc, "a", "b", "c"); + }); + + it("Should remove nonSummarizer client from end", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + removeClient("c"); + assertState(oc, 2, 1, "a"); + assertEvents(0, 0); + assertOrderedClientIds(oc, "a", "b"); + }); + + it("Should remove nonSummarizer client from middle", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + removeClient("b"); + assertState(oc, 2, 1, "a"); + assertEvents(0, 0); + assertOrderedClientIds(oc, "a", "c"); + }); + + it("Should remove nonSummarizer client from front", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + removeClient("a"); + assertState(oc, 2, 1, "b"); + assertEvents(1, 0); + assertOrderedClientIds(oc, "b", "c"); + }); + }); + + describe("Increment Current Client", () => { + it("Should do nothing in empty quorum", () => { + const oc = createOrderedClients(); + oc.incrementElectedClient(); + assertState(oc, 0, 0, undefined); + assertEvents(0, 0); + }); + + it("Should go to next client from first", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + oc.incrementElectedClient(); + assertState(oc, 3, 1, "b"); + assertEvents(1, 0); + }); + + it("Should go to next client from middle", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + assertState(oc, 3, 1, "c"); + assertEvents(2, 0); + }); + + it("Should go to undefined from last", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + assertState(oc, 3, 1, undefined); + assertEvents(3, 0); + }); + + it("Should stay unchanged from end", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + assertState(oc, 3, 1, undefined); + assertEvents(3, 0); + }); + + it("Should increment to new nodes", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + addClient("d", 100); + addClient("e", 101); + assertState(oc, 5, 1, "d"); + oc.incrementElectedClient(); + assertState(oc, 5, 1, "e"); + addClient("f", 200); + oc.incrementElectedClient(); + assertState(oc, 6, 1, "f"); + }); + }); + + describe("Reset Current Client", () => { + it("Should do nothing in empty quorum", () => { + const oc = createOrderedClients(); + oc.resetElectedClient(); + assertState(oc, 0, 0, undefined); + assertEvents(0, 0); + }); + + it("Should do nothing when already first", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + oc.resetElectedClient(); + assertState(oc, 3, 1, "a"); + assertEvents(0, 0); + }); + + it("Should reset to first when not first", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + oc.resetElectedClient(); + assertState(oc, 3, 1, "a"); + assertEvents(3, 0); + }); + + it("Should reset to first when undefined at end", () => { + const oc = createOrderedClients([ + ["a", 1, false], + ["b", 2, false], + ["s", 5, true], + ["c", 9, false], + ]); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + oc.incrementElectedClient(); + oc.resetElectedClient(); + assertState(oc, 3, 1, "a"); + assertEvents(4, 0); + }); + }); +}); diff --git a/packages/runtime/container-runtime/src/test/summarizer.spec.ts b/packages/runtime/container-runtime/src/test/summarizer.spec.ts index 60d4c556b3ec..d1fbe054132b 100644 --- a/packages/runtime/container-runtime/src/test/summarizer.spec.ts +++ b/packages/runtime/container-runtime/src/test/summarizer.spec.ts @@ -138,10 +138,7 @@ describe("Runtime", () => { lastRefSeq = 0; mockLogger = new MockLogger(); mockDeltaManager = new MockDeltaManager(); - summaryCollection = new SummaryCollection( - mockDeltaManager, - mockLogger, - {}); + summaryCollection = new SummaryCollection(mockDeltaManager, mockLogger); }); describe("Summary Schedule", () => { @@ -314,6 +311,7 @@ describe("Runtime", () => { "RunningSummarizer should still be starting since timestamp is within maxAckWaitTime"); // Emit next op after maxAckWaitTime + // clock.tick(summaryConfig.maxAckWaitTime + 1000); await emitNextOp(1, summaryTimestamp + summaryConfig.maxAckWaitTime); assert(mockLogger.matchEvents([ { eventName: "Running:MissingSummaryAckFoundByOps" }, diff --git a/packages/runtime/container-runtime/src/test/summaryCollection.spec.ts b/packages/runtime/container-runtime/src/test/summaryCollection.spec.ts index 854116ea58d3..32cb65cb5573 100644 --- a/packages/runtime/container-runtime/src/test/summaryCollection.spec.ts +++ b/packages/runtime/container-runtime/src/test/summaryCollection.spec.ts @@ -5,7 +5,8 @@ import { strict as assert } from "assert"; import { MockDeltaManager, MockLogger } from "@fluidframework/test-runtime-utils"; -import { MessageType } from "@fluidframework/protocol-definitions"; +import { IDocumentMessage, ISequencedDocumentMessage, MessageType } from "@fluidframework/protocol-definitions"; +import { IDeltaManager } from "@fluidframework/container-definitions"; import { ISummaryAckMessage, ISummaryNackMessage, ISummaryOpMessage, SummaryCollection } from "../summaryCollection"; const summaryOp: ISummaryOpMessage = { @@ -59,11 +60,7 @@ describe("Summary Collection", () => { describe("latestAck",()=>{ it("Ack with op",()=>{ const dm = new MockDeltaManager(); - const sc = new SummaryCollection( - dm, - new MockLogger(), - {}, - ); + const sc = new SummaryCollection(dm, new MockLogger()); assert.strictEqual(sc.latestAck, undefined, "last ack undefined"); dm.emit("op", summaryOp); dm.emit("op", summaryAck); @@ -80,11 +77,7 @@ describe("Summary Collection", () => { it("Ack without op",()=>{ const dm = new MockDeltaManager(); - const sc = new SummaryCollection( - dm, - new MockLogger(), - {}, - ); + const sc = new SummaryCollection(dm, new MockLogger()); assert.strictEqual(sc.latestAck, undefined, "last ack undefined"); dm.emit("op", summaryAck); assert.strictEqual(sc.latestAck, undefined, "last ack undefined"); @@ -92,11 +85,7 @@ describe("Summary Collection", () => { it("Nack with op",()=>{ const dm = new MockDeltaManager(); - const sc = new SummaryCollection( - dm, - new MockLogger(), - {}, - ); + const sc = new SummaryCollection(dm, new MockLogger()); assert.strictEqual(sc.latestAck, undefined, "last ack undefined"); dm.emit("op", summaryAck); dm.emit("op", summaryNack); @@ -108,11 +97,7 @@ describe("Summary Collection", () => { const dm = new MockDeltaManager(); dm.on("op", (op)=>{dm.lastSequenceNumber = op.sequenceNumber;}); - const sc = new SummaryCollection( - dm, - new MockLogger(), - {}, - ); + const sc = new SummaryCollection(dm, new MockLogger()); assert.strictEqual(sc.opsSinceLastAck, 0); dm.emit("op", summaryOp); assert.strictEqual(sc.opsSinceLastAck, summaryOp.sequenceNumber); @@ -123,11 +108,7 @@ describe("Summary Collection", () => { const dm = new MockDeltaManager(); dm.on("op", (op)=>{dm.lastSequenceNumber = op.sequenceNumber;}); - const sc = new SummaryCollection( - dm, - new MockLogger(), - {}, - ); + const sc = new SummaryCollection(dm, new MockLogger()); assert.strictEqual(sc.opsSinceLastAck, 0); dm.emit("op", summaryOp); assert.strictEqual(sc.opsSinceLastAck, summaryOp.sequenceNumber); @@ -138,11 +119,7 @@ describe("Summary Collection", () => { const dm = new MockDeltaManager(); dm.on("op", (op)=>{dm.lastSequenceNumber = op.sequenceNumber;}); - const sc = new SummaryCollection( - dm, - new MockLogger(), - {}, - ); + const sc = new SummaryCollection(dm, new MockLogger()); assert.strictEqual(sc.opsSinceLastAck, 0); dm.emit("op", summaryOp); dm.emit("op", summaryNack); @@ -152,124 +129,95 @@ describe("Summary Collection", () => { }); describe("opActions",()=>{ + interface ISummaryCollectionWithCounters { + summaryCollection: SummaryCollection; + callCounts: { + default: number; + summarize: number; + summaryAck: number; + summaryNack: number; + }; + } + function createSummaryCollection( + deltaManager: IDeltaManager, + ): ISummaryCollectionWithCounters { + const summaryCollection = new SummaryCollection(deltaManager, new MockLogger()); + const callCounts: ISummaryCollectionWithCounters["callCounts"] = { + default: 0, + summarize: 0, + summaryAck: 0, + summaryNack: 0, + }; + summaryCollection.on("default", () => callCounts.default++); + summaryCollection.on(MessageType.Summarize, () => callCounts.summarize++); + summaryCollection.on(MessageType.SummaryAck, () => callCounts.summaryAck++); + summaryCollection.on(MessageType.SummaryNack, () => callCounts.summaryNack++); + return { summaryCollection, callCounts }; + } it("Summary op",()=>{ const dm = new MockDeltaManager(); - let called = 0; - new SummaryCollection( - dm, - new MockLogger(), - { - summarize:()=>called++, - }, - ); + const { callCounts } = createSummaryCollection(dm); dm.emit("op", summaryOp); - assert.strictEqual(called, 1); + assert.strictEqual(callCounts.summarize, 1); }); it("Summary Ack without op",()=>{ const dm = new MockDeltaManager(); - let called = 0; - new SummaryCollection( - dm, - new MockLogger(), - { - summaryAck:()=>called++, - }, - ); + const { callCounts } = createSummaryCollection(dm); dm.emit("op", summaryAck); - assert.strictEqual(called, 0); + assert.strictEqual(callCounts.summaryAck, 0); }); it("Summary Ack with op",()=>{ const dm = new MockDeltaManager(); - let called = 0; - new SummaryCollection( - dm, - new MockLogger(), - { - summaryAck:()=>called++, - }, - ); + const { callCounts } = createSummaryCollection(dm); dm.emit("op", summaryOp); dm.emit("op", summaryAck); - assert.strictEqual(called, 1); + assert.strictEqual(callCounts.summarize, 1); + assert.strictEqual(callCounts.summaryAck, 1); }); it("Double Summary Ack with op",()=>{ const dm = new MockDeltaManager(); - let called = 0; - new SummaryCollection( - dm, - new MockLogger(), - { - summaryAck:()=>called++, - }, - ); + const { callCounts } = createSummaryCollection(dm); dm.emit("op", summaryOp); dm.emit("op", summaryAck); dm.emit("op", summaryAck); - assert.strictEqual(called, 1); + assert.strictEqual(callCounts.summarize, 1); + assert.strictEqual(callCounts.summaryAck, 1); }); it("Summary Nack without op",()=>{ const dm = new MockDeltaManager(); - let called = 0; - new SummaryCollection( - dm, - new MockLogger(), - { - summaryNack:()=>called++, - }, - ); + const { callCounts } = createSummaryCollection(dm); dm.emit("op", summaryNack); - assert.strictEqual(called, 0); + assert.strictEqual(callCounts.summaryNack, 0); }); it("Summary Nack with op",()=>{ const dm = new MockDeltaManager(); - let called = 0; - new SummaryCollection( - dm, - new MockLogger(), - { - summaryNack:()=>called++, - }, - ); + const { callCounts } = createSummaryCollection(dm); dm.emit("op", summaryOp); dm.emit("op", summaryNack); - - assert.strictEqual(called, 1); + assert.strictEqual(callCounts.summarize, 1); + assert.strictEqual(callCounts.summaryNack, 1); }); it("Double Summary Nack with op",()=>{ const dm = new MockDeltaManager(); - let called = 0; - new SummaryCollection( - dm, - new MockLogger(), - { - summaryNack:()=>called++, - }, - ); + const { callCounts } = createSummaryCollection(dm); dm.emit("op", summaryOp); dm.emit("op", summaryNack); dm.emit("op", summaryNack); - - assert.strictEqual(called, 1); + assert.strictEqual(callCounts.summarize, 1); + assert.strictEqual(callCounts.summaryNack, 1); }); it("default",()=>{ const dm = new MockDeltaManager(); - let called = 0; - new SummaryCollection( - dm, - new MockLogger(), - { - default:()=>called++, - }, - ); + const { callCounts } = createSummaryCollection(dm); dm.emit("op", {}); - assert.strictEqual(called, 1); + assert.strictEqual(callCounts.default, 1); }); }); }); diff --git a/packages/test/test-end-to-end-tests/src/test/gcDataStoreRequests.spec.ts b/packages/test/test-end-to-end-tests/src/test/gcDataStoreRequests.spec.ts index 13109bfc94fd..3f72acb05c9e 100644 --- a/packages/test/test-end-to-end-tests/src/test/gcDataStoreRequests.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/gcDataStoreRequests.spec.ts @@ -106,7 +106,7 @@ describeNoCompat("GC Data Store Requests", (getTestObjectProvider) => { await provider.ensureSynchronized(); // Create and setup a summary collection that will be used to track and wait for summaries. - summaryCollection = new SummaryCollection(mainContainer.deltaManager, new TelemetryNullLogger(), {}); + summaryCollection = new SummaryCollection(mainContainer.deltaManager, new TelemetryNullLogger()); }); it("should fail requests with externalRequest flag for unreferenced data stores", async () => { diff --git a/packages/test/test-end-to-end-tests/src/test/gcReferenceUpdatesInSummarizer.spec.ts b/packages/test/test-end-to-end-tests/src/test/gcReferenceUpdatesInSummarizer.spec.ts index 9170db3d8042..459cfbc54f56 100644 --- a/packages/test/test-end-to-end-tests/src/test/gcReferenceUpdatesInSummarizer.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/gcReferenceUpdatesInSummarizer.spec.ts @@ -136,7 +136,7 @@ describeNoCompat("GC reference updates in summarizer", (getTestObjectProvider) = await provider.ensureSynchronized(); // Create and setup a summary collection that will be used to track and wait for summaries. - summaryCollection = new SummaryCollection(mainContainer.deltaManager, new TelemetryNullLogger(), {}); + summaryCollection = new SummaryCollection(mainContainer.deltaManager, new TelemetryNullLogger()); }); describe("SharedMatrix", () => {