Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Buffered Event Hubs Producer] Implements Parition Key to Partition ID mapping #18331

Merged
merged 7 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

import { isDefined } from "../util/typeGuards";
import { mapPartitionKeyToId } from "./patitionKeyToIdMapper";

/**
* @internal
Expand Down Expand Up @@ -52,17 +53,12 @@ export class PartitionAssigner {
}

if (isDefined(partitionKey)) {
return this._assignPartitionForPartitionKey(partitionKey);
return mapPartitionKeyToId(partitionKey, this._partitions.length).toString();
}

return this._assignRoundRobinPartition();
}

private _assignPartitionForPartitionKey(partitionKey: string): string {
// TODO: Implement hashing function
return partitionKey ? this._partitions[0] : this._partitions[0];
}

private _assignRoundRobinPartition(): string {
const maxPartitionIndex = this._partitions.length - 1;
const proposedPartitionIndex = this._lastRoundRobinPartitionIndex + 1;
Expand Down
130 changes: 130 additions & 0 deletions sdk/eventhub/event-hubs/src/impl/patitionKeyToIdMapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

/* eslint-disable no-fallthrough */

import os from "os";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os might not work in browser?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah many things here are node specific such as os and Buffer but the tests passed so I assumed there is polyfilling going on. I will revisit the browser support.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollup shims perhaps


export function mapPartitionKeyToId(partitionKey: string, partitionCount: number): number {
const hash = computeHash(Buffer.from(partitionKey, "utf8"));
const hashedParitionKey = castToInt16(hash.c ^ hash.b);
return Math.abs(hashedParitionKey % partitionCount);
}

function readUInt32(data: Buffer, offset: number): number {
return os.endianness() === "BE" ? data.readUInt32BE(offset) : data.readUInt32LE(offset);
}

function castToInt16(n: number): number {
return new Int16Array([n])[0];
}

function computeHash(data: Buffer, seed1: number = 0, seed2: number = 0): any {
let a: number, b: number, c: number;

a = b = c = 0xdeadbeef + data.length + seed1;
c += seed2;

let index = 0,
size = data.length;
while (size > 12) {
a += readUInt32(data, index);
b += readUInt32(data, index + 4);
c += readUInt32(data, index + 8);

a -= c;
a ^= (c << 4) | (c >>> 28);
c += b;

b -= a;
b ^= (a << 6) | (a >>> 26);
a += c;

c -= b;
c ^= (b << 8) | (b >>> 24);
b += a;

a -= c;
a ^= (c << 16) | (c >>> 16);
c += b;

b -= a;
b ^= (a << 19) | (a >>> 13);
a += c;

c -= b;
c ^= (b << 4) | (b >>> 28);
b += a;

index += 12;
size -= 12;
}

let curr = size;
switch (curr) {
case 12:
a += readUInt32(data, index);
b += readUInt32(data, index + 4);
c += readUInt32(data, index + 8);
break;
case 11:
c += data[index + 10] << 16;
curr = 10;
case 10:
c += data[index + 9] << 8;
curr = 9;
case 9:
c += data[index + 8];
curr = 8;
case 8:
b += readUInt32(data, index + 4);
a += readUInt32(data, index);
break;
case 7:
b += data[index + 6] << 16;
curr = 6;
case 6:
b += data[index + 5] << 8;
curr = 5;
case 5:
b += data[index + 4];
curr = 4;
case 4:
a += readUInt32(data, index);
break;
case 3:
a += data[index + 2] << 16;
curr = 2;
case 2:
a += data[index + 1] << 8;
curr = 1;
case 1:
a += data[index];
break;
case 0:
return { b: b >>> 0, c: c >>> 0 };
}

c ^= b;
c -= (b << 14) | (b >>> 18);

a ^= c;
a -= (c << 11) | (c >>> 21);

b ^= a;
b -= (a << 25) | (a >>> 7);

c ^= b;
c -= (b << 16) | (b >>> 16);

a ^= c;
a -= (c << 4) | (c >>> 28);

b ^= a;
b -= (a << 14) | (a >>> 18);

c ^= b;
c -= (b << 24) | (b >>> 8);

return { b: b >>> 0, c: c >>> 0 };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { assert } from "chai";
import { mapPartitionKeyToId } from "../../../src/impl/patitionKeyToIdMapper";

/**
* These unit tests have been created from outputs received from the C# implementation
* of Jenkins lookup3 that the Event Hubs service uses.
*/
describe("mapPartitionKeyToId", () => {
it("short key, small partitions count", () => {
assert.equal(mapPartitionKeyToId("alphabet", 3), 0);
});

it("short key, large partitions count", () => {
assert.equal(mapPartitionKeyToId("alphabet", 11), 4);
});

it("long key, small partitions count", () => {
assert.equal(mapPartitionKeyToId("TheBestParitionEver", 4), 2);
});

it("long key, large partitions count", () => {
assert.equal(mapPartitionKeyToId("TheWorstParitionEver", 15), 1);
});
});
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"declarationDir": "./types",
"outDir": "./dist-esm",
"downlevelIteration": true,
"noFallthroughCasesInSwitch": false,
"paths": {
"@azure/event-hubs": ["./src/index"]
}
Expand Down