Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Implement PartitionManager methods #4538

Merged
merged 6 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ export interface BatchOptions {
partitionKey?: string;
}

// @public
export interface Checkpoint {
consumerGroupName: string;
eventHubName: string;
instanceId: string;
offset: number;
partitionId: string;
sequenceNumber: number;
}

// @public
export class CheckpointManager {
// (undocumented)
Expand All @@ -40,7 +50,8 @@ export class CheckpointManager {
// @public
export enum CloseReason {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something all languages are exposing? I don't remember seeing it in our API design, that close() takes a reason. 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposed API for javascript, here it takes CloseReason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@conniey We brought that up on Monday too, because we were curious how Java would know why the subscription was closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also wonder what the user could do differently with that reason (ie. user shutdown vs stolen)? I mean, we log it in the EventConsumer but it is not propagated out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory if it was a shutdown they could checkpoint one last time, whereas if it were stolen they wouldn’t be able to update the checkpoint.

OwnershipLost = "OwnershipLost",
Shutdown = "Shutdown"
Shutdown = "Shutdown",
Unknown = "Unknown"
}

export { DataTransformer }
Expand Down Expand Up @@ -178,6 +189,13 @@ export interface EventProcessorOptions {
maxWaitTimeInSeconds?: number;
}

// @public
export class InMemoryPartitionManager implements PartitionManager {
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

export { MessagingError }

// @public
Expand All @@ -188,45 +206,28 @@ export type OnMessage = (eventData: ReceivedEventData) => void;

// @public
export interface PartitionContext {
// (undocumented)
readonly consumerGroupName: string;
// (undocumented)
readonly eventHubName: string;
// (undocumented)
readonly partitionId: string;
}

// @public
export interface PartitionManager {
// (undocumented)
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
// Warning: (ae-forgotten-export) The symbol "Checkpoint" needs to be exported by the entry point index.d.ts
//
// (undocumented)
createCheckpoint(checkpoint: Checkpoint): Promise<void>;
// (undocumented)
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

// @public
export interface PartitionOwnership {
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
// (undocumented)
consumerGroupName: string;
// (undocumented)
ETag?: string;
// (undocumented)
eTag?: string;
eventHubName: string;
// (undocumented)
instanceId: string;
// (undocumented)
lastModifiedTime?: number;
// (undocumented)
lastModifiedTimeInMS?: number;
offset?: number;
// (undocumented)
ownerLevel: number;
// (undocumented)
partitionId: string;
// (undocumented)
sequenceNumber?: number;
}

Expand Down
18 changes: 18 additions & 0 deletions sdk/eventhub/event-hubs/src/checkpointManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,29 @@ import { EventData } from "./eventData";
* Used by createCheckpoint in PartitionManager
**/
export interface Checkpoint {
/**
* @property The event hub name
*/
eventHubName: string;
/**
* @property The consumer group name
*/
consumerGroupName: string;
/**
* @property The unique instance identifier
*/
instanceId: string;
/**
* @property The identifier of the Event Hub partition
*/
partitionId: string;
/**
* @property The sequence number of the event.
*/
sequenceNumber: number;
/**
* @property The offset of the event.
*/
offset: number;
}

Expand Down
55 changes: 52 additions & 3 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,43 @@ export interface PartitionProcessor {
* returned by listOwnerships
*/
export interface PartitionOwnership {
/**
* @property The event hub name
*/
eventHubName: string;
/**
* @property The consumer group name
*/
consumerGroupName: string;
/**
* @property The unique instance identifier
*/
instanceId: string;
/**
* @property The identifier of the Event Hub partition
*/
partitionId: string;
/**
* @property
* The owner level
*/
ownerLevel: number;
/**
* @property The offset of the event.
*/
offset?: number;
/**
* @property The sequence number of the event.
*/
sequenceNumber?: number;
lastModifiedTime?: number;
ETag?: string;
/**
* @property The last modified time.
*/
lastModifiedTimeInMS?: number;
/**
* @property The unique identifier for the operation.
*/
eTag?: string;
}

/**
Expand All @@ -80,9 +108,30 @@ export interface PartitionProcessorFactory {
* Deals mainly with read/write to the chosen storage service
*/
export interface PartitionManager {
/**
* Called to get the list of all existing partition ownership from the underlying data store. Could return empty
* results if there are is no existing ownership information.
*
* @param eventHubName The event hub name.
* @param consumerGroupName The consumer group name.
* @return A list of partition ownership details of all the partitions that have/had an owner.
*/
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
/**
* Called to claim ownership of a list of partitions. This will return the list of partitions that were owned
* successfully.
*
* @param partitionOwnerships The list of partition ownerships this instance is claiming to own.
* @return A list of partitions this instance successfully claimed ownership.
*/
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
createCheckpoint(checkpoint: Checkpoint): Promise<void>;
/**
* Updates the checkpoint in the data store for a partition.
*
* @param checkpoint The checkpoint.
* @return The new eTag on successful update.
*/
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

// Options passed when creating EventProcessor, everything is optional
Expand Down
61 changes: 61 additions & 0 deletions sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { PartitionManager, PartitionOwnership } from "./eventProcessor";
import { Checkpoint } from "./checkpointManager";
import { generate_uuid } from "rhea-promise";

/**
* A simple in-memory implementation of a `PartitionManager`
* @class
*/
export class InMemoryPartitionManager implements PartitionManager {
private _partitionOwnershipMap: Map<string, PartitionOwnership> = new Map();

/**
* Get the list of all existing partition ownership from the underlying data store. Could return empty
* results if there are is no existing ownership information.
*
* @param eventHubName The event hub name.
* @param consumerGroupName The consumer group name.
* @return Partition ownership details of all the partitions that have/had an owner..
*/
async listOwnerships(
eventHubName: string,
consumerGroupName: string
): Promise<PartitionOwnership[]> {
return Array.from(this._partitionOwnershipMap.values());
}

/**
* Claim ownership of a list of partitions. This will return the list of partitions that were owned
* successfully.
*
* @param partitionOwnerships The list of partition ownerships this instance is claiming to own.
* @return A list partitions this instance successfully claimed ownership.
*/
async claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]> {
for (const partitionOwnership of partitionOwnerships) {
if (!this._partitionOwnershipMap.has(partitionOwnership.partitionId)) {
partitionOwnership.eTag = generate_uuid();
this._partitionOwnershipMap.set(partitionOwnership.partitionId, partitionOwnership);
}
}
return partitionOwnerships;
}

/**
* Updates the checkpoint in the data store for a partition.
*
* @param checkpoint The checkpoint.
* @return Promise<void>
*/
async updateCheckpoint(checkpoint: Checkpoint): Promise<void> {
const partitionOwnership = this._partitionOwnershipMap.get(checkpoint.partitionId);
if (partitionOwnership) {
partitionOwnership.sequenceNumber = checkpoint.sequenceNumber;
partitionOwnership.offset = checkpoint.offset;
partitionOwnership.eTag = generate_uuid();
}
}
}
2 changes: 2 additions & 0 deletions sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export {
PartitionOwnership
} from "./eventProcessor";
export { PartitionContext } from "./partitionContext";
export { InMemoryPartitionManager} from "./inMemoryPartitionManager"
export { Checkpoint } from "./checkpointManager";
export {
MessagingError,
DataTransformer,
Expand Down
12 changes: 12 additions & 0 deletions sdk/eventhub/event-hubs/src/partitionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,19 @@
* about the partition, the EventProcessor will be processing events from.
*/
export interface PartitionContext {
/**
* @property The identifier of the Event Hub partition
* @readonly
*/
readonly partitionId: string;
/**
* @property The event hub name
* @readonly
*/
readonly eventHubName: string;
/**
* @property The consumer group name
* @readonly
*/
readonly consumerGroupName: string;
}
53 changes: 53 additions & 0 deletions sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ import {
EventProcessor,
PartitionContext,
delay,
InMemoryPartitionManager,
PartitionOwnership,
Checkpoint,
PartitionProcessorFactory,
CloseReason
} from "../src";
import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
import { generate_uuid } from "rhea-promise";
const env = getEnvVars();

describe("Event Processor", function(): void {
Expand Down Expand Up @@ -431,4 +435,53 @@ describe("Event Processor", function(): void {
isCloseCalled.should.equal(true);
});
});

describe("InMemory Partition Manager", function(): void {
it("should claim ownership, get a list of ownership and update checkpoint", async function(): Promise<void> {
const inMemoryPartitionManager = new InMemoryPartitionManager();
const partitionOwnership1: PartitionOwnership = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
partitionId: "0",
ownerLevel: 10
};
const partitionOwnership2: PartitionOwnership = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
partitionId: "1",
ownerLevel: 10
};
const partitionOwnership = await inMemoryPartitionManager.claimOwnerships([
partitionOwnership1,
partitionOwnership2
]);
partitionOwnership.length.should.equals(2);

const ownershipslist = await inMemoryPartitionManager.listOwnerships(
"myEventHub",
EventHubClient.defaultConsumerGroupName
);
ownershipslist.length.should.equals(2);

const checkpoint: Checkpoint = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
partitionId: "0",
sequenceNumber: 10,
offset: 50
};

await inMemoryPartitionManager.updateCheckpoint(checkpoint);
const partitionOwnershipList = await inMemoryPartitionManager.listOwnerships(
"myEventHub",
EventHubClient.defaultConsumerGroupName
);
partitionOwnershipList[0].partitionId.should.equals(checkpoint.partitionId);
partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber);
partitionOwnershipList[0].offset!.should.equals(checkpoint.offset);
});
});
}).timeout(90000);