diff --git a/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts b/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts index f2f0297c1ef4..7e4d1100eeae 100644 --- a/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts +++ b/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. import { isDefined } from "../util/typeGuards"; +import { mapPartitionKeyToId } from "./patitionKeyToIdMapper"; /** * @internal @@ -52,17 +53,12 @@ export class PartitionAssigner { } if (isDefined(partitionKey)) { - return this._assignPartitionForPartitionKey(partitionKey); + return mapPartitionKeyToId(partitionKey, this._partitions.length).toString(); } return this._assignRoundRobinPartition(); } - private _assignPartitionForPartitionKey(partitionKey: string): string { - // TODO: Implement hashing function - return partitionKey ? this._partitions[0] : this._partitions[0]; - } - private _assignRoundRobinPartition(): string { const maxPartitionIndex = this._partitions.length - 1; const proposedPartitionIndex = this._lastRoundRobinPartitionIndex + 1; diff --git a/sdk/eventhub/event-hubs/src/impl/patitionKeyToIdMapper.ts b/sdk/eventhub/event-hubs/src/impl/patitionKeyToIdMapper.ts new file mode 100644 index 000000000000..4e85f1e1a00a --- /dev/null +++ b/sdk/eventhub/event-hubs/src/impl/patitionKeyToIdMapper.ts @@ -0,0 +1,130 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/* eslint-disable no-fallthrough */ + +import os from "os"; + +export function mapPartitionKeyToId(partitionKey: string, partitionCount: number): number { + const hash = computeHash(Buffer.from(partitionKey, "utf8")); + const hashedParitionKey = castToInt16(hash.c ^ hash.b); + return Math.abs(hashedParitionKey % partitionCount); +} + +function readUInt32(data: Buffer, offset: number): number { + return os.endianness() === "BE" ? data.readUInt32BE(offset) : data.readUInt32LE(offset); +} + +function castToInt16(n: number): number { + return new Int16Array([n])[0]; +} + +function computeHash(data: Buffer, seed1: number = 0, seed2: number = 0): any { + let a: number, b: number, c: number; + + a = b = c = 0xdeadbeef + data.length + seed1; + c += seed2; + + let index = 0, + size = data.length; + while (size > 12) { + a += readUInt32(data, index); + b += readUInt32(data, index + 4); + c += readUInt32(data, index + 8); + + a -= c; + a ^= (c << 4) | (c >>> 28); + c += b; + + b -= a; + b ^= (a << 6) | (a >>> 26); + a += c; + + c -= b; + c ^= (b << 8) | (b >>> 24); + b += a; + + a -= c; + a ^= (c << 16) | (c >>> 16); + c += b; + + b -= a; + b ^= (a << 19) | (a >>> 13); + a += c; + + c -= b; + c ^= (b << 4) | (b >>> 28); + b += a; + + index += 12; + size -= 12; + } + + let curr = size; + switch (curr) { + case 12: + a += readUInt32(data, index); + b += readUInt32(data, index + 4); + c += readUInt32(data, index + 8); + break; + case 11: + c += data[index + 10] << 16; + curr = 10; + case 10: + c += data[index + 9] << 8; + curr = 9; + case 9: + c += data[index + 8]; + curr = 8; + case 8: + b += readUInt32(data, index + 4); + a += readUInt32(data, index); + break; + case 7: + b += data[index + 6] << 16; + curr = 6; + case 6: + b += data[index + 5] << 8; + curr = 5; + case 5: + b += data[index + 4]; + curr = 4; + case 4: + a += readUInt32(data, index); + break; + case 3: + a += data[index + 2] << 16; + curr = 2; + case 2: + a += data[index + 1] << 8; + curr = 1; + case 1: + a += data[index]; + break; + case 0: + return { b: b >>> 0, c: c >>> 0 }; + } + + c ^= b; + c -= (b << 14) | (b >>> 18); + + a ^= c; + a -= (c << 11) | (c >>> 21); + + b ^= a; + b -= (a << 25) | (a >>> 7); + + c ^= b; + c -= (b << 16) | (b >>> 16); + + a ^= c; + a -= (c << 4) | (c >>> 28); + + b ^= a; + b -= (a << 14) | (a >>> 18); + + c ^= b; + c -= (b << 24) | (b >>> 8); + + return { b: b >>> 0, c: c >>> 0 }; +} diff --git a/sdk/eventhub/event-hubs/test/internal/impl/mapPartitionKeyToId.spec.ts b/sdk/eventhub/event-hubs/test/internal/impl/mapPartitionKeyToId.spec.ts new file mode 100644 index 000000000000..e076173a9eae --- /dev/null +++ b/sdk/eventhub/event-hubs/test/internal/impl/mapPartitionKeyToId.spec.ts @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { assert } from "chai"; +import { mapPartitionKeyToId } from "../../../src/impl/patitionKeyToIdMapper"; + +/** + * These unit tests have been created from outputs received from the C# implementation + * of Jenkins lookup3 that the Event Hubs service uses. + */ +describe("mapPartitionKeyToId", () => { + it("short key, small partitions count", () => { + assert.equal(mapPartitionKeyToId("alphabet", 3), 0); + }); + + it("short key, large partitions count", () => { + assert.equal(mapPartitionKeyToId("alphabet", 11), 4); + }); + + it("long key, small partitions count", () => { + assert.equal(mapPartitionKeyToId("TheBestParitionEver", 4), 2); + }); + + it("long key, large partitions count", () => { + assert.equal(mapPartitionKeyToId("TheWorstParitionEver", 15), 1); + }); +}); diff --git a/sdk/eventhub/event-hubs/tsconfig.json b/sdk/eventhub/event-hubs/tsconfig.json index 2570c3f0d26e..4a2b540ed8ba 100644 --- a/sdk/eventhub/event-hubs/tsconfig.json +++ b/sdk/eventhub/event-hubs/tsconfig.json @@ -4,6 +4,7 @@ "declarationDir": "./types", "outDir": "./dist-esm", "downlevelIteration": true, + "noFallthroughCasesInSwitch": false, "paths": { "@azure/event-hubs": ["./src/index"] }