Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elected summarizer logs #6106

Merged
merged 16 commits into from
May 27, 2021
2 changes: 1 addition & 1 deletion api-report/driver-utils.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export function combineAppAndProtocolSummary(appSummary: ISummaryTree, protocolS
export function configurableUrlResolver(resolversList: IUrlResolver[], request: IRequest): Promise<IResolvedUrl | undefined>;

// @public (undocumented)
export function createGenericNetworkError(errorMessage: string, canRetry: boolean, retryAfterSeconds?: number, statusCode?: number): GenericNetworkError | ThrottlingError;
export function createGenericNetworkError(errorMessage: string, canRetry: boolean, retryAfterSeconds?: number, statusCode?: number): ThrottlingError | GenericNetworkError;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tylerbutler any idea why this keeps flip flopping? seen in it my changes as well

Copy link
Member

@tylerbutler tylerbutler May 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I've noticed it as well. No idea why yet. I'll open an issue. #6158


// @public (undocumented)
export const createWriteError: (errorMessage: string) => NonRetryableError<DriverErrorType>;
Expand Down
15 changes: 15 additions & 0 deletions lerna-package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 16 additions & 21 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ import {
metadataBlobName,
wrapSummaryInChannelsTree,
} from "./summaryFormat";
import { SummaryCollection, SummaryCollectionOpActions } from "./summaryCollection";
import { SummaryCollection } from "./summaryCollection";
import { getLocalStorageFeatureGate } from "./localStorageFeatureGates";

export enum ContainerMessageType {
Expand Down Expand Up @@ -912,32 +912,25 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
this.emit("codeDetailsProposed", proposal.value, proposal);
}
});
const defaultAction = (op: ISequencedDocumentMessage,sc: SummaryCollection)=> {
if(sc.opsSinceLastAck > (this.runtimeOptions.summaryOptions.maxOpsSinceLastSummary ?? 3000)) {

this.summaryCollection = new SummaryCollection(this.deltaManager, this.logger);
const maxOpsSinceLastSummary = this.runtimeOptions.summaryOptions.maxOpsSinceLastSummary ?? 3000;
const defaultAction = () => {
if (this.summaryCollection.opsSinceLastAck > maxOpsSinceLastSummary) {
this.logger.sendErrorEvent({eventName: "SummaryStatus:Behind"});
// unregister default to no log on every op after falling behind
// and register summary ack handler to re-register this handler
// after successful summary
opActions.default = undefined;
opActions.summaryAck = summaryAckAction;
this.summaryCollection.once(MessageType.SummaryAck, () => {
this.logger.sendTelemetryEvent({eventName: "SummaryStatus:CaughtUp"});
// we've caught up, so re-register the default action to monitor for
// falling behind, and unregister ourself
this.summaryCollection.on("default", defaultAction);
});
this.summaryCollection.off("default", defaultAction);
}
};
const summaryAckAction = (op: ISequencedDocumentMessage,sc: SummaryCollection)=> {
this.logger.sendTelemetryEvent({eventName: "SummaryStatus:CaughtUp"});
// we've caught up, so re-register the default action to monitor for
// falling behind, and unregister ourself
opActions.default = defaultAction;
opActions.summaryAck = undefined;
};
const opActions: SummaryCollectionOpActions = {
default: defaultAction,
};

this.summaryCollection = new SummaryCollection(
this.deltaManager,
this.logger,
opActions,
);
this.summaryCollection.on("default", defaultAction);

// We always create the summarizer in the case that we are asked to generate summaries. But this may
// want to be on demand instead.
Expand All @@ -954,8 +947,10 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
// Create the SummaryManager and mark the initial state
this.summaryManager = new SummaryManager(
context,
this.summaryCollection,
this.runtimeOptions.summaryOptions.generateSummaries !== false,
this.logger,
maxOpsSinceLastSummary,
this.runtimeOptions.summaryOptions.initialSummarizerDelayMs);

if (this.connected) {
Expand Down
31 changes: 18 additions & 13 deletions packages/runtime/container-runtime/src/summaryCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* Licensed under the MIT License.
*/

import { IDisposable, ITelemetryLogger } from "@fluidframework/common-definitions";
import { Deferred, assert } from "@fluidframework/common-utils";
import { IDisposable, IEvent, ITelemetryLogger } from "@fluidframework/common-definitions";
import { Deferred, assert, TypedEventEmitter } from "@fluidframework/common-utils";
import { IDeltaManager } from "@fluidframework/container-definitions";
import {
IDocumentMessage,
Expand Down Expand Up @@ -192,17 +192,18 @@ class ClientSummaryWatcher implements IClientSummaryWatcher {
}
}

export type SummaryCollectionOpActions =
Partial<Record<
MessageType.Summarize | MessageType.SummaryAck | MessageType.SummaryNack | "default",
(op: ISequencedDocumentMessage, sc: SummaryCollection) => void
>>;
export type OpActionEventName = MessageType.Summarize | MessageType.SummaryAck | MessageType.SummaryNack | "default";
export type OpActionEventListener = (op: ISequencedDocumentMessage) => void;
export interface ISummaryCollectionOpEvents extends IEvent {
(event: OpActionEventName, listener: OpActionEventListener);
}

/**
* Data structure that looks at the op stream to track summaries as they
* are broadcast, acked and nacked.
* It provides functionality for watching specific summaries.
*/
export class SummaryCollection {
export class SummaryCollection extends TypedEventEmitter<ISummaryCollectionOpEvents> {
// key: clientId
private readonly summaryWatchers = new Map<string, ClientSummaryWatcher>();
// key: summarySeqNum
Expand All @@ -216,6 +217,10 @@ export class SummaryCollection {

public get latestAck(): IAckedSummary | undefined { return this.lastAck; }

public emit(event: OpActionEventName, ...args: Parameters<OpActionEventListener>): boolean {
return super.emit(event, ...args);
}

public get opsSinceLastAck() {
return this.deltaManager.lastSequenceNumber -
(this.lastAck?.summaryAck.sequenceNumber ?? this.deltaManager.initialSequenceNumber);
Expand All @@ -224,8 +229,8 @@ export class SummaryCollection {
public constructor(
private readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>,
private readonly logger: ITelemetryLogger,
private readonly opActions: SummaryCollectionOpActions,
) {
super();
this.deltaManager.on(
"op",
(op) => this.handleOp(op));
Expand Down Expand Up @@ -311,7 +316,7 @@ export class SummaryCollection {
) {
this.pendingAckTimerTimeoutCallback?.();
}
this.opActions.default?.(op, this);
this.emit("default", op);

return;
}
Expand Down Expand Up @@ -339,7 +344,7 @@ export class SummaryCollection {
}
this.pendingSummaries.set(op.sequenceNumber, summary);
this.lastSummaryTimestamp = op.timestamp;
this.opActions.summarize?.(op, this);
this.emit(MessageType.Summarize, op);
}

private handleSummaryAck(op: ISummaryAckMessage) {
Expand Down Expand Up @@ -377,7 +382,7 @@ export class SummaryCollection {
};
this.refreshWaitNextAck.resolve();
this.refreshWaitNextAck = new Deferred<void>();
this.opActions.summaryAck?.(op, this);
this.emit(MessageType.SummaryAck, op);
}
}

Expand All @@ -387,7 +392,7 @@ export class SummaryCollection {
if (summary) {
summary.ackNack(op);
this.pendingSummaries.delete(seq);
this.opActions.summaryNack?.(op, this);
this.emit(MessageType.SummaryNack, op);
}
}
}
Loading