Skip to content

Commit

Permalink
[EPH] implement CheckpointManager methods (#4583)
Browse files Browse the repository at this point in the history
* [EPH] implement CheckpointManager methods
  • Loading branch information
ShivangiReja authored Aug 1, 2019
1 parent 97b684e commit f1c472c
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 89 deletions.
34 changes: 17 additions & 17 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface BatchOptions {
// @public
export interface Checkpoint {
consumerGroupName: string;
eTag: string;
eventHubName: string;
instanceId: string;
offset: number;
Expand All @@ -41,17 +42,16 @@ export interface Checkpoint {

// @public
export class CheckpointManager {
// (undocumented)
updateCheckpoint(eventData: EventData): Promise<void>;
// (undocumented)
updateCheckpoint(offset: string, sequenceNumber: number): Promise<void>;
constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, instanceId: string);
updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;
}

// @public
export enum CloseReason {
EventHubException = "EventHubException",
OwnershipLost = "OwnershipLost",
Shutdown = "Shutdown",
Unknown = "Unknown"
Shutdown = "Shutdown"
}

export { DataTransformer }
Expand Down Expand Up @@ -164,11 +164,11 @@ export class EventPosition {
static earliest(): EventPosition;
enqueuedTime?: Date | number;
static fromEnqueuedTime(enqueuedTime: Date | number): EventPosition;
static fromOffset(offset: string, isInclusive?: boolean): EventPosition;
static fromOffset(offset: number, isInclusive?: boolean): EventPosition;
static fromSequenceNumber(sequenceNumber: number, isInclusive?: boolean): EventPosition;
isInclusive: boolean;
static latest(): EventPosition;
offset?: string;
offset?: number | "@latest";
sequenceNumber?: number;
}

Expand All @@ -191,9 +191,9 @@ export interface EventProcessorOptions {

// @public
export class InMemoryPartitionManager implements PartitionManager {
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<string>;
}

export { MessagingError }
Expand All @@ -213,9 +213,9 @@ export interface PartitionContext {

// @public
export interface PartitionManager {
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<string>;
}

// @public
Expand All @@ -236,7 +236,7 @@ export interface PartitionProcessor {
close?(reason: CloseReason): Promise<void>;
initialize?(): Promise<void>;
processError(error: Error): Promise<void>;
processEvents(events: EventData[]): Promise<void>;
processEvents(events: ReceivedEventData[]): Promise<void>;
}

// @public
Expand All @@ -249,7 +249,7 @@ export interface PartitionProcessorFactory {
export interface PartitionProperties {
beginningSequenceNumber: number;
eventHubPath: string;
lastEnqueuedOffset: string;
lastEnqueuedOffset: number;
lastEnqueuedSequenceNumber: number;
lastEnqueuedTimeUtc: Date;
partitionId: string;
Expand All @@ -259,7 +259,7 @@ export interface PartitionProperties {
export interface ReceivedEventData {
body: any;
enqueuedTimeUtc: Date;
offset: string;
offset: number;
partitionKey: string | null;
properties?: {
[key: string]: any;
Expand Down
79 changes: 57 additions & 22 deletions sdk/eventhub/event-hubs/src/checkpointManager.ts
Original file line number Diff line number Diff line change
@@ -1,58 +1,93 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

// import { PartitionContext } from "./partitionContext";
import { EventData } from "./eventData";
// import { PartitionManager } from "./eventProcessor";
import { PartitionContext } from "./partitionContext";
import { ReceivedEventData } from "./eventData";
import { PartitionManager } from "./eventProcessor";

/**
* Used by createCheckpoint in PartitionManager
**/
export interface Checkpoint {
/**
/**
* @property The event hub name
*/
eventHubName: string;
/**
/**
* @property The consumer group name
*/
consumerGroupName: string;
/**
/**
* @property The unique instance identifier
*/
instanceId: string;
/**
/**
* @property The identifier of the Event Hub partition
*/
partitionId: string;
/**
/**
* @property The sequence number of the event.
*/
sequenceNumber: number;
/**
/**
* @property The offset of the event.
*/
offset: number;
/**
* @property The unique identifier for the operation.
*/
eTag: string;
}

/**
* CheckPointManager is created by the library & passed to user's code to let them create a checkpoint
*/
export class CheckpointManager {
// private _partitionContext: PartitionContext; // for internal use by createCheckpoint
// private _partitionManager: PartitionManager; // for internal use by createCheckpoint

// constructor(partitionContext: PartitionContext, partitionManager: PartitionManager) {
// this._partitionContext = partitionContext;
// this._partitionManager = partitionManager;
// }

public async updateCheckpoint(eventData: EventData): Promise<void>;
private _partitionContext: PartitionContext;
private _partitionManager: PartitionManager;
private _instanceId: string;
private _eTag: string;

public async updateCheckpoint(offset: string, sequenceNumber: number): Promise<void>;
constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, instanceId: string) {
this._partitionContext = partitionContext;
this._partitionManager = partitionManager;
this._instanceId = instanceId;
this._eTag = "";
}
/**
* Updates a checkpoint using the event data.
*
* @param eventData The event data to use for updating the checkpoint.
* @return Promise<void>
*/
public async updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
/**
* Updates a checkpoint using the given offset and sequence number.
*
* @param sequenceNumber The sequence number to update the checkpoint.
* @param offset The offset to update the checkpoint.
* @return Promise<void>.
*/
public async updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;

public async updateCheckpoint(
eventDataOrOffset: EventData | string,
sequenceNumber?: number
): Promise<void> {}
eventDataOrSequenceNumber: ReceivedEventData | number,
offset?: number
): Promise<void> {
const checkpoint: Checkpoint = {
eventHubName: this._partitionContext.eventHubName,
consumerGroupName: this._partitionContext.consumerGroupName,
instanceId: this._instanceId,
partitionId: this._partitionContext.partitionId,
sequenceNumber:
typeof eventDataOrSequenceNumber === "number"
? eventDataOrSequenceNumber
: eventDataOrSequenceNumber.sequenceNumber,
offset:
typeof eventDataOrSequenceNumber === "number" ? offset! : eventDataOrSequenceNumber.offset,
eTag: this._eTag
};

this._eTag = await this._partitionManager.updateCheckpoint(checkpoint);
}
}
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export interface EventDataInternal {
/**
* @property [offset] The offset of the event.
*/
offset?: string;
offset?: number;
/**
* @property [sequenceNumber] The sequence number of the event.
*/
Expand Down Expand Up @@ -216,7 +216,7 @@ export interface ReceivedEventData {
/**
* @property The offset of the event.
*/
offset: string;
offset: number;
/**
* @property The sequence number of the event.
*/
Expand Down
16 changes: 8 additions & 8 deletions sdk/eventhub/event-hubs/src/eventPosition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface EventPositionOptions {
* @property The offset of the event at the position. It can be undefined
* if the position is just created from a sequence number or an enqueued time.
*/
offset?: string;
offset?: number | "@latest";
/**
* @property Indicates if the current event at the specified offset is
* included or not. It is only applicable if offset is set. Default value: false.
Expand All @@ -40,20 +40,20 @@ export interface EventPositionOptions {
*/
export class EventPosition {
/**
* @property The token that represents the beginning event in the stream of a partition: `"-1"`.
* @property The token that represents the beginning event in the stream of a partition: `-1`.
* @static
* @readonly
* @ignore
*/
private static readonly startOfStream: string = "-1";
private static readonly startOfStream: number = -1;

/**
* @property The token that represents the last event in the stream of a partition: `"@latest"`.
* @static
* @readonly
* @ignore
*/
private static readonly endOfStream: string = "@latest";
private static readonly endOfStream = "@latest";
/**
* @property The offset of the event identified by this position.
* Expected to be undefined if the position is just created from a sequence number or an enqueued time.
Expand All @@ -63,7 +63,7 @@ export class EventPosition {
* The same offset may refer to a different event as events reach the age limit for
* retention and are no longer visible within the partition.
*/
offset?: string;
offset?: number | "@latest";
/**
* @property Indicates if the specified offset is inclusive of the event which it identifies.
* This information is only relevent if the event position was identified by an offset or sequence number.
Expand Down Expand Up @@ -106,11 +106,11 @@ export class EventPosition {
* Default: `false`.
* @returns EventPosition
*/
static fromOffset(offset: string, isInclusive?: boolean): EventPosition {
static fromOffset(offset: number, isInclusive?: boolean): EventPosition {
if (offset == undefined) {
throw new Error('Missing parameter "offset"');
}
return new EventPosition({ offset: String(offset), isInclusive: isInclusive });
return new EventPosition({ offset: offset, isInclusive: isInclusive });
}

/**
Expand Down Expand Up @@ -165,7 +165,7 @@ export class EventPosition {
*/

static latest(): EventPosition {
return EventPosition.fromOffset(EventPosition.endOfStream);
return new EventPosition({ offset: EventPosition.endOfStream });
}
}

Expand Down
31 changes: 23 additions & 8 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { EventHubClient } from "./eventHubClient";
import { EventPosition } from "./eventPosition";
import { PartitionContext } from "./partitionContext";
import { CheckpointManager, Checkpoint } from "./checkpointManager";
import { EventData } from "./eventData";
import { ReceivedEventData } from "./eventData";
import { PumpManager } from "./pumpManager";
import { AbortSignalLike, AbortController } from "@azure/abort-controller";
import * as log from "./log";
Expand Down Expand Up @@ -44,7 +44,7 @@ export interface PartitionProcessor {
/**
* Called when a batch of events have been received.
*/
processEvents(events: EventData[]): Promise<void>;
processEvents(events: ReceivedEventData[]): Promise<void>;
/**
* Called when the underlying client experiences an error while receiving.
*/
Expand All @@ -53,7 +53,7 @@ export interface PartitionProcessor {

/**
* used by PartitionManager to claim ownership.
* returned by listOwnerships
* returned by listOwnership
*/
export interface PartitionOwnership {
/**
Expand Down Expand Up @@ -116,22 +116,22 @@ export interface PartitionManager {
* @param consumerGroupName The consumer group name.
* @return A list of partition ownership details of all the partitions that have/had an owner.
*/
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
/**
* Called to claim ownership of a list of partitions. This will return the list of partitions that were owned
* successfully.
*
* @param partitionOwnerships The list of partition ownerships this instance is claiming to own.
* @param partitionOwnership The list of partition ownership this instance is claiming to own.
* @return A list of partitions this instance successfully claimed ownership.
*/
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
/**
* Updates the checkpoint in the data store for a partition.
*
* @param checkpoint The checkpoint.
* @return The new eTag on successful update.
*/
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
updateCheckpoint(checkpoint: Checkpoint): Promise<string>;
}

// Options passed when creating EventProcessor, everything is optional
Expand All @@ -155,6 +155,7 @@ export class EventProcessor {
private _isRunning: boolean = false;
private _loopTask?: PromiseLike<void>;
private _abortController?: AbortController;
private _partitionManager: PartitionManager;

constructor(
consumerGroupName: string,
Expand All @@ -168,6 +169,7 @@ export class EventProcessor {
this._consumerGroupName = consumerGroupName;
this._eventHubClient = eventHubClient;
this._partitionProcessorFactory = partitionProcessorFactory;
this._partitionManager = partitionManager;
this._processorOptions = options;
this._pumpManager = new PumpManager(this._id, options);
}
Expand Down Expand Up @@ -216,7 +218,20 @@ export class EventProcessor {
partitionId: partitionId
};

const checkpointManager = new CheckpointManager();
const partitionOwnership: PartitionOwnership = {
eventHubName: this._eventHubClient.eventHubName,
consumerGroupName: this._consumerGroupName,
instanceId: this._id,
partitionId: partitionId,
ownerLevel: 0
};
await this._partitionManager.claimOwnership([partitionOwnership]);

const checkpointManager = new CheckpointManager(
partitionContext,
this._partitionManager,
this._id
);

log.eventProcessor(
`[${this._id}] [${partitionId}] Calling user-provided PartitionProcessorFactory.`
Expand Down
Loading

0 comments on commit f1c472c

Please sign in to comment.